lakehouse_engine.transformers.unions
Module with union transformers.
1"""Module with union transformers.""" 2 3from functools import reduce 4from typing import Callable, List 5 6from pyspark.sql import DataFrame 7 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9 10 11class Unions(object): 12 """Class containing union transformers.""" 13 14 _logger = LoggingHandler(__name__).get_logger() 15 16 @classmethod 17 def union( 18 cls, 19 union_with: List[DataFrame], 20 deduplication: bool = True, 21 ) -> Callable: 22 """Union dataframes, resolving columns by position (not by name). 23 24 Args: 25 union_with: list of dataframes to union. 26 deduplication: whether to perform deduplication of elements or not. 27 28 Returns: 29 A function to be called in .transform() spark function. 30 """ 31 32 def inner(df: DataFrame) -> DataFrame: 33 union_df = reduce(DataFrame.union, [df] + union_with) 34 35 return union_df.distinct() if deduplication else union_df 36 37 return inner 38 39 @classmethod 40 def union_by_name( 41 cls, 42 union_with: List[DataFrame], 43 deduplication: bool = True, 44 allow_missing_columns: bool = True, 45 ) -> Callable: 46 """Union dataframes, resolving columns by name (not by position). 47 48 Args: 49 union_with: list of dataframes to union. 50 deduplication: whether to perform deduplication of elements or not. 51 allow_missing_columns: allow the union of DataFrames with different 52 schemas. 53 54 Returns: 55 A function to be called in .transform() spark function. 56 """ 57 58 def inner(df: DataFrame) -> DataFrame: 59 union_df = reduce( 60 lambda x, y: x.unionByName( 61 y, allowMissingColumns=allow_missing_columns 62 ), 63 [df] + union_with, 64 ) 65 66 return union_df.distinct() if deduplication else union_df 67 68 return inner
class
Unions:
12class Unions(object): 13 """Class containing union transformers.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def union( 19 cls, 20 union_with: List[DataFrame], 21 deduplication: bool = True, 22 ) -> Callable: 23 """Union dataframes, resolving columns by position (not by name). 24 25 Args: 26 union_with: list of dataframes to union. 27 deduplication: whether to perform deduplication of elements or not. 28 29 Returns: 30 A function to be called in .transform() spark function. 31 """ 32 33 def inner(df: DataFrame) -> DataFrame: 34 union_df = reduce(DataFrame.union, [df] + union_with) 35 36 return union_df.distinct() if deduplication else union_df 37 38 return inner 39 40 @classmethod 41 def union_by_name( 42 cls, 43 union_with: List[DataFrame], 44 deduplication: bool = True, 45 allow_missing_columns: bool = True, 46 ) -> Callable: 47 """Union dataframes, resolving columns by name (not by position). 48 49 Args: 50 union_with: list of dataframes to union. 51 deduplication: whether to perform deduplication of elements or not. 52 allow_missing_columns: allow the union of DataFrames with different 53 schemas. 54 55 Returns: 56 A function to be called in .transform() spark function. 57 """ 58 59 def inner(df: DataFrame) -> DataFrame: 60 union_df = reduce( 61 lambda x, y: x.unionByName( 62 y, allowMissingColumns=allow_missing_columns 63 ), 64 [df] + union_with, 65 ) 66 67 return union_df.distinct() if deduplication else union_df 68 69 return inner
Class containing union transformers.
@classmethod
def
union( cls, union_with: List[pyspark.sql.dataframe.DataFrame], deduplication: bool = True) -> Callable:
17 @classmethod 18 def union( 19 cls, 20 union_with: List[DataFrame], 21 deduplication: bool = True, 22 ) -> Callable: 23 """Union dataframes, resolving columns by position (not by name). 24 25 Args: 26 union_with: list of dataframes to union. 27 deduplication: whether to perform deduplication of elements or not. 28 29 Returns: 30 A function to be called in .transform() spark function. 31 """ 32 33 def inner(df: DataFrame) -> DataFrame: 34 union_df = reduce(DataFrame.union, [df] + union_with) 35 36 return union_df.distinct() if deduplication else union_df 37 38 return inner
Union dataframes, resolving columns by position (not by name).
Arguments:
- union_with: list of dataframes to union.
- deduplication: whether to perform deduplication of elements or not.
Returns:
A function to be called in .transform() spark function.
@classmethod
def
union_by_name( cls, union_with: List[pyspark.sql.dataframe.DataFrame], deduplication: bool = True, allow_missing_columns: bool = True) -> Callable:
40 @classmethod 41 def union_by_name( 42 cls, 43 union_with: List[DataFrame], 44 deduplication: bool = True, 45 allow_missing_columns: bool = True, 46 ) -> Callable: 47 """Union dataframes, resolving columns by name (not by position). 48 49 Args: 50 union_with: list of dataframes to union. 51 deduplication: whether to perform deduplication of elements or not. 52 allow_missing_columns: allow the union of DataFrames with different 53 schemas. 54 55 Returns: 56 A function to be called in .transform() spark function. 57 """ 58 59 def inner(df: DataFrame) -> DataFrame: 60 union_df = reduce( 61 lambda x, y: x.unionByName( 62 y, allowMissingColumns=allow_missing_columns 63 ), 64 [df] + union_with, 65 ) 66 67 return union_df.distinct() if deduplication else union_df 68 69 return inner
Union dataframes, resolving columns by name (not by position).
Arguments:
- union_with: list of dataframes to union.
- deduplication: whether to perform deduplication of elements or not.
- allow_missing_columns: allow the union of DataFrames with different schemas.
Returns:
A function to be called in .transform() spark function.