lakehouse_engine.transformers.data_maskers
Module with data masking transformers.
1"""Module with data masking transformers.""" 2 3from typing import Callable, List 4 5from pyspark.sql import DataFrame 6from pyspark.sql.functions import hash, sha2 # noqa: A004 7 8from lakehouse_engine.transformers.exceptions import WrongArgumentsException 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class DataMaskers(object): 13 """Class containing data masking transformers.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def hash_masker( 19 cls, 20 cols: List[str], 21 approach: str = "SHA", 22 num_bits: int = 256, 23 suffix: str = "_hash", 24 ) -> Callable: 25 """Mask specific columns using an hashing approach. 26 27 Args: 28 cols: list of column names to mask. 29 approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well. 30 num_bits: number of bits of the SHA approach. Only applies to SHA approach. 31 suffix: suffix to apply to new column name. Defaults to "_hash". 32 Note: you can pass an empty suffix to have the original column replaced. 33 34 Returns: 35 A function to be called in .transform() spark function. 36 """ 37 38 def inner(df: DataFrame) -> DataFrame: 39 masked_df = df 40 for col in cols: 41 if approach == "MURMUR3": 42 masked_df = masked_df.withColumn(col + suffix, hash(col)) 43 elif approach == "SHA": 44 masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits)) 45 else: 46 raise WrongArgumentsException("Hashing approach is not supported.") 47 48 return masked_df 49 50 return inner 51 52 @classmethod 53 def column_dropper(cls, cols: List[str]) -> Callable: 54 """Drop specific columns. 55 56 Args: 57 cols: list of column names to drop. 58 59 Returns: 60 A function to be called in .transform() spark function. 61 """ 62 63 def inner(df: DataFrame) -> DataFrame: 64 drop_df = df 65 for col in cols: 66 drop_df = drop_df.drop(col) 67 68 return drop_df 69 70 return inner
class
DataMaskers:
13class DataMaskers(object): 14 """Class containing data masking transformers.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @classmethod 19 def hash_masker( 20 cls, 21 cols: List[str], 22 approach: str = "SHA", 23 num_bits: int = 256, 24 suffix: str = "_hash", 25 ) -> Callable: 26 """Mask specific columns using an hashing approach. 27 28 Args: 29 cols: list of column names to mask. 30 approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well. 31 num_bits: number of bits of the SHA approach. Only applies to SHA approach. 32 suffix: suffix to apply to new column name. Defaults to "_hash". 33 Note: you can pass an empty suffix to have the original column replaced. 34 35 Returns: 36 A function to be called in .transform() spark function. 37 """ 38 39 def inner(df: DataFrame) -> DataFrame: 40 masked_df = df 41 for col in cols: 42 if approach == "MURMUR3": 43 masked_df = masked_df.withColumn(col + suffix, hash(col)) 44 elif approach == "SHA": 45 masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits)) 46 else: 47 raise WrongArgumentsException("Hashing approach is not supported.") 48 49 return masked_df 50 51 return inner 52 53 @classmethod 54 def column_dropper(cls, cols: List[str]) -> Callable: 55 """Drop specific columns. 56 57 Args: 58 cols: list of column names to drop. 59 60 Returns: 61 A function to be called in .transform() spark function. 62 """ 63 64 def inner(df: DataFrame) -> DataFrame: 65 drop_df = df 66 for col in cols: 67 drop_df = drop_df.drop(col) 68 69 return drop_df 70 71 return inner
Class containing data masking transformers.
@classmethod
def
hash_masker( cls, cols: List[str], approach: str = 'SHA', num_bits: int = 256, suffix: str = '_hash') -> Callable:
18 @classmethod 19 def hash_masker( 20 cls, 21 cols: List[str], 22 approach: str = "SHA", 23 num_bits: int = 256, 24 suffix: str = "_hash", 25 ) -> Callable: 26 """Mask specific columns using an hashing approach. 27 28 Args: 29 cols: list of column names to mask. 30 approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well. 31 num_bits: number of bits of the SHA approach. Only applies to SHA approach. 32 suffix: suffix to apply to new column name. Defaults to "_hash". 33 Note: you can pass an empty suffix to have the original column replaced. 34 35 Returns: 36 A function to be called in .transform() spark function. 37 """ 38 39 def inner(df: DataFrame) -> DataFrame: 40 masked_df = df 41 for col in cols: 42 if approach == "MURMUR3": 43 masked_df = masked_df.withColumn(col + suffix, hash(col)) 44 elif approach == "SHA": 45 masked_df = masked_df.withColumn(col + suffix, sha2(col, num_bits)) 46 else: 47 raise WrongArgumentsException("Hashing approach is not supported.") 48 49 return masked_df 50 51 return inner
Mask specific columns using an hashing approach.
Arguments:
- cols: list of column names to mask.
- approach: hashing approach. Defaults to 'SHA'. There's "MURMUR3" as well.
- num_bits: number of bits of the SHA approach. Only applies to SHA approach.
- suffix: suffix to apply to new column name. Defaults to "_hash". Note: you can pass an empty suffix to have the original column replaced.
Returns:
A function to be called in .transform() spark function.
View Example
@classmethod
def
column_dropper(cls, cols: List[str]) -> Callable:
53 @classmethod 54 def column_dropper(cls, cols: List[str]) -> Callable: 55 """Drop specific columns. 56 57 Args: 58 cols: list of column names to drop. 59 60 Returns: 61 A function to be called in .transform() spark function. 62 """ 63 64 def inner(df: DataFrame) -> DataFrame: 65 drop_df = df 66 for col in cols: 67 drop_df = drop_df.drop(col) 68 69 return drop_df 70 71 return inner
Drop specific columns.
Arguments:
- cols: list of column names to drop.
Returns:
A function to be called in .transform() spark function.