lakehouse_engine.io.writer_factory

Module for writer factory.

 1"""Module for writer factory."""
 2
 3from abc import ABC
 4from typing import OrderedDict
 5
 6from pyspark.sql import DataFrame
 7
 8from lakehouse_engine.core.definitions import (
 9    FILE_OUTPUT_FORMATS,
10    OutputFormat,
11    OutputSpec,
12    WriteType,
13)
14from lakehouse_engine.io.writer import Writer
15from lakehouse_engine.io.writers.console_writer import ConsoleWriter
16from lakehouse_engine.io.writers.dataframe_writer import DataFrameWriter
17from lakehouse_engine.io.writers.delta_merge_writer import DeltaMergeWriter
18from lakehouse_engine.io.writers.file_writer import FileWriter
19from lakehouse_engine.io.writers.jdbc_writer import JDBCWriter
20from lakehouse_engine.io.writers.kafka_writer import KafkaWriter
21from lakehouse_engine.io.writers.rest_api_writer import RestApiWriter
22from lakehouse_engine.io.writers.table_writer import TableWriter
23
24
25class WriterFactory(ABC):  # noqa: B024
26    """Class for writer factory."""
27
28    AVAILABLE_WRITERS = {
29        OutputFormat.TABLE.value: TableWriter,
30        OutputFormat.DELTAFILES.value: DeltaMergeWriter,
31        OutputFormat.JDBC.value: JDBCWriter,
32        OutputFormat.FILE.value: FileWriter,
33        OutputFormat.KAFKA.value: KafkaWriter,
34        OutputFormat.CONSOLE.value: ConsoleWriter,
35        OutputFormat.DATAFRAME.value: DataFrameWriter,
36        OutputFormat.REST_API.value: RestApiWriter,
37    }
38
39    @classmethod
40    def _get_writer_name(cls, spec: OutputSpec) -> str:
41        """Get the writer name according to the output specification.
42
43        Args:
44            OutputSpec spec: output specification to write data.
45
46        Returns:
47            Writer: writer name that will be created to write the data.
48        """
49        if spec.db_table and spec.write_type != WriteType.MERGE.value:
50            writer_name = OutputFormat.TABLE.value
51        elif (
52            spec.data_format == OutputFormat.DELTAFILES.value or spec.db_table
53        ) and spec.write_type == WriteType.MERGE.value:
54            writer_name = OutputFormat.DELTAFILES.value
55        elif spec.data_format in FILE_OUTPUT_FORMATS:
56            writer_name = OutputFormat.FILE.value
57        else:
58            writer_name = spec.data_format
59        return writer_name
60
61    @classmethod
62    def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer:
63        """Get a writer according to the output specification using a factory pattern.
64
65        Args:
66            spec: output specification to write data.
67            df: dataframe to be written.
68            data: list of all dfs generated on previous steps before writer.
69
70        Returns:
71            Writer: writer that will write the data.
72        """
73        writer_name = cls._get_writer_name(spec)
74        writer = cls.AVAILABLE_WRITERS.get(writer_name)
75
76        if writer:
77            return writer(output_spec=spec, df=df, data=data)  # type: ignore
78        else:
79            raise NotImplementedError(
80                f"The requested output spec format {spec.data_format} is not supported."
81            )
class WriterFactory(abc.ABC):
26class WriterFactory(ABC):  # noqa: B024
27    """Class for writer factory."""
28
29    AVAILABLE_WRITERS = {
30        OutputFormat.TABLE.value: TableWriter,
31        OutputFormat.DELTAFILES.value: DeltaMergeWriter,
32        OutputFormat.JDBC.value: JDBCWriter,
33        OutputFormat.FILE.value: FileWriter,
34        OutputFormat.KAFKA.value: KafkaWriter,
35        OutputFormat.CONSOLE.value: ConsoleWriter,
36        OutputFormat.DATAFRAME.value: DataFrameWriter,
37        OutputFormat.REST_API.value: RestApiWriter,
38    }
39
40    @classmethod
41    def _get_writer_name(cls, spec: OutputSpec) -> str:
42        """Get the writer name according to the output specification.
43
44        Args:
45            OutputSpec spec: output specification to write data.
46
47        Returns:
48            Writer: writer name that will be created to write the data.
49        """
50        if spec.db_table and spec.write_type != WriteType.MERGE.value:
51            writer_name = OutputFormat.TABLE.value
52        elif (
53            spec.data_format == OutputFormat.DELTAFILES.value or spec.db_table
54        ) and spec.write_type == WriteType.MERGE.value:
55            writer_name = OutputFormat.DELTAFILES.value
56        elif spec.data_format in FILE_OUTPUT_FORMATS:
57            writer_name = OutputFormat.FILE.value
58        else:
59            writer_name = spec.data_format
60        return writer_name
61
62    @classmethod
63    def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer:
64        """Get a writer according to the output specification using a factory pattern.
65
66        Args:
67            spec: output specification to write data.
68            df: dataframe to be written.
69            data: list of all dfs generated on previous steps before writer.
70
71        Returns:
72            Writer: writer that will write the data.
73        """
74        writer_name = cls._get_writer_name(spec)
75        writer = cls.AVAILABLE_WRITERS.get(writer_name)
76
77        if writer:
78            return writer(output_spec=spec, df=df, data=data)  # type: ignore
79        else:
80            raise NotImplementedError(
81                f"The requested output spec format {spec.data_format} is not supported."
82            )

Class for writer factory.

@classmethod
def get_writer( cls, spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict) -> lakehouse_engine.io.writer.Writer:
62    @classmethod
63    def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer:
64        """Get a writer according to the output specification using a factory pattern.
65
66        Args:
67            spec: output specification to write data.
68            df: dataframe to be written.
69            data: list of all dfs generated on previous steps before writer.
70
71        Returns:
72            Writer: writer that will write the data.
73        """
74        writer_name = cls._get_writer_name(spec)
75        writer = cls.AVAILABLE_WRITERS.get(writer_name)
76
77        if writer:
78            return writer(output_spec=spec, df=df, data=data)  # type: ignore
79        else:
80            raise NotImplementedError(
81                f"The requested output spec format {spec.data_format} is not supported."
82            )

Get a writer according to the output specification using a factory pattern.

Arguments:
  • spec: output specification to write data.
  • df: dataframe to be written.
  • data: list of all dfs generated on previous steps before writer.
Returns:

Writer: writer that will write the data.