lakehouse_engine.transformers.transformer_factory
Module with the factory pattern to return transformers.
1"""Module with the factory pattern to return transformers.""" 2 3from typing import Callable, OrderedDict 4 5from lakehouse_engine.core.definitions import TransformerSpec 6from lakehouse_engine.transformers.aggregators import Aggregators 7from lakehouse_engine.transformers.column_creators import ColumnCreators 8from lakehouse_engine.transformers.column_reshapers import ColumnReshapers 9from lakehouse_engine.transformers.condensers import Condensers 10from lakehouse_engine.transformers.custom_transformers import CustomTransformers 11from lakehouse_engine.transformers.data_maskers import DataMaskers 12from lakehouse_engine.transformers.date_transformers import DateTransformers 13from lakehouse_engine.transformers.filters import Filters 14from lakehouse_engine.transformers.joiners import Joiners 15from lakehouse_engine.transformers.null_handlers import NullHandlers 16from lakehouse_engine.transformers.optimizers import Optimizers 17from lakehouse_engine.transformers.regex_transformers import RegexTransformers 18from lakehouse_engine.transformers.repartitioners import Repartitioners 19from lakehouse_engine.transformers.unions import Unions 20from lakehouse_engine.transformers.watermarker import Watermarker 21from lakehouse_engine.utils.logging_handler import LoggingHandler 22 23 24class TransformerFactory(object): 25 """TransformerFactory class following the factory pattern.""" 26 27 _logger = LoggingHandler(__name__).get_logger() 28 29 UNSUPPORTED_STREAMING_TRANSFORMERS = [ 30 "condense_record_mode_cdc", 31 "group_and_rank", 32 "with_auto_increment_id", 33 "with_row_id", 34 ] 35 36 AVAILABLE_TRANSFORMERS = { 37 "add_current_date": DateTransformers.add_current_date, 38 "cache": Optimizers.cache, 39 "cast": ColumnReshapers.cast, 40 "coalesce": Repartitioners.coalesce, 41 "column_dropper": DataMaskers.column_dropper, 42 "column_filter_exp": Filters.column_filter_exp, 43 "column_selector": ColumnReshapers.column_selector, 44 "condense_record_mode_cdc": Condensers.condense_record_mode_cdc, 45 "convert_to_date": DateTransformers.convert_to_date, 46 "convert_to_timestamp": DateTransformers.convert_to_timestamp, 47 "custom_transformation": CustomTransformers.custom_transformation, 48 "drop_duplicate_rows": Filters.drop_duplicate_rows, 49 "expression_filter": Filters.expression_filter, 50 "format_date": DateTransformers.format_date, 51 "flatten_schema": ColumnReshapers.flatten_schema, 52 "explode_columns": ColumnReshapers.explode_columns, 53 "from_avro": ColumnReshapers.from_avro, 54 "from_avro_with_registry": ColumnReshapers.from_avro_with_registry, 55 "from_json": ColumnReshapers.from_json, 56 "get_date_hierarchy": DateTransformers.get_date_hierarchy, 57 "get_max_value": Aggregators.get_max_value, 58 "group_and_rank": Condensers.group_and_rank, 59 "hash_masker": DataMaskers.hash_masker, 60 "incremental_filter": Filters.incremental_filter, 61 "join": Joiners.join, 62 "persist": Optimizers.persist, 63 "rename": ColumnReshapers.rename, 64 "repartition": Repartitioners.repartition, 65 "replace_nulls": NullHandlers.replace_nulls, 66 "sql_transformation": CustomTransformers.sql_transformation, 67 "to_json": ColumnReshapers.to_json, 68 "union": Unions.union, 69 "union_by_name": Unions.union_by_name, 70 "with_watermark": Watermarker.with_watermark, 71 "unpersist": Optimizers.unpersist, 72 "with_auto_increment_id": ColumnCreators.with_auto_increment_id, 73 "with_expressions": ColumnReshapers.with_expressions, 74 "with_literals": ColumnCreators.with_literals, 75 "with_regex_value": RegexTransformers.with_regex_value, 76 "with_row_id": ColumnCreators.with_row_id, 77 } 78 79 @staticmethod 80 def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable: 81 """Get a transformer following the factory pattern. 82 83 Args: 84 spec: transformer specification (individual transformation... not to be 85 confused with list of all transformations). 86 data: ordered dict of dataframes to be transformed. Needed when a 87 transformer requires more than one dataframe as input. 88 89 Returns: 90 Transformer function to be executed in .transform() spark function. 91 """ 92 if spec.function == "incremental_filter": 93 # incremental_filter optionally expects a DataFrame as input, so find it. 94 if "increment_df" in spec.args: 95 spec.args["increment_df"] = data[spec.args["increment_df"]] 96 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 97 spec.function 98 ](**spec.args) 99 elif spec.function == "join": 100 # get the dataframe given the input_id in the input specs of the acon. 101 spec.args["join_with"] = data[spec.args["join_with"]] 102 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 103 spec.function 104 ](**spec.args) 105 elif spec.function == "union" or spec.function == "union_by_name": 106 # get the list of dataframes given the input_id in the input specs 107 # of the acon. 108 df_to_transform = spec.args["union_with"] 109 spec.args["union_with"] = [] 110 for item in df_to_transform: 111 spec.args["union_with"].append(data[item]) 112 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 113 spec.function 114 ](**spec.args) 115 elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS: 116 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 117 spec.function 118 ](**spec.args) 119 else: 120 raise NotImplementedError( 121 f"The requested transformer {spec.function} is not implemented." 122 )
class
TransformerFactory:
25class TransformerFactory(object): 26 """TransformerFactory class following the factory pattern.""" 27 28 _logger = LoggingHandler(__name__).get_logger() 29 30 UNSUPPORTED_STREAMING_TRANSFORMERS = [ 31 "condense_record_mode_cdc", 32 "group_and_rank", 33 "with_auto_increment_id", 34 "with_row_id", 35 ] 36 37 AVAILABLE_TRANSFORMERS = { 38 "add_current_date": DateTransformers.add_current_date, 39 "cache": Optimizers.cache, 40 "cast": ColumnReshapers.cast, 41 "coalesce": Repartitioners.coalesce, 42 "column_dropper": DataMaskers.column_dropper, 43 "column_filter_exp": Filters.column_filter_exp, 44 "column_selector": ColumnReshapers.column_selector, 45 "condense_record_mode_cdc": Condensers.condense_record_mode_cdc, 46 "convert_to_date": DateTransformers.convert_to_date, 47 "convert_to_timestamp": DateTransformers.convert_to_timestamp, 48 "custom_transformation": CustomTransformers.custom_transformation, 49 "drop_duplicate_rows": Filters.drop_duplicate_rows, 50 "expression_filter": Filters.expression_filter, 51 "format_date": DateTransformers.format_date, 52 "flatten_schema": ColumnReshapers.flatten_schema, 53 "explode_columns": ColumnReshapers.explode_columns, 54 "from_avro": ColumnReshapers.from_avro, 55 "from_avro_with_registry": ColumnReshapers.from_avro_with_registry, 56 "from_json": ColumnReshapers.from_json, 57 "get_date_hierarchy": DateTransformers.get_date_hierarchy, 58 "get_max_value": Aggregators.get_max_value, 59 "group_and_rank": Condensers.group_and_rank, 60 "hash_masker": DataMaskers.hash_masker, 61 "incremental_filter": Filters.incremental_filter, 62 "join": Joiners.join, 63 "persist": Optimizers.persist, 64 "rename": ColumnReshapers.rename, 65 "repartition": Repartitioners.repartition, 66 "replace_nulls": NullHandlers.replace_nulls, 67 "sql_transformation": CustomTransformers.sql_transformation, 68 "to_json": ColumnReshapers.to_json, 69 "union": Unions.union, 70 "union_by_name": Unions.union_by_name, 71 "with_watermark": Watermarker.with_watermark, 72 "unpersist": Optimizers.unpersist, 73 "with_auto_increment_id": ColumnCreators.with_auto_increment_id, 74 "with_expressions": ColumnReshapers.with_expressions, 75 "with_literals": ColumnCreators.with_literals, 76 "with_regex_value": RegexTransformers.with_regex_value, 77 "with_row_id": ColumnCreators.with_row_id, 78 } 79 80 @staticmethod 81 def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable: 82 """Get a transformer following the factory pattern. 83 84 Args: 85 spec: transformer specification (individual transformation... not to be 86 confused with list of all transformations). 87 data: ordered dict of dataframes to be transformed. Needed when a 88 transformer requires more than one dataframe as input. 89 90 Returns: 91 Transformer function to be executed in .transform() spark function. 92 """ 93 if spec.function == "incremental_filter": 94 # incremental_filter optionally expects a DataFrame as input, so find it. 95 if "increment_df" in spec.args: 96 spec.args["increment_df"] = data[spec.args["increment_df"]] 97 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 98 spec.function 99 ](**spec.args) 100 elif spec.function == "join": 101 # get the dataframe given the input_id in the input specs of the acon. 102 spec.args["join_with"] = data[spec.args["join_with"]] 103 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 104 spec.function 105 ](**spec.args) 106 elif spec.function == "union" or spec.function == "union_by_name": 107 # get the list of dataframes given the input_id in the input specs 108 # of the acon. 109 df_to_transform = spec.args["union_with"] 110 spec.args["union_with"] = [] 111 for item in df_to_transform: 112 spec.args["union_with"].append(data[item]) 113 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 114 spec.function 115 ](**spec.args) 116 elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS: 117 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 118 spec.function 119 ](**spec.args) 120 else: 121 raise NotImplementedError( 122 f"The requested transformer {spec.function} is not implemented." 123 )
TransformerFactory class following the factory pattern.
UNSUPPORTED_STREAMING_TRANSFORMERS =
['condense_record_mode_cdc', 'group_and_rank', 'with_auto_increment_id', 'with_row_id']
AVAILABLE_TRANSFORMERS =
{'add_current_date': <function DateTransformers.add_current_date>, 'cache': <bound method Optimizers.cache of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'cast': <bound method ColumnReshapers.cast of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'coalesce': <bound method Repartitioners.coalesce of <class 'lakehouse_engine.transformers.repartitioners.Repartitioners'>>, 'column_dropper': <bound method DataMaskers.column_dropper of <class 'lakehouse_engine.transformers.data_maskers.DataMaskers'>>, 'column_filter_exp': <function Filters.column_filter_exp>, 'column_selector': <bound method ColumnReshapers.column_selector of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'condense_record_mode_cdc': <bound method Condensers.condense_record_mode_cdc of <class 'lakehouse_engine.transformers.condensers.Condensers'>>, 'convert_to_date': <function DateTransformers.convert_to_date>, 'convert_to_timestamp': <function DateTransformers.convert_to_timestamp>, 'custom_transformation': <function CustomTransformers.custom_transformation>, 'drop_duplicate_rows': <function Filters.drop_duplicate_rows>, 'expression_filter': <function Filters.expression_filter>, 'format_date': <function DateTransformers.format_date>, 'flatten_schema': <bound method ColumnReshapers.flatten_schema of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'explode_columns': <bound method ColumnReshapers.explode_columns of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_avro': <bound method ColumnReshapers.from_avro of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_avro_with_registry': <bound method ColumnReshapers.from_avro_with_registry of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'from_json': <bound method ColumnReshapers.from_json of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'get_date_hierarchy': <function DateTransformers.get_date_hierarchy>, 'get_max_value': <function Aggregators.get_max_value>, 'group_and_rank': <bound method Condensers.group_and_rank of <class 'lakehouse_engine.transformers.condensers.Condensers'>>, 'hash_masker': <bound method DataMaskers.hash_masker of <class 'lakehouse_engine.transformers.data_maskers.DataMaskers'>>, 'incremental_filter': <bound method Filters.incremental_filter of <class 'lakehouse_engine.transformers.filters.Filters'>>, 'join': <bound method Joiners.join of <class 'lakehouse_engine.transformers.joiners.Joiners'>>, 'persist': <bound method Optimizers.persist of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'rename': <bound method ColumnReshapers.rename of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'repartition': <bound method Repartitioners.repartition of <class 'lakehouse_engine.transformers.repartitioners.Repartitioners'>>, 'replace_nulls': <bound method NullHandlers.replace_nulls of <class 'lakehouse_engine.transformers.null_handlers.NullHandlers'>>, 'sql_transformation': <function CustomTransformers.sql_transformation>, 'to_json': <bound method ColumnReshapers.to_json of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'union': <bound method Unions.union of <class 'lakehouse_engine.transformers.unions.Unions'>>, 'union_by_name': <bound method Unions.union_by_name of <class 'lakehouse_engine.transformers.unions.Unions'>>, 'with_watermark': <function Watermarker.with_watermark>, 'unpersist': <bound method Optimizers.unpersist of <class 'lakehouse_engine.transformers.optimizers.Optimizers'>>, 'with_auto_increment_id': <bound method ColumnCreators.with_auto_increment_id of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>, 'with_expressions': <bound method ColumnReshapers.with_expressions of <class 'lakehouse_engine.transformers.column_reshapers.ColumnReshapers'>>, 'with_literals': <bound method ColumnCreators.with_literals of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>, 'with_regex_value': <function RegexTransformers.with_regex_value>, 'with_row_id': <bound method ColumnCreators.with_row_id of <class 'lakehouse_engine.transformers.column_creators.ColumnCreators'>>}
@staticmethod
def
get_transformer( spec: lakehouse_engine.core.definitions.TransformerSpec, data: OrderedDict = None) -> Callable:
80 @staticmethod 81 def get_transformer(spec: TransformerSpec, data: OrderedDict = None) -> Callable: 82 """Get a transformer following the factory pattern. 83 84 Args: 85 spec: transformer specification (individual transformation... not to be 86 confused with list of all transformations). 87 data: ordered dict of dataframes to be transformed. Needed when a 88 transformer requires more than one dataframe as input. 89 90 Returns: 91 Transformer function to be executed in .transform() spark function. 92 """ 93 if spec.function == "incremental_filter": 94 # incremental_filter optionally expects a DataFrame as input, so find it. 95 if "increment_df" in spec.args: 96 spec.args["increment_df"] = data[spec.args["increment_df"]] 97 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 98 spec.function 99 ](**spec.args) 100 elif spec.function == "join": 101 # get the dataframe given the input_id in the input specs of the acon. 102 spec.args["join_with"] = data[spec.args["join_with"]] 103 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 104 spec.function 105 ](**spec.args) 106 elif spec.function == "union" or spec.function == "union_by_name": 107 # get the list of dataframes given the input_id in the input specs 108 # of the acon. 109 df_to_transform = spec.args["union_with"] 110 spec.args["union_with"] = [] 111 for item in df_to_transform: 112 spec.args["union_with"].append(data[item]) 113 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 114 spec.function 115 ](**spec.args) 116 elif spec.function in TransformerFactory.AVAILABLE_TRANSFORMERS: 117 return TransformerFactory.AVAILABLE_TRANSFORMERS[ # type: ignore 118 spec.function 119 ](**spec.args) 120 else: 121 raise NotImplementedError( 122 f"The requested transformer {spec.function} is not implemented." 123 )
Get a transformer following the factory pattern.
Arguments:
- spec: transformer specification (individual transformation... not to be confused with list of all transformations).
- data: ordered dict of dataframes to be transformed. Needed when a transformer requires more than one dataframe as input.
Returns:
Transformer function to be executed in .transform() spark function.