Skip to content

Column creators

Column creators transformers module.

ColumnCreators

Bases: object

Class containing all functions that can create columns to add value.

Source code in mkdocs/lakehouse_engine/packages/transformers/column_creators.py
class ColumnCreators(object):
    """Class containing all functions that can create columns to add value."""

    _logger = LoggingHandler(__name__).get_logger()

    @classmethod
    def with_row_id(
        cls,
        output_col: str = "lhe_row_id",
    ) -> Callable:
        """Create a sequential but not consecutive id.

        Args:
            output_col: optional name of the output column.

        Returns:
            A function to be executed in the .transform() spark function.

        {{get_example(method_name='with_row_id')}}
        """

        def inner(df: DataFrame) -> DataFrame:
            if not df.isStreaming:
                return df.withColumn(output_col, monotonically_increasing_id())
            else:
                raise UnsupportedStreamingTransformerException(
                    "Transformer with_row_id is not supported in streaming mode."
                )

        return inner

    @classmethod
    def with_auto_increment_id(
        cls, output_col: str = "lhe_row_id", rdd: bool = True
    ) -> Callable:
        """Create a sequential and consecutive id.

        Args:
            output_col: optional name of the output column.
            rdd: optional parameter to use spark rdd.

        Returns:
            A function to be executed in the .transform() spark function.

        {{get_example(method_name='with_auto_increment_id')}}
        """

        def inner(df: DataFrame) -> DataFrame:
            if not df.isStreaming:
                if len(df.take(1)) == 0:
                    # if df is empty we have to prevent the algorithm from failing
                    return df.withColumn(output_col, lit(None).cast(IntegerType()))
                elif rdd:
                    return (
                        df.rdd.zipWithIndex()
                        .toDF()
                        .select(col("_1.*"), col("_2").alias(output_col))
                    )
                else:
                    w = Window.orderBy(monotonically_increasing_id())
                    return df.withColumn(output_col, (row_number().over(w)) - 1)

            else:
                raise UnsupportedStreamingTransformerException(
                    "Transformer with_auto_increment_id is not supported in "
                    "streaming mode."
                )

        return inner

    @classmethod
    def with_literals(
        cls,
        literals: Dict[str, Any],
    ) -> Callable:
        """Create columns given a map of column names and literal values (constants).

        Args:
            Dict[str, Any] literals: map of column names and literal values (constants).

        Returns:
            Callable: A function to be executed in the .transform() spark function.

        {{get_example(method_name='with_literals')}}
        """

        def inner(df: DataFrame) -> DataFrame:
            df_with_literals = df
            for name, value in literals.items():
                df_with_literals = df_with_literals.withColumn(name, lit(value))
            return df_with_literals

        return inner

with_auto_increment_id(output_col='lhe_row_id', rdd=True) classmethod

Create a sequential and consecutive id.

Parameters:

Name Type Description Default
output_col str

optional name of the output column.

'lhe_row_id'
rdd bool

optional parameter to use spark rdd.

True

Returns:

Type Description
Callable

A function to be executed in the .transform() spark function.

View Example of with_auto_increment_id (See full example here)
56{
57    "function": "with_auto_increment_id"
58}
Source code in mkdocs/lakehouse_engine/packages/transformers/column_creators.py
@classmethod
def with_auto_increment_id(
    cls, output_col: str = "lhe_row_id", rdd: bool = True
) -> Callable:
    """Create a sequential and consecutive id.

    Args:
        output_col: optional name of the output column.
        rdd: optional parameter to use spark rdd.

    Returns:
        A function to be executed in the .transform() spark function.

    {{get_example(method_name='with_auto_increment_id')}}
    """

    def inner(df: DataFrame) -> DataFrame:
        if not df.isStreaming:
            if len(df.take(1)) == 0:
                # if df is empty we have to prevent the algorithm from failing
                return df.withColumn(output_col, lit(None).cast(IntegerType()))
            elif rdd:
                return (
                    df.rdd.zipWithIndex()
                    .toDF()
                    .select(col("_1.*"), col("_2").alias(output_col))
                )
            else:
                w = Window.orderBy(monotonically_increasing_id())
                return df.withColumn(output_col, (row_number().over(w)) - 1)

        else:
            raise UnsupportedStreamingTransformerException(
                "Transformer with_auto_increment_id is not supported in "
                "streaming mode."
            )

    return inner

with_literals(literals) classmethod

Create columns given a map of column names and literal values (constants).

Parameters:

Name Type Description Default
Dict[str, Any] literals

map of column names and literal values (constants).

required

Returns:

Name Type Description
Callable Callable

A function to be executed in the .transform() spark function.

View Example of with_literals (See full example here)
21{
22    "function": "with_literals",
23    "args": {
24        "literals": {
25            "dummy_string": "this is a string",
26            "dummy_int": 100,
27            "dummy_double": 10.2,
28            "dummy_boolean": true
29        }
30    }
31}
Source code in mkdocs/lakehouse_engine/packages/transformers/column_creators.py
@classmethod
def with_literals(
    cls,
    literals: Dict[str, Any],
) -> Callable:
    """Create columns given a map of column names and literal values (constants).

    Args:
        Dict[str, Any] literals: map of column names and literal values (constants).

    Returns:
        Callable: A function to be executed in the .transform() spark function.

    {{get_example(method_name='with_literals')}}
    """

    def inner(df: DataFrame) -> DataFrame:
        df_with_literals = df
        for name, value in literals.items():
            df_with_literals = df_with_literals.withColumn(name, lit(value))
        return df_with_literals

    return inner

with_row_id(output_col='lhe_row_id') classmethod

Create a sequential but not consecutive id.

Parameters:

Name Type Description Default
output_col str

optional name of the output column.

'lhe_row_id'

Returns:

Type Description
Callable

A function to be executed in the .transform() spark function.

View Example of with_row_id (See full example here)
92{
93    "function": "with_row_id"
94}
Source code in mkdocs/lakehouse_engine/packages/transformers/column_creators.py
@classmethod
def with_row_id(
    cls,
    output_col: str = "lhe_row_id",
) -> Callable:
    """Create a sequential but not consecutive id.

    Args:
        output_col: optional name of the output column.

    Returns:
        A function to be executed in the .transform() spark function.

    {{get_example(method_name='with_row_id')}}
    """

    def inner(df: DataFrame) -> DataFrame:
        if not df.isStreaming:
            return df.withColumn(output_col, monotonically_increasing_id())
        else:
            raise UnsupportedStreamingTransformerException(
                "Transformer with_row_id is not supported in streaming mode."
            )

    return inner