lakehouse_engine.io.writers.jdbc_writer

Module that defines the behaviour to write to JDBC targets.

 1"""Module that defines the behaviour to write to JDBC targets."""
 2
 3from typing import Callable, OrderedDict
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.core.definitions import OutputSpec
 8from lakehouse_engine.io.writer import Writer
 9
10
11class JDBCWriter(Writer):
12    """Class to write to JDBC targets."""
13
14    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
15        """Construct JDBCWriter instances.
16
17        Args:
18            output_spec: output specification.
19            df: dataframe to be writen.
20            data: list of all dfs generated on previous steps before writer.
21        """
22        super().__init__(output_spec, df, data)
23
24    def write(self) -> None:
25        """Write data into JDBC target."""
26        if not self._df.isStreaming:
27            self._write_to_jdbc_in_batch_mode(self._df, self._output_spec)
28        else:
29            stream_df = (
30                self._df.writeStream.trigger(
31                    **Writer.get_streaming_trigger(self._output_spec)
32                )
33                .options(
34                    **self._output_spec.options if self._output_spec.options else {}
35                )
36                .foreachBatch(
37                    self._write_transformed_micro_batch(self._output_spec, self._data)
38                )
39                .start()
40            )
41
42            if self._output_spec.streaming_await_termination:
43                stream_df.awaitTermination(
44                    self._output_spec.streaming_await_termination_timeout
45                )
46
47    @staticmethod
48    def _write_to_jdbc_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
49        """Write to jdbc in batch mode.
50
51        Args:
52            df: dataframe to write.
53            output_spec: output specification.
54        """
55        df.write.format(output_spec.data_format).partitionBy(
56            output_spec.partitions
57        ).options(**output_spec.options if output_spec.options else {}).mode(
58            output_spec.write_type
59        ).save(
60            output_spec.location
61        )
62
63    @staticmethod
64    def _write_transformed_micro_batch(  # type: ignore
65        output_spec: OutputSpec, data: OrderedDict
66    ) -> Callable:
67        """Define how to write a streaming micro batch after transforming it.
68
69        Args:
70            output_spec: output specification.
71            data: list of all dfs generated on previous steps before writer.
72
73        Returns:
74            A function to be executed in the foreachBatch spark write method.
75        """
76
77        def inner(batch_df: DataFrame, batch_id: int) -> None:
78            transformed_df = Writer.get_transformed_micro_batch(
79                output_spec, batch_df, batch_id, data
80            )
81
82            if output_spec.streaming_micro_batch_dq_processors:
83                transformed_df = Writer.run_micro_batch_dq_process(
84                    transformed_df, output_spec.streaming_micro_batch_dq_processors
85                )
86
87            JDBCWriter._write_to_jdbc_in_batch_mode(transformed_df, output_spec)
88
89        return inner
class JDBCWriter(lakehouse_engine.io.writer.Writer):
12class JDBCWriter(Writer):
13    """Class to write to JDBC targets."""
14
15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
16        """Construct JDBCWriter instances.
17
18        Args:
19            output_spec: output specification.
20            df: dataframe to be writen.
21            data: list of all dfs generated on previous steps before writer.
22        """
23        super().__init__(output_spec, df, data)
24
25    def write(self) -> None:
26        """Write data into JDBC target."""
27        if not self._df.isStreaming:
28            self._write_to_jdbc_in_batch_mode(self._df, self._output_spec)
29        else:
30            stream_df = (
31                self._df.writeStream.trigger(
32                    **Writer.get_streaming_trigger(self._output_spec)
33                )
34                .options(
35                    **self._output_spec.options if self._output_spec.options else {}
36                )
37                .foreachBatch(
38                    self._write_transformed_micro_batch(self._output_spec, self._data)
39                )
40                .start()
41            )
42
43            if self._output_spec.streaming_await_termination:
44                stream_df.awaitTermination(
45                    self._output_spec.streaming_await_termination_timeout
46                )
47
48    @staticmethod
49    def _write_to_jdbc_in_batch_mode(df: DataFrame, output_spec: OutputSpec) -> None:
50        """Write to jdbc in batch mode.
51
52        Args:
53            df: dataframe to write.
54            output_spec: output specification.
55        """
56        df.write.format(output_spec.data_format).partitionBy(
57            output_spec.partitions
58        ).options(**output_spec.options if output_spec.options else {}).mode(
59            output_spec.write_type
60        ).save(
61            output_spec.location
62        )
63
64    @staticmethod
65    def _write_transformed_micro_batch(  # type: ignore
66        output_spec: OutputSpec, data: OrderedDict
67    ) -> Callable:
68        """Define how to write a streaming micro batch after transforming it.
69
70        Args:
71            output_spec: output specification.
72            data: list of all dfs generated on previous steps before writer.
73
74        Returns:
75            A function to be executed in the foreachBatch spark write method.
76        """
77
78        def inner(batch_df: DataFrame, batch_id: int) -> None:
79            transformed_df = Writer.get_transformed_micro_batch(
80                output_spec, batch_df, batch_id, data
81            )
82
83            if output_spec.streaming_micro_batch_dq_processors:
84                transformed_df = Writer.run_micro_batch_dq_process(
85                    transformed_df, output_spec.streaming_micro_batch_dq_processors
86                )
87
88            JDBCWriter._write_to_jdbc_in_batch_mode(transformed_df, output_spec)
89
90        return inner

Class to write to JDBC targets.

JDBCWriter( output_spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict)
15    def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
16        """Construct JDBCWriter instances.
17
18        Args:
19            output_spec: output specification.
20            df: dataframe to be writen.
21            data: list of all dfs generated on previous steps before writer.
22        """
23        super().__init__(output_spec, df, data)

Construct JDBCWriter instances.

Arguments:
  • output_spec: output specification.
  • df: dataframe to be writen.
  • data: list of all dfs generated on previous steps before writer.
def write(self) -> None:
25    def write(self) -> None:
26        """Write data into JDBC target."""
27        if not self._df.isStreaming:
28            self._write_to_jdbc_in_batch_mode(self._df, self._output_spec)
29        else:
30            stream_df = (
31                self._df.writeStream.trigger(
32                    **Writer.get_streaming_trigger(self._output_spec)
33                )
34                .options(
35                    **self._output_spec.options if self._output_spec.options else {}
36                )
37                .foreachBatch(
38                    self._write_transformed_micro_batch(self._output_spec, self._data)
39                )
40                .start()
41            )
42
43            if self._output_spec.streaming_await_termination:
44                stream_df.awaitTermination(
45                    self._output_spec.streaming_await_termination_timeout
46                )

Write data into JDBC target.