lakehouse_engine.io.writers.console_writer

Module to define behaviour to write to console.

  1"""Module to define behaviour to write to console."""
  2
  3from typing import Callable, OrderedDict
  4
  5from pyspark.sql import DataFrame
  6
  7from lakehouse_engine.core.definitions import OutputSpec
  8from lakehouse_engine.io.writer import Writer
  9from lakehouse_engine.utils.logging_handler import LoggingHandler
 10
 11
 12class ConsoleWriter(Writer):
 13    """Class to write data to console."""
 14
 15    _logger = LoggingHandler(__name__).get_logger()
 16
 17    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 18        """Construct ConsoleWriter instances.
 19
 20        Args:
 21            output_spec: output specification
 22            df: dataframe to be written.
 23            data: list of all dfs generated on previous steps before writer.
 24        """
 25        super().__init__(output_spec, df, data)
 26
 27    def write(self) -> None:
 28        """Write data to console."""
 29        self._output_spec.options = (
 30            self._output_spec.options if self._output_spec.options else {}
 31        )
 32        if not self._df.isStreaming:
 33            self._logger.info("Dataframe preview:")
 34            self._show_df(self._df, self._output_spec)
 35        else:
 36            self._logger.info("Stream Dataframe preview:")
 37            self._write_to_console_in_streaming_mode(
 38                self._df, self._output_spec, self._data
 39            )
 40
 41    @staticmethod
 42    def _show_df(df: DataFrame, output_spec: OutputSpec) -> None:
 43        """Given a dataframe it applies Spark's show function to show it.
 44
 45        Args:
 46            df: dataframe to be shown.
 47            output_spec: output specification.
 48        """
 49        df.show(
 50            n=output_spec.options.get("limit", 20),
 51            truncate=output_spec.options.get("truncate", True),
 52            vertical=output_spec.options.get("vertical", False),
 53        )
 54
 55    @staticmethod
 56    def _show_streaming_df(output_spec: OutputSpec) -> Callable:
 57        """Define how to show a streaming df.
 58
 59        Args:
 60            output_spec: output specification.
 61
 62        Returns:
 63            A function to show df in the foreachBatch spark write method.
 64        """
 65
 66        def inner(batch_df: DataFrame, batch_id: int) -> None:
 67            ConsoleWriter._logger.info(f"Showing DF for batch {batch_id}")
 68            ConsoleWriter._show_df(batch_df, output_spec)
 69
 70        return inner
 71
 72    @staticmethod
 73    def _write_to_console_in_streaming_mode(
 74        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 75    ) -> None:
 76        """Write to console in streaming mode.
 77
 78        Args:
 79            df: dataframe to write.
 80            output_spec: output specification.
 81            data: list of all dfs generated on previous steps before writer.
 82        """
 83        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 84
 85        if (
 86            output_spec.streaming_micro_batch_transformers
 87            or output_spec.streaming_micro_batch_dq_processors
 88        ):
 89            stream_df = df_writer.foreachBatch(
 90                ConsoleWriter._write_transformed_micro_batch(output_spec, data)
 91            ).start()
 92        else:
 93            stream_df = df_writer.foreachBatch(
 94                ConsoleWriter._show_streaming_df(output_spec)
 95            ).start()
 96
 97        if output_spec.streaming_await_termination:
 98            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
 99
