lakehouse_engine.transformers.column_reshapers
Module with column reshaping transformers.
1"""Module with column reshaping transformers.""" 2 3from collections import OrderedDict 4from typing import Any, Callable, Dict, List, Optional 5 6import pyspark.sql.types as spark_types 7from pyspark.sql import DataFrame 8from pyspark.sql.avro.functions import from_avro 9from pyspark.sql.functions import ( 10 col, 11 explode_outer, 12 expr, 13 from_json, 14 map_entries, 15 struct, 16 to_json, 17) 18 19from lakehouse_engine.transformers.exceptions import WrongArgumentsException 20from lakehouse_engine.utils.logging_handler import LoggingHandler 21from lakehouse_engine.utils.schema_utils import SchemaUtils 22 23 24class ColumnReshapers(object): 25 """Class containing column reshaping transformers.""" 26 27 _logger = LoggingHandler(__name__).get_logger() 28 29 @classmethod 30 def cast(cls, cols: Dict[str, str]) -> Callable: 31 """Cast specific columns into the designated type. 32 33 Args: 34 cols: dict with columns and respective target types. 35 Target types need to have the exact name of spark types: 36 https://spark.apache.org/docs/latest/sql-ref-datatypes.html 37 38 Returns: 39 A function to be called in .transform() spark function. 40 """ 41 42 def inner(df: DataFrame) -> DataFrame: 43 cast_df = df 44 for c, t in cols.items(): 45 cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)())) 46 47 return cast_df 48 49 return inner 50 51 @classmethod 52 def column_selector(cls, cols: OrderedDict) -> Callable: 53 """Select specific columns with specific output aliases. 54 55 Args: 56 cols: dict with columns to select and respective aliases. 57 58 Returns: 59 A function to be called in .transform() spark function. 60 """ 61 62 def inner(df: DataFrame) -> DataFrame: 63 return df.select(*[col(c).alias(a) for c, a in cols.items()]) 64 65 return inner 66 67 @classmethod 68 def flatten_schema( 69 cls, 70 max_level: int = None, 71 shorten_names: bool = False, 72 alias: bool = True, 73 num_chars: int = 7, 74 ignore_cols: List = None, 75 ) -> Callable: 76 """Flatten the schema of the dataframe. 77 78 Args: 79 max_level: level until which you want to flatten the schema. 80 Default: None. 81 shorten_names: whether to shorten the names of the prefixes 82 of the fields being flattened or not. Default: False. 83 alias: whether to define alias for the columns being flattened 84 or not. Default: True. 85 num_chars: number of characters to consider when shortening 86 the names of the fields. Default: 7. 87 ignore_cols: columns which you don't want to flatten. 88 Default: None. 89 90 Returns: 91 A function to be called in .transform() spark function. 92 """ 93 94 def inner(df: DataFrame) -> DataFrame: 95 return df.select( 96 SchemaUtils.schema_flattener( 97 schema=df.schema, 98 max_level=max_level, 99 shorten_names=shorten_names, 100 alias=alias, 101 num_chars=num_chars, 102 ignore_cols=ignore_cols, 103 ) 104 ) 105 106 return inner 107 108 @classmethod 109 def explode_columns( 110 cls, 111 explode_arrays: bool = False, 112 array_cols_to_explode: List[str] = None, 113 explode_maps: bool = False, 114 map_cols_to_explode: List[str] = None, 115 ) -> Callable: 116 """Explode columns with types like ArrayType and MapType. 117 118 After it can be applied the flatten_schema transformation, 119 if we desired for example to explode the map (as we explode a StructType) 120 or to explode a StructType inside the array. 121 We recommend you to specify always the columns desired to explode 122 and not explode all columns. 123 124 Args: 125 explode_arrays: whether you want to explode array columns (True) 126 or not (False). Default: False. 127 array_cols_to_explode: array columns which you want to explode. 128 If you don't specify it will get all array columns and explode them. 129 Default: None. 130 explode_maps: whether you want to explode map columns (True) 131 or not (False). Default: False. 132 map_cols_to_explode: map columns which you want to explode. 133 If you don't specify it will get all map columns and explode them. 134 Default: None. 135 136 Returns: 137 A function to be called in .transform() spark function. 138 """ 139 140 def inner(df: DataFrame) -> DataFrame: 141 if explode_arrays or (array_cols_to_explode is not None): 142 df = cls._explode_arrays(df, array_cols_to_explode) 143 144 if explode_maps or (map_cols_to_explode is not None): 145 df = cls._explode_maps(df, map_cols_to_explode) 146 147 return df 148 149 return inner 150 151 @classmethod 152 def _get_columns( 153 cls, 154 df: DataFrame, 155 data_type: Any, 156 ) -> List: 157 """Get a list of columns from the dataframe of the data types specified. 158 159 Args: 160 df: input dataframe. 161 data_type: data type specified. 162 163 Returns: 164 List of columns with the datatype specified. 165 """ 166 cols = [] 167 for field in df.schema.fields: 168 if isinstance(field.dataType, data_type): 169 cols.append(field.name) 170 return cols 171 172 @classmethod 173 def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable: 174 """Execute Spark SQL expressions to create the specified columns. 175 176 This function uses the Spark expr function. [Check here]( 177 https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html). 178 179 Args: 180 cols_and_exprs: dict with columns and respective expressions to compute 181 (Spark SQL expressions). 182 183 Returns: 184 A function to be called in .transform() spark function. 185 """ 186 187 def inner(df: DataFrame) -> DataFrame: 188 enriched_df = df 189 for c, e in cols_and_exprs.items(): 190 enriched_df = enriched_df.withColumn(c, expr(e)) 191 192 return enriched_df 193 194 return inner 195 196 @classmethod 197 def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable: 198 """Rename specific columns into the designated name. 199 200 Args: 201 cols: dict with columns and respective target names. 202 escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not. 203 If True it creates a column with the new name and drop the old one. 204 If False, uses the native withColumnRenamed Spark function. 205 Default: True. 206 207 Returns: 208 Function to be called in .transform() spark function. 209 """ 210 211 def inner(df: DataFrame) -> DataFrame: 212 renamed_df = df 213 if escape_col_names: 214 for old_name, new_name in cols.items(): 215 renamed_df = renamed_df.withColumn(new_name, col(old_name)) 216 renamed_df = renamed_df.drop(old_name) 217 else: 218 for old_name, new_name in cols.items(): 219 renamed_df = df.withColumnRenamed(old_name, new_name) 220 221 return renamed_df 222 223 return inner 224 225 @classmethod 226 def from_avro( 227 cls, 228 schema: str = None, 229 key_col: str = "key", 230 value_col: str = "value", 231 options: dict = None, 232 expand_key: bool = False, 233 expand_value: bool = True, 234 ) -> Callable: 235 """Select all attributes from avro. 236 237 Args: 238 schema: the schema string. 239 key_col: the name of the key column. 240 value_col: the name of the value column. 241 options: extra options (e.g., mode: "PERMISSIVE"). 242 expand_key: whether you want to expand the content inside the key 243 column or not. Default: false. 244 expand_value: whether you want to expand the content inside the value 245 column or not. Default: true. 246 247 Returns: 248 Function to be called in .transform() spark function. 249 """ 250 251 def inner(df: DataFrame) -> DataFrame: 252 cols_to_select = [ 253 column for column in df.columns if column not in [key_col, value_col] 254 ] 255 256 return df.select( 257 *cols_to_select, 258 key_col, 259 from_avro(col(value_col), schema, options if options else {}).alias( 260 value_col 261 ), 262 ).select( 263 *cols_to_select, 264 f"{key_col}.*" if expand_key else key_col, 265 f"{value_col}.*" if expand_value else value_col, 266 ) 267 268 return inner 269 270 @classmethod 271 def from_avro_with_registry( 272 cls, 273 schema_registry: str, 274 value_schema: str, 275 value_col: str = "value", 276 key_schema: str = None, 277 key_col: str = "key", 278 expand_key: bool = False, 279 expand_value: bool = True, 280 ) -> Callable: 281 """Select all attributes from avro using a schema registry. 282 283 Args: 284 schema_registry: the url to the schema registry. 285 value_schema: the name of the value schema entry in the schema registry. 286 value_col: the name of the value column. 287 key_schema: the name of the key schema entry in the schema 288 registry. Default: None. 289 key_col: the name of the key column. 290 expand_key: whether you want to expand the content inside the key 291 column or not. Default: false. 292 expand_value: whether you want to expand the content inside the value 293 column or not. Default: true. 294 295 Returns: 296 Function to be called in .transform() spark function. 297 """ 298 299 def inner(df: DataFrame) -> DataFrame: 300 cols_to_select = [ 301 column for column in df.columns if column not in [key_col, value_col] 302 ] 303 304 return df.select( # type: ignore 305 *cols_to_select, 306 ( 307 from_avro( 308 data=col(key_col), 309 subject=key_schema, 310 schemaRegistryAddress=schema_registry, # type: ignore 311 ).alias(key_col) 312 if key_schema 313 else key_col 314 ), 315 from_avro( 316 data=col(value_col), 317 subject=value_schema, 318 schemaRegistryAddress=schema_registry, # type: ignore 319 ).alias(value_col), 320 ).select( 321 *cols_to_select, 322 f"{key_col}.*" if expand_key else key_col, 323 f"{value_col}.*" if expand_value else value_col, 324 ) 325 326 return inner 327 328 @classmethod 329 def from_json( 330 cls, 331 input_col: str, 332 schema_path: Optional[str] = None, 333 schema: Optional[dict] = None, 334 json_options: Optional[dict] = None, 335 drop_all_cols: bool = False, 336 disable_dbfs_retry: bool = False, 337 ) -> Callable: 338 """Convert a json string into a json column (struct). 339 340 The new json column can be added to the existing columns (default) or it can 341 replace all the others, being the only one to output. The new column gets the 342 same name as the original one suffixed with '_json'. 343 344 Args: 345 input_col: dict with columns and respective target names. 346 schema_path: path to the StructType schema (spark schema). 347 schema: dict with the StructType schema (spark schema). 348 json_options: options to parse the json value. 349 drop_all_cols: whether to drop all the input columns or not. 350 Defaults to False. 351 disable_dbfs_retry: optional flag to disable file storage dbfs. 352 353 Returns: 354 A function to be called in .transform() spark function. 355 """ 356 357 def inner(df: DataFrame) -> DataFrame: 358 if schema_path: 359 json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry) 360 elif schema: 361 json_schema = SchemaUtils.from_dict(schema) 362 else: 363 raise WrongArgumentsException( 364 "A file or dict schema needs to be provided." 365 ) 366 367 if drop_all_cols: 368 df_with_json = df.select( 369 from_json( 370 col(input_col).cast("string").alias(f"{input_col}_json"), 371 json_schema, 372 json_options if json_options else {}, 373 ).alias(f"{input_col}_json") 374 ) 375 else: 376 df_with_json = df.select( 377 "*", 378 from_json( 379 col(input_col).cast("string").alias(f"{input_col}_json"), 380 json_schema, 381 json_options if json_options else {}, 382 ).alias(f"{input_col}_json"), 383 ) 384 385 return df_with_json 386 387 return inner 388 389 @classmethod 390 def to_json( 391 cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None 392 ) -> Callable: 393 """Convert dataframe columns into a json value. 394 395 Args: 396 in_cols: name(s) of the input column(s). 397 Example values: 398 "*" - all 399 columns; "my_col" - one column named "my_col"; 400 "my_col1, my_col2" - two columns. 401 out_col: name of the output column. 402 json_options: options to parse the json value. 403 404 Returns: 405 A function to be called in .transform() spark function. 406 """ 407 408 def inner(df: DataFrame) -> DataFrame: 409 return df.withColumn( 410 out_col, to_json(struct(*in_cols), json_options if json_options else {}) 411 ) 412 413 return inner 414 415 @classmethod 416 def _explode_arrays(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame: 417 """Explode array columns from dataframe. 418 419 Args: 420 df: the dataframe to apply the explode operation. 421 cols_to_explode: list of array columns to perform explode. 422 423 Returns: 424 A dataframe with array columns exploded. 425 """ 426 if cols_to_explode is None: 427 cols_to_explode = cls._get_columns(df, spark_types.ArrayType) 428 429 for column in cols_to_explode: 430 df = df.withColumn(column, explode_outer(column)) 431 432 return df 433 434 @classmethod 435 def _explode_maps(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame: 436 """Explode map columns from dataframe. 437 438 Args: 439 df: the dataframe to apply the explode operation. 440 cols_to_explode: list of map columns to perform explode. 441 442 Returns: 443 A dataframe with map columns exploded. 444 """ 445 if cols_to_explode is None: 446 cols_to_explode = cls._get_columns(df, spark_types.MapType) 447 448 for column in cols_to_explode: 449 df = df.withColumn(column, explode_outer(map_entries(col(column)))) 450 451 return df
25class ColumnReshapers(object): 26 """Class containing column reshaping transformers.""" 27 28 _logger = LoggingHandler(__name__).get_logger() 29 30 @classmethod 31 def cast(cls, cols: Dict[str, str]) -> Callable: 32 """Cast specific columns into the designated type. 33 34 Args: 35 cols: dict with columns and respective target types. 36 Target types need to have the exact name of spark types: 37 https://spark.apache.org/docs/latest/sql-ref-datatypes.html 38 39 Returns: 40 A function to be called in .transform() spark function. 41 """ 42 43 def inner(df: DataFrame) -> DataFrame: 44 cast_df = df 45 for c, t in cols.items(): 46 cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)())) 47 48 return cast_df 49 50 return inner 51 52 @classmethod 53 def column_selector(cls, cols: OrderedDict) -> Callable: 54 """Select specific columns with specific output aliases. 55 56 Args: 57 cols: dict with columns to select and respective aliases. 58 59 Returns: 60 A function to be called in .transform() spark function. 61 """ 62 63 def inner(df: DataFrame) -> DataFrame: 64 return df.select(*[col(c).alias(a) for c, a in cols.items()]) 65 66 return inner 67 68 @classmethod 69 def flatten_schema( 70 cls, 71 max_level: int = None, 72 shorten_names: bool = False, 73 alias: bool = True, 74 num_chars: int = 7, 75 ignore_cols: List = None, 76 ) -> Callable: 77 """Flatten the schema of the dataframe. 78 79 Args: 80 max_level: level until which you want to flatten the schema. 81 Default: None. 82 shorten_names: whether to shorten the names of the prefixes 83 of the fields being flattened or not. Default: False. 84 alias: whether to define alias for the columns being flattened 85 or not. Default: True. 86 num_chars: number of characters to consider when shortening 87 the names of the fields. Default: 7. 88 ignore_cols: columns which you don't want to flatten. 89 Default: None. 90 91 Returns: 92 A function to be called in .transform() spark function. 93 """ 94 95 def inner(df: DataFrame) -> DataFrame: 96 return df.select( 97 SchemaUtils.schema_flattener( 98 schema=df.schema, 99 max_level=max_level, 100 shorten_names=shorten_names, 101 alias=alias, 102 num_chars=num_chars, 103 ignore_cols=ignore_cols, 104 ) 105 ) 106 107 return inner 108 109 @classmethod 110 def explode_columns( 111 cls, 112 explode_arrays: bool = False, 113 array_cols_to_explode: List[str] = None, 114 explode_maps: bool = False, 115 map_cols_to_explode: List[str] = None, 116 ) -> Callable: 117 """Explode columns with types like ArrayType and MapType. 118 119 After it can be applied the flatten_schema transformation, 120 if we desired for example to explode the map (as we explode a StructType) 121 or to explode a StructType inside the array. 122 We recommend you to specify always the columns desired to explode 123 and not explode all columns. 124 125 Args: 126 explode_arrays: whether you want to explode array columns (True) 127 or not (False). Default: False. 128 array_cols_to_explode: array columns which you want to explode. 129 If you don't specify it will get all array columns and explode them. 130 Default: None. 131 explode_maps: whether you want to explode map columns (True) 132 or not (False). Default: False. 133 map_cols_to_explode: map columns which you want to explode. 134 If you don't specify it will get all map columns and explode them. 135 Default: None. 136 137 Returns: 138 A function to be called in .transform() spark function. 139 """ 140 141 def inner(df: DataFrame) -> DataFrame: 142 if explode_arrays or (array_cols_to_explode is not None): 143 df = cls._explode_arrays(df, array_cols_to_explode) 144 145 if explode_maps or (map_cols_to_explode is not None): 146 df = cls._explode_maps(df, map_cols_to_explode) 147 148 return df 149 150 return inner 151 152 @classmethod 153 def _get_columns( 154 cls, 155 df: DataFrame, 156 data_type: Any, 157 ) -> List: 158 """Get a list of columns from the dataframe of the data types specified. 159 160 Args: 161 df: input dataframe. 162 data_type: data type specified. 163 164 Returns: 165 List of columns with the datatype specified. 166 """ 167 cols = [] 168 for field in df.schema.fields: 169 if isinstance(field.dataType, data_type): 170 cols.append(field.name) 171 return cols 172 173 @classmethod 174 def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable: 175 """Execute Spark SQL expressions to create the specified columns. 176 177 This function uses the Spark expr function. [Check here]( 178 https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html). 179 180 Args: 181 cols_and_exprs: dict with columns and respective expressions to compute 182 (Spark SQL expressions). 183 184 Returns: 185 A function to be called in .transform() spark function. 186 """ 187 188 def inner(df: DataFrame) -> DataFrame: 189 enriched_df = df 190 for c, e in cols_and_exprs.items(): 191 enriched_df = enriched_df.withColumn(c, expr(e)) 192 193 return enriched_df 194 195 return inner 196 197 @classmethod 198 def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable: 199 """Rename specific columns into the designated name. 200 201 Args: 202 cols: dict with columns and respective target names. 203 escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not. 204 If True it creates a column with the new name and drop the old one. 205 If False, uses the native withColumnRenamed Spark function. 206 Default: True. 207 208 Returns: 209 Function to be called in .transform() spark function. 210 """ 211 212 def inner(df: DataFrame) -> DataFrame: 213 renamed_df = df 214 if escape_col_names: 215 for old_name, new_name in cols.items(): 216 renamed_df = renamed_df.withColumn(new_name, col(old_name)) 217 renamed_df = renamed_df.drop(old_name) 218 else: 219 for old_name, new_name in cols.items(): 220 renamed_df = df.withColumnRenamed(old_name, new_name) 221 222 return renamed_df 223 224 return inner 225 226 @classmethod 227 def from_avro( 228 cls, 229 schema: str = None, 230 key_col: str = "key", 231 value_col: str = "value", 232 options: dict = None, 233 expand_key: bool = False, 234 expand_value: bool = True, 235 ) -> Callable: 236 """Select all attributes from avro. 237 238 Args: 239 schema: the schema string. 240 key_col: the name of the key column. 241 value_col: the name of the value column. 242 options: extra options (e.g., mode: "PERMISSIVE"). 243 expand_key: whether you want to expand the content inside the key 244 column or not. Default: false. 245 expand_value: whether you want to expand the content inside the value 246 column or not. Default: true. 247 248 Returns: 249 Function to be called in .transform() spark function. 250 """ 251 252 def inner(df: DataFrame) -> DataFrame: 253 cols_to_select = [ 254 column for column in df.columns if column not in [key_col, value_col] 255 ] 256 257 return df.select( 258 *cols_to_select, 259 key_col, 260 from_avro(col(value_col), schema, options if options else {}).alias( 261 value_col 262 ), 263 ).select( 264 *cols_to_select, 265 f"{key_col}.*" if expand_key else key_col, 266 f"{value_col}.*" if expand_value else value_col, 267 ) 268 269 return inner 270 271 @classmethod 272 def from_avro_with_registry( 273 cls, 274 schema_registry: str, 275 value_schema: str, 276 value_col: str = "value", 277 key_schema: str = None, 278 key_col: str = "key", 279 expand_key: bool = False, 280 expand_value: bool = True, 281 ) -> Callable: 282 """Select all attributes from avro using a schema registry. 283 284 Args: 285 schema_registry: the url to the schema registry. 286 value_schema: the name of the value schema entry in the schema registry. 287 value_col: the name of the value column. 288 key_schema: the name of the key schema entry in the schema 289 registry. Default: None. 290 key_col: the name of the key column. 291 expand_key: whether you want to expand the content inside the key 292 column or not. Default: false. 293 expand_value: whether you want to expand the content inside the value 294 column or not. Default: true. 295 296 Returns: 297 Function to be called in .transform() spark function. 298 """ 299 300 def inner(df: DataFrame) -> DataFrame: 301 cols_to_select = [ 302 column for column in df.columns if column not in [key_col, value_col] 303 ] 304 305 return df.select( # type: ignore 306 *cols_to_select, 307 ( 308 from_avro( 309 data=col(key_col), 310 subject=key_schema, 311 schemaRegistryAddress=schema_registry, # type: ignore 312 ).alias(key_col) 313 if key_schema 314 else key_col 315 ), 316 from_avro( 317 data=col(value_col), 318 subject=value_schema, 319 schemaRegistryAddress=schema_registry, # type: ignore 320 ).alias(value_col), 321 ).select( 322 *cols_to_select, 323 f"{key_col}.*" if expand_key else key_col, 324 f"{value_col}.*" if expand_value else value_col, 325 ) 326 327 return inner 328 329 @classmethod 330 def from_json( 331 cls, 332 input_col: str, 333 schema_path: Optional[str] = None, 334 schema: Optional[dict] = None, 335 json_options: Optional[dict] = None, 336 drop_all_cols: bool = False, 337 disable_dbfs_retry: bool = False, 338 ) -> Callable: 339 """Convert a json string into a json column (struct). 340 341 The new json column can be added to the existing columns (default) or it can 342 replace all the others, being the only one to output. The new column gets the 343 same name as the original one suffixed with '_json'. 344 345 Args: 346 input_col: dict with columns and respective target names. 347 schema_path: path to the StructType schema (spark schema). 348 schema: dict with the StructType schema (spark schema). 349 json_options: options to parse the json value. 350 drop_all_cols: whether to drop all the input columns or not. 351 Defaults to False. 352 disable_dbfs_retry: optional flag to disable file storage dbfs. 353 354 Returns: 355 A function to be called in .transform() spark function. 356 """ 357 358 def inner(df: DataFrame) -> DataFrame: 359 if schema_path: 360 json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry) 361 elif schema: 362 json_schema = SchemaUtils.from_dict(schema) 363 else: 364 raise WrongArgumentsException( 365 "A file or dict schema needs to be provided." 366 ) 367 368 if drop_all_cols: 369 df_with_json = df.select( 370 from_json( 371 col(input_col).cast("string").alias(f"{input_col}_json"), 372 json_schema, 373 json_options if json_options else {}, 374 ).alias(f"{input_col}_json") 375 ) 376 else: 377 df_with_json = df.select( 378 "*", 379 from_json( 380 col(input_col).cast("string").alias(f"{input_col}_json"), 381 json_schema, 382 json_options if json_options else {}, 383 ).alias(f"{input_col}_json"), 384 ) 385 386 return df_with_json 387 388 return inner 389 390 @classmethod 391 def to_json( 392 cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None 393 ) -> Callable: 394 """Convert dataframe columns into a json value. 395 396 Args: 397 in_cols: name(s) of the input column(s). 398 Example values: 399 "*" - all 400 columns; "my_col" - one column named "my_col"; 401 "my_col1, my_col2" - two columns. 402 out_col: name of the output column. 403 json_options: options to parse the json value. 404 405 Returns: 406 A function to be called in .transform() spark function. 407 """ 408 409 def inner(df: DataFrame) -> DataFrame: 410 return df.withColumn( 411 out_col, to_json(struct(*in_cols), json_options if json_options else {}) 412 ) 413 414 return inner 415 416 @classmethod 417 def _explode_arrays(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame: 418 """Explode array columns from dataframe. 419 420 Args: 421 df: the dataframe to apply the explode operation. 422 cols_to_explode: list of array columns to perform explode. 423 424 Returns: 425 A dataframe with array columns exploded. 426 """ 427 if cols_to_explode is None: 428 cols_to_explode = cls._get_columns(df, spark_types.ArrayType) 429 430 for column in cols_to_explode: 431 df = df.withColumn(column, explode_outer(column)) 432 433 return df 434 435 @classmethod 436 def _explode_maps(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame: 437 """Explode map columns from dataframe. 438 439 Args: 440 df: the dataframe to apply the explode operation. 441 cols_to_explode: list of map columns to perform explode. 442 443 Returns: 444 A dataframe with map columns exploded. 445 """ 446 if cols_to_explode is None: 447 cols_to_explode = cls._get_columns(df, spark_types.MapType) 448 449 for column in cols_to_explode: 450 df = df.withColumn(column, explode_outer(map_entries(col(column)))) 451 452 return df
Class containing column reshaping transformers.
30 @classmethod 31 def cast(cls, cols: Dict[str, str]) -> Callable: 32 """Cast specific columns into the designated type. 33 34 Args: 35 cols: dict with columns and respective target types. 36 Target types need to have the exact name of spark types: 37 https://spark.apache.org/docs/latest/sql-ref-datatypes.html 38 39 Returns: 40 A function to be called in .transform() spark function. 41 """ 42 43 def inner(df: DataFrame) -> DataFrame: 44 cast_df = df 45 for c, t in cols.items(): 46 cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)())) 47 48 return cast_df 49 50 return inner
Cast specific columns into the designated type.
Arguments:
- cols: dict with columns and respective target types. Target types need to have the exact name of spark types: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
Returns:
A function to be called in .transform() spark function.
View Example
52 @classmethod 53 def column_selector(cls, cols: OrderedDict) -> Callable: 54 """Select specific columns with specific output aliases. 55 56 Args: 57 cols: dict with columns to select and respective aliases. 58 59 Returns: 60 A function to be called in .transform() spark function. 61 """ 62 63 def inner(df: DataFrame) -> DataFrame: 64 return df.select(*[col(c).alias(a) for c, a in cols.items()]) 65 66 return inner
Select specific columns with specific output aliases.
Arguments:
- cols: dict with columns to select and respective aliases.
Returns:
A function to be called in .transform() spark function.
68 @classmethod 69 def flatten_schema( 70 cls, 71 max_level: int = None, 72 shorten_names: bool = False, 73 alias: bool = True, 74 num_chars: int = 7, 75 ignore_cols: List = None, 76 ) -> Callable: 77 """Flatten the schema of the dataframe. 78 79 Args: 80 max_level: level until which you want to flatten the schema. 81 Default: None. 82 shorten_names: whether to shorten the names of the prefixes 83 of the fields being flattened or not. Default: False. 84 alias: whether to define alias for the columns being flattened 85 or not. Default: True. 86 num_chars: number of characters to consider when shortening 87 the names of the fields. Default: 7. 88 ignore_cols: columns which you don't want to flatten. 89 Default: None. 90 91 Returns: 92 A function to be called in .transform() spark function. 93 """ 94 95 def inner(df: DataFrame) -> DataFrame: 96 return df.select( 97 SchemaUtils.schema_flattener( 98 schema=df.schema, 99 max_level=max_level, 100 shorten_names=shorten_names, 101 alias=alias, 102 num_chars=num_chars, 103 ignore_cols=ignore_cols, 104 ) 105 ) 106 107 return inner
Flatten the schema of the dataframe.
Arguments:
- max_level: level until which you want to flatten the schema. Default: None.
- shorten_names: whether to shorten the names of the prefixes of the fields being flattened or not. Default: False.
- alias: whether to define alias for the columns being flattened or not. Default: True.
- num_chars: number of characters to consider when shortening the names of the fields. Default: 7.
- ignore_cols: columns which you don't want to flatten. Default: None.
Returns:
A function to be called in .transform() spark function.
109 @classmethod 110 def explode_columns( 111 cls, 112 explode_arrays: bool = False, 113 array_cols_to_explode: List[str] = None, 114 explode_maps: bool = False, 115 map_cols_to_explode: List[str] = None, 116 ) -> Callable: 117 """Explode columns with types like ArrayType and MapType. 118 119 After it can be applied the flatten_schema transformation, 120 if we desired for example to explode the map (as we explode a StructType) 121 or to explode a StructType inside the array. 122 We recommend you to specify always the columns desired to explode 123 and not explode all columns. 124 125 Args: 126 explode_arrays: whether you want to explode array columns (True) 127 or not (False). Default: False. 128 array_cols_to_explode: array columns which you want to explode. 129 If you don't specify it will get all array columns and explode them. 130 Default: None. 131 explode_maps: whether you want to explode map columns (True) 132 or not (False). Default: False. 133 map_cols_to_explode: map columns which you want to explode. 134 If you don't specify it will get all map columns and explode them. 135 Default: None. 136 137 Returns: 138 A function to be called in .transform() spark function. 139 """ 140 141 def inner(df: DataFrame) -> DataFrame: 142 if explode_arrays or (array_cols_to_explode is not None): 143 df = cls._explode_arrays(df, array_cols_to_explode) 144 145 if explode_maps or (map_cols_to_explode is not None): 146 df = cls._explode_maps(df, map_cols_to_explode) 147 148 return df 149 150 return inner
Explode columns with types like ArrayType and MapType.
After it can be applied the flatten_schema transformation, if we desired for example to explode the map (as we explode a StructType) or to explode a StructType inside the array. We recommend you to specify always the columns desired to explode and not explode all columns.
Arguments:
- explode_arrays: whether you want to explode array columns (True) or not (False). Default: False.
- array_cols_to_explode: array columns which you want to explode. If you don't specify it will get all array columns and explode them. Default: None.
- explode_maps: whether you want to explode map columns (True) or not (False). Default: False.
- map_cols_to_explode: map columns which you want to explode. If you don't specify it will get all map columns and explode them. Default: None.
Returns:
A function to be called in .transform() spark function.
173 @classmethod 174 def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable: 175 """Execute Spark SQL expressions to create the specified columns. 176 177 This function uses the Spark expr function. [Check here]( 178 https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html). 179 180 Args: 181 cols_and_exprs: dict with columns and respective expressions to compute 182 (Spark SQL expressions). 183 184 Returns: 185 A function to be called in .transform() spark function. 186 """ 187 188 def inner(df: DataFrame) -> DataFrame: 189 enriched_df = df 190 for c, e in cols_and_exprs.items(): 191 enriched_df = enriched_df.withColumn(c, expr(e)) 192 193 return enriched_df 194 195 return inner
Execute Spark SQL expressions to create the specified columns.
This function uses the Spark expr function. Check here.
Arguments:
- cols_and_exprs: dict with columns and respective expressions to compute (Spark SQL expressions).
Returns:
A function to be called in .transform() spark function.
View Example
197 @classmethod 198 def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable: 199 """Rename specific columns into the designated name. 200 201 Args: 202 cols: dict with columns and respective target names. 203 escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not. 204 If True it creates a column with the new name and drop the old one. 205 If False, uses the native withColumnRenamed Spark function. 206 Default: True. 207 208 Returns: 209 Function to be called in .transform() spark function. 210 """ 211 212 def inner(df: DataFrame) -> DataFrame: 213 renamed_df = df 214 if escape_col_names: 215 for old_name, new_name in cols.items(): 216 renamed_df = renamed_df.withColumn(new_name, col(old_name)) 217 renamed_df = renamed_df.drop(old_name) 218 else: 219 for old_name, new_name in cols.items(): 220 renamed_df = df.withColumnRenamed(old_name, new_name) 221 222 return renamed_df 223 224 return inner
Rename specific columns into the designated name.
Arguments:
- cols: dict with columns and respective target names.
- escape_col_names: whether to escape column names (e.g.
/BIC/COL1
) or not. If True it creates a column with the new name and drop the old one. If False, uses the native withColumnRenamed Spark function. Default: True.
Returns:
Function to be called in .transform() spark function.
View Example
226 @classmethod 227 def from_avro( 228 cls, 229 schema: str = None, 230 key_col: str = "key", 231 value_col: str = "value", 232 options: dict = None, 233 expand_key: bool = False, 234 expand_value: bool = True, 235 ) -> Callable: 236 """Select all attributes from avro. 237 238 Args: 239 schema: the schema string. 240 key_col: the name of the key column. 241 value_col: the name of the value column. 242 options: extra options (e.g., mode: "PERMISSIVE"). 243 expand_key: whether you want to expand the content inside the key 244 column or not. Default: false. 245 expand_value: whether you want to expand the content inside the value 246 column or not. Default: true. 247 248 Returns: 249 Function to be called in .transform() spark function. 250 """ 251 252 def inner(df: DataFrame) -> DataFrame: 253 cols_to_select = [ 254 column for column in df.columns if column not in [key_col, value_col] 255 ] 256 257 return df.select( 258 *cols_to_select, 259 key_col, 260 from_avro(col(value_col), schema, options if options else {}).alias( 261 value_col 262 ), 263 ).select( 264 *cols_to_select, 265 f"{key_col}.*" if expand_key else key_col, 266 f"{value_col}.*" if expand_value else value_col, 267 ) 268 269 return inner
Select all attributes from avro.
Arguments:
- schema: the schema string.
- key_col: the name of the key column.
- value_col: the name of the value column.
- options: extra options (e.g., mode: "PERMISSIVE").
- expand_key: whether you want to expand the content inside the key column or not. Default: false.
- expand_value: whether you want to expand the content inside the value column or not. Default: true.
Returns:
Function to be called in .transform() spark function.
271 @classmethod 272 def from_avro_with_registry( 273 cls, 274 schema_registry: str, 275 value_schema: str, 276 value_col: str = "value", 277 key_schema: str = None, 278 key_col: str = "key", 279 expand_key: bool = False, 280 expand_value: bool = True, 281 ) -> Callable: 282 """Select all attributes from avro using a schema registry. 283 284 Args: 285 schema_registry: the url to the schema registry. 286 value_schema: the name of the value schema entry in the schema registry. 287 value_col: the name of the value column. 288 key_schema: the name of the key schema entry in the schema 289 registry. Default: None. 290 key_col: the name of the key column. 291 expand_key: whether you want to expand the content inside the key 292 column or not. Default: false. 293 expand_value: whether you want to expand the content inside the value 294 column or not. Default: true. 295 296 Returns: 297 Function to be called in .transform() spark function. 298 """ 299 300 def inner(df: DataFrame) -> DataFrame: 301 cols_to_select = [ 302 column for column in df.columns if column not in [key_col, value_col] 303 ] 304 305 return df.select( # type: ignore 306 *cols_to_select, 307 ( 308 from_avro( 309 data=col(key_col), 310 subject=key_schema, 311 schemaRegistryAddress=schema_registry, # type: ignore 312 ).alias(key_col) 313 if key_schema 314 else key_col 315 ), 316 from_avro( 317 data=col(value_col), 318 subject=value_schema, 319 schemaRegistryAddress=schema_registry, # type: ignore 320 ).alias(value_col), 321 ).select( 322 *cols_to_select, 323 f"{key_col}.*" if expand_key else key_col, 324 f"{value_col}.*" if expand_value else value_col, 325 ) 326 327 return inner
Select all attributes from avro using a schema registry.
Arguments:
- schema_registry: the url to the schema registry.
- value_schema: the name of the value schema entry in the schema registry.
- value_col: the name of the value column.
- key_schema: the name of the key schema entry in the schema registry. Default: None.
- key_col: the name of the key column.
- expand_key: whether you want to expand the content inside the key column or not. Default: false.
- expand_value: whether you want to expand the content inside the value column or not. Default: true.
Returns:
Function to be called in .transform() spark function.
329 @classmethod 330 def from_json( 331 cls, 332 input_col: str, 333 schema_path: Optional[str] = None, 334 schema: Optional[dict] = None, 335 json_options: Optional[dict] = None, 336 drop_all_cols: bool = False, 337 disable_dbfs_retry: bool = False, 338 ) -> Callable: 339 """Convert a json string into a json column (struct). 340 341 The new json column can be added to the existing columns (default) or it can 342 replace all the others, being the only one to output. The new column gets the 343 same name as the original one suffixed with '_json'. 344 345 Args: 346 input_col: dict with columns and respective target names. 347 schema_path: path to the StructType schema (spark schema). 348 schema: dict with the StructType schema (spark schema). 349 json_options: options to parse the json value. 350 drop_all_cols: whether to drop all the input columns or not. 351 Defaults to False. 352 disable_dbfs_retry: optional flag to disable file storage dbfs. 353 354 Returns: 355 A function to be called in .transform() spark function. 356 """ 357 358 def inner(df: DataFrame) -> DataFrame: 359 if schema_path: 360 json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry) 361 elif schema: 362 json_schema = SchemaUtils.from_dict(schema) 363 else: 364 raise WrongArgumentsException( 365 "A file or dict schema needs to be provided." 366 ) 367 368 if drop_all_cols: 369 df_with_json = df.select( 370 from_json( 371 col(input_col).cast("string").alias(f"{input_col}_json"), 372 json_schema, 373 json_options if json_options else {}, 374 ).alias(f"{input_col}_json") 375 ) 376 else: 377 df_with_json = df.select( 378 "*", 379 from_json( 380 col(input_col).cast("string").alias(f"{input_col}_json"), 381 json_schema, 382 json_options if json_options else {}, 383 ).alias(f"{input_col}_json"), 384 ) 385 386 return df_with_json 387 388 return inner
Convert a json string into a json column (struct).
The new json column can be added to the existing columns (default) or it can replace all the others, being the only one to output. The new column gets the same name as the original one suffixed with '_json'.
Arguments:
- input_col: dict with columns and respective target names.
- schema_path: path to the StructType schema (spark schema).
- schema: dict with the StructType schema (spark schema).
- json_options: options to parse the json value.
- drop_all_cols: whether to drop all the input columns or not. Defaults to False.
- disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:
A function to be called in .transform() spark function.
View Example
34{ 35 "function": "from_json", 36 "args": { 37 "input_col": "sample", 38 "schema": { 39 "type": "struct", 40 "fields": [ 41 { 42 "name": "field1", 43 "type": "string", 44 "nullable": true, 45 "metadata": {} 46 }, 47 { 48 "name": "field2", 49 "type": "string", 50 "nullable": true, 51 "metadata": {} 52 }, 53 { 54 "name": "field3", 55 "type": "double", 56 "nullable": true, 57 "metadata": {} 58 }, 59 { 60 "name": "field4", 61 "type": { 62 "type": "struct", 63 "fields": [ 64 { 65 "name": "field1", 66 "type": "string", 67 "nullable": true, 68 "metadata": {} 69 }, 70 { 71 "name": "field2", 72 "type": "string", 73 "nullable": true, 74 "metadata": {} 75 } 76 ] 77 }, 78 "nullable": true, 79 "metadata": {} 80 } 81 ] 82 } 83 } 84}
390 @classmethod 391 def to_json( 392 cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None 393 ) -> Callable: 394 """Convert dataframe columns into a json value. 395 396 Args: 397 in_cols: name(s) of the input column(s). 398 Example values: 399 "*" - all 400 columns; "my_col" - one column named "my_col"; 401 "my_col1, my_col2" - two columns. 402 out_col: name of the output column. 403 json_options: options to parse the json value. 404 405 Returns: 406 A function to be called in .transform() spark function. 407 """ 408 409 def inner(df: DataFrame) -> DataFrame: 410 return df.withColumn( 411 out_col, to_json(struct(*in_cols), json_options if json_options else {}) 412 ) 413 414 return inner
Convert dataframe columns into a json value.
Arguments:
- in_cols: name(s) of the input column(s). Example values: "*" - all columns; "my_col" - one column named "my_col"; "my_col1, my_col2" - two columns.
- out_col: name of the output column.
- json_options: options to parse the json value.
Returns:
A function to be called in .transform() spark function.