lakehouse_engine.terminators.cdf_processor
Defines change data feed processor behaviour.
1"""Defines change data feed processor behaviour.""" 2 3from datetime import datetime, timedelta 4from typing import OrderedDict 5 6from delta.tables import DeltaTable 7from pyspark.sql import DataFrame 8from pyspark.sql.functions import col, date_format 9 10from lakehouse_engine.core.definitions import ( 11 InputSpec, 12 OutputFormat, 13 OutputSpec, 14 ReadType, 15 TerminatorSpec, 16 WriteType, 17) 18from lakehouse_engine.core.exec_env import ExecEnv 19from lakehouse_engine.io.reader_factory import ReaderFactory 20from lakehouse_engine.io.writer_factory import WriterFactory 21from lakehouse_engine.utils.logging_handler import LoggingHandler 22 23 24class CDFProcessor(object): 25 """Change data feed processor class.""" 26 27 _logger = LoggingHandler(__name__).get_logger() 28 29 @classmethod 30 def expose_cdf(cls, spec: TerminatorSpec) -> None: 31 """Expose CDF to external location. 32 33 Args: 34 spec: terminator specification. 35 """ 36 cls._logger.info("Reading CDF from input table...") 37 38 df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec)) 39 new_df_cdf = df_cdf.withColumn( 40 "_commit_timestamp", 41 date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"), 42 ) 43 44 cls._logger.info("Writing CDF to external table...") 45 cls._write_cdf_to_external( 46 spec, 47 new_df_cdf.repartition( 48 spec.args.get( 49 "materialized_cdf_num_partitions", col("_commit_timestamp") 50 ) 51 ), 52 ) 53 54 # used to delete old data on CDF table (don't remove parquet). 55 if spec.args.get("clean_cdf", True): 56 cls._logger.info("Cleaning CDF table...") 57 cls.delete_old_data(spec) 58 59 # used to delete old parquet files. 60 if spec.args.get("vacuum_cdf", False): 61 cls._logger.info("Vacuuming CDF table...") 62 cls.vacuum_cdf_data(spec) 63 64 @staticmethod 65 def _write_cdf_to_external( 66 spec: TerminatorSpec, df: DataFrame, data: OrderedDict = None 67 ) -> None: 68 """Write cdf results dataframe. 69 70 Args: 71 spec: terminator specification. 72 df: dataframe with cdf results to write. 73 data: list of all dfs generated on previous steps before writer. 74 """ 75 WriterFactory.get_writer( 76 spec=OutputSpec( 77 spec_id="materialized_cdf", 78 input_id="input_table", 79 location=spec.args["materialized_cdf_location"], 80 write_type=WriteType.APPEND.value, 81 data_format=spec.args.get("data_format", OutputFormat.DELTAFILES.value), 82 options=spec.args["materialized_cdf_options"], 83 partitions=["_commit_timestamp"], 84 ), 85 df=df, 86 data=data, 87 ).write() 88 89 @staticmethod 90 def _get_table_cdf_input_specs(spec: TerminatorSpec) -> InputSpec: 91 """Get the input specifications from a terminator spec. 92 93 Args: 94 spec: terminator specifications. 95 96 Returns: 97 List of input specifications. 98 """ 99 options = { 100 "readChangeFeed": "true", 101 **spec.args.get("db_table_options", {}), 102 } 103 104 input_specs = InputSpec( 105 spec_id="input_table", 106 db_table=spec.args["db_table"], 107 read_type=ReadType.STREAMING.value, 108 data_format=OutputFormat.DELTAFILES.value, 109 options=options, 110 ) 111 112 return input_specs 113 114 @classmethod 115 def delete_old_data(cls, spec: TerminatorSpec) -> None: 116 """Delete old data from cdf delta table. 117 118 Args: 119 spec: terminator specifications. 120 """ 121 today_datetime = datetime.today() 122 limit_date = today_datetime + timedelta( 123 days=spec.args.get("days_to_keep", 30) * -1 124 ) 125 limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S") 126 127 cdf_delta_table = DeltaTable.forPath( 128 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 129 ) 130 131 cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp) 132 133 @classmethod 134 def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None: 135 """Vacuum old data from cdf delta table. 136 137 Args: 138 spec: terminator specifications. 139 """ 140 cdf_delta_table = DeltaTable.forPath( 141 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 142 ) 143 144 cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))
class
CDFProcessor:
25class CDFProcessor(object): 26 """Change data feed processor class.""" 27 28 _logger = LoggingHandler(__name__).get_logger() 29 30 @classmethod 31 def expose_cdf(cls, spec: TerminatorSpec) -> None: 32 """Expose CDF to external location. 33 34 Args: 35 spec: terminator specification. 36 """ 37 cls._logger.info("Reading CDF from input table...") 38 39 df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec)) 40 new_df_cdf = df_cdf.withColumn( 41 "_commit_timestamp", 42 date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"), 43 ) 44 45 cls._logger.info("Writing CDF to external table...") 46 cls._write_cdf_to_external( 47 spec, 48 new_df_cdf.repartition( 49 spec.args.get( 50 "materialized_cdf_num_partitions", col("_commit_timestamp") 51 ) 52 ), 53 ) 54 55 # used to delete old data on CDF table (don't remove parquet). 56 if spec.args.get("clean_cdf", True): 57 cls._logger.info("Cleaning CDF table...") 58 cls.delete_old_data(spec) 59 60 # used to delete old parquet files. 61 if spec.args.get("vacuum_cdf", False): 62 cls._logger.info("Vacuuming CDF table...") 63 cls.vacuum_cdf_data(spec) 64 65 @staticmethod 66 def _write_cdf_to_external( 67 spec: TerminatorSpec, df: DataFrame, data: OrderedDict = None 68 ) -> None: 69 """Write cdf results dataframe. 70 71 Args: 72 spec: terminator specification. 73 df: dataframe with cdf results to write. 74 data: list of all dfs generated on previous steps before writer. 75 """ 76 WriterFactory.get_writer( 77 spec=OutputSpec( 78 spec_id="materialized_cdf", 79 input_id="input_table", 80 location=spec.args["materialized_cdf_location"], 81 write_type=WriteType.APPEND.value, 82 data_format=spec.args.get("data_format", OutputFormat.DELTAFILES.value), 83 options=spec.args["materialized_cdf_options"], 84 partitions=["_commit_timestamp"], 85 ), 86 df=df, 87 data=data, 88 ).write() 89 90 @staticmethod 91 def _get_table_cdf_input_specs(spec: TerminatorSpec) -> InputSpec: 92 """Get the input specifications from a terminator spec. 93 94 Args: 95 spec: terminator specifications. 96 97 Returns: 98 List of input specifications. 99 """ 100 options = { 101 "readChangeFeed": "true", 102 **spec.args.get("db_table_options", {}), 103 } 104 105 input_specs = InputSpec( 106 spec_id="input_table", 107 db_table=spec.args["db_table"], 108 read_type=ReadType.STREAMING.value, 109 data_format=OutputFormat.DELTAFILES.value, 110 options=options, 111 ) 112 113 return input_specs 114 115 @classmethod 116 def delete_old_data(cls, spec: TerminatorSpec) -> None: 117 """Delete old data from cdf delta table. 118 119 Args: 120 spec: terminator specifications. 121 """ 122 today_datetime = datetime.today() 123 limit_date = today_datetime + timedelta( 124 days=spec.args.get("days_to_keep", 30) * -1 125 ) 126 limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S") 127 128 cdf_delta_table = DeltaTable.forPath( 129 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 130 ) 131 132 cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp) 133 134 @classmethod 135 def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None: 136 """Vacuum old data from cdf delta table. 137 138 Args: 139 spec: terminator specifications. 140 """ 141 cdf_delta_table = DeltaTable.forPath( 142 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 143 ) 144 145 cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))
Change data feed processor class.
30 @classmethod 31 def expose_cdf(cls, spec: TerminatorSpec) -> None: 32 """Expose CDF to external location. 33 34 Args: 35 spec: terminator specification. 36 """ 37 cls._logger.info("Reading CDF from input table...") 38 39 df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec)) 40 new_df_cdf = df_cdf.withColumn( 41 "_commit_timestamp", 42 date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"), 43 ) 44 45 cls._logger.info("Writing CDF to external table...") 46 cls._write_cdf_to_external( 47 spec, 48 new_df_cdf.repartition( 49 spec.args.get( 50 "materialized_cdf_num_partitions", col("_commit_timestamp") 51 ) 52 ), 53 ) 54 55 # used to delete old data on CDF table (don't remove parquet). 56 if spec.args.get("clean_cdf", True): 57 cls._logger.info("Cleaning CDF table...") 58 cls.delete_old_data(spec) 59 60 # used to delete old parquet files. 61 if spec.args.get("vacuum_cdf", False): 62 cls._logger.info("Vacuuming CDF table...") 63 cls.vacuum_cdf_data(spec)
Expose CDF to external location.
Arguments:
- spec: terminator specification.
@classmethod
def
delete_old_data(cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> None:
115 @classmethod 116 def delete_old_data(cls, spec: TerminatorSpec) -> None: 117 """Delete old data from cdf delta table. 118 119 Args: 120 spec: terminator specifications. 121 """ 122 today_datetime = datetime.today() 123 limit_date = today_datetime + timedelta( 124 days=spec.args.get("days_to_keep", 30) * -1 125 ) 126 limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S") 127 128 cdf_delta_table = DeltaTable.forPath( 129 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 130 ) 131 132 cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp)
Delete old data from cdf delta table.
Arguments:
- spec: terminator specifications.
@classmethod
def
vacuum_cdf_data(cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> None:
134 @classmethod 135 def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None: 136 """Vacuum old data from cdf delta table. 137 138 Args: 139 spec: terminator specifications. 140 """ 141 cdf_delta_table = DeltaTable.forPath( 142 ExecEnv.SESSION, spec.args["materialized_cdf_location"] 143 ) 144 145 cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))
Vacuum old data from cdf delta table.
Arguments:
- spec: terminator specifications.