100    @staticmethod
101    def _write_transformed_micro_batch(  # type: ignore
102        output_spec: OutputSpec, data: OrderedDict
103    ) -> Callable:
104        """Define how to write a streaming micro batch after transforming it.
105
106        Args:
107            output_spec: output specification.
108            data: list of all dfs generated on previous steps before writer.
109
110        Returns:
111            A function to be executed in the foreachBatch spark write method.
112        """
113
114        def inner(batch_df: DataFrame, batch_id: int) -> None:
115            transformed_df = Writer.get_transformed_micro_batch(
116                output_spec, batch_df, batch_id, data
117            )
118
119            if output_spec.streaming_micro_batch_dq_processors:
120                transformed_df = Writer.run_micro_batch_dq_process(
121                    transformed_df, output_spec.streaming_micro_batch_dq_processors
122                )
123
124            ConsoleWriter._show_df(transformed_df, output_spec)
125
126        return inner
class ConsoleWriter(lakehouse_engine.io.writer.Writer):
 13class ConsoleWriter(Writer):
 14    """Class to write data to console."""
 15
 16    _logger = LoggingHandler(__name__).get_logger()
 17
 18    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
 19        """Construct ConsoleWriter instances.
 20
 21        Args:
 22            output_spec: output specification
 23            df: dataframe to be written.
 24            data: list of all dfs generated on previous steps before writer.
 25        """
 26        super().__init__(output_spec, df, data)
 27
 28    def write(self) -> None:
 29        """Write data to console."""
 30        self._output_spec.options = (
 31            self._output_spec.options if self._output_spec.options else {}
 32        )
 33        if not self._df.isStreaming:
 34            self._logger.info("Dataframe preview:")
 35            self._show_df(self._df, self._output_spec)
 36        else:
 37            self._logger.info("Stream Dataframe preview:")
 38            self._write_to_console_in_streaming_mode(
 39                self._df, self._output_spec, self._data
 40            )
 41
 42    @staticmethod
 43    def _show_df(df: DataFrame, output_spec: OutputSpec) -> None:
 44        """Given a dataframe it applies Spark's show function to show it.
 45
 46        Args:
 47            df: dataframe to be shown.
 48            output_spec: output specification.
 49        """
 50        df.show(
 51            n=output_spec.options.get("limit", 20),
 52            truncate=output_spec.options.get("truncate", True),
 53            vertical=output_spec.options.get("vertical", False),
 54        )
 55
 56    @staticmethod
 57    def _show_streaming_df(output_spec: OutputSpec) -> Callable:
 58        """Define how to show a streaming df.
 59
 60        Args:
 61            output_spec: output specification.
 62
 63        Returns:
 64            A function to show df in the foreachBatch spark write method.
 65        """
 66
 67        def inner(batch_df: DataFrame, batch_id: int) -> None:
 68            ConsoleWriter._logger.info(f"Showing DF for batch {batch_id}")
 69            ConsoleWriter._show_df(batch_df, output_spec)
 70
 71        return inner
 72
 73    @staticmethod
 74    def _write_to_console_in_streaming_mode(
 75        df: DataFrame, output_spec: OutputSpec, data: OrderedDict
 76    ) -> None:
 77        """Write to console in streaming mode.
 78
 79        Args:
 80            df: dataframe to write.
 81            output_spec: output specification.
 82            data: list of all dfs generated on previous steps before writer.
 83        """
 84        df_writer = df.writeStream.trigger(**Writer.get_streaming_trigger(output_spec))
 85
 86        if (
 87            output_spec.streaming_micro_batch_transformers
 88            or output_spec.streaming_micro_batch_dq_processors
 89        ):
 90            stream_df = df_writer.foreachBatch(
 91                ConsoleWriter._write_transformed_micro_batch(output_spec, data)
 92            ).start()
 93        else:
 94            stream_df = df_writer.foreachBatch(
 95                ConsoleWriter._show_streaming_df(output_spec)
 96            ).start()
 97
 98        if output_spec.streaming_await_termination:
 99            stream_df.awaitTermination(output_spec.streaming_await_termination_timeout)
100
101    @staticmethod
102    def _write_transformed_micro_batch(  # type: ignore
103        output_spec: OutputSpec, data: OrderedDict
104    ) -> Callable:
105        """Define how to write a streaming micro batch after transforming it.
106
107        Args:
108            output_spec: output specification.
109            data: list of all dfs generated on previous steps before writer.
110
111        Returns:
112            A function to be executed in the foreachBatch spark write method.
113        """
114
115        def inner(batch_df: DataFrame, batch_id: int) -> None:
116            transformed_df = Writer.get_transformed_micro_batch(
117                output_spec, batch_df, batch_id, data
118            )
119
120            if output_spec.streaming_micro_batch_dq_processors:
121                transformed_df = Writer.run_micro_batch_dq_process(
122                    transformed_df, output_spec.streaming_micro_batch_dq_processors
123                )
124
125            ConsoleWriter._show_df(transformed_df, output_spec)
126
127        return inner

Class to write data to console.

ConsoleWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
18    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
19        """Construct ConsoleWriter instances.
20
21        Args:
22            output_spec: output specification
23            df: dataframe to be written.
24            data: list of all dfs generated on previous steps before writer.
25        """
26        super().__init__(output_spec, df, data)

Construct ConsoleWriter instances.

Arguments:
  • output_spec: output specification
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
28    def write(self) -> None:
29        """Write data to console."""
30        self._output_spec.options = (
31            self._output_spec.options if self._output_spec.options else {}
32        )
33        if not self._df.isStreaming:
34            self._logger.info("Dataframe preview:")
35            self._show_df(self._df, self._output_spec)
36        else:
37            self._logger.info("Stream Dataframe preview:")
38            self._write_to_console_in_streaming_mode(
39                self._df, self._output_spec, self._data
40            )

Write data to console.