lakehouse_engine.io.writers.dataframe_writer

Module to define behaviour to write to dataframe.

  1"""Module to define behaviour to write to dataframe."""
  2
  3from typing import Callable, Optional, OrderedDict
  4
  5from pyspark.sql import DataFrame
  6from pyspark.sql.types import StructType
  7
  8from lakehouse_engine.core.definitions import OutputFormat, OutputSpec
  9from lakehouse_engine.core.exec_env import ExecEnv
 10from lakehouse_engine.io.exceptions import NotSupportedException
 11from lakehouse_engine.io.writer import Writer
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13
 14
 15class DataFrameWriter(Writer):
 16    """Class to write data to dataframe."""
 17
 18    _logger = LoggingHandler(__name__).get_logger()
 19
 20    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 21        """Construct DataFrameWriter instances.
 22
 23        Args:
 24            output_spec: output specification.
 25            df: dataframe to be written.
 26            data: list of all dfs generated on previous steps before writer.
 27        """
 28        super().__init__(output_spec, df, data)
 29
 30    def write(self) -> Optional[OrderedDict]:
 31        """Write data to dataframe."""
 32        self._output_spec.options = (
 33            self._output_spec.options if self._output_spec.options else {}
 34        )
 35        written_dfs: OrderedDict = OrderedDict({})
 36
 37        if (
 38            self._output_spec.streaming_processing_time
 39            or self._output_spec.streaming_continuous
 40        ):
 41            raise NotSupportedException(
 42                f"DataFrame writer doesn't support "
 43                f"processing time or continuous streaming "
 44                f"for step ${self._output_spec.spec_id}."
 45            )
 46
 47        if self._df.isStreaming:
 48            output_df = self._write_to_dataframe_in_streaming_mode(
 49                self._df, self._output_spec, self._data
 50            )
 51        else:
 52            output_df = self._df
 53
 54        written_dfs[self._output_spec.spec_id] = output_df
 55
 56        return written_dfs
 57
 58    @staticmethod
 59    def _create_global_view(df: DataFrame, stream_df_view_name: str) -> None:
 60        """Given a dataframe create a global temp view to be available for consumption.
 61
 62        Args:
 63            df: dataframe to be shown.
 64            stream_df_view_name: stream df view name.
 65        """
 66        if DataFrameWriter._table_exists(stream_df_view_name):
 67            DataFrameWriter._logger.info("Global temp view exists")
 68            existing_data = ExecEnv.SESSION.table(f"global_temp.{stream_df_view_name}")
 69            df = existing_data.union(df)
 70
 71        df.createOrReplaceGlobalTempView(f"{stream_df_view_name}")
 72
 73    @staticmethod
 74    def _write_streaming_df(stream_df_view_name: str) -> Callable:
 75        """Define how to create a df from streaming df.
 76
 77        Args:
 78            stream_df_view_name: stream df view name.
 79
 80        Returns:
 81            A function to show df in the foreachBatch spark write method.
 82        """
 83
 84        def inner(batch_df: DataFrame, batch_id: int) -> None:
 85            DataFrameWriter._create_global_view(batch_df, stream_df_view_name)
 86
 87        return inner
 88
 89    @staticmethod
 90    def _write_to_dataframe_in_streaming_mode(
 91        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 92    ) -> DataFrame:
 93        """Write to DataFrame in streaming mode.
 94
 95        Args:
 96            df: dataframe to write.
 97            output_spec: output specification.
 98            data: list of all dfs generated on previous steps before writer.
 99        """
100        app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
101        stream_df_view_name = f"`{app_id}_{output_spec.spec_id}`"
102        DataFrameWriter._logger.info("Drop temp view if exists")
103
104        if DataFrameWriter._table_exists(stream_df_view_name):
105            # Cleaning global temp view to not maintain state and impact other acon runs
106            view_name = stream_df_view_name.strip("`")
107            ExecEnv.SESSION.sql(f"DROP VIEW global_temp.`{view_name}`")
108
109        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
110
111        if (
112            output_spec.streaming_micro_batch_transformers
113            or output_spec.streaming_micro_batch_dq_processors
114        ):
115            stream_df = (
116                df_writer.options(**output_spec.options if output_spec.options else {})
117                .format(OutputFormat.NOOP.value)
118                .foreachBatch(
119                    DataFrameWriter._write_transformed_micro_batch(
120                        output_spec, data, stream_df_view_name
121                    )
122                )
123                .start()
124            )
125        else:
126            stream_df = (
127                df_writer.options(**output_spec.options if output_spec.options else {})
128                .format(OutputFormat.NOOP.value)
129                .foreachBatch(DataFrameWriter._write_streaming_df(stream_df_view_name))
130                .start()
131            )
132
133        if output_spec.streaming_await_termination:
134            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
135
136        DataFrameWriter._logger.info("Reading stream data as df if exists")
137        if DataFrameWriter._table_exists(stream_df_view_name):
138            stream_data_as_df = ExecEnv.SESSION.table(
139                f"global_temp.{stream_df_view_name}"
140            )
141        else:
142            DataFrameWriter._logger.info(
143                f"DataFrame writer couldn't find any data to return "
144                f"for streaming, check if you are using checkpoint "
145                f"for step {output_spec.spec_id}."
146            )
147            stream_data_as_df = ExecEnv.SESSION.createDataFrame(
148                data=[], schema=StructType([])
149            )
150
151        return stream_data_as_df
152
153    @staticmethod
154    def _table_exists(  # type: ignore
155        table_name: str, db_name: str = "global_temp"
156    ) -> bool:
157        """Check if the table exists in the session catalog.
158
159        Args:
160            table_name: table/view name to check if exists in the session.
161            db_name: database name that you want to check if the table/view exists,
162                default value is the global_temp.
163
164        Returns:
165            A bool representing if the table/view exists.
166        """
167        table_name = table_name.strip("`")
168        return (
169            len(
170                ExecEnv.SESSION.sql(f"SHOW  TABLES  IN `{db_name}`")
171                .filter(f"tableName = '{table_name}'")
172                .collect()
173            )
174            > 0
175        )
176
177    @staticmethod
178    def _write_transformed_micro_batch(  # type: ignore
179        output_spec: OutputSpec, data: OrderedDict, stream_as_df_view
180    ) -> Callable:
181        """Define how to write a streaming micro batch after transforming it.
182
183        Args:
184            output_spec: output specification.
185            data: list of all dfs generated on previous steps before writer.
186            stream_as_df_view: stream df view name.
187
188        Returns:
189            A function to be executed in the foreachBatch spark write method.
190        """
191
192        def inner(batch_df: DataFrame, batch_id: int) -> None:
193            transformed_df = Writer.get_transformed_micro_batch(
194                output_spec, batch_df, batch_id, data
195            )
196
197            if output_spec.streaming_micro_batch_dq_processors:
198                transformed_df = Writer.run_micro_batch_dq_process(
199                    transformed_df, output_spec.streaming_micro_batch_dq_processors
200                )
201
202            DataFrameWriter._create_global_view(transformed_df, stream_as_df_view)
203
204        return inner
class DataFrameWriter(lakehouse_engine.io.writer.Writer):
 16class DataFrameWriter(Writer):
 17    """Class to write data to dataframe."""
 18
 19    _logger = LoggingHandler(__name__).get_logger()
 20
 21    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 22        """Construct DataFrameWriter instances.
 23
 24        Args:
 25            output_spec: output specification.
 26            df: dataframe to be written.
 27            data: list of all dfs generated on previous steps before writer.
 28        """
 29        super().__init__(output_spec, df, data)
 30
 31    def write(self) -> Optional[OrderedDict]:
 32        """Write data to dataframe."""
 33        self._output_spec.options = (
 34            self._output_spec.options if self._output_spec.options else {}
 35        )
 36        written_dfs: OrderedDict = OrderedDict({})
 37
 38        if (
 39            self._output_spec.streaming_processing_time
 40            or self._output_spec.streaming_continuous
 41        ):
 42            raise NotSupportedException(
 43                f"DataFrame writer doesn't support "
 44                f"processing time or continuous streaming "
 45                f"for step ${self._output_spec.spec_id}."
 46            )
 47
 48        if self._df.isStreaming:
 49            output_df = self._write_to_dataframe_in_streaming_mode(
 50                self._df, self._output_spec, self._data
 51            )
 52        else:
 53            output_df = self._df
 54
 55        written_dfs[self._output_spec.spec_id] = output_df
 56
 57        return written_dfs
 58
 59    @staticmethod
 60    def _create_global_view(df: DataFrame, stream_df_view_name: str) -> None:
 61        """Given a dataframe create a global temp view to be available for consumption.
 62
 63        Args:
 64            df: dataframe to be shown.
 65            stream_df_view_name: stream df view name.
 66        """
 67        if DataFrameWriter._table_exists(stream_df_view_name):
 68            DataFrameWriter._logger.info("Global temp view exists")
 69            existing_data = ExecEnv.SESSION.table(f"global_temp.{stream_df_view_name}")
 70            df = existing_data.union(df)
 71
 72        df.createOrReplaceGlobalTempView(f"{stream_df_view_name}")
 73
 74    @staticmethod
 75    def _write_streaming_df(stream_df_view_name: str) -> Callable:
 76        """Define how to create a df from streaming df.
 77
 78        Args:
 79            stream_df_view_name: stream df view name.
 80
 81        Returns:
 82            A function to show df in the foreachBatch spark write method.
 83        """
 84
 85        def inner(batch_df: DataFrame, batch_id: int) -> None:
 86            DataFrameWriter._create_global_view(batch_df, stream_df_view_name)
 87
 88        return inner
 89
 90    @staticmethod
 91    def _write_to_dataframe_in_streaming_mode(
 92        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 93    ) -> DataFrame:
 94        """Write to DataFrame in streaming mode.
 95
 96        Args:
 97            df: dataframe to write.
 98            output_spec: output specification.
 99            data: list of all dfs generated on previous steps before writer.
100        """
101        app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id")
102        stream_df_view_name = f"`{app_id}_{output_spec.spec_id}`"
103        DataFrameWriter._logger.info("Drop temp view if exists")
104
105        if DataFrameWriter._table_exists(stream_df_view_name):
106            # Cleaning global temp view to not maintain state and impact other acon runs
107            view_name = stream_df_view_name.strip("`")
108            ExecEnv.SESSION.sql(f"DROP VIEW global_temp.`{view_name}`")
109
110        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
111
112        if (
113            output_spec.streaming_micro_batch_transformers
114            or output_spec.streaming_micro_batch_dq_processors
115        ):
116            stream_df = (
117                df_writer.options(**output_spec.options if output_spec.options else {})
118                .format(OutputFormat.NOOP.value)
119                .foreachBatch(
120                    DataFrameWriter._write_transformed_micro_batch(
121                        output_spec, data, stream_df_view_name
122                    )
123                )
124                .start()
125            )
126        else:
127            stream_df = (
128                df_writer.options(**output_spec.options if output_spec.options else {})
129                .format(OutputFormat.NOOP.value)
130                .foreachBatch(DataFrameWriter._write_streaming_df(stream_df_view_name))
131                .start()
132            )
133
134        if output_spec.streaming_await_termination:
135            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
136
137        DataFrameWriter._logger.info("Reading stream data as df if exists")
138        if DataFrameWriter._table_exists(stream_df_view_name):
139            stream_data_as_df = ExecEnv.SESSION.table(
140                f"global_temp.{stream_df_view_name}"
141            )
142        else:
143            DataFrameWriter._logger.info(
144                f"DataFrame writer couldn't find any data to return "
145                f"for streaming, check if you are using checkpoint "
146                f"for step {output_spec.spec_id}."
147            )
148            stream_data_as_df = ExecEnv.SESSION.createDataFrame(
149                data=[], schema=StructType([])
150            )
151
152        return stream_data_as_df
153
154    @staticmethod
155    def _table_exists(  # type: ignore
156        table_name: str, db_name: str = "global_temp"
157    ) -> bool:
158        """Check if the table exists in the session catalog.
159
160        Args:
161            table_name: table/view name to check if exists in the session.
162            db_name: database name that you want to check if the table/view exists,
163                default value is the global_temp.
164
165        Returns:
166            A bool representing if the table/view exists.
167        """
168        table_name = table_name.strip("`")
169        return (
170            len(
171                ExecEnv.SESSION.sql(f"SHOW  TABLES  IN `{db_name}`")
172                .filter(f"tableName = '{table_name}'")
173                .collect()
174            )
175            > 0
176        )
177
178    @staticmethod
179    def _write_transformed_micro_batch(  # type: ignore
180        output_spec: OutputSpec, data: OrderedDict, stream_as_df_view
181    ) -> Callable:
182        """Define how to write a streaming micro batch after transforming it.
183
184        Args:
185            output_spec: output specification.
186            data: list of all dfs generated on previous steps before writer.
187            stream_as_df_view: stream df view name.
188
189        Returns:
190            A function to be executed in the foreachBatch spark write method.
191        """
192
193        def inner(batch_df: DataFrame, batch_id: int) -> None:
194            transformed_df = Writer.get_transformed_micro_batch(
195                output_spec, batch_df, batch_id, data
196            )
197
198            if output_spec.streaming_micro_batch_dq_processors:
199                transformed_df = Writer.run_micro_batch_dq_process(
200                    transformed_df, output_spec.streaming_micro_batch_dq_processors
201                )
202
203            DataFrameWriter._create_global_view(transformed_df, stream_as_df_view)
204
205        return inner

Class to write data to dataframe.

DataFrameWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
21    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
22        """Construct DataFrameWriter instances.
23
24        Args:
25            output_spec: output specification.
26            df: dataframe to be written.
27            data: list of all dfs generated on previous steps before writer.
28        """
29        super().__init__(output_spec, df, data)

Construct DataFrameWriter instances.

Arguments:
  • output_spec: output specification.
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> Optional[OrderedDict]:
31    def write(self) -> Optional[OrderedDict]:
32        """Write data to dataframe."""
33        self._output_spec.options = (
34            self._output_spec.options if self._output_spec.options else {}
35        )
36        written_dfs: OrderedDict = OrderedDict({})
37
38        if (
39            self._output_spec.streaming_processing_time
40            or self._output_spec.streaming_continuous
41        ):
42            raise NotSupportedException(
43                f"DataFrame writer doesn't support "
44                f"processing time or continuous streaming "
45                f"for step ${self._output_spec.spec_id}."
46            )
47
48        if self._df.isStreaming:
49            output_df = self._write_to_dataframe_in_streaming_mode(
50                self._df, self._output_spec, self._data
51            )
52        else:
53            output_df = self._df
54
55        written_dfs[self._output_spec.spec_id] = output_df
56
57        return written_dfs

Write data to dataframe.