Skip to content

Writer factory

Module for writer factory.

WriterFactory

Bases: ABC

Class for writer factory.

Source code in mkdocs/lakehouse_engine/packages/io/writer_factory.py
class WriterFactory(ABC):  # noqa: B024
    """Class for writer factory."""

    AVAILABLE_WRITERS = {
        OutputFormat.TABLE.value: TableWriter,
        OutputFormat.DELTAFILES.value: DeltaMergeWriter,
        OutputFormat.JDBC.value: JDBCWriter,
        OutputFormat.FILE.value: FileWriter,
        OutputFormat.KAFKA.value: KafkaWriter,
        OutputFormat.CONSOLE.value: ConsoleWriter,
        OutputFormat.DATAFRAME.value: DataFrameWriter,
        OutputFormat.REST_API.value: RestApiWriter,
        OutputFormat.SHAREPOINT.value: SharepointWriter,
    }

    @classmethod
    def _get_writer_name(cls, spec: OutputSpec) -> str:
        """Get the writer name according to the output specification.

        Args:
            OutputSpec spec: output specification to write data.

        Returns:
            Writer: writer name that will be created to write the data.
        """
        if spec.db_table and spec.write_type != WriteType.MERGE.value:
            writer_name = OutputFormat.TABLE.value
        elif (
            spec.data_format == OutputFormat.DELTAFILES.value or spec.db_table
        ) and spec.write_type == WriteType.MERGE.value:
            writer_name = OutputFormat.DELTAFILES.value
        elif spec.data_format in FILE_OUTPUT_FORMATS:
            writer_name = OutputFormat.FILE.value
        else:
            writer_name = spec.data_format
        return writer_name

    @classmethod
    def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer:
        """Get a writer according to the output specification using a factory pattern.

        Args:
            spec: output specification to write data.
            df: dataframe to be written.
            data: list of all dfs generated on previous steps before writer.

        Returns:
            Writer: writer that will write the data.
        """
        writer_name = cls._get_writer_name(spec)
        writer = cls.AVAILABLE_WRITERS.get(writer_name)

        if writer:
            return writer(output_spec=spec, df=df, data=data)  # type: ignore
        else:
            raise NotImplementedError(
                f"The requested output spec format {spec.data_format} is not supported."
            )

get_writer(spec, df, data) classmethod

Get a writer according to the output specification using a factory pattern.

Parameters:

Name Type Description Default
spec OutputSpec

output specification to write data.

required
df DataFrame

dataframe to be written.

required
data OrderedDict

list of all dfs generated on previous steps before writer.

required

Returns:

Name Type Description
Writer Writer

writer that will write the data.

Source code in mkdocs/lakehouse_engine/packages/io/writer_factory.py
@classmethod
def get_writer(cls, spec: OutputSpec, df: DataFrame, data: OrderedDict) -> Writer:
    """Get a writer according to the output specification using a factory pattern.

    Args:
        spec: output specification to write data.
        df: dataframe to be written.
        data: list of all dfs generated on previous steps before writer.

    Returns:
        Writer: writer that will write the data.
    """
    writer_name = cls._get_writer_name(spec)
    writer = cls.AVAILABLE_WRITERS.get(writer_name)

    if writer:
        return writer(output_spec=spec, df=df, data=data)  # type: ignore
    else:
        raise NotImplementedError(
            f"The requested output spec format {spec.data_format} is not supported."
        )