lakehouse_engine.io.writers.console_writer
Module to define behaviour to write to console.
1"""Module to define behaviour to write to console.""" 2 3from typing import Callable, OrderedDict 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.core.definitions import OutputSpec 8from lakehouse_engine.io.writer import Writer 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class ConsoleWriter(Writer): 13 """Class to write data to console.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 18 """Construct ConsoleWriter instances. 19 20 Args: 21 output_spec: output specification 22 df: dataframe to be written. 23 data: list of all dfs generated on previous steps before writer. 24 """ 25 super().__init__(output_spec, df, data) 26 27 def write(self) -> None: 28 """Write data to console.""" 29 self._output_spec.options = ( 30 self._output_spec.options if self._output_spec.options else {} 31 ) 32 if not self._df.isStreaming: 33 self._logger.info("Dataframe preview:") 34 self._show_df(self._df, self._output_spec) 35 else: 36 self._logger.info("Stream Dataframe preview:") 37 self._write_to_console_in_streaming_mode( 38 self._df, self._output_spec, self._data 39 ) 40 41 @staticmethod 42 def _show_df(df: DataFrame, output_spec: OutputSpec) -> None: 43 """Given a dataframe it applies Spark's show function to show it. 44 45 Args: 46 df: dataframe to be shown. 47 output_spec: output specification. 48 """ 49 df.show( 50 n=output_spec.options.get("limit", 20), 51 truncate=output_spec.options.get("truncate", True), 52 vertical=output_spec.options.get("vertical", False), 53 ) 54 55 @staticmethod 56 def _show_streaming_df(output_spec: OutputSpec) -> Callable: 57 """Define how to show a streaming df. 58 59 Args: 60 output_spec: output specification. 61 62 Returns: 63 A function to show df in the foreachBatch spark write method. 64 """ 65 66 def inner(batch_df: DataFrame, batch_id: int) -> None: 67 ConsoleWriter._logger.info(f"Showing DF for batch {batch_id}") 68 ConsoleWriter._show_df(batch_df, output_spec) 69 70 return inner 71 72 @staticmethod 73 def _write_to_console_in_streaming_mode( 74 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 75 ) -> None: 76 """Write to console in streaming mode. 77 78 Args: 79 df: dataframe to write. 80 output_spec: output specification. 81 data: list of all dfs generated on previous steps before writer. 82 """ 83 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 84 85 if ( 86 output_spec.streaming_micro_batch_transformers 87 or output_spec.streaming_micro_batch_dq_processors 88 ): 89 stream_df = df_writer.foreachBatch( 90 ConsoleWriter._write_transformed_micro_batch(output_spec, data) 91 ).start() 92 else: 93 stream_df = df_writer.foreachBatch( 94 ConsoleWriter._show_streaming_df(output_spec) 95 ).start() 96 97 if output_spec.streaming_await_termination: 98 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 99 100 @staticmethod 101 def _write_transformed_micro_batch( # type: ignore 102 output_spec: OutputSpec, data: OrderedDict 103 ) -> Callable: 104 """Define how to write a streaming micro batch after transforming it. 105 106 Args: 107 output_spec: output specification. 108 data: list of all dfs generated on previous steps before writer. 109 110 Returns: 111 A function to be executed in the foreachBatch spark write method. 112 """ 113 114 def inner(batch_df: DataFrame, batch_id: int) -> None: 115 transformed_df = Writer.get_transformed_micro_batch( 116 output_spec, batch_df, batch_id, data 117 ) 118 119 if output_spec.streaming_micro_batch_dq_processors: 120 transformed_df = Writer.run_micro_batch_dq_process( 121 transformed_df, output_spec.streaming_micro_batch_dq_processors 122 ) 123 124 ConsoleWriter._show_df(transformed_df, output_spec) 125 126 return inner
13class ConsoleWriter(Writer): 14 """Class to write data to console.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 19 """Construct ConsoleWriter instances. 20 21 Args: 22 output_spec: output specification 23 df: dataframe to be written. 24 data: list of all dfs generated on previous steps before writer. 25 """ 26 super().__init__(output_spec, df, data) 27 28 def write(self) -> None: 29 """Write data to console.""" 30 self._output_spec.options = ( 31 self._output_spec.options if self._output_spec.options else {} 32 ) 33 if not self._df.isStreaming: 34 self._logger.info("Dataframe preview:") 35 self._show_df(self._df, self._output_spec) 36 else: 37 self._logger.info("Stream Dataframe preview:") 38 self._write_to_console_in_streaming_mode( 39 self._df, self._output_spec, self._data 40 ) 41 42 @staticmethod 43 def _show_df(df: DataFrame, output_spec: OutputSpec) -> None: 44 """Given a dataframe it applies Spark's show function to show it. 45 46 Args: 47 df: dataframe to be shown. 48 output_spec: output specification. 49 """ 50 df.show( 51 n=output_spec.options.get("limit", 20), 52 truncate=output_spec.options.get("truncate", True), 53 vertical=output_spec.options.get("vertical", False), 54 ) 55 56 @staticmethod 57 def _show_streaming_df(output_spec: OutputSpec) -> Callable: 58 """Define how to show a streaming df. 59 60 Args: 61 output_spec: output specification. 62 63 Returns: 64 A function to show df in the foreachBatch spark write method. 65 """ 66 67 def inner(batch_df: DataFrame, batch_id: int) -> None: 68 ConsoleWriter._logger.info(f"Showing DF for batch {batch_id}") 69 ConsoleWriter._show_df(batch_df, output_spec) 70 71 return inner 72 73 @staticmethod 74 def _write_to_console_in_streaming_mode( 75 df: DataFrame, output_spec: OutputSpec, data: OrderedDict 76 ) -> None: 77 """Write to console in streaming mode. 78 79 Args: 80 df: dataframe to write. 81 output_spec: output specification. 82 data: list of all dfs generated on previous steps before writer. 83 """ 84 df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec)) 85 86 if ( 87 output_spec.streaming_micro_batch_transformers 88 or output_spec.streaming_micro_batch_dq_processors 89 ): 90 stream_df = df_writer.foreachBatch( 91 ConsoleWriter._write_transformed_micro_batch(output_spec, data) 92 ).start() 93 else: 94 stream_df = df_writer.foreachBatch( 95 ConsoleWriter._show_streaming_df(output_spec) 96 ).start() 97 98 if output_spec.streaming_await_termination: 99 stream_df.awaitTermination(output_spec.streaming_await_termination_timeout) 100 101 @staticmethod 102 def _write_transformed_micro_batch( # type: ignore 103 output_spec: OutputSpec, data: OrderedDict 104 ) -> Callable: 105 """Define how to write a streaming micro batch after transforming it. 106 107 Args: 108 output_spec: output specification. 109 data: list of all dfs generated on previous steps before writer. 110 111 Returns: 112 A function to be executed in the foreachBatch spark write method. 113 """ 114 115 def inner(batch_df: DataFrame, batch_id: int) -> None: 116 transformed_df = Writer.get_transformed_micro_batch( 117 output_spec, batch_df, batch_id, data 118 ) 119 120 if output_spec.streaming_micro_batch_dq_processors: 121 transformed_df = Writer.run_micro_batch_dq_process( 122 transformed_df, output_spec.streaming_micro_batch_dq_processors 123 ) 124 125 ConsoleWriter._show_df(transformed_df, output_spec) 126 127 return inner
Class to write data to console.
ConsoleWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
18 def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict): 19 """Construct ConsoleWriter instances. 20 21 Args: 22 output_spec: output specification 23 df: dataframe to be written. 24 data: list of all dfs generated on previous steps before writer. 25 """ 26 super().__init__(output_spec, df, data)
Construct ConsoleWriter instances.
Arguments:
- output_spec: output specification
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
def
write(self) -> None:
28 def write(self) -> None: 29 """Write data to console.""" 30 self._output_spec.options = ( 31 self._output_spec.options if self._output_spec.options else {} 32 ) 33 if not self._df.isStreaming: 34 self._logger.info("Dataframe preview:") 35 self._show_df(self._df, self._output_spec) 36 else: 37 self._logger.info("Stream Dataframe preview:") 38 self._write_to_console_in_streaming_mode( 39 self._df, self._output_spec, self._data 40 )
Write data to console.