lakehouse_engine.transformers.watermarker

Watermarker module.

 1"""Watermarker module."""
 2
 3from typing import Callable
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.utils.logging_handler import LoggingHandler
 8
 9
10class Watermarker(object):
11    """Class containing all watermarker transformers."""
12
13    _logger = LoggingHandler(__name__).get_logger()
14
15    @staticmethod
16    def with_watermark(watermarker_column: str, watermarker_time: str) -> Callable:
17        """Get the dataframe with watermarker defined.
18
19        Args:
20            watermarker_column: name of the input column to be considered for
21                the watermarking. Note: it must be a timestamp.
22            watermarker_time: time window to define the watermark value.
23
24        Returns:
25            A function to be executed on other transformers.
26        """
27
28        def inner(df: DataFrame) -> DataFrame:
29            return df.withWatermark(watermarker_column, watermarker_time)
30
31        return inner
class Watermarker:
11class Watermarker(object):
12    """Class containing all watermarker transformers."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @staticmethod
17    def with_watermark(watermarker_column: str, watermarker_time: str) -> Callable:
18        """Get the dataframe with watermarker defined.
19
20        Args:
21            watermarker_column: name of the input column to be considered for
22                the watermarking. Note: it must be a timestamp.
23            watermarker_time: time window to define the watermark value.
24
25        Returns:
26            A function to be executed on other transformers.
27        """
28
29        def inner(df: DataFrame) -> DataFrame:
30            return df.withWatermark(watermarker_column, watermarker_time)
31
32        return inner

Class containing all watermarker transformers.

@staticmethod
def with_watermark(watermarker_column: str, watermarker_time: str) -> Callable:
16    @staticmethod
17    def with_watermark(watermarker_column: str, watermarker_time: str) -> Callable:
18        """Get the dataframe with watermarker defined.
19
20        Args:
21            watermarker_column: name of the input column to be considered for
22                the watermarking. Note: it must be a timestamp.
23            watermarker_time: time window to define the watermark value.
24
25        Returns:
26            A function to be executed on other transformers.
27        """
28
29        def inner(df: DataFrame) -> DataFrame:
30            return df.withWatermark(watermarker_column, watermarker_time)
31
32        return inner

Get the dataframe with watermarker defined.

Arguments:
  • watermarker_column: name of the input column to be considered for the watermarking. Note: it must be a timestamp.
  • watermarker_time: time window to define the watermark value.
Returns:

A function to be executed on other transformers.

View Example
21{
22    "function": "with_watermark",
23    "args": {
24        "watermarker_column": "date",
25        "watermarker_time": "2 days"
26    }
27}
View Full Acon