lakehouse_engine.transformers.aggregators
Aggregators module.
1"""Aggregators module.""" 2 3from typing import Callable 4 5from pyspark.sql import DataFrame 6from pyspark.sql.functions import col, max # noqa: A004 7 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9 10 11class Aggregators(object): 12 """Class containing all aggregation functions.""" 13 14 _logger = LoggingHandler(__name__).get_logger() 15 16 @staticmethod 17 def get_max_value(input_col: str, output_col: str = "latest") -> Callable: 18 """Get the maximum value of a given column of a dataframe. 19 20 Args: 21 input_col: name of the input column. 22 output_col: name of the output column (defaults to "latest"). 23 24 Returns: 25 A function to be executed in the .transform() spark function. 26 """ 27 28 def inner(df: DataFrame) -> DataFrame: 29 return df.select(col(input_col)).agg(max(input_col).alias(output_col)) 30 31 return inner
class
Aggregators:
12class Aggregators(object): 13 """Class containing all aggregation functions.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @staticmethod 18 def get_max_value(input_col: str, output_col: str = "latest") -> Callable: 19 """Get the maximum value of a given column of a dataframe. 20 21 Args: 22 input_col: name of the input column. 23 output_col: name of the output column (defaults to "latest"). 24 25 Returns: 26 A function to be executed in the .transform() spark function. 27 """ 28 29 def inner(df: DataFrame) -> DataFrame: 30 return df.select(col(input_col)).agg(max(input_col).alias(output_col)) 31 32 return inner
Class containing all aggregation functions.
@staticmethod
def
get_max_value(input_col: str, output_col: str = 'latest') -> Callable:
17 @staticmethod 18 def get_max_value(input_col: str, output_col: str = "latest") -> Callable: 19 """Get the maximum value of a given column of a dataframe. 20 21 Args: 22 input_col: name of the input column. 23 output_col: name of the output column (defaults to "latest"). 24 25 Returns: 26 A function to be executed in the .transform() spark function. 27 """ 28 29 def inner(df: DataFrame) -> DataFrame: 30 return df.select(col(input_col)).agg(max(input_col).alias(output_col)) 31 32 return inner
Get the maximum value of a given column of a dataframe.
Arguments:
- input_col: name of the input column.
- output_col: name of the output column (defaults to "latest").
Returns:
A function to be executed in the .transform() spark function.