lakehouse_engine.transformers.transformer_factory

Module with the factory pattern to return transformers.

  1"""Module with the factory pattern to return transformers."""
  2
  3from typing import Callable, OrderedDict
  4
  5from lakehouse_engine.core.definitions import TransformerSpec
  6from lakehouse_engine.transformers.aggregators import Aggregators
  7from lakehouse_engine.transformers.column_creators import ColumnCreators
  8from lakehouse_engine.transformers.column_reshapers import ColumnReshapers
  9from lakehouse_engine.transformers.condensers import Condensers
 10from lakehouse_engine.transformers.custom_transformers import CustomTransformers
 11from lakehouse_engine.transformers.data_maskers import DataMaskers
 12from lakehouse_engine.transformers.date_transformers import DateTransformers
 13from lakehouse_engine.transformers.filters import Filters
 14from lakehouse_engine.transformers.joiners import Joiners
 15from lakehouse_engine.transformers.null_handlers import NullHandlers
 16from lakehouse_engine.transformers.optimizers import Optimizers
 17from lakehouse_engine.transformers.regex_transformers import RegexTransformers
 18from lakehouse_engine.transformers.repartitioners import Repartitioners
 19from lakehouse_engine.transformers.unions import Unions
 20from lakehouse_engine.transformers.watermarker import Watermarker
 21from lakehouse_engine.utils.logging_handler import LoggingHandler
 22
 23
 24class TransformerFactory(object):
 25    """TransformerFactory class following the factory pattern."""
 26
 27    _logger = LoggingHandler(__name__).get_logger()
 28
 29    UNSUPPORTED_STREAMING_TRANSFORMERS = [
 30        "condense_record_mode_cdc",
 31        "group_and_rank",
 32        "with_auto_increment_id",
 33        "with_row_id",
 34    ]
 35
 36    AVAILABLE_TRANSFORMERS = {
 37        "add_current_date": DateTransformers.add_current_date,
 38        "cache": Optimizers.cache,
 39        "cast": ColumnReshapers.cast,
 40        "coalesce": Repartitioners.coalesce,
 41        "column_dropper": DataMaskers.column_dropper,
 42        "column_filter_exp": Filters.column_filter_exp,
 43        "column_selector": ColumnReshapers.column_selector,
 44        "condense_record_mode_cdc": Condensers.condense_record_mode_cdc,
 45        "convert_to_date": DateTransformers.convert_to_date,
 46        "convert_to_timestamp": DateTransformers.convert_to_timestamp,
 47        "custom_transformation": CustomTransformers.custom_transformation,
 48        "drop_duplicate_rows": Filters.drop_duplicate_rows,
 49        "expression_filter": Filters.expression_filter,
 50        "format_date": DateTransformers.format_date,
 51        "flatten_schema": ColumnReshapers.flatten_schema,
 52        "explode_columns": ColumnReshapers.explode_columns,
 53        "from_avro": ColumnReshapers.from_avro,
 54        "from_avro_with_registry": ColumnReshapers.from_avro_with_registry,
 55        "from_json": ColumnReshapers.from_json,
 56        "get_date_hierarchy": DateTransformers.get_date_hierarchy,
 57        "get_max_value": Aggregators.get_max_value,
 58        "group_and_rank": Condensers.group_and_rank,
 59        "hash_masker": DataMaskers.hash_masker,
 60        "incremental_filter": Filters.incremental_filter,
 61        "join": Joiners.join,
 62        "persist": Optimizers.persist,
 63        "rename": ColumnReshapers.rename,
 64        "repartition": Repartitioners.repartition,
 65        "replace_nulls": NullHandlers.replace_nulls,
 66        "sql_transformation": CustomTransformers.sql_transformation,
 67        "to_json": ColumnReshapers.to_json,
 68        "union": Unions.union,
 69        "union_by_name": Unions.union_by_name,
 70        "with_watermark": Watermarker.with_watermark,
 71        "unpersist": Optimizers.unpersist,
 72        "with_auto_increment_id": ColumnCreators.with_auto_increment_id,
 73        "with_expressions": ColumnReshapers.with_expressions,
 74        "with_literals": ColumnCreators.with_literals,
 75        "with_regex_value": RegexTransformers.with_regex_value,
 76        "with_row_id": ColumnCreators.with_row_id,
 77    }
 78
 79    @staticmethod
 80    def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable:
 81        """Get a transformer following the factory pattern.
 82
 83        Args:
 84            spec: transformer specification (individual transformation... not to be
 85                confused with list of all transformations).
 86            data: ordered dict of dataframes to be transformed. Needed when a
 87                transformer requires more than one dataframe as input.
 88
 89        Returns:
 90            Transformer function to be executed in .transform() spark function.
 91        """
 92        if spec.function == "incremental_filter":
 93            # incremental_filter optionally expects a DataFrame as input, so find it.
 94            if "increment_df" in spec.args:
 95                spec.args["increment_df"] = data[spec.args["increment_df"]]
 96            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
 97                spec.function
 98            ](**spec.args)
 99        elif spec.function == "join":
