lakehouse_engine.transformers.unions

Module with union transformers.

 1"""Module with union transformers."""
 2
 3from functools import reduce
 4from typing import Callable, List
 5
 6from pyspark.sql import DataFrame
 7
 8from lakehouse_engine.utils.logging_handler import LoggingHandler
 9
10
11class Unions(object):
12    """Class containing union transformers."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @classmethod
17    def union(
18        cls,
19        union_with: List[DataFrame],
20        deduplication: bool = True,
21    ) -> Callable:
22        """Union dataframes, resolving columns by position (not by name).
23
24        Args:
25            union_with: list of dataframes to union.
26            deduplication: whether to perform deduplication of elements or not.
27
28        Returns:
29            A function to be called in .transform() spark function.
30        """
31
32        def inner(df: DataFrame) -> DataFrame:
33            union_df = reduce(DataFrame.union, [df] + union_with)
34
35            return union_df.distinct() if deduplication else union_df
36
37        return inner
38
39    @classmethod
40    def union_by_name(
41        cls,
42        union_with: List[DataFrame],
43        deduplication: bool = True,
44        allow_missing_columns: bool = True,
45    ) -> Callable:
46        """Union dataframes, resolving columns by name (not by position).
47
48        Args:
49            union_with: list of dataframes to union.
50            deduplication: whether to perform deduplication of elements or not.
51            allow_missing_columns: allow the union of DataFrames with different
52                schemas.
53
54        Returns:
55            A function to be called in .transform() spark function.
56        """
57
58        def inner(df: DataFrame) -> DataFrame:
59            union_df = reduce(
60                lambda x, y: x.unionByName(
61                    y, allowMissingColumns=allow_missing_columns
62                ),
63                [df] + union_with,
64            )
65
66            return union_df.distinct() if deduplication else union_df
67
68        return inner
class Unions:
12class Unions(object):
13    """Class containing union transformers."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def union(
19        cls,
20        union_with: List[DataFrame],
21        deduplication: bool = True,
22    ) -> Callable:
23        """Union dataframes, resolving columns by position (not by name).
24
25        Args:
26            union_with: list of dataframes to union.
27            deduplication: whether to perform deduplication of elements or not.
28
29        Returns:
30            A function to be called in .transform() spark function.
31        """
32
33        def inner(df: DataFrame) -> DataFrame:
34            union_df = reduce(DataFrame.union, [df] + union_with)
35
36            return union_df.distinct() if deduplication else union_df
37
38        return inner
39
40    @classmethod
41    def union_by_name(
42        cls,
43        union_with: List[DataFrame],
44        deduplication: bool = True,
45        allow_missing_columns: bool = True,
46    ) -> Callable:
47        """Union dataframes, resolving columns by name (not by position).
48
49        Args:
50            union_with: list of dataframes to union.
51            deduplication: whether to perform deduplication of elements or not.
52            allow_missing_columns: allow the union of DataFrames with different
53                schemas.
54
55        Returns:
56            A function to be called in .transform() spark function.
57        """
58
59        def inner(df: DataFrame) -> DataFrame:
60            union_df = reduce(
61                lambda x, y: x.unionByName(
62                    y, allowMissingColumns=allow_missing_columns
63                ),
64                [df] + union_with,
65            )
66
67            return union_df.distinct() if deduplication else union_df
68
69        return inner

Class containing union transformers.

@classmethod
def union( cls, union_with: List[pyspark.sql.dataframe.DataFrame], deduplication: bool = True) -> Callable:
17    @classmethod
18    def union(
19        cls,
20        union_with: List[DataFrame],
21        deduplication: bool = True,
22    ) -> Callable:
23        """Union dataframes, resolving columns by position (not by name).
24
25        Args:
26            union_with: list of dataframes to union.
27            deduplication: whether to perform deduplication of elements or not.
28
29        Returns:
30            A function to be called in .transform() spark function.
31        """
32
33        def inner(df: DataFrame) -> DataFrame:
34            union_df = reduce(DataFrame.union, [df] + union_with)
35
36            return union_df.distinct() if deduplication else union_df
37
38        return inner

Union dataframes, resolving columns by position (not by name).

Arguments:
  • union_with: list of dataframes to union.
  • deduplication: whether to perform deduplication of elements or not.
Returns:

A function to be called in .transform() spark function.

@classmethod
def union_by_name( cls, union_with: List[pyspark.sql.dataframe.DataFrame], deduplication: bool = True, allow_missing_columns: bool = True) -> Callable:
40    @classmethod
41    def union_by_name(
42        cls,
43        union_with: List[DataFrame],
44        deduplication: bool = True,
45        allow_missing_columns: bool = True,
46    ) -> Callable:
47        """Union dataframes, resolving columns by name (not by position).
48
49        Args:
50            union_with: list of dataframes to union.
51            deduplication: whether to perform deduplication of elements or not.
52            allow_missing_columns: allow the union of DataFrames with different
53                schemas.
54
55        Returns:
56            A function to be called in .transform() spark function.
57        """
58
59        def inner(df: DataFrame) -> DataFrame:
60            union_df = reduce(
61                lambda x, y: x.unionByName(
62                    y, allowMissingColumns=allow_missing_columns
63                ),
64                [df] + union_with,
65            )
66
67            return union_df.distinct() if deduplication else union_df
68
69        return inner

Union dataframes, resolving columns by name (not by position).

Arguments:
  • union_with: list of dataframes to union.
  • deduplication: whether to perform deduplication of elements or not.
  • allow_missing_columns: allow the union of DataFrames with different schemas.
Returns:

A function to be called in .transform() spark function.