lakehouse_engine.transformers.column_reshapers

Module with column reshaping transformers.

  1"""Module with column reshaping transformers."""
  2
  3from collections import OrderedDict
  4from typing import Any, Callable, Dict, List, Optional
  5
  6import pyspark.sql.types as spark_types
  7from pyspark.sql import DataFrame
  8from pyspark.sql.avro.functions import from_avro
  9from pyspark.sql.functions import (
 10    col,
 11    explode_outer,
 12    expr,
 13    from_json,
 14    map_entries,
 15    struct,
 16    to_json,
 17)
 18
 19from lakehouse_engine.transformers.exceptions import WrongArgumentsException
 20from lakehouse_engine.utils.logging_handler import LoggingHandler
 21from lakehouse_engine.utils.schema_utils import SchemaUtils
 22
 23
 24class ColumnReshapers(object):
 25    """Class containing column reshaping transformers."""
 26
 27    _logger = LoggingHandler(__name__).get_logger()
 28
 29    @classmethod
 30    def cast(cls, cols: Dict[str, str]) -> Callable:
 31        """Cast specific columns into the designated type.
 32
 33        Args:
 34            cols: dict with columns and respective target types.
 35                Target types need to have the exact name of spark types:
 36                https://spark.apache.org/docs/latest/sql-ref-datatypes.html
 37
 38        Returns:
 39            A function to be called in .transform() spark function.
 40        """
 41
 42        def inner(df: DataFrame) -> DataFrame:
 43            cast_df = df
 44            for c, t in cols.items():
 45                cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)()))
 46
 47            return cast_df
 48
 49        return inner
 50
 51    @classmethod
 52    def column_selector(cls, cols: OrderedDict) -> Callable:
 53        """Select specific columns with specific output aliases.
 54
 55        Args:
 56            cols: dict with columns to select and respective aliases.
 57
 58        Returns:
 59            A function to be called in .transform() spark function.
 60        """
 61
 62        def inner(df: DataFrame) -> DataFrame:
 63            return df.select(*[col(c).alias(a) for c, a in cols.items()])
 64
 65        return inner
 66
 67    @classmethod
 68    def flatten_schema(
 69        cls,
 70        max_level: int = None,
 71        shorten_names: bool = False,
 72        alias: bool = True,
 73        num_chars: int = 7,
 74        ignore_cols: List = None,
 75    ) -> Callable:
 76        """Flatten the schema of the dataframe.
 77
 78        Args:
 79            max_level: level until which you want to flatten the schema.
 80                Default: None.
 81            shorten_names: whether to shorten the names of the prefixes
 82                of the fields being flattened or not. Default: False.
 83            alias: whether to define alias for the columns being flattened
 84                or not. Default: True.
 85            num_chars: number of characters to consider when shortening
 86                the names of the fields. Default: 7.
 87            ignore_cols: columns which you don't want to flatten.
 88                Default: None.
 89
 90        Returns:
 91            A function to be called in .transform() spark function.
 92        """
 93
 94        def inner(df: DataFrame) -> DataFrame:
 95            return df.select(
 96                SchemaUtils.schema_flattener(
 97                    schema=df.schema,
 98                    max_level=max_level,
 99                    shorten_names=shorten_names,
100                    alias=alias,
101                    num_chars=num_chars,
102                    ignore_cols=ignore_cols,
103                )
104            )
105
106        return inner
107
108    @classmethod
109    def explode_columns(
110        cls,
111        explode_arrays: bool = False,
112        array_cols_to_explode: List[str] = None,
113        explode_maps: bool = False,
114        map_cols_to_explode: List[str] = None,
115    ) -> Callable:
116        """Explode columns with types like ArrayType and MapType.
117
118        After it can be applied the flatten_schema transformation,
119        if we desired for example to explode the map (as we explode a StructType)
120        or to explode a StructType inside the array.
121        We recommend you to specify always the columns desired to explode
122        and not explode all columns.
123
124        Args:
125            explode_arrays: whether you want to explode array columns (True)
126                or not (False). Default: False.
127            array_cols_to_explode: array columns which you want to explode.
128                If you don't specify it will get all array columns and explode them.
129                Default: None.
130            explode_maps: whether you want to explode map columns (True)
131                or not (False). Default: False.
132            map_cols_to_explode: map columns which you want to explode.
133                If you don't specify it will get all map columns and explode them.
134                Default: None.
135
136        Returns:
137            A function to be called in .transform() spark function.
138        """
139
140        def inner(df: DataFrame) -> DataFrame:
141            if explode_arrays or (array_cols_to_explode is not None):
142                df = cls._explode_arrays(df, array_cols_to_explode)
143
144            if explode_maps or (map_cols_to_explode is not None):
145                df = cls._explode_maps(df, map_cols_to_explode)
146
147            return df
148
149        return inner
150
151    @classmethod
152    def _get_columns(
153        cls,
154        df: DataFrame,
155        data_type: Any,
156    ) -> List:
157        """Get a list of columns from the dataframe of the data types specified.
158
159        Args:
160            df: input dataframe.
161            data_type: data type specified.
162
163        Returns:
164            List of columns with the datatype specified.
165        """
166        cols = []
167        for field in df.schema.fields:
168            if isinstance(field.dataType, data_type):
169                cols.append(field.name)
170        return cols
171
172    @classmethod
173    def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable:
174        """Execute Spark SQL expressions to create the specified columns.
175
176        This function uses the Spark expr function. [Check here](
177        https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html).
178
179        Args:
180            cols_and_exprs: dict with columns and respective expressions to compute
181                (Spark SQL expressions).
182
183        Returns:
184            A function to be called in .transform() spark function.
185        """
186
187        def inner(df: DataFrame) -> DataFrame:
188            enriched_df = df
189            for c, e in cols_and_exprs.items():
190                enriched_df = enriched_df.withColumn(c, expr(e))
191
192            return enriched_df
193
194        return inner
195
196    @classmethod
197    def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable:
198        """Rename specific columns into the designated name.
199
200        Args:
201            cols: dict with columns and respective target names.
202            escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not.
203                If True it creates a column with the new name and drop the old one.
204                If False, uses the native withColumnRenamed Spark function.
205                Default: True.
206
207        Returns:
208            Function to be called in .transform() spark function.
209        """
210
211        def inner(df: DataFrame) -> DataFrame:
212            renamed_df = df
213            if escape_col_names:
214                for old_name, new_name in cols.items():
215                    renamed_df = renamed_df.withColumn(new_name, col(old_name))
216                    renamed_df = renamed_df.drop(old_name)
217            else:
218                for old_name, new_name in cols.items():
219                    renamed_df = df.withColumnRenamed(old_name, new_name)
220
221            return renamed_df
222
223        return inner
224
225    @classmethod
226    def from_avro(
227        cls,
228        schema: str = None,
229        key_col: str = "key",
230        value_col: str = "value",
231        options: dict = None,
232        expand_key: bool = False,
233        expand_value: bool = True,
234    ) -> Callable:
235        """Select all attributes from avro.
236
237        Args:
238            schema: the schema string.
239            key_col: the name of the key column.
240            value_col: the name of the value column.
241            options: extra options (e.g., mode: "PERMISSIVE").
242            expand_key: whether you want to expand the content inside the key
243                column or not. Default: false.
244            expand_value: whether you want to expand the content inside the value
245                column or not. Default: true.
246
247        Returns:
248            Function to be called in .transform() spark function.
249        """
250
251        def inner(df: DataFrame) -> DataFrame:
252            cols_to_select = [
253                column for column in df.columns if column not in [key_col, value_col]
254            ]
255
256            return df.select(
257                *cols_to_select,
258                key_col,
259                from_avro(col(value_col), schema, options if options else {}).alias(
260                    value_col
261                ),
262            ).select(
263                *cols_to_select,
264                f"{key_col}.*" if expand_key else key_col,
265                f"{value_col}.*" if expand_value else value_col,
266            )
267
268        return inner
269
270    @classmethod
271    def from_avro_with_registry(
272        cls,
273        schema_registry: str,
274        value_schema: str,
275        value_col: str = "value",
276        key_schema: str = None,
277        key_col: str = "key",
278        expand_key: bool = False,
279        expand_value: bool = True,
280    ) -> Callable:
281        """Select all attributes from avro using a schema registry.
282
283        Args:
284            schema_registry: the url to the schema registry.
285            value_schema: the name of the value schema entry in the schema registry.
286            value_col: the name of the value column.
287            key_schema: the name of the key schema entry in the schema
288                registry. Default: None.
289            key_col: the name of the key column.
290            expand_key: whether you want to expand the content inside the key
291                column or not. Default: false.
292            expand_value: whether you want to expand the content inside the value
293                column or not. Default: true.
294
295        Returns:
296            Function to be called in .transform() spark function.
297        """
298
299        def inner(df: DataFrame) -> DataFrame:
300            cols_to_select = [
301                column for column in df.columns if column not in [key_col, value_col]
302            ]
303
304            return df.select(  # type: ignore
305                *cols_to_select,
306                (
307                    from_avro(
308                        data=col(key_col),
309                        subject=key_schema,
310                        schemaRegistryAddress=schema_registry,  # type: ignore
311                    ).alias(key_col)
312                    if key_schema
313                    else key_col
314                ),
315                from_avro(
316                    data=col(value_col),
317                    subject=value_schema,
318                    schemaRegistryAddress=schema_registry,  # type: ignore
319                ).alias(value_col),
320            ).select(
321                *cols_to_select,
322                f"{key_col}.*" if expand_key else key_col,
323                f"{value_col}.*" if expand_value else value_col,
324            )
325
326        return inner
327
328    @classmethod
329    def from_json(
330        cls,
331        input_col: str,
332        schema_path: Optional[str] = None,
333        schema: Optional[dict] = None,
334        json_options: Optional[dict] = None,
335        drop_all_cols: bool = False,
336        disable_dbfs_retry: bool = False,
337    ) -> Callable:
338        """Convert a json string into a json column (struct).
339
340        The new json column can be added to the existing columns (default) or it can
341        replace all the others, being the only one to output. The new column gets the
342        same name as the original one suffixed with '_json'.
343
344        Args:
345            input_col: dict with columns and respective target names.
346            schema_path: path to the StructType schema (spark schema).
347            schema: dict with the StructType schema (spark schema).
348            json_options: options to parse the json value.
349            drop_all_cols: whether to drop all the input columns or not.
350                Defaults to False.
351            disable_dbfs_retry: optional flag to disable file storage dbfs.
352
353        Returns:
354            A function to be called in .transform() spark function.
355        """
356
357        def inner(df: DataFrame) -> DataFrame:
358            if schema_path:
359                json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry)
360            elif schema:
361                json_schema = SchemaUtils.from_dict(schema)
362            else:
363                raise WrongArgumentsException(
364                    "A file or dict schema needs to be provided."
365                )
366
367            if drop_all_cols:
368                df_with_json = df.select(
369                    from_json(
370                        col(input_col).cast("string").alias(f"{input_col}_json"),
371                        json_schema,
372                        json_options if json_options else {},
373                    ).alias(f"{input_col}_json")
374                )
375            else:
376                df_with_json = df.select(
377                    "*",
378                    from_json(
379                        col(input_col).cast("string").alias(f"{input_col}_json"),
380                        json_schema,
381                        json_options if json_options else {},
382                    ).alias(f"{input_col}_json"),
383                )
384
385            return df_with_json
386
387        return inner
388
389    @classmethod
390    def to_json(
391        cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None
392    ) -> Callable:
393        """Convert dataframe columns into a json value.
394
395        Args:
396            in_cols: name(s) of the input column(s).
397                Example values:
398                "*" - all
399                columns; "my_col" - one column named "my_col";
400                "my_col1, my_col2" - two columns.
401            out_col: name of the output column.
402            json_options: options to parse the json value.
403
404        Returns:
405            A function to be called in .transform() spark function.
406        """
407
408        def inner(df: DataFrame) -> DataFrame:
409            return df.withColumn(
410                out_col, to_json(struct(*in_cols), json_options if json_options else {})
411            )
412
413        return inner
414
415    @classmethod
416    def _explode_arrays(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame:
417        """Explode array columns from dataframe.
418
419        Args:
420            df: the dataframe to apply the explode operation.
421            cols_to_explode: list of array columns to perform explode.
422
423        Returns:
424            A dataframe with array columns exploded.
425        """
426        if cols_to_explode is None:
427            cols_to_explode = cls._get_columns(df, spark_types.ArrayType)
428
429        for column in cols_to_explode:
430            df = df.withColumn(column, explode_outer(column))
431
432        return df
433
434    @classmethod
435    def _explode_maps(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame:
436        """Explode map columns from dataframe.
437
438        Args:
439            df: the dataframe to apply the explode operation.
440            cols_to_explode: list of map columns to perform explode.
441
442        Returns:
443            A dataframe with map columns exploded.
444        """
445        if cols_to_explode is None:
446            cols_to_explode = cls._get_columns(df, spark_types.MapType)
447
448        for column in cols_to_explode:
449            df = df.withColumn(column, explode_outer(map_entries(col(column))))
450
451        return df
class ColumnReshapers:
 25class ColumnReshapers(object):
 26    """Class containing column reshaping transformers."""
 27
 28    _logger = LoggingHandler(__name__).get_logger()
 29
 30    @classmethod
 31    def cast(cls, cols: Dict[str, str]) -> Callable:
 32        """Cast specific columns into the designated type.
 33
 34        Args:
 35            cols: dict with columns and respective target types.
 36                Target types need to have the exact name of spark types:
 37                https://spark.apache.org/docs/latest/sql-ref-datatypes.html
 38
 39        Returns:
 40            A function to be called in .transform() spark function.
 41        """
 42
 43        def inner(df: DataFrame) -> DataFrame:
 44            cast_df = df
 45            for c, t in cols.items():
 46                cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)()))
 47
 48            return cast_df
 49
 50        return inner
 51
 52    @classmethod
 53    def column_selector(cls, cols: OrderedDict) -> Callable:
 54        """Select specific columns with specific output aliases.
 55
 56        Args:
 57            cols: dict with columns to select and respective aliases.
 58
 59        Returns:
 60            A function to be called in .transform() spark function.
 61        """
 62
 63        def inner(df: DataFrame) -> DataFrame:
 64            return df.select(*[col(c).alias(a) for c, a in cols.items()])
 65
 66        return inner
 67
 68    @classmethod
 69    def flatten_schema(
 70        cls,
 71        max_level: int = None,
 72        shorten_names: bool = False,
 73        alias: bool = True,
 74        num_chars: int = 7,
 75        ignore_cols: List = None,
 76    ) -> Callable:
 77        """Flatten the schema of the dataframe.
 78
 79        Args:
 80            max_level: level until which you want to flatten the schema.
 81                Default: None.
 82            shorten_names: whether to shorten the names of the prefixes
 83                of the fields being flattened or not. Default: False.
 84            alias: whether to define alias for the columns being flattened
 85                or not. Default: True.
 86            num_chars: number of characters to consider when shortening
 87                the names of the fields. Default: 7.
 88            ignore_cols: columns which you don't want to flatten.
 89                Default: None.
 90
 91        Returns:
 92            A function to be called in .transform() spark function.
 93        """
 94
 95        def inner(df: DataFrame) -> DataFrame:
 96            return df.select(
 97                SchemaUtils.schema_flattener(
 98                    schema=df.schema,
 99                    max_level=max_level,
100                    shorten_names=shorten_names,
101                    alias=alias,
102                    num_chars=num_chars,
103                    ignore_cols=ignore_cols,
104                )
105            )
106
107        return inner
108
109    @classmethod
110    def explode_columns(
111        cls,
112        explode_arrays: bool = False,
113        array_cols_to_explode: List[str] = None,
114        explode_maps: bool = False,
115        map_cols_to_explode: List[str] = None,
116    ) -> Callable:
117        """Explode columns with types like ArrayType and MapType.
118
119        After it can be applied the flatten_schema transformation,
120        if we desired for example to explode the map (as we explode a StructType)
121        or to explode a StructType inside the array.
122        We recommend you to specify always the columns desired to explode
123        and not explode all columns.
124
125        Args:
126            explode_arrays: whether you want to explode array columns (True)
127                or not (False). Default: False.
128            array_cols_to_explode: array columns which you want to explode.
129                If you don't specify it will get all array columns and explode them.
130                Default: None.
131            explode_maps: whether you want to explode map columns (True)
132                or not (False). Default: False.
133            map_cols_to_explode: map columns which you want to explode.
134                If you don't specify it will get all map columns and explode them.
135                Default: None.
136
137        Returns:
138            A function to be called in .transform() spark function.
139        """
140
141        def inner(df: DataFrame) -> DataFrame:
142            if explode_arrays or (array_cols_to_explode is not None):
143                df = cls._explode_arrays(df, array_cols_to_explode)
144
145            if explode_maps or (map_cols_to_explode is not None):
146                df = cls._explode_maps(df, map_cols_to_explode)
147
148            return df
149
150        return inner
151
152    @classmethod
153    def _get_columns(
154        cls,
155        df: DataFrame,
156        data_type: Any,
157    ) -> List:
158        """Get a list of columns from the dataframe of the data types specified.
159
160        Args:
161            df: input dataframe.
162            data_type: data type specified.
163
164        Returns:
165            List of columns with the datatype specified.
166        """
167        cols = []
168        for field in df.schema.fields:
169            if isinstance(field.dataType, data_type):
170                cols.append(field.name)
171        return cols
172
173    @classmethod
174    def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable:
175        """Execute Spark SQL expressions to create the specified columns.
176
177        This function uses the Spark expr function. [Check here](
178        https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html).
179
180        Args:
181            cols_and_exprs: dict with columns and respective expressions to compute
182                (Spark SQL expressions).
183
184        Returns:
185            A function to be called in .transform() spark function.
186        """
187
188        def inner(df: DataFrame) -> DataFrame:
189            enriched_df = df
190            for c, e in cols_and_exprs.items():
191                enriched_df = enriched_df.withColumn(c, expr(e))
192
193            return enriched_df
194
195        return inner
196
197    @classmethod
198    def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable:
199        """Rename specific columns into the designated name.
200
201        Args:
202            cols: dict with columns and respective target names.
203            escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not.
204                If True it creates a column with the new name and drop the old one.
205                If False, uses the native withColumnRenamed Spark function.
206                Default: True.
207
208        Returns:
209            Function to be called in .transform() spark function.
210        """
211
212        def inner(df: DataFrame) -> DataFrame:
213            renamed_df = df
214            if escape_col_names:
215                for old_name, new_name in cols.items():
216                    renamed_df = renamed_df.withColumn(new_name, col(old_name))
217                    renamed_df = renamed_df.drop(old_name)
218            else:
219                for old_name, new_name in cols.items():
220                    renamed_df = df.withColumnRenamed(old_name, new_name)
221
222            return renamed_df
223
224        return inner
225
226    @classmethod
227    def from_avro(
228        cls,
229        schema: str = None,
230        key_col: str = "key",
231        value_col: str = "value",
232        options: dict = None,
233        expand_key: bool = False,
234        expand_value: bool = True,
235    ) -> Callable:
236        """Select all attributes from avro.
237
238        Args:
239            schema: the schema string.
240            key_col: the name of the key column.
241            value_col: the name of the value column.
242            options: extra options (e.g., mode: "PERMISSIVE").
243            expand_key: whether you want to expand the content inside the key
244                column or not. Default: false.
245            expand_value: whether you want to expand the content inside the value
246                column or not. Default: true.
247
248        Returns:
249            Function to be called in .transform() spark function.
250        """
251
252        def inner(df: DataFrame) -> DataFrame:
253            cols_to_select = [
254                column for column in df.columns if column not in [key_col, value_col]
255            ]
256
257            return df.select(
258                *cols_to_select,
259                key_col,
260                from_avro(col(value_col), schema, options if options else {}).alias(
261                    value_col
262                ),
263            ).select(
264                *cols_to_select,
265                f"{key_col}.*" if expand_key else key_col,
266                f"{value_col}.*" if expand_value else value_col,
267            )
268
269        return inner
270
271    @classmethod
272    def from_avro_with_registry(
273        cls,
274        schema_registry: str,
275        value_schema: str,
276        value_col: str = "value",
277        key_schema: str = None,
278        key_col: str = "key",
279        expand_key: bool = False,
280        expand_value: bool = True,
281    ) -> Callable:
282        """Select all attributes from avro using a schema registry.
283
284        Args:
285            schema_registry: the url to the schema registry.
286            value_schema: the name of the value schema entry in the schema registry.
287            value_col: the name of the value column.
288            key_schema: the name of the key schema entry in the schema
289                registry. Default: None.
290            key_col: the name of the key column.
291            expand_key: whether you want to expand the content inside the key
292                column or not. Default: false.
293            expand_value: whether you want to expand the content inside the value
294                column or not. Default: true.
295
296        Returns:
297            Function to be called in .transform() spark function.
298        """
299
300        def inner(df: DataFrame) -> DataFrame:
301            cols_to_select = [
302                column for column in df.columns if column not in [key_col, value_col]
303            ]
304
305            return df.select(  # type: ignore
306                *cols_to_select,
307                (
308                    from_avro(
309                        data=col(key_col),
310                        subject=key_schema,
311                        schemaRegistryAddress=schema_registry,  # type: ignore
312                    ).alias(key_col)
313                    if key_schema
314                    else key_col
315                ),
316                from_avro(
317                    data=col(value_col),
318                    subject=value_schema,
319                    schemaRegistryAddress=schema_registry,  # type: ignore
320                ).alias(value_col),
321            ).select(
322                *cols_to_select,
323                f"{key_col}.*" if expand_key else key_col,
324                f"{value_col}.*" if expand_value else value_col,
325            )
326
327        return inner
328
329    @classmethod
330    def from_json(
331        cls,
332        input_col: str,
333        schema_path: Optional[str] = None,
334        schema: Optional[dict] = None,
335        json_options: Optional[dict] = None,
336        drop_all_cols: bool = False,
337        disable_dbfs_retry: bool = False,
338    ) -> Callable:
339        """Convert a json string into a json column (struct).
340
341        The new json column can be added to the existing columns (default) or it can
342        replace all the others, being the only one to output. The new column gets the
343        same name as the original one suffixed with '_json'.
344
345        Args:
346            input_col: dict with columns and respective target names.
347            schema_path: path to the StructType schema (spark schema).
348            schema: dict with the StructType schema (spark schema).
349            json_options: options to parse the json value.
350            drop_all_cols: whether to drop all the input columns or not.
351                Defaults to False.
352            disable_dbfs_retry: optional flag to disable file storage dbfs.
353
354        Returns:
355            A function to be called in .transform() spark function.
356        """
357
358        def inner(df: DataFrame) -> DataFrame:
359            if schema_path:
360                json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry)
361            elif schema:
362                json_schema = SchemaUtils.from_dict(schema)
363            else:
364                raise WrongArgumentsException(
365                    "A file or dict schema needs to be provided."
366                )
367
368            if drop_all_cols:
369                df_with_json = df.select(
370                    from_json(
371                        col(input_col).cast("string").alias(f"{input_col}_json"),
372                        json_schema,
373                        json_options if json_options else {},
374                    ).alias(f"{input_col}_json")
375                )
376            else:
377                df_with_json = df.select(
378                    "*",
379                    from_json(
380                        col(input_col).cast("string").alias(f"{input_col}_json"),
381                        json_schema,
382                        json_options if json_options else {},
383                    ).alias(f"{input_col}_json"),
384                )
385
386            return df_with_json
387
388        return inner
389
390    @classmethod
391    def to_json(
392        cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None
393    ) -> Callable:
394        """Convert dataframe columns into a json value.
395
396        Args:
397            in_cols: name(s) of the input column(s).
398                Example values:
399                "*" - all
400                columns; "my_col" - one column named "my_col";
401                "my_col1, my_col2" - two columns.
402            out_col: name of the output column.
403            json_options: options to parse the json value.
404
405        Returns:
406            A function to be called in .transform() spark function.
407        """
408
409        def inner(df: DataFrame) -> DataFrame:
410            return df.withColumn(
411                out_col, to_json(struct(*in_cols), json_options if json_options else {})
412            )
413
414        return inner
415
416    @classmethod
417    def _explode_arrays(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame:
418        """Explode array columns from dataframe.
419
420        Args:
421            df: the dataframe to apply the explode operation.
422            cols_to_explode: list of array columns to perform explode.
423
424        Returns:
425            A dataframe with array columns exploded.
426        """
427        if cols_to_explode is None:
428            cols_to_explode = cls._get_columns(df, spark_types.ArrayType)
429
430        for column in cols_to_explode:
431            df = df.withColumn(column, explode_outer(column))
432
433        return df
434
435    @classmethod
436    def _explode_maps(cls, df: DataFrame, cols_to_explode: List[str]) -> DataFrame:
437        """Explode map columns from dataframe.
438
439        Args:
440            df: the dataframe to apply the explode operation.
441            cols_to_explode: list of map columns to perform explode.
442
443        Returns:
444            A dataframe with map columns exploded.
445        """
446        if cols_to_explode is None:
447            cols_to_explode = cls._get_columns(df, spark_types.MapType)
448
449        for column in cols_to_explode:
450            df = df.withColumn(column, explode_outer(map_entries(col(column))))
451
452        return df

Class containing column reshaping transformers.

@classmethod
def cast(cls, cols: Dict[str, str]) -> Callable:
30    @classmethod
31    def cast(cls, cols: Dict[str, str]) -> Callable:
32        """Cast specific columns into the designated type.
33
34        Args:
35            cols: dict with columns and respective target types.
36                Target types need to have the exact name of spark types:
37                https://spark.apache.org/docs/latest/sql-ref-datatypes.html
38
39        Returns:
40            A function to be called in .transform() spark function.
41        """
42
43        def inner(df: DataFrame) -> DataFrame:
44            cast_df = df
45            for c, t in cols.items():
46                cast_df = cast_df.withColumn(c, col(c).cast(getattr(spark_types, t)()))
47
48            return cast_df
49
50        return inner

Cast specific columns into the designated type.

Arguments:
Returns:

A function to be called in .transform() spark function.

View Example
38{
39    "function": "cast",
40    "args": {
41        "cols": {
42            "code": "StringType"
43        }
44    }
45}
View Full Acon


@classmethod
def column_selector(cls, cols: collections.OrderedDict) -> Callable:
52    @classmethod
53    def column_selector(cls, cols: OrderedDict) -> Callable:
54        """Select specific columns with specific output aliases.
55
56        Args:
57            cols: dict with columns to select and respective aliases.
58
59        Returns:
60            A function to be called in .transform() spark function.
61        """
62
63        def inner(df: DataFrame) -> DataFrame:
64            return df.select(*[col(c).alias(a) for c, a in cols.items()])
65
66        return inner

Select specific columns with specific output aliases.

Arguments:
  • cols: dict with columns to select and respective aliases.
Returns:

A function to be called in .transform() spark function.

@classmethod
def flatten_schema( cls, max_level: int = None, shorten_names: bool = False, alias: bool = True, num_chars: int = 7, ignore_cols: List = None) -> Callable:
 68    @classmethod
 69    def flatten_schema(
 70        cls,
 71        max_level: int = None,
 72        shorten_names: bool = False,
 73        alias: bool = True,
 74        num_chars: int = 7,
 75        ignore_cols: List = None,
 76    ) -> Callable:
 77        """Flatten the schema of the dataframe.
 78
 79        Args:
 80            max_level: level until which you want to flatten the schema.
 81                Default: None.
 82            shorten_names: whether to shorten the names of the prefixes
 83                of the fields being flattened or not. Default: False.
 84            alias: whether to define alias for the columns being flattened
 85                or not. Default: True.
 86            num_chars: number of characters to consider when shortening
 87                the names of the fields. Default: 7.
 88            ignore_cols: columns which you don't want to flatten.
 89                Default: None.
 90
 91        Returns:
 92            A function to be called in .transform() spark function.
 93        """
 94
 95        def inner(df: DataFrame) -> DataFrame:
 96            return df.select(
 97                SchemaUtils.schema_flattener(
 98                    schema=df.schema,
 99                    max_level=max_level,
100                    shorten_names=shorten_names,
101                    alias=alias,
102                    num_chars=num_chars,
103                    ignore_cols=ignore_cols,
104                )
105            )
106
107        return inner

Flatten the schema of the dataframe.

Arguments:
  • max_level: level until which you want to flatten the schema. Default: None.
  • shorten_names: whether to shorten the names of the prefixes of the fields being flattened or not. Default: False.
  • alias: whether to define alias for the columns being flattened or not. Default: True.
  • num_chars: number of characters to consider when shortening the names of the fields. Default: 7.
  • ignore_cols: columns which you don't want to flatten. Default: None.
Returns:

A function to be called in .transform() spark function.

View Example
 95{
 96    "function": "flatten_schema",
 97    "args": {
 98        "max_level": 2
 99    }
100}
View Full Acon


@classmethod
def explode_columns( cls, explode_arrays: bool = False, array_cols_to_explode: List[str] = None, explode_maps: bool = False, map_cols_to_explode: List[str] = None) -> Callable:
109    @classmethod
110    def explode_columns(
111        cls,
112        explode_arrays: bool = False,
113        array_cols_to_explode: List[str] = None,
114        explode_maps: bool = False,
115        map_cols_to_explode: List[str] = None,
116    ) -> Callable:
117        """Explode columns with types like ArrayType and MapType.
118
119        After it can be applied the flatten_schema transformation,
120        if we desired for example to explode the map (as we explode a StructType)
121        or to explode a StructType inside the array.
122        We recommend you to specify always the columns desired to explode
123        and not explode all columns.
124
125        Args:
126            explode_arrays: whether you want to explode array columns (True)
127                or not (False). Default: False.
128            array_cols_to_explode: array columns which you want to explode.
129                If you don't specify it will get all array columns and explode them.
130                Default: None.
131            explode_maps: whether you want to explode map columns (True)
132                or not (False). Default: False.
133            map_cols_to_explode: map columns which you want to explode.
134                If you don't specify it will get all map columns and explode them.
135                Default: None.
136
137        Returns:
138            A function to be called in .transform() spark function.
139        """
140
141        def inner(df: DataFrame) -> DataFrame:
142            if explode_arrays or (array_cols_to_explode is not None):
143                df = cls._explode_arrays(df, array_cols_to_explode)
144
145            if explode_maps or (map_cols_to_explode is not None):
146                df = cls._explode_maps(df, map_cols_to_explode)
147
148            return df
149
150        return inner

Explode columns with types like ArrayType and MapType.

After it can be applied the flatten_schema transformation, if we desired for example to explode the map (as we explode a StructType) or to explode a StructType inside the array. We recommend you to specify always the columns desired to explode and not explode all columns.

Arguments:
  • explode_arrays: whether you want to explode array columns (True) or not (False). Default: False.
  • array_cols_to_explode: array columns which you want to explode. If you don't specify it will get all array columns and explode them. Default: None.
  • explode_maps: whether you want to explode map columns (True) or not (False). Default: False.
  • map_cols_to_explode: map columns which you want to explode. If you don't specify it will get all map columns and explode them. Default: None.
Returns:

A function to be called in .transform() spark function.

View Example
44{
45    "function": "explode_columns",
46    "args": {
47        "explode_arrays": true
48    }
49}
View Full Acon


@classmethod
def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable:
173    @classmethod
174    def with_expressions(cls, cols_and_exprs: Dict[str, str]) -> Callable:
175        """Execute Spark SQL expressions to create the specified columns.
176
177        This function uses the Spark expr function. [Check here](
178        https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.expr.html).
179
180        Args:
181            cols_and_exprs: dict with columns and respective expressions to compute
182                (Spark SQL expressions).
183
184        Returns:
185            A function to be called in .transform() spark function.
186        """
187
188        def inner(df: DataFrame) -> DataFrame:
189            enriched_df = df
190            for c, e in cols_and_exprs.items():
191                enriched_df = enriched_df.withColumn(c, expr(e))
192
193            return enriched_df
194
195        return inner

Execute Spark SQL expressions to create the specified columns.

This function uses the Spark expr function. Check here.

Arguments:
  • cols_and_exprs: dict with columns and respective expressions to compute (Spark SQL expressions).
Returns:

A function to be called in .transform() spark function.

View Example
25{
26    "function": "with_expressions",
27    "args": {
28        "cols_and_exprs": {
29            "constant": "'just a constant'",
30            "length_customer2": "length(customer2)"
31        }
32    }
33}
View Full Acon


@classmethod
def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable:
197    @classmethod
198    def rename(cls, cols: Dict[str, str], escape_col_names: bool = True) -> Callable:
199        """Rename specific columns into the designated name.
200
201        Args:
202            cols: dict with columns and respective target names.
203            escape_col_names: whether to escape column names (e.g. `/BIC/COL1`) or not.
204                If True it creates a column with the new name and drop the old one.
205                If False, uses the native withColumnRenamed Spark function.
206                Default: True.
207
208        Returns:
209            Function to be called in .transform() spark function.
210        """
211
212        def inner(df: DataFrame) -> DataFrame:
213            renamed_df = df
214            if escape_col_names:
215                for old_name, new_name in cols.items():
216                    renamed_df = renamed_df.withColumn(new_name, col(old_name))
217                    renamed_df = renamed_df.drop(old_name)
218            else:
219                for old_name, new_name in cols.items():
220                    renamed_df = df.withColumnRenamed(old_name, new_name)
221
222            return renamed_df
223
224        return inner

Rename specific columns into the designated name.

Arguments:
  • cols: dict with columns and respective target names.
  • escape_col_names: whether to escape column names (e.g. /BIC/COL1) or not. If True it creates a column with the new name and drop the old one. If False, uses the native withColumnRenamed Spark function. Default: True.
Returns:

Function to be called in .transform() spark function.

View Example
37{
38    "function": "rename",
39    "args": {
40        "cols": {
41            "ARTICLE": "article"
42        }
43    }
44}
View Full Acon


@classmethod
def from_avro( cls, schema: str = None, key_col: str = 'key', value_col: str = 'value', options: dict = None, expand_key: bool = False, expand_value: bool = True) -> Callable:
226    @classmethod
227    def from_avro(
228        cls,
229        schema: str = None,
230        key_col: str = "key",
231        value_col: str = "value",
232        options: dict = None,
233        expand_key: bool = False,
234        expand_value: bool = True,
235    ) -> Callable:
236        """Select all attributes from avro.
237
238        Args:
239            schema: the schema string.
240            key_col: the name of the key column.
241            value_col: the name of the value column.
242            options: extra options (e.g., mode: "PERMISSIVE").
243            expand_key: whether you want to expand the content inside the key
244                column or not. Default: false.
245            expand_value: whether you want to expand the content inside the value
246                column or not. Default: true.
247
248        Returns:
249            Function to be called in .transform() spark function.
250        """
251
252        def inner(df: DataFrame) -> DataFrame:
253            cols_to_select = [
254                column for column in df.columns if column not in [key_col, value_col]
255            ]
256
257            return df.select(
258                *cols_to_select,
259                key_col,
260                from_avro(col(value_col), schema, options if options else {}).alias(
261                    value_col
262                ),
263            ).select(
264                *cols_to_select,
265                f"{key_col}.*" if expand_key else key_col,
266                f"{value_col}.*" if expand_value else value_col,
267            )
268
269        return inner

Select all attributes from avro.

Arguments:
  • schema: the schema string.
  • key_col: the name of the key column.
  • value_col: the name of the value column.
  • options: extra options (e.g., mode: "PERMISSIVE").
  • expand_key: whether you want to expand the content inside the key column or not. Default: false.
  • expand_value: whether you want to expand the content inside the value column or not. Default: true.
Returns:

Function to be called in .transform() spark function.

@classmethod
def from_avro_with_registry( cls, schema_registry: str, value_schema: str, value_col: str = 'value', key_schema: str = None, key_col: str = 'key', expand_key: bool = False, expand_value: bool = True) -> Callable:
271    @classmethod
272    def from_avro_with_registry(
273        cls,
274        schema_registry: str,
275        value_schema: str,
276        value_col: str = "value",
277        key_schema: str = None,
278        key_col: str = "key",
279        expand_key: bool = False,
280        expand_value: bool = True,
281    ) -> Callable:
282        """Select all attributes from avro using a schema registry.
283
284        Args:
285            schema_registry: the url to the schema registry.
286            value_schema: the name of the value schema entry in the schema registry.
287            value_col: the name of the value column.
288            key_schema: the name of the key schema entry in the schema
289                registry. Default: None.
290            key_col: the name of the key column.
291            expand_key: whether you want to expand the content inside the key
292                column or not. Default: false.
293            expand_value: whether you want to expand the content inside the value
294                column or not. Default: true.
295
296        Returns:
297            Function to be called in .transform() spark function.
298        """
299
300        def inner(df: DataFrame) -> DataFrame:
301            cols_to_select = [
302                column for column in df.columns if column not in [key_col, value_col]
303            ]
304
305            return df.select(  # type: ignore
306                *cols_to_select,
307                (
308                    from_avro(
309                        data=col(key_col),
310                        subject=key_schema,
311                        schemaRegistryAddress=schema_registry,  # type: ignore
312                    ).alias(key_col)
313                    if key_schema
314                    else key_col
315                ),
316                from_avro(
317                    data=col(value_col),
318                    subject=value_schema,
319                    schemaRegistryAddress=schema_registry,  # type: ignore
320                ).alias(value_col),
321            ).select(
322                *cols_to_select,
323                f"{key_col}.*" if expand_key else key_col,
324                f"{value_col}.*" if expand_value else value_col,
325            )
326
327        return inner

Select all attributes from avro using a schema registry.

Arguments:
  • schema_registry: the url to the schema registry.
  • value_schema: the name of the value schema entry in the schema registry.
  • value_col: the name of the value column.
  • key_schema: the name of the key schema entry in the schema registry. Default: None.
  • key_col: the name of the key column.
  • expand_key: whether you want to expand the content inside the key column or not. Default: false.
  • expand_value: whether you want to expand the content inside the value column or not. Default: true.
Returns:

Function to be called in .transform() spark function.

@classmethod
def from_json( cls, input_col: str, schema_path: Optional[str] = None, schema: Optional[dict] = None, json_options: Optional[dict] = None, drop_all_cols: bool = False, disable_dbfs_retry: bool = False) -> Callable:
329    @classmethod
330    def from_json(
331        cls,
332        input_col: str,
333        schema_path: Optional[str] = None,
334        schema: Optional[dict] = None,
335        json_options: Optional[dict] = None,
336        drop_all_cols: bool = False,
337        disable_dbfs_retry: bool = False,
338    ) -> Callable:
339        """Convert a json string into a json column (struct).
340
341        The new json column can be added to the existing columns (default) or it can
342        replace all the others, being the only one to output. The new column gets the
343        same name as the original one suffixed with '_json'.
344
345        Args:
346            input_col: dict with columns and respective target names.
347            schema_path: path to the StructType schema (spark schema).
348            schema: dict with the StructType schema (spark schema).
349            json_options: options to parse the json value.
350            drop_all_cols: whether to drop all the input columns or not.
351                Defaults to False.
352            disable_dbfs_retry: optional flag to disable file storage dbfs.
353
354        Returns:
355            A function to be called in .transform() spark function.
356        """
357
358        def inner(df: DataFrame) -> DataFrame:
359            if schema_path:
360                json_schema = SchemaUtils.from_file(schema_path, disable_dbfs_retry)
361            elif schema:
362                json_schema = SchemaUtils.from_dict(schema)
363            else:
364                raise WrongArgumentsException(
365                    "A file or dict schema needs to be provided."
366                )
367
368            if drop_all_cols:
369                df_with_json = df.select(
370                    from_json(
371                        col(input_col).cast("string").alias(f"{input_col}_json"),
372                        json_schema,
373                        json_options if json_options else {},
374                    ).alias(f"{input_col}_json")
375                )
376            else:
377                df_with_json = df.select(
378                    "*",
379                    from_json(
380                        col(input_col).cast("string").alias(f"{input_col}_json"),
381                        json_schema,
382                        json_options if json_options else {},
383                    ).alias(f"{input_col}_json"),
384                )
385
386            return df_with_json
387
388        return inner

Convert a json string into a json column (struct).

The new json column can be added to the existing columns (default) or it can replace all the others, being the only one to output. The new column gets the same name as the original one suffixed with '_json'.

Arguments:
  • input_col: dict with columns and respective target names.
  • schema_path: path to the StructType schema (spark schema).
  • schema: dict with the StructType schema (spark schema).
  • json_options: options to parse the json value.
  • drop_all_cols: whether to drop all the input columns or not. Defaults to False.
  • disable_dbfs_retry: optional flag to disable file storage dbfs.
Returns:

A function to be called in .transform() spark function.

View Example
34{
35    "function": "from_json",
36    "args": {
37        "input_col": "sample",
38        "schema": {
39            "type": "struct",
40            "fields": [
41                {
42                    "name": "field1",
43                    "type": "string",
44                    "nullable": true,
45                    "metadata": {}
46                },
47                {
48                    "name": "field2",
49                    "type": "string",
50                    "nullable": true,
51                    "metadata": {}
52                },
53                {
54                    "name": "field3",
55                    "type": "double",
56                    "nullable": true,
57                    "metadata": {}
58                },
59                {
60                    "name": "field4",
61                    "type": {
62                        "type": "struct",
63                        "fields": [
64                            {
65                                "name": "field1",
66                                "type": "string",
67                                "nullable": true,
68                                "metadata": {}
69                            },
70                            {
71                                "name": "field2",
72                                "type": "string",
73                                "nullable": true,
74                                "metadata": {}
75                            }
76                        ]
77                    },
78                    "nullable": true,
79                    "metadata": {}
80                }
81            ]
82        }
83    }
84}
View Full Acon


@classmethod
def to_json( cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None) -> Callable:
390    @classmethod
391    def to_json(
392        cls, in_cols: List[str], out_col: str, json_options: Optional[dict] = None
393    ) -> Callable:
394        """Convert dataframe columns into a json value.
395
396        Args:
397            in_cols: name(s) of the input column(s).
398                Example values:
399                "*" - all
400                columns; "my_col" - one column named "my_col";
401                "my_col1, my_col2" - two columns.
402            out_col: name of the output column.
403            json_options: options to parse the json value.
404
405        Returns:
406            A function to be called in .transform() spark function.
407        """
408
409        def inner(df: DataFrame) -> DataFrame:
410            return df.withColumn(
411                out_col, to_json(struct(*in_cols), json_options if json_options else {})
412            )
413
414        return inner

Convert dataframe columns into a json value.

Arguments:
  • in_cols: name(s) of the input column(s). Example values: "*" - all columns; "my_col" - one column named "my_col"; "my_col1, my_col2" - two columns.
  • out_col: name of the output column.
  • json_options: options to parse the json value.
Returns:

A function to be called in .transform() spark function.

View Example
85{
86    "function": "to_json",
87    "args": {
88        "in_cols": [
89            "item",
90            "amount"
91        ],
92        "out_col": "item_amount_json"
93    }
94}
View Full Acon