100            # get the dataframe given the input_id in the input specs of the acon.
101            spec.args["join_with"] = data[spec.args["join_with"]]
102            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
103                spec.function
104            ](**spec.args)
105        elif spec.function == "union" or spec.function == "union_by_name":
106            # get the list of dataframes given the input_id in the input specs
107            # of the acon.
108            df_to_transform = spec.args["union_with"]
109            spec.args["union_with"] = []
110            for item in df_to_transform:
111                spec.args["union_with"].append(data[item])
112            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
113                spec.function
114            ](**spec.args)
115        elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS:
116            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
117                spec.function
118            ](**spec.args)
119        else:
120            raise NotImplementedError(
121                f"The requested transformer {spec.function} is not implemented."
122            )
class TransformerFactory:
 25class TransformerFactory(object):
 26    """TransformerFactory class following the factory pattern."""
 27
 28    _logger = LoggingHandler(__name__).get_logger()
 29
 30    UNSUPPORTED_STREAMING_TRANSFORMERS = [
 31        "condense_record_mode_cdc",
 32        "group_and_rank",
 33        "with_auto_increment_id",
 34        "with_row_id",
 35    ]
 36
 37    AVAILABLE_TRANSFORMERS = {
 38        "add_current_date": DateTransformers.add_current_date,
 39        "cache": Optimizers.cache,
 40        "cast": ColumnReshapers.cast,
 41        "coalesce": Repartitioners.coalesce,
 42        "column_dropper": DataMaskers.column_dropper,
 43        "column_filter_exp": Filters.column_filter_exp,
 44        "column_selector": ColumnReshapers.column_selector,
 45        "condense_record_mode_cdc": Condensers.condense_record_mode_cdc,
 46        "convert_to_date": DateTransformers.convert_to_date,
 47        "convert_to_timestamp": DateTransformers.convert_to_timestamp,
 48        "custom_transformation": CustomTransformers.custom_transformation,
 49        "drop_duplicate_rows": Filters.drop_duplicate_rows,
 50        "expression_filter": Filters.expression_filter,
 51        "format_date": DateTransformers.format_date,
 52        "flatten_schema": ColumnReshapers.flatten_schema,
 53        "explode_columns": ColumnReshapers.explode_columns,
 54        "from_avro": ColumnReshapers.from_avro,
 55        "from_avro_with_registry": ColumnReshapers.from_avro_with_registry,
 56        "from_json": ColumnReshapers.from_json,
 57        "get_date_hierarchy": DateTransformers.get_date_hierarchy,
 58        "get_max_value": Aggregators.get_max_value,
 59        "group_and_rank": Condensers.group_and_rank,
 60        "hash_masker": DataMaskers.hash_masker,
 61        "incremental_filter": Filters.incremental_filter,
 62        "join": Joiners.join,
 63        "persist": Optimizers.persist,
 64        "rename": ColumnReshapers.rename,
 65        "repartition": Repartitioners.repartition,
 66        "replace_nulls": NullHandlers.replace_nulls,
 67        "sql_transformation": CustomTransformers.sql_transformation,
 68        "to_json": ColumnReshapers.to_json,
 69        "union": Unions.union,
 70        "union_by_name": Unions.union_by_name,
 71        "with_watermark": Watermarker.with_watermark,
 72        "unpersist": Optimizers.unpersist,
 73        "with_auto_increment_id": ColumnCreators.with_auto_increment_id,
 74        "with_expressions": ColumnReshapers.with_expressions,
 75        "with_literals": ColumnCreators.with_literals,
 76        "with_regex_value": RegexTransformers.with_regex_value,
 77        "with_row_id": ColumnCreators.with_row_id,
 78    }
 79
 80    @staticmethod
 81    def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable:
 82        """Get a transformer following the factory pattern.
 83
 84        Args:
 85            spec: transformer specification (individual transformation... not to be
 86                confused with list of all transformations).
 87            data: ordered dict of dataframes to be transformed. Needed when a
 88                transformer requires more than one dataframe as input.
 89
 90        Returns:
 91            Transformer function to be executed in .transform() spark function.
 92        """
 93        if spec.function == "incremental_filter":
 94            # incremental_filter optionally expects a DataFrame as input, so find it.
 95            if "increment_df" in spec.args:
 96                spec.args["increment_df"] = data[spec.args["increment_df"]]
 97            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
 98                spec.function
 99            ](**spec.args)
