lakehouse_engine.transformers.column_creators
Column creators transformers module.
1"""Column creators transformers module.""" 2 3from typing import Any, Callable, Dict 4 5from pyspark.sql import DataFrame, Window 6from pyspark.sql.functions import col, lit, monotonically_increasing_id, row_number 7from pyspark.sql.types import IntegerType 8 9from lakehouse_engine.transformers.exceptions import ( 10 UnsupportedStreamingTransformerException, 11) 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13 14 15class ColumnCreators(object): 16 """Class containing all functions that can create columns to add value.""" 17 18 _logger = LoggingHandler(__name__).get_logger() 19 20 @classmethod 21 def with_row_id( 22 cls, 23 output_col: str = "lhe_row_id", 24 ) -> Callable: 25 """Create a sequential but not consecutive id. 26 27 Args: 28 output_col: optional name of the output column. 29 30 Returns: 31 A function to be executed in the .transform() spark function. 32 """ 33 34 def inner(df: DataFrame) -> DataFrame: 35 if not df.isStreaming: 36 return df.withColumn(output_col, monotonically_increasing_id()) 37 else: 38 raise UnsupportedStreamingTransformerException( 39 "Transformer with_row_id is not supported in streaming mode." 40 ) 41 42 return inner 43 44 @classmethod 45 def with_auto_increment_id( 46 cls, output_col: str = "lhe_row_id", rdd: bool = True 47 ) -> Callable: 48 """Create a sequential and consecutive id. 49 50 Args: 51 output_col: optional name of the output column. 52 rdd: optional parameter to use spark rdd. 53 54 Returns: 55 A function to be executed in the .transform() spark function. 56 """ 57 58 def inner(df: DataFrame) -> DataFrame: 59 if not df.isStreaming: 60 if len(df.take(1)) == 0: 61 # if df is empty we have to prevent the algorithm from failing 62 return df.withColumn(output_col, lit(None).cast(IntegerType())) 63 elif rdd: 64 return ( 65 df.rdd.zipWithIndex() 66 .toDF() 67 .select(col("_1.*"), col("_2").alias(output_col)) 68 ) 69 else: 70 w = Window.orderBy(monotonically_increasing_id()) 71 return df.withColumn(output_col, (row_number().over(w)) - 1) 72 73 else: 74 raise UnsupportedStreamingTransformerException( 75 "Transformer with_auto_increment_id is not supported in " 76 "streaming mode." 77 ) 78 79 return inner 80 81 @classmethod 82 def with_literals( 83 cls, 84 literals: Dict[str, Any], 85 ) -> Callable: 86 """Create columns given a map of column names and literal values (constants). 87 88 Args: 89 Dict[str, Any] literals: map of column names and literal values (constants). 90 91 Returns: 92 Callable: A function to be executed in the .transform() spark function. 93 """ 94 95 def inner(df: DataFrame) -> DataFrame: 96 df_with_literals = df 97 for name, value in literals.items(): 98 df_with_literals = df_with_literals.withColumn(name, lit(value)) 99 return df_with_literals 100 101 return inner
class
ColumnCreators:
16class ColumnCreators(object): 17 """Class containing all functions that can create columns to add value.""" 18 19 _logger = LoggingHandler(__name__).get_logger() 20 21 @classmethod 22 def with_row_id( 23 cls, 24 output_col: str = "lhe_row_id", 25 ) -> Callable: 26 """Create a sequential but not consecutive id. 27 28 Args: 29 output_col: optional name of the output column. 30 31 Returns: 32 A function to be executed in the .transform() spark function. 33 """ 34 35 def inner(df: DataFrame) -> DataFrame: 36 if not df.isStreaming: 37 return df.withColumn(output_col, monotonically_increasing_id()) 38 else: 39 raise UnsupportedStreamingTransformerException( 40 "Transformer with_row_id is not supported in streaming mode." 41 ) 42 43 return inner 44 45 @classmethod 46 def with_auto_increment_id( 47 cls, output_col: str = "lhe_row_id", rdd: bool = True 48 ) -> Callable: 49 """Create a sequential and consecutive id. 50 51 Args: 52 output_col: optional name of the output column. 53 rdd: optional parameter to use spark rdd. 54 55 Returns: 56 A function to be executed in the .transform() spark function. 57 """ 58 59 def inner(df: DataFrame) -> DataFrame: 60 if not df.isStreaming: 61 if len(df.take(1)) == 0: 62 # if df is empty we have to prevent the algorithm from failing 63 return df.withColumn(output_col, lit(None).cast(IntegerType())) 64 elif rdd: 65 return ( 66 df.rdd.zipWithIndex() 67 .toDF() 68 .select(col("_1.*"), col("_2").alias(output_col)) 69 ) 70 else: 71 w = Window.orderBy(monotonically_increasing_id()) 72 return df.withColumn(output_col, (row_number().over(w)) - 1) 73 74 else: 75 raise UnsupportedStreamingTransformerException( 76 "Transformer with_auto_increment_id is not supported in " 77 "streaming mode." 78 ) 79 80 return inner 81 82 @classmethod 83 def with_literals( 84 cls, 85 literals: Dict[str, Any], 86 ) -> Callable: 87 """Create columns given a map of column names and literal values (constants). 88 89 Args: 90 Dict[str, Any] literals: map of column names and literal values (constants). 91 92 Returns: 93 Callable: A function to be executed in the .transform() spark function. 94 """ 95 96 def inner(df: DataFrame) -> DataFrame: 97 df_with_literals = df 98 for name, value in literals.items(): 99 df_with_literals = df_with_literals.withColumn(name, lit(value)) 100 return df_with_literals 101 102 return inner
Class containing all functions that can create columns to add value.
@classmethod
def
with_row_id(cls, output_col: str = 'lhe_row_id') -> Callable:
21 @classmethod 22 def with_row_id( 23 cls, 24 output_col: str = "lhe_row_id", 25 ) -> Callable: 26 """Create a sequential but not consecutive id. 27 28 Args: 29 output_col: optional name of the output column. 30 31 Returns: 32 A function to be executed in the .transform() spark function. 33 """ 34 35 def inner(df: DataFrame) -> DataFrame: 36 if not df.isStreaming: 37 return df.withColumn(output_col, monotonically_increasing_id()) 38 else: 39 raise UnsupportedStreamingTransformerException( 40 "Transformer with_row_id is not supported in streaming mode." 41 ) 42 43 return inner
Create a sequential but not consecutive id.
Arguments:
- output_col: optional name of the output column.
Returns:
A function to be executed in the .transform() spark function.
@classmethod
def
with_auto_increment_id(cls, output_col: str = 'lhe_row_id', rdd: bool = True) -> Callable:
45 @classmethod 46 def with_auto_increment_id( 47 cls, output_col: str = "lhe_row_id", rdd: bool = True 48 ) -> Callable: 49 """Create a sequential and consecutive id. 50 51 Args: 52 output_col: optional name of the output column. 53 rdd: optional parameter to use spark rdd. 54 55 Returns: 56 A function to be executed in the .transform() spark function. 57 """ 58 59 def inner(df: DataFrame) -> DataFrame: 60 if not df.isStreaming: 61 if len(df.take(1)) == 0: 62 # if df is empty we have to prevent the algorithm from failing 63 return df.withColumn(output_col, lit(None).cast(IntegerType())) 64 elif rdd: 65 return ( 66 df.rdd.zipWithIndex() 67 .toDF() 68 .select(col("_1.*"), col("_2").alias(output_col)) 69 ) 70 else: 71 w = Window.orderBy(monotonically_increasing_id()) 72 return df.withColumn(output_col, (row_number().over(w)) - 1) 73 74 else: 75 raise UnsupportedStreamingTransformerException( 76 "Transformer with_auto_increment_id is not supported in " 77 "streaming mode." 78 ) 79 80 return inner
Create a sequential and consecutive id.
Arguments:
- output_col: optional name of the output column.
- rdd: optional parameter to use spark rdd.
Returns:
A function to be executed in the .transform() spark function.
@classmethod
def
with_literals(cls, literals: Dict[str, Any]) -> Callable:
82 @classmethod 83 def with_literals( 84 cls, 85 literals: Dict[str, Any], 86 ) -> Callable: 87 """Create columns given a map of column names and literal values (constants). 88 89 Args: 90 Dict[str, Any] literals: map of column names and literal values (constants). 91 92 Returns: 93 Callable: A function to be executed in the .transform() spark function. 94 """ 95 96 def inner(df: DataFrame) -> DataFrame: 97 df_with_literals = df 98 for name, value in literals.items(): 99 df_with_literals = df_with_literals.withColumn(name, lit(value)) 100 return df_with_literals 101 102 return inner
Create columns given a map of column names and literal values (constants).
Arguments:
- Dict[str, Any] literals: map of column names and literal values (constants).
Returns:
Callable: A function to be executed in the .transform() spark function.