lakehouse_engine.transformers.column_creators

Column creators transformers module.

  1"""Column creators transformers module."""
  2
  3from typing import Any, Callable, Dict
  4
  5from pyspark.sql import DataFrame, Window
  6from pyspark.sql.functions import col, lit, monotonically_increasing_id, row_number
  7from pyspark.sql.types import IntegerType
  8
  9from lakehouse_engine.transformers.exceptions import (
 10    UnsupportedStreamingTransformerException,
 11)
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13
 14
 15class ColumnCreators(object):
 16    """Class containing all functions that can create columns to add value."""
 17
 18    _logger = LoggingHandler(__name__).get_logger()
 19
 20    @classmethod
 21    def with_row_id(
 22        cls,
 23        output_col: str = "lhe_row_id",
 24    ) -> Callable:
 25        """Create a sequential but not consecutive id.
 26
 27        Args:
 28            output_col: optional name of the output column.
 29
 30        Returns:
 31            A function to be executed in the .transform() spark function.
 32        """
 33
 34        def inner(df: DataFrame) -> DataFrame:
 35            if not df.isStreaming:
 36                return df.withColumn(output_col, monotonically_increasing_id())
 37            else:
 38                raise UnsupportedStreamingTransformerException(
 39                    "Transformer with_row_id is not supported in streaming mode."
 40                )
 41
 42        return inner
 43
 44    @classmethod
 45    def with_auto_increment_id(
 46        cls, output_col: str = "lhe_row_id", rdd: bool = True
 47    ) -> Callable:
 48        """Create a sequential and consecutive id.
 49
 50        Args:
 51            output_col: optional name of the output column.
 52            rdd: optional parameter to use spark rdd.
 53
 54        Returns:
 55            A function to be executed in the .transform() spark function.
 56        """
 57
 58        def inner(df: DataFrame) -> DataFrame:
 59            if not df.isStreaming:
 60                if len(df.take(1)) == 0:
 61                    # if df is empty we have to prevent the algorithm from failing
 62                    return df.withColumn(output_col, lit(None).cast(IntegerType()))
 63                elif rdd:
 64                    return (
 65                        df.rdd.zipWithIndex()
 66                        .toDF()
 67                        .select(col("_1.*"), col("_2").alias(output_col))
 68                    )
 69                else:
 70                    w = Window.orderBy(monotonically_increasing_id())
 71                    return df.withColumn(output_col, (row_number().over(w)) - 1)
 72
 73            else:
 74                raise UnsupportedStreamingTransformerException(
 75                    "Transformer with_auto_increment_id is not supported in "
 76                    "streaming mode."
 77                )
 78
 79        return inner
 80
 81    @classmethod
 82    def with_literals(
 83        cls,
 84        literals: Dict[str, Any],
 85    ) -> Callable:
 86        """Create columns given a map of column names and literal values (constants).
 87
 88        Args:
 89            Dict[str, Any] literals: map of column names and literal values (constants).
 90
 91        Returns:
 92            Callable: A function to be executed in the .transform() spark function.
 93        """
 94
 95        def inner(df: DataFrame) -> DataFrame:
 96            df_with_literals = df
 97            for name, value in literals.items():
 98                df_with_literals = df_with_literals.withColumn(name, lit(value))
 99            return df_with_literals
100
101        return inner
class ColumnCreators:
 16class ColumnCreators(object):
 17    """Class containing all functions that can create columns to add value."""
 18
 19    _logger = LoggingHandler(__name__).get_logger()
 20
 21    @classmethod
 22    def with_row_id(
 23        cls,
 24        output_col: str = "lhe_row_id",
 25    ) -> Callable:
 26        """Create a sequential but not consecutive id.
 27
 28        Args:
 29            output_col: optional name of the output column.
 30
 31        Returns:
 32            A function to be executed in the .transform() spark function.
 33        """
 34
 35        def inner(df: DataFrame) -> DataFrame:
 36            if not df.isStreaming:
 37                return df.withColumn(output_col, monotonically_increasing_id())
 38            else:
 39                raise UnsupportedStreamingTransformerException(
 40                    "Transformer with_row_id is not supported in streaming mode."
 41                )
 42
 43        return inner
 44
 45    @classmethod
 46    def with_auto_increment_id(
 47        cls, output_col: str = "lhe_row_id", rdd: bool = True
 48    ) -> Callable:
 49        """Create a sequential and consecutive id.
 50
 51        Args:
 52            output_col: optional name of the output column.
 53            rdd: optional parameter to use spark rdd.
 54
 55        Returns:
 56            A function to be executed in the .transform() spark function.
 57        """
 58
 59        def inner(df: DataFrame) -> DataFrame:
 60            if not df.isStreaming:
 61                if len(df.take(1)) == 0:
 62                    # if df is empty we have to prevent the algorithm from failing
 63                    return df.withColumn(output_col, lit(None).cast(IntegerType()))
 64                elif rdd:
 65                    return (
 66                        df.rdd.zipWithIndex()
 67                        .toDF()
 68                        .select(col("_1.*"), col("_2").alias(output_col))
 69                    )
 70                else:
 71                    w = Window.orderBy(monotonically_increasing_id())
 72                    return df.withColumn(output_col, (row_number().over(w)) - 1)
 73
 74            else:
 75                raise UnsupportedStreamingTransformerException(
 76                    "Transformer with_auto_increment_id is not supported in "
 77                    "streaming mode."
 78                )
 79
 80        return inner
 81
 82    @classmethod
 83    def with_literals(
 84        cls,
 85        literals: Dict[str, Any],
 86    ) -> Callable:
 87        """Create columns given a map of column names and literal values (constants).
 88
 89        Args:
 90            Dict[str, Any] literals: map of column names and literal values (constants).
 91
 92        Returns:
 93            Callable: A function to be executed in the .transform() spark function.
 94        """
 95
 96        def inner(df: DataFrame) -> DataFrame:
 97            df_with_literals = df
 98            for name, value in literals.items():
 99                df_with_literals = df_with_literals.withColumn(name, lit(value))