100        elif spec.function == "join":
101            # get the dataframe given the input_id in the input specs of the acon.
102            spec.args["join_with"] = data[spec.args["join_with"]]
103            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
104                spec.function
105            ](**spec.args)
106        elif spec.function == "union" or spec.function == "union_by_name":
107            # get the list of dataframes given the input_id in the input specs
108            # of the acon.
109            df_to_transform = spec.args["union_with"]
110            spec.args["union_with"] = []
111            for item in df_to_transform:
112                spec.args["union_with"].append(data[item])
113            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
114                spec.function
115            ](**spec.args)
116        elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS:
117            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
118                spec.function
119            ](**spec.args)
120        else:
121            raise NotImplementedError(
122                f"The requested transformer {spec.function} is not implemented."
123            )

TransformerFactory class following the factory pattern.

UNSUPPORTED_STREAMING_TRANSFORMERS = ['condense_record_mode_cdc', 'group_and_rank', 'with_auto_increment_id', 'with_row_id']
AVAILABLE_TRANSFORMERS = {'add_current_date': <function DateTransformers.add_current_date>, 'cache': <bound method Optimizers.cache of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'cast': <bound method ColumnReshapers.cast of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'coalesce': <bound method Repartitioners.coalesce of <class 'lakehouse_engine.transformers.repartitioners.Repartitioners'>>, 'column_dropper': <bound method DataMaskers.column_dropper of <class 'lakehouse_engine.transformers.data_maskers.DataMaskers'>>, 'column_filter_exp': <function Filters.column_filter_exp>, 'column_selector': <bound method ColumnReshapers.column_selector of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'condense_record_mode_cdc': <bound method Condensers.condense_record_mode_cdc of <class 'lakehouse_engine.transformers.condensers.Condensers'>>, 'convert_to_date': <function DateTransformers.convert_to_date>, 'convert_to_timestamp': <function DateTransformers.convert_to_timestamp>, 'custom_transformation': <function CustomTransformers.custom_transformation>, 'drop_duplicate_rows': <function Filters.drop_duplicate_rows>, 'expression_filter': <function Filters.expression_filter>, 'format_date': <function DateTransformers.format_date>, 'flatten_schema': <bound method ColumnReshapers.flatten_schema of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'explode_columns': <bound method ColumnReshapers.explode_columns of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_avro': <bound method ColumnReshapers.from_avro of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_avro_with_registry': <bound method ColumnReshapers.from_avro_with_registry of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_json': <bound method ColumnReshapers.from_json of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'get_date_hierarchy': <function DateTransformers.get_date_hierarchy>, 'get_max_value': <function Aggregators.get_max_value>, 'group_and_rank': <bound method Condensers.group_and_rank of <class 'lakehouse_engine.transformers.condensers.Condensers'>>, 'hash_masker': <bound method DataMaskers.hash_masker of <class 'lakehouse_engine.transformers.data_maskers.DataMaskers'>>, 'incremental_filter': <bound method Filters.incremental_filter of <class 'lakehouse_engine.transformers.filters.Filters'>>, 'join': <bound method Joiners.join of <class 'lakehouse_engine.transformers.joiners.Joiners'>>, 'persist': <bound method Optimizers.persist of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'rename': <bound method ColumnReshapers.rename of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'repartition': <bound method Repartitioners.repartition of <class 'lakehouse_engine.transformers.repartitioners.Repartitioners'>>, 'replace_nulls': <bound method NullHandlers.replace_nulls of <class 'lakehouse_engine.transformers.null_handlers.NullHandlers'>>, 'sql_transformation': <function CustomTransformers.sql_transformation>, 'to_json': <bound method ColumnReshapers.to_json of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'union': <bound method Unions.union of <class 'lakehouse_engine.transformers.unions.Unions'>>, 'union_by_name': <bound method Unions.union_by_name of <class 'lakehouse_engine.transformers.unions.Unions'>>, 'with_watermark': <function Watermarker.with_watermark>, 'unpersist': <bound method Optimizers.unpersist of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'with_auto_increment_id': <bound method ColumnCreators.with_auto_increment_id of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>, 'with_expressions': <bound method ColumnReshapers.with_expressions of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'with_literals': <bound method ColumnCreators.with_literals of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>, 'with_regex_value': <function RegexTransformers.with_regex_value>, 'with_row_id': <bound method ColumnCreators.with_row_id of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>}
@staticmethod
def get_transformer( spec: lakehouse_engine.core.definitions.TransformerSpec, data: OrderedDict = None) -> Callable:
 80    @staticmethod
 81    def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable:
 82        """Get a transformer following the factory pattern.
 83
 84        Args:
 85            spec: transformer specification (individual transformation... not to be
 86                confused with list of all transformations).
 87            data: ordered dict of dataframes to be transformed. Needed when a
 88                transformer requires more than one dataframe as input.
 89
 90        Returns:
 91            Transformer function to be executed in .transform() spark function.
 92        """
 93        if spec.function == "incremental_filter":
 94            # incremental_filter optionally expects a DataFrame as input, so find it.
 95            if "increment_df" in spec.args:
 96                spec.args["increment_df"] = data[spec.args["increment_df"]]
 97            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
 98                spec.function
 99            ](**spec.args)
100        elif spec.function == "join":
101            # get the dataframe given the input_id in the input specs of the acon.
102            spec.args["join_with"] = data[spec.args["join_with"]]
103            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
104                spec.function
105            ](**spec.args)
106        elif spec.function == "union" or spec.function == "union_by_name":
107            # get the list of dataframes given the input_id in the input specs
108            # of the acon.
109            df_to_transform = spec.args["union_with"]
110            spec.args["union_with"] = []
111            for item in df_to_transform:
112                spec.args["union_with"].append(data[item])
113            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
114                spec.function
115            ](**spec.args)
116        elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS:
117            return TransformerFactory.AVAILABLE_TRANSFORMERS[  # type: ignore
118                spec.function
119            ](**spec.args)
120        else:
121            raise NotImplementedError(
122                f"The requested transformer {spec.function} is not implemented."
123            )

Get a transformer following the factory pattern.

Arguments:
  • spec: transformer specification (individual transformation... not to be confused with list of all transformations).
  • data: ordered dict of dataframes to be transformed. Needed when a transformer requires more than one dataframe as input.
Returns:

Transformer function to be executed in .transform() spark function.