lakehouse_engine.io.writer_factory
Module for writer factory.
1"""Module for writer factory.""" 2 3from abc import ABC 4from typing import OrderedDict 5 6from pyspark.sql import DataFrame 7 8from lakehouse_engine.core.definitions import ( 9 FILE_OUTPUT_FORMATS, 10 OutputFormat, 11 OutputSpec, 12 WriteType, 13) 14from lakehouse_engine.io.writer import Writer 15from lakehouse_engine.io.writers.console_writer import ConsoleWriter 16from lakehouse_engine.io.writers.dataframe_writer import DataFrameWriter 17from lakehouse_engine.io.writers.delta_merge_writer import DeltaMergeWriter 18from lakehouse_engine.io.writers.file_writer import FileWriter 19from lakehouse_engine.io.writers.jdbc_writer import JDBCWriter 20from lakehouse_engine.io.writers.kafka_writer import KafkaWriter 21from lakehouse_engine.io.writers.rest_api_writer import RestApiWriter 22from lakehouse_engine.io.writers.table_writer import TableWriter 23 24 25class WriterFactory(ABC): # noqa: B024 26 """Class for writer factory.""" 27 28 AVAILABLE_WRITERS = { 29 OutputFormat.TABLE.value: TableWriter, 30 OutputFormat.DELTAFILES.value: DeltaMergeWriter, 31 OutputFormat.JDBC.value: JDBCWriter, 32 OutputFormat.FILE.value: FileWriter, 33 OutputFormat.KAFKA.value: KafkaWriter, 34 OutputFormat.CONSOLE.value: ConsoleWriter, 35 OutputFormat.DATAFRAME.value: DataFrameWriter, 36 OutputFormat.REST_API.value: RestApiWriter, 37 } 38 39 @classmethod 40 def _get_writer_name(cls, spec: OutputSpec) -> str: 41 """Get the writer name according to the output specification. 42 43 Args: 44 OutputSpec spec: output specification to write data. 45 46 Returns: 47 Writer: writer name that will be created to write the data. 48 """ 49 if spec.db_table and spec.write_type != WriteType.MERGE.value: 50 writer_name = OutputFormat.TABLE.value 51 elif ( 52 spec.data_format == OutputFormat.DELTAFILES.value or spec.db_table 53 ) and spec.write_type == WriteType.MERGE.value: 54 writer_name = OutputFormat.DELTAFILES.value 55 elif spec.data_format in FILE_OUTPUT_FORMATS: 56 writer_name = OutputFormat.FILE.value 57 else: 58 writer_name = spec.data_format 59 return writer_name 60 61 @classmethod 62 def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer: 63 """Get a writer according to the output specification using a factory pattern. 64 65 Args: 66 spec: output specification to write data. 67 df: dataframe to be written. 68 data: list of all dfs generated on previous steps before writer. 69 70 Returns: 71 Writer: writer that will write the data. 72 """ 73 writer_name = cls._get_writer_name(spec) 74 writer = cls.AVAILABLE_WRITERS.get(writer_name) 75 76 if writer: 77 return writer(output_spec=spec, df=df, data=data) # type: ignore 78 else: 79 raise NotImplementedError( 80 f"The requested output spec format {spec.data_format} is not supported." 81 )
class
WriterFactory(abc.ABC):
26class WriterFactory(ABC): # noqa: B024 27 """Class for writer factory.""" 28 29 AVAILABLE_WRITERS = { 30 OutputFormat.TABLE.value: TableWriter, 31 OutputFormat.DELTAFILES.value: DeltaMergeWriter, 32 OutputFormat.JDBC.value: JDBCWriter, 33 OutputFormat.FILE.value: FileWriter, 34 OutputFormat.KAFKA.value: KafkaWriter, 35 OutputFormat.CONSOLE.value: ConsoleWriter, 36 OutputFormat.DATAFRAME.value: DataFrameWriter, 37 OutputFormat.REST_API.value: RestApiWriter, 38 } 39 40 @classmethod 41 def _get_writer_name(cls, spec: OutputSpec) -> str: 42 """Get the writer name according to the output specification. 43 44 Args: 45 OutputSpec spec: output specification to write data. 46 47 Returns: 48 Writer: writer name that will be created to write the data. 49 """ 50 if spec.db_table and spec.write_type != WriteType.MERGE.value: 51 writer_name = OutputFormat.TABLE.value 52 elif ( 53 spec.data_format == OutputFormat.DELTAFILES.value or spec.db_table 54 ) and spec.write_type == WriteType.MERGE.value: 55 writer_name = OutputFormat.DELTAFILES.value 56 elif spec.data_format in FILE_OUTPUT_FORMATS: 57 writer_name = OutputFormat.FILE.value 58 else: 59 writer_name = spec.data_format 60 return writer_name 61 62 @classmethod 63 def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer: 64 """Get a writer according to the output specification using a factory pattern. 65 66 Args: 67 spec: output specification to write data. 68 df: dataframe to be written. 69 data: list of all dfs generated on previous steps before writer. 70 71 Returns: 72 Writer: writer that will write the data. 73 """ 74 writer_name = cls._get_writer_name(spec) 75 writer = cls.AVAILABLE_WRITERS.get(writer_name) 76 77 if writer: 78 return writer(output_spec=spec, df=df, data=data) # type: ignore 79 else: 80 raise NotImplementedError( 81 f"The requested output spec format {spec.data_format} is not supported." 82 )
Class for writer factory.
AVAILABLE_WRITERS =
{'table': <class 'lakehouse_engine.io.writers.table_writer.TableWriter'>, 'delta': <class 'lakehouse_engine.io.writers.delta_merge_writer.DeltaMergeWriter'>, 'jdbc': <class 'lakehouse_engine.io.writers.jdbc_writer.JDBCWriter'>, 'file': <class 'lakehouse_engine.io.writers.file_writer.FileWriter'>, 'kafka': <class 'lakehouse_engine.io.writers.kafka_writer.KafkaWriter'>, 'console': <class 'lakehouse_engine.io.writers.console_writer.ConsoleWriter'>, 'dataframe': <class 'lakehouse_engine.io.writers.dataframe_writer.DataFrameWriter'>, 'rest_api': <class 'lakehouse_engine.io.writers.rest_api_writer.RestApiWriter'>}
@classmethod
def
get_writer( cls, spec: lakehouse_engine.core.definitions.OutputSpec, df: pyspark.sql.dataframe.DataFrame, data: OrderedDict) -> lakehouse_engine.io.writer.Writer:
62 @classmethod 63 def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer: 64 """Get a writer according to the output specification using a factory pattern. 65 66 Args: 67 spec: output specification to write data. 68 df: dataframe to be written. 69 data: list of all dfs generated on previous steps before writer. 70 71 Returns: 72 Writer: writer that will write the data. 73 """ 74 writer_name = cls._get_writer_name(spec) 75 writer = cls.AVAILABLE_WRITERS.get(writer_name) 76 77 if writer: 78 return writer(output_spec=spec, df=df, data=data) # type: ignore 79 else: 80 raise NotImplementedError( 81 f"The requested output spec format {spec.data_format} is not supported." 82 )
Get a writer according to the output specification using a factory pattern.
Arguments:
- spec: output specification to write data.
- df: dataframe to be written.
- data: list of all dfs generated on previous steps before writer.
Returns:
Writer: writer that will write the data.