100            return df_with_literals
101
102        return inner

Class containing all functions that can create columns to add value.

@classmethod
def with_row_id(cls, output_col: str = 'lhe_row_id') -> Callable:
21    @classmethod
22    def with_row_id(
23        cls,
24        output_col: str = "lhe_row_id",
25    ) -> Callable:
26        """Create a sequential but not consecutive id.
27
28        Args:
29            output_col: optional name of the output column.
30
31        Returns:
32            A function to be executed in the .transform() spark function.
33        """
34
35        def inner(df: DataFrame) -> DataFrame:
36            if not df.isStreaming:
37                return df.withColumn(output_col, monotonically_increasing_id())
38            else:
39                raise UnsupportedStreamingTransformerException(
40                    "Transformer with_row_id is not supported in streaming mode."
41                )
42
43        return inner

Create a sequential but not consecutive id.

Arguments:
  • output_col: optional name of the output column.
Returns:

A function to be executed in the .transform() spark function.

View Example
92{
93    "function": "with_row_id"
94}
View Full Acon


@classmethod
def with_auto_increment_id(cls, output_col: str = 'lhe_row_id', rdd: bool = True) -> Callable:
45    @classmethod
46    def with_auto_increment_id(
47        cls, output_col: str = "lhe_row_id", rdd: bool = True
48    ) -> Callable:
49        """Create a sequential and consecutive id.
50
51        Args:
52            output_col: optional name of the output column.
53            rdd: optional parameter to use spark rdd.
54
55        Returns:
56            A function to be executed in the .transform() spark function.
57        """
58
59        def inner(df: DataFrame) -> DataFrame:
60            if not df.isStreaming:
61                if len(df.take(1)) == 0:
62                    # if df is empty we have to prevent the algorithm from failing
63                    return df.withColumn(output_col, lit(None).cast(IntegerType()))
64                elif rdd:
65                    return (
66                        df.rdd.zipWithIndex()
67                        .toDF()
68                        .select(col("_1.*"), col("_2").alias(output_col))
69                    )
70                else:
71                    w = Window.orderBy(monotonically_increasing_id())
72                    return df.withColumn(output_col, (row_number().over(w)) - 1)
73
74            else:
75                raise UnsupportedStreamingTransformerException(
76                    "Transformer with_auto_increment_id is not supported in "
77                    "streaming mode."
78                )
79
80        return inner

Create a sequential and consecutive id.

Arguments:
  • output_col: optional name of the output column.
  • rdd: optional parameter to use spark rdd.
Returns:

A function to be executed in the .transform() spark function.

View Example
56{
57    "function": "with_auto_increment_id"
58}
View Full Acon


@classmethod
def with_literals(cls, literals: Dict[str, Any]) -> Callable:
 82    @classmethod
 83    def with_literals(
 84        cls,
 85        literals: Dict[str, Any],
 86    ) -> Callable:
 87        """Create columns given a map of column names and literal values (constants).
 88
 89        Args:
 90            Dict[str, Any] literals: map of column names and literal values (constants).
 91
 92        Returns:
 93            Callable: A function to be executed in the .transform() spark function.
 94        """
 95
 96        def inner(df: DataFrame) -> DataFrame:
 97            df_with_literals = df
 98            for name, value in literals.items():
 99                df_with_literals = df_with_literals.withColumn(name, lit(value))
100            return df_with_literals
101
102        return inner

Create columns given a map of column names and literal values (constants).

Arguments:
  • Dict[str, Any] literals: map of column names and literal values (constants).
Returns:

Callable: A function to be executed in the .transform() spark function.

View Example
21{
22    "function": "with_literals",
23    "args": {
24        "literals": {
25            "dummy_string": "this is a string",
26            "dummy_int": 100,
27            "dummy_double": 10.2,
28            "dummy_boolean": true
29        }
30    }
31}
View Full Acon