lakehouse_engine.io.writers.dataframe_writer
Module to define behaviour to write to dataframe.
1"""Module to define behaviour to write to dataframe.""" 2 3from typing import Callable, Optional, OrderedDict 4 5from pyspark.sql import DataFrame 6from pyspark.sql.types import StructType 7 8from lakehouse_engine.core.definitions import OutputFormat, OutputSpec 9from lakehouse_engine.core.exec_env import ExecEnv 10from lakehouse_engine.io.exceptions import NotSupportedException 11from lakehouse_engine.io.writer import Writer 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13 14 15class DataFrameWriter(Writer): 16 """Class to write data to dataframe.""" 17 18 _logger = LoggingHandler(__name__).get_logger() 19 20 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 21 """Construct DataFrameWriter instances. 22 23 Args: 24 output_spec: output specification. 25 df: dataframe to be written. 26 data: list of all dfs generated on previous steps before writer. 27 """ 28 super().__init__(output_spec, df, data) 29 30 def write(self) -> Optional[OrderedDict]: 31 """Write data to dataframe.""" 32 self._output_spec.options = ( 33 self._output_spec.options if self._output_spec.options else {} 34 ) 35 written_dfs: OrderedDict = OrderedDict({}) 36 37 if ( 38 self._output_spec.streaming_processing_time 39 or self._output_spec.streaming_continuous 40 ): 41 raise NotSupportedException( 42 f"DataFrame writer doesn't support " 43 f"processing time or continuous streaming " 44 f"for step ${self._output_spec.spec_id}." 45 ) 46 47 if self._df.isStreaming: 48 output_df = self._write_to_dataframe_in_streaming_mode( 49 self._df, self._output_spec, self._data 50 ) 51 else: 52 output_df = self._df 53 54 written_dfs[self._output_spec.spec_id] = output_df 55 56 return written_dfs 57 58 @staticmethod 59 def _create_global_view(df: DataFrame, stream_df_view_name: str) -> None: 60 """Given a dataframe create a global temp view to be available for consumption. 61 62 Args: 63 df: dataframe to be shown. 64 stream_df_view_name: stream df view name. 65 """ 66 if DataFrameWriter._table_exists(stream_df_view_name): 67 DataFrameWriter._logger.info("Global temp view exists") 68 existing_data = ExecEnv.SESSION.table(f"global_temp.{stream_df_view_name}") 69 df = existing_data.union(df) 70 71 df.createOrReplaceGlobalTempView(f"{stream_df_view_name}") 72 73 @staticmethod 74 def _write_streaming_df(stream_df_view_name: str) -> Callable: 75 """Define how to create a df from streaming df. 76 77 Args: 78 stream_df_view_name: stream df view name. 79 80 Returns: 81 A function to show df in the foreachBatch spark write method. 82 """ 83 84 def inner(batch_df: DataFrame, batch_id: int) -> None: 85 DataFrameWriter._create_global_view(batch_df, stream_df_view_name) 86 87 return inner 88 89 @staticmethod 90 def _write_to_dataframe_in_streaming_mode( 91 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 92 ) -> DataFrame: 93 """Write to DataFrame in streaming mode. 94 95 Args: 96 df: dataframe to write. 97 output_spec: output specification. 98 data: list of all dfs generated on previous steps before writer. 99 """ 100 app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id") 101 stream_df_view_name = f"`{app_id}_{output_spec.spec_id}`" 102 DataFrameWriter._logger.info("Drop temp view if exists") 103 104 if DataFrameWriter._table_exists(stream_df_view_name): 105 # Cleaning global temp view to not maintain state and impact other acon runs 106 view_name = stream_df_view_name.strip("`") 107 ExecEnv.SESSION.sql(f"DROP VIEW global_temp.`{view_name}`") 108 109 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 110 111 if ( 112 output_spec.streaming_micro_batch_transformers 113 or output_spec.streaming_micro_batch_dq_processors 114 ): 115 stream_df = ( 116 df_writer.options(**output_spec.options if output_spec.options else {}) 117 .format(OutputFormat.NOOP.value) 118 .foreachBatch( 119 DataFrameWriter._write_transformed_micro_batch( 120 output_spec, data, stream_df_view_name 121 ) 122 ) 123 .start() 124 ) 125 else: 126 stream_df = ( 127 df_writer.options(**output_spec.options if output_spec.options else {}) 128 .format(OutputFormat.NOOP.value) 129 .foreachBatch(DataFrameWriter._write_streaming_df(stream_df_view_name)) 130 .start() 131 ) 132 133 if output_spec.streaming_await_termination: 134 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 135 136 DataFrameWriter._logger.info("Reading stream data as df if exists") 137 if DataFrameWriter._table_exists(stream_df_view_name): 138 stream_data_as_df = ExecEnv.SESSION.table( 139 f"global_temp.{stream_df_view_name}" 140 ) 141 else: 142 DataFrameWriter._logger.info( 143 f"DataFrame writer couldn't find any data to return " 144 f"for streaming, check if you are using checkpoint " 145 f"for step {output_spec.spec_id}." 146 ) 147 stream_data_as_df = ExecEnv.SESSION.createDataFrame( 148 data=[], schema=StructType([]) 149 ) 150 151 return stream_data_as_df 152 153 @staticmethod 154 def _table_exists( # type: ignore 155 table_name: str, db_name: str = "global_temp" 156 ) -> bool: 157 """Check if the table exists in the session catalog. 158 159 Args: 160 table_name: table/view name to check if exists in the session. 161 db_name: database name that you want to check if the table/view exists, 162 default value is the global_temp. 163 164 Returns: 165 A bool representing if the table/view exists. 166 """ 167 table_name = table_name.strip("`") 168 return ( 169 len( 170 ExecEnv.SESSION.sql(f"SHOW TABLES IN `{db_name}`") 171 .filter(f"tableName = '{table_name}'") 172 .collect() 173 ) 174 > 0 175 ) 176 177 @staticmethod 178 def _write_transformed_micro_batch( # type: ignore 179 output_spec: OutputSpec, data: OrderedDict, stream_as_df_view 180 ) -> Callable: 181 """Define how to write a streaming micro batch after transforming it. 182 183 Args: 184 output_spec: output specification. 185 data: list of all dfs generated on previous steps before writer. 186 stream_as_df_view: stream df view name. 187 188 Returns: 189 A function to be executed in the foreachBatch spark write method. 190 """ 191 192 def inner(batch_df: DataFrame, batch_id: int) -> None: 193 transformed_df = Writer.get_transformed_micro_batch( 194 output_spec, batch_df, batch_id, data 195 ) 196 197 if output_spec.streaming_micro_batch_dq_processors: 198 transformed_df = Writer.run_micro_batch_dq_process( 199 transformed_df, output_spec.streaming_micro_batch_dq_processors 200 ) 201 202 DataFrameWriter._create_global_view(transformed_df, stream_as_df_view) 203 204 return inner
16class DataFrameWriter(Writer): 17 """Class to write data to dataframe.""" 18 19 _logger = LoggingHandler(__name__).get_logger() 20 21 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 22 """Construct DataFrameWriter instances. 23 24 Args: 25 output_spec: output specification. 26 df: dataframe to be written. 27 data: list of all dfs generated on previous steps before writer. 28 """ 29 super().__init__(output_spec, df, data) 30 31 def write(self) -> Optional[OrderedDict]: 32 """Write data to dataframe.""" 33 self._output_spec.options = ( 34 self._output_spec.options if self._output_spec.options else {} 35 ) 36 written_dfs: OrderedDict = OrderedDict({}) 37 38 if ( 39 self._output_spec.streaming_processing_time 40 or self._output_spec.streaming_continuous 41 ): 42 raise NotSupportedException( 43 f"DataFrame writer doesn't support " 44 f"processing time or continuous streaming " 45 f"for step ${self._output_spec.spec_id}." 46 ) 47 48 if self._df.isStreaming: 49 output_df = self._write_to_dataframe_in_streaming_mode( 50 self._df, self._output_spec, self._data 51 ) 52 else: 53 output_df = self._df 54 55 written_dfs[self._output_spec.spec_id] = output_df 56 57 return written_dfs 58 59 @staticmethod 60 def _create_global_view(df: DataFrame, stream_df_view_name: str) -> None: 61 """Given a dataframe create a global temp view to be available for consumption. 62 63 Args: 64 df: dataframe to be shown. 65 stream_df_view_name: stream df view name. 66 """ 67 if DataFrameWriter._table_exists(stream_df_view_name): 68 DataFrameWriter._logger.info("Global temp view exists") 69 existing_data = ExecEnv.SESSION.table(f"global_temp.{stream_df_view_name}") 70 df = existing_data.union(df) 71 72 df.createOrReplaceGlobalTempView(f"{stream_df_view_name}") 73 74 @staticmethod 75 def _write_streaming_df(stream_df_view_name: str) -> Callable: 76 """Define how to create a df from streaming df. 77 78 Args: 79 stream_df_view_name: stream df view name. 80 81 Returns: 82 A function to show df in the foreachBatch spark write method. 83 """ 84 85 def inner(batch_df: DataFrame, batch_id: int) -> None: 86 DataFrameWriter._create_global_view(batch_df, stream_df_view_name) 87 88 return inner 89 90 @staticmethod 91 def _write_to_dataframe_in_streaming_mode( 92 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 93 ) -> DataFrame: 94 """Write to DataFrame in streaming mode. 95 96 Args: 97 df: dataframe to write. 98 output_spec: output specification. 99 data: list of all dfs generated on previous steps before writer. 100 """ 101 app_id = ExecEnv.SESSION.getActiveSession().conf.get("spark.app.id") 102 stream_df_view_name = f"`{app_id}_{output_spec.spec_id}`" 103 DataFrameWriter._logger.info("Drop temp view if exists") 104 105 if DataFrameWriter._table_exists(stream_df_view_name): 106 # Cleaning global temp view to not maintain state and impact other acon runs 107 view_name = stream_df_view_name.strip("`") 108 ExecEnv.SESSION.sql(f"DROP VIEW global_temp.`{view_name}`") 109 110 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 111 112 if ( 113 output_spec.streaming_micro_batch_transformers 114 or output_spec.streaming_micro_batch_dq_processors 115 ): 116 stream_df = ( 117 df_writer.options(**output_spec.options if output_spec.options else {}) 118 .format(OutputFormat.NOOP.value) 119 .foreachBatch( 120 DataFrameWriter._write_transformed_micro_batch( 121 output_spec, data, stream_df_view_name 122 ) 123 ) 124 .start() 125 ) 126 else: 127 stream_df = ( 128 df_writer.options(**output_spec.options if output_spec.options else {}) 129 .format(OutputFormat.NOOP.value) 130 .foreachBatch(DataFrameWriter._write_streaming_df(stream_df_view_name)) 131 .start() 132 ) 133 134 if output_spec.streaming_await_termination: 135 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 136 137 DataFrameWriter._logger.info("Reading stream data as df if exists") 138 if DataFrameWriter._table_exists(stream_df_view_name): 139 stream_data_as_df = ExecEnv.SESSION.table( 140 f"global_temp.{stream_df_view_name}" 141 ) 142 else: 143 DataFrameWriter._logger.info( 144 f"DataFrame writer couldn't find any data to return " 145 f"for streaming, check if you are using checkpoint " 146 f"for step {output_spec.spec_id}." 147 ) 148 stream_data_as_df = ExecEnv.SESSION.createDataFrame( 149 data=[], schema=StructType([]) 150 ) 151 152 return stream_data_as_df 153 154 @staticmethod 155 def _table_exists( # type: ignore 156 table_name: str, db_name: str = "global_temp" 157 ) -> bool: 158 """Check if the table exists in the session catalog. 159 160 Args: 161 table_name: table/view name to check if exists in the session. 162 db_name: database name that you want to check if the table/view exists, 163 default value is the global_temp. 164 165 Returns: 166 A bool representing if the table/view exists. 167 """ 168 table_name = table_name.strip("`") 169 return ( 170 len( 171 ExecEnv.SESSION.sql(f"SHOW TABLES IN `{db_name}`") 172 .filter(f"tableName = '{table_name}'") 173 .collect() 174 ) 175 > 0 176 ) 177 178 @staticmethod 179 def _write_transformed_micro_batch( # type: ignore 180 output_spec: OutputSpec, data: OrderedDict, stream_as_df_view 181 ) -> Callable: 182 """Define how to write a streaming micro batch after transforming it. 183 184 Args: 185 output_spec: output specification. 186 data: list of all dfs generated on previous steps before writer. 187 stream_as_df_view: stream df view name. 188 189 Returns: 190 A function to be executed in the foreachBatch spark write method. 191 """ 192 193 def inner(batch_df: DataFrame, batch_id: int) -> None: 194 transformed_df = Writer.get_transformed_micro_batch( 195 output_spec, batch_df, batch_id, data 196 ) 197 198 if output_spec.streaming_micro_batch_dq_processors: 199 transformed_df = Writer.run_micro_batch_dq_process( 200 transformed_df, output_spec.streaming_micro_batch_dq_processors 201 ) 202 203 DataFrameWriter._create_global_view(transformed_df, stream_as_df_view) 204 205 return inner
Class to write data to dataframe.
DataFrameWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
21 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 22 """Construct DataFrameWriter instances. 23 24 Args: 25 output_spec: output specification. 26 df: dataframe to be written. 27 data: list of all dfs generated on previous steps before writer. 28 """ 29 super().__init__(output_spec, df, data)
Construct DataFrameWriter instances.
Arguments:
- output_spec: output specification.
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> Optional[OrderedDict]:
31 def write(self) -> Optional[OrderedDict]: 32 """Write data to dataframe.""" 33 self._output_spec.options = ( 34 self._output_spec.options if self._output_spec.options else {} 35 ) 36 written_dfs: OrderedDict = OrderedDict({}) 37 38 if ( 39 self._output_spec.streaming_processing_time 40 or self._output_spec.streaming_continuous 41 ): 42 raise NotSupportedException( 43 f"DataFrame writer doesn't support " 44 f"processing time or continuous streaming " 45 f"for step ${self._output_spec.spec_id}." 46 ) 47 48 if self._df.isStreaming: 49 output_df = self._write_to_dataframe_in_streaming_mode( 50 self._df, self._output_spec, self._data 51 ) 52 else: 53 output_df = self._df 54 55 written_dfs[self._output_spec.spec_id] = output_df 56 57 return written_dfs
Write data to dataframe.