lakehouse_engine.transformers.aggregators

Aggregators module.

 1"""Aggregators module."""
 2
 3from typing import Callable
 4
 5from pyspark.sql import DataFrame
 6from pyspark.sql.functions import col, max  # noqa: A004
 7
 8from lakehouse_engine.utils.logging_handler import LoggingHandler
 9
10
11class Aggregators(object):
12    """Class containing all aggregation functions."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @staticmethod
17    def get_max_value(input_col: str, output_col: str = "latest") -> Callable:
18        """Get the maximum value of a given column of a dataframe.
19
20        Args:
21            input_col: name of the input column.
22            output_col: name of the output column (defaults to "latest").
23
24        Returns:
25            A function to be executed in the .transform() spark function.
26        """
27
28        def inner(df: DataFrame) -> DataFrame:
29            return df.select(col(input_col)).agg(max(input_col).alias(output_col))
30
31        return inner
class Aggregators:
12class Aggregators(object):
13    """Class containing all aggregation functions."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @staticmethod
18    def get_max_value(input_col: str, output_col: str = "latest") -> Callable:
19        """Get the maximum value of a given column of a dataframe.
20
21        Args:
22            input_col: name of the input column.
23            output_col: name of the output column (defaults to "latest").
24
25        Returns:
26            A function to be executed in the .transform() spark function.
27        """
28
29        def inner(df: DataFrame) -> DataFrame:
30            return df.select(col(input_col)).agg(max(input_col).alias(output_col))
31
32        return inner

Class containing all aggregation functions.

@staticmethod
def get_max_value(input_col: str, output_col: str = 'latest') -> Callable:
17    @staticmethod
18    def get_max_value(input_col: str, output_col: str = "latest") -> Callable:
19        """Get the maximum value of a given column of a dataframe.
20
21        Args:
22            input_col: name of the input column.
23            output_col: name of the output column (defaults to "latest").
24
25        Returns:
26            A function to be executed in the .transform() spark function.
27        """
28
29        def inner(df: DataFrame) -> DataFrame:
30            return df.select(col(input_col)).agg(max(input_col).alias(output_col))
31
32        return inner

Get the maximum value of a given column of a dataframe.

Arguments:
  • input_col: name of the input column.
  • output_col: name of the output column (defaults to "latest").
Returns:

A function to be executed in the .transform() spark function.

View Example
28{
29    "function": "get_max_value",
30    "args": {
31        "input_col": "extraction_date"
32    }
33}
View Full Acon