lakehouse_engine.terminators.cdf_processor

Defines change data feed processor behaviour.

  1"""Defines change data feed processor behaviour."""
  2
  3from datetime import datetime, timedelta
  4from typing import OrderedDict
  5
  6from delta.tables import DeltaTable
  7from pyspark.sql import DataFrame
  8from pyspark.sql.functions import col, date_format
  9
 10from lakehouse_engine.core.definitions import (
 11    InputSpec,
 12    OutputFormat,
 13    OutputSpec,
 14    ReadType,
 15    TerminatorSpec,
 16    WriteType,
 17)
 18from lakehouse_engine.core.exec_env import ExecEnv
 19from lakehouse_engine.io.reader_factory import ReaderFactory
 20from lakehouse_engine.io.writer_factory import WriterFactory
 21from lakehouse_engine.utils.logging_handler import LoggingHandler
 22
 23
 24class CDFProcessor(object):
 25    """Change data feed processor class."""
 26
 27    _logger = LoggingHandler(__name__).get_logger()
 28
 29    @classmethod
 30    def expose_cdf(cls, spec: TerminatorSpec) -> None:
 31        """Expose CDF to external location.
 32
 33        Args:
 34            spec: terminator specification.
 35        """
 36        cls._logger.info("Reading CDF from input table...")
 37
 38        df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec))
 39        new_df_cdf = df_cdf.withColumn(
 40            "_commit_timestamp",
 41            date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"),
 42        )
 43
 44        cls._logger.info("Writing CDF to external table...")
 45        cls._write_cdf_to_external(
 46            spec,
 47            new_df_cdf.repartition(
 48                spec.args.get(
 49                    "materialized_cdf_num_partitions", col("_commit_timestamp")
 50                )
 51            ),
 52        )
 53
 54        # used to delete old data on CDF table (don't remove parquet).
 55        if spec.args.get("clean_cdf", True):
 56            cls._logger.info("Cleaning CDF table...")
 57            cls.delete_old_data(spec)
 58
 59        # used to delete old parquet files.
 60        if spec.args.get("vacuum_cdf", False):
 61            cls._logger.info("Vacuuming CDF table...")
 62            cls.vacuum_cdf_data(spec)
 63
 64    @staticmethod
 65    def _write_cdf_to_external(
 66        spec: TerminatorSpec, df: DataFrame, data: OrderedDict = None
 67    ) -> None:
 68        """Write cdf results dataframe.
 69
 70        Args:
 71            spec: terminator specification.
 72            df: dataframe with cdf results to write.
 73            data: list of all dfs generated on previous steps before writer.
 74        """
 75        WriterFactory.get_writer(
 76            spec=OutputSpec(
 77                spec_id="materialized_cdf",
 78                input_id="input_table",
 79                location=spec.args["materialized_cdf_location"],
 80                write_type=WriteType.APPEND.value,
 81                data_format=spec.args.get("data_format", OutputFormat.DELTAFILES.value),
 82                options=spec.args["materialized_cdf_options"],
 83                partitions=["_commit_timestamp"],
 84            ),
 85            df=df,
 86            data=data,
 87        ).write()
 88
 89    @staticmethod
 90    def _get_table_cdf_input_specs(spec: TerminatorSpec) -> InputSpec:
 91        """Get the input specifications from a terminator spec.
 92
 93        Args:
 94            spec: terminator specifications.
 95
 96        Returns:
 97            List of input specifications.
 98        """
 99        options = {
100            "readChangeFeed": "true",
101            **spec.args.get("db_table_options", {}),
102        }
103
104        input_specs = InputSpec(
105            spec_id="input_table",
106            db_table=spec.args["db_table"],
107            read_type=ReadType.STREAMING.value,
108            data_format=OutputFormat.DELTAFILES.value,
109            options=options,
110        )
111
112        return input_specs
113
114    @classmethod
115    def delete_old_data(cls, spec: TerminatorSpec) -> None:
116        """Delete old data from cdf delta table.
117
118        Args:
119            spec: terminator specifications.
120        """
121        today_datetime = datetime.today()
122        limit_date = today_datetime + timedelta(
123            days=spec.args.get("days_to_keep", 30) * -1
124        )
125        limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S")
126
127        cdf_delta_table = DeltaTable.forPath(
128            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
129        )
130
131        cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp)
132
133    @classmethod
134    def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None:
135        """Vacuum old data from cdf delta table.
136
137        Args:
138            spec: terminator specifications.
139        """
140        cdf_delta_table = DeltaTable.forPath(
141            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
142        )
143
144        cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))
class CDFProcessor:
 25class CDFProcessor(object):
 26    """Change data feed processor class."""
 27
 28    _logger = LoggingHandler(__name__).get_logger()
 29
 30    @classmethod
 31    def expose_cdf(cls, spec: TerminatorSpec) -> None:
 32        """Expose CDF to external location.
 33
 34        Args:
 35            spec: terminator specification.
 36        """
 37        cls._logger.info("Reading CDF from input table...")
 38
 39        df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec))
 40        new_df_cdf = df_cdf.withColumn(
 41            "_commit_timestamp",
 42            date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"),
 43        )
 44
 45        cls._logger.info("Writing CDF to external table...")
 46        cls._write_cdf_to_external(
 47            spec,
 48            new_df_cdf.repartition(
 49                spec.args.get(
 50                    "materialized_cdf_num_partitions", col("_commit_timestamp")
 51                )
 52            ),
 53        )
 54
 55        # used to delete old data on CDF table (don't remove parquet).
 56        if spec.args.get("clean_cdf", True):
 57            cls._logger.info("Cleaning CDF table...")
 58            cls.delete_old_data(spec)
 59
 60        # used to delete old parquet files.
 61        if spec.args.get("vacuum_cdf", False):
 62            cls._logger.info("Vacuuming CDF table...")
 63            cls.vacuum_cdf_data(spec)
 64
 65    @staticmethod
 66    def _write_cdf_to_external(
 67        spec: TerminatorSpec, df: DataFrame, data: OrderedDict = None
 68    ) -> None:
 69        """Write cdf results dataframe.
 70
 71        Args:
 72            spec: terminator specification.
 73            df: dataframe with cdf results to write.
 74            data: list of all dfs generated on previous steps before writer.
 75        """
 76        WriterFactory.get_writer(
 77            spec=OutputSpec(
 78                spec_id="materialized_cdf",
 79                input_id="input_table",
 80                location=spec.args["materialized_cdf_location"],
 81                write_type=WriteType.APPEND.value,
 82                data_format=spec.args.get("data_format", OutputFormat.DELTAFILES.value),
 83                options=spec.args["materialized_cdf_options"],
 84                partitions=["_commit_timestamp"],
 85            ),
 86            df=df,
 87            data=data,
 88        ).write()
 89
 90    @staticmethod
 91    def _get_table_cdf_input_specs(spec: TerminatorSpec) -> InputSpec:
 92        """Get the input specifications from a terminator spec.
 93
 94        Args:
 95            spec: terminator specifications.
 96
 97        Returns:
 98            List of input specifications.
 99        """
100        options = {
101            "readChangeFeed": "true",
102            **spec.args.get("db_table_options", {}),
103        }
104
105        input_specs = InputSpec(
106            spec_id="input_table",
107            db_table=spec.args["db_table"],
108            read_type=ReadType.STREAMING.value,
109            data_format=OutputFormat.DELTAFILES.value,
110            options=options,
111        )
112
113        return input_specs
114
115    @classmethod
116    def delete_old_data(cls, spec: TerminatorSpec) -> None:
117        """Delete old data from cdf delta table.
118
119        Args:
120            spec: terminator specifications.
121        """
122        today_datetime = datetime.today()
123        limit_date = today_datetime + timedelta(
124            days=spec.args.get("days_to_keep", 30) * -1
125        )
126        limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S")
127
128        cdf_delta_table = DeltaTable.forPath(
129            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
130        )
131
132        cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp)
133
134    @classmethod
135    def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None:
136        """Vacuum old data from cdf delta table.
137
138        Args:
139            spec: terminator specifications.
140        """
141        cdf_delta_table = DeltaTable.forPath(
142            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
143        )
144
145        cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))

Change data feed processor class.

@classmethod
def expose_cdf(cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> None:
30    @classmethod
31    def expose_cdf(cls, spec: TerminatorSpec) -> None:
32        """Expose CDF to external location.
33
34        Args:
35            spec: terminator specification.
36        """
37        cls._logger.info("Reading CDF from input table...")
38
39        df_cdf = ReaderFactory.get_data(cls._get_table_cdf_input_specs(spec))
40        new_df_cdf = df_cdf.withColumn(
41            "_commit_timestamp",
42            date_format(col("_commit_timestamp"), "yyyyMMddHHmmss"),
43        )
44
45        cls._logger.info("Writing CDF to external table...")
46        cls._write_cdf_to_external(
47            spec,
48            new_df_cdf.repartition(
49                spec.args.get(
50                    "materialized_cdf_num_partitions", col("_commit_timestamp")
51                )
52            ),
53        )
54
55        # used to delete old data on CDF table (don't remove parquet).
56        if spec.args.get("clean_cdf", True):
57            cls._logger.info("Cleaning CDF table...")
58            cls.delete_old_data(spec)
59
60        # used to delete old parquet files.
61        if spec.args.get("vacuum_cdf", False):
62            cls._logger.info("Vacuuming CDF table...")
63            cls.vacuum_cdf_data(spec)

Expose CDF to external location.

Arguments:
  • spec: terminator specification.
@classmethod
def delete_old_data(cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> None:
115    @classmethod
116    def delete_old_data(cls, spec: TerminatorSpec) -> None:
117        """Delete old data from cdf delta table.
118
119        Args:
120            spec: terminator specifications.
121        """
122        today_datetime = datetime.today()
123        limit_date = today_datetime + timedelta(
124            days=spec.args.get("days_to_keep", 30) * -1
125        )
126        limit_timestamp = limit_date.strftime("%Y%m%d%H%M%S")
127
128        cdf_delta_table = DeltaTable.forPath(
129            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
130        )
131
132        cdf_delta_table.delete(col("_commit_timestamp") < limit_timestamp)

Delete old data from cdf delta table.

Arguments:
  • spec: terminator specifications.
@classmethod
def vacuum_cdf_data(cls, spec: lakehouse_engine.core.definitions.TerminatorSpec) -> None:
134    @classmethod
135    def vacuum_cdf_data(cls, spec: TerminatorSpec) -> None:
136        """Vacuum old data from cdf delta table.
137
138        Args:
139            spec: terminator specifications.
140        """
141        cdf_delta_table = DeltaTable.forPath(
142            ExecEnv.SESSION, spec.args["materialized_cdf_location"]
143        )
144
145        cdf_delta_table.vacuum(spec.args.get("vacuum_hours", 168))

Vacuum old data from cdf delta table.

Arguments:
  • spec: terminator specifications.