lakehouse_engine.transformers.data_maskers

Module with data masking transformers.

 1"""Module with data masking transformers."""
 2
 3from typing import Callable, List
 4
 5from pyspark.sql import DataFrame
 6from pyspark.sql.functions import hash, sha2  # noqa: A004
 7
 8from lakehouse_engine.transformers.exceptions import WrongArgumentsException
 9from lakehouse_engine.utils.logging_handler import LoggingHandler
10
11
12class DataMaskers(object):
13    """Class containing data masking transformers."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def hash_masker(
19        cls,
20        cols: List[str],
21        approach: str = "SHA",
22        num_bits: int = 256,
23        suffix: str = "_hash",
24    ) -> Callable:
25        """Mask specific columns using an hashing approach.
26
27        Args:
28            cols: list of column names to mask.
29            approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well.
30            num_bits: number of bits of the SHA approach. Only applies to SHA approach.
31            suffix: suffix to apply to new column name. Defaults to "_hash".
32                Note: you can pass an empty suffix to have the original column replaced.
33
34        Returns:
35            A function to be called in .transform() spark function.
36        """
37
38        def inner(df: DataFrame) -> DataFrame:
39            masked_df = df
40            for col in cols:
41                if approach == "MURMUR3":
42                    masked_df = masked_df.withColumn(col + suffix, hash(col))
43                elif approach == "SHA":
44                    masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits))
45                else:
46                    raise WrongArgumentsException("Hashing approach is not supported.")
47
48            return masked_df
49
50        return inner
51
52    @classmethod
53    def column_dropper(cls, cols: List[str]) -> Callable:
54        """Drop specific columns.
55
56        Args:
57            cols: list of column names to drop.
58
59        Returns:
60            A function to be called in .transform() spark function.
61        """
62
63        def inner(df: DataFrame) -> DataFrame:
64            drop_df = df
65            for col in cols:
66                drop_df = drop_df.drop(col)
67
68            return drop_df
69
70        return inner
class DataMaskers:
13class DataMaskers(object):
14    """Class containing data masking transformers."""
15
16    _logger = LoggingHandler(__name__).get_logger()
17
18    @classmethod
19    def hash_masker(
20        cls,
21        cols: List[str],
22        approach: str = "SHA",
23        num_bits: int = 256,
24        suffix: str = "_hash",
25    ) -> Callable:
26        """Mask specific columns using an hashing approach.
27
28        Args:
29            cols: list of column names to mask.
30            approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well.
31            num_bits: number of bits of the SHA approach. Only applies to SHA approach.
32            suffix: suffix to apply to new column name. Defaults to "_hash".
33                Note: you can pass an empty suffix to have the original column replaced.
34
35        Returns:
36            A function to be called in .transform() spark function.
37        """
38
39        def inner(df: DataFrame) -> DataFrame:
40            masked_df = df
41            for col in cols:
42                if approach == "MURMUR3":
43                    masked_df = masked_df.withColumn(col + suffix, hash(col))
44                elif approach == "SHA":
45                    masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits))
46                else:
47                    raise WrongArgumentsException("Hashing approach is not supported.")
48
49            return masked_df
50
51        return inner
52
53    @classmethod
54    def column_dropper(cls, cols: List[str]) -> Callable:
55        """Drop specific columns.
56
57        Args:
58            cols: list of column names to drop.
59
60        Returns:
61            A function to be called in .transform() spark function.
62        """
63
64        def inner(df: DataFrame) -> DataFrame:
65            drop_df = df
66            for col in cols:
67                drop_df = drop_df.drop(col)
68
69            return drop_df
70
71        return inner

Class containing data masking transformers.

@classmethod
def hash_masker( cls, cols: List[str], approach: str = 'SHA', num_bits: int = 256, suffix: str = '_hash') -> Callable:
18    @classmethod
19    def hash_masker(
20        cls,
21        cols: List[str],
22        approach: str = "SHA",
23        num_bits: int = 256,
24        suffix: str = "_hash",
25    ) -> Callable:
26        """Mask specific columns using an hashing approach.
27
28        Args:
29            cols: list of column names to mask.
30            approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well.
31            num_bits: number of bits of the SHA approach. Only applies to SHA approach.
32            suffix: suffix to apply to new column name. Defaults to "_hash".
33                Note: you can pass an empty suffix to have the original column replaced.
34
35        Returns:
36            A function to be called in .transform() spark function.
37        """
38
39        def inner(df: DataFrame) -> DataFrame:
40            masked_df = df
41            for col in cols:
42                if approach == "MURMUR3":
43                    masked_df = masked_df.withColumn(col + suffix, hash(col))
44                elif approach == "SHA":
45                    masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits))
46                else:
47                    raise WrongArgumentsException("Hashing approach is not supported.")
48
49            return masked_df
50
51        return inner

Mask specific columns using an hashing approach.

Arguments:
  • cols: list of column names to mask.
  • approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well.
  • num_bits: number of bits of the SHA approach. Only applies to SHA approach.
  • suffix: suffix to apply to new column name. Defaults to "_hash". Note: you can pass an empty suffix to have the original column replaced.
Returns:

A function to be called in .transform() spark function.

View Example
21{
22    "function": "hash_masker",
23    "args": {
24        "cols": [
25            "customer",
26            "article"
27        ]
28    }
29}
View Full Acon


@classmethod
def column_dropper(cls, cols: List[str]) -> Callable:
53    @classmethod
54    def column_dropper(cls, cols: List[str]) -> Callable:
55        """Drop specific columns.
56
57        Args:
58            cols: list of column names to drop.
59
60        Returns:
61            A function to be called in .transform() spark function.
62        """
63
64        def inner(df: DataFrame) -> DataFrame:
65            drop_df = df
66            for col in cols:
67                drop_df = drop_df.drop(col)
68
69            return drop_df
70
71        return inner

Drop specific columns.

Arguments:
  • cols: list of column names to drop.
Returns:

A function to be called in .transform() spark function.

View Example
21{
22    "function": "column_dropper",
23    "args": {
24        "cols": [
25            "customer",
26            "article"
27        ]
28    }
29}
View Full Acon