lakehouse_engine.transformers.date_transformers
Module containing date transformers.
1"""Module containing date transformers.""" 2 3from datetime import datetime 4from typing import Callable, List, Optional 5 6from pyspark.sql import DataFrame 7from pyspark.sql.functions import col, date_format, lit, to_date, to_timestamp 8 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class DateTransformers(object): 13 """Class with set of transformers to transform dates in several forms.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @staticmethod 18 def add_current_date(output_col: str) -> Callable: 19 """Add column with current date. 20 21 The current date comes from the driver as a constant, not from every executor. 22 23 Args: 24 output_col: name of the output column. 25 26 Returns: 27 A function to be executed in the .transform() spark function. 28 """ 29 30 def inner(df: DataFrame) -> DataFrame: 31 return df.withColumn(output_col, lit(datetime.now())) 32 33 return inner 34 35 @staticmethod 36 def convert_to_date( 37 cols: List[str], source_format: Optional[str] = None 38 ) -> Callable: 39 """Convert multiple string columns with a source format into dates. 40 41 Args: 42 cols: list of names of the string columns to convert. 43 source_format: dates source format (e.g., YYYY-MM-dd). [Check here]( 44 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 45 46 Returns: 47 A function to be executed in the .transform() spark function. 48 """ 49 50 def inner(df: DataFrame) -> DataFrame: 51 converted_df = df 52 for c in cols: 53 converted_df = converted_df.withColumn( 54 c, to_date(col(c), source_format) 55 ) 56 57 return converted_df 58 59 return inner 60 61 @staticmethod 62 def convert_to_timestamp( 63 cols: List[str], source_format: Optional[str] = None 64 ) -> Callable: 65 """Convert multiple string columns with a source format into timestamps. 66 67 Args: 68 cols: list of names of the string columns to convert. 69 source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS). 70 [Check here]( 71 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 72 73 Returns: 74 A function to be executed in the .transform() spark function. 75 """ 76 77 def inner(df: DataFrame) -> DataFrame: 78 converted_df = df 79 for c in cols: 80 converted_df = converted_df.withColumn( 81 c, to_timestamp(col(c), source_format) 82 ) 83 84 return converted_df 85 86 return inner 87 88 @staticmethod 89 def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable: 90 """Convert multiple date/timestamp columns into strings with the target format. 91 92 Args: 93 cols: list of names of the string columns to convert. 94 target_format: strings target format (e.g., YYYY-MM-dd). [Check here]( 95 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 96 97 Returns: 98 A function to be executed in the .transform() spark function. 99 """ 100 101 def inner(df: DataFrame) -> DataFrame: 102 converted_df = df 103 for c in cols: 104 converted_df = converted_df.withColumn( 105 c, date_format(col(c), target_format) 106 ) 107 108 return converted_df 109 110 return inner 111 112 @staticmethod 113 def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable: 114 """Create day/month/week/quarter/year hierarchy for the provided date columns. 115 116 Uses Spark's extract function. 117 118 Args: 119 cols: list of names of the date columns to create the hierarchy. 120 formats: dict with the correspondence between the hierarchy and the format 121 to apply. [Check here]( 122 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 123 Example: { 124 "year": "year", 125 "month": "month", 126 "day": "day", 127 "week": "week", 128 "quarter": "quarter" 129 } 130 131 Returns: 132 A function to be executed in the .transform() spark function. 133 """ 134 if not formats: 135 formats = { 136 "year": "year", 137 "month": "month", 138 "day": "day", 139 "week": "week", 140 "quarter": "quarter", 141 } 142 143 def inner(df: DataFrame) -> DataFrame: 144 transformer_df = df 145 for c in cols: 146 transformer_df = transformer_df.selectExpr( 147 "*", 148 f"extract({formats['day']} from {c}) as {c}_day", 149 f"extract({formats['month']} from {c}) as {c}_month", 150 f"extract({formats['week']} from {c}) as {c}_week", 151 f"extract({formats['quarter']} from {c}) as {c}_quarter", 152 f"extract({formats['year']} from {c}) as {c}_year", 153 ) 154 155 return transformer_df 156 157 return inner
13class DateTransformers(object): 14 """Class with set of transformers to transform dates in several forms.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @staticmethod 19 def add_current_date(output_col: str) -> Callable: 20 """Add column with current date. 21 22 The current date comes from the driver as a constant, not from every executor. 23 24 Args: 25 output_col: name of the output column. 26 27 Returns: 28 A function to be executed in the .transform() spark function. 29 """ 30 31 def inner(df: DataFrame) -> DataFrame: 32 return df.withColumn(output_col, lit(datetime.now())) 33 34 return inner 35 36 @staticmethod 37 def convert_to_date( 38 cols: List[str], source_format: Optional[str] = None 39 ) -> Callable: 40 """Convert multiple string columns with a source format into dates. 41 42 Args: 43 cols: list of names of the string columns to convert. 44 source_format: dates source format (e.g., YYYY-MM-dd). [Check here]( 45 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 46 47 Returns: 48 A function to be executed in the .transform() spark function. 49 """ 50 51 def inner(df: DataFrame) -> DataFrame: 52 converted_df = df 53 for c in cols: 54 converted_df = converted_df.withColumn( 55 c, to_date(col(c), source_format) 56 ) 57 58 return converted_df 59 60 return inner 61 62 @staticmethod 63 def convert_to_timestamp( 64 cols: List[str], source_format: Optional[str] = None 65 ) -> Callable: 66 """Convert multiple string columns with a source format into timestamps. 67 68 Args: 69 cols: list of names of the string columns to convert. 70 source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS). 71 [Check here]( 72 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 73 74 Returns: 75 A function to be executed in the .transform() spark function. 76 """ 77 78 def inner(df: DataFrame) -> DataFrame: 79 converted_df = df 80 for c in cols: 81 converted_df = converted_df.withColumn( 82 c, to_timestamp(col(c), source_format) 83 ) 84 85 return converted_df 86 87 return inner 88 89 @staticmethod 90 def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable: 91 """Convert multiple date/timestamp columns into strings with the target format. 92 93 Args: 94 cols: list of names of the string columns to convert. 95 target_format: strings target format (e.g., YYYY-MM-dd). [Check here]( 96 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 97 98 Returns: 99 A function to be executed in the .transform() spark function. 100 """ 101 102 def inner(df: DataFrame) -> DataFrame: 103 converted_df = df 104 for c in cols: 105 converted_df = converted_df.withColumn( 106 c, date_format(col(c), target_format) 107 ) 108 109 return converted_df 110 111 return inner 112 113 @staticmethod 114 def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable: 115 """Create day/month/week/quarter/year hierarchy for the provided date columns. 116 117 Uses Spark's extract function. 118 119 Args: 120 cols: list of names of the date columns to create the hierarchy. 121 formats: dict with the correspondence between the hierarchy and the format 122 to apply. [Check here]( 123 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 124 Example: { 125 "year": "year", 126 "month": "month", 127 "day": "day", 128 "week": "week", 129 "quarter": "quarter" 130 } 131 132 Returns: 133 A function to be executed in the .transform() spark function. 134 """ 135 if not formats: 136 formats = { 137 "year": "year", 138 "month": "month", 139 "day": "day", 140 "week": "week", 141 "quarter": "quarter", 142 } 143 144 def inner(df: DataFrame) -> DataFrame: 145 transformer_df = df 146 for c in cols: 147 transformer_df = transformer_df.selectExpr( 148 "*", 149 f"extract({formats['day']} from {c}) as {c}_day", 150 f"extract({formats['month']} from {c}) as {c}_month", 151 f"extract({formats['week']} from {c}) as {c}_week", 152 f"extract({formats['quarter']} from {c}) as {c}_quarter", 153 f"extract({formats['year']} from {c}) as {c}_year", 154 ) 155 156 return transformer_df 157 158 return inner
Class with set of transformers to transform dates in several forms.
18 @staticmethod 19 def add_current_date(output_col: str) -> Callable: 20 """Add column with current date. 21 22 The current date comes from the driver as a constant, not from every executor. 23 24 Args: 25 output_col: name of the output column. 26 27 Returns: 28 A function to be executed in the .transform() spark function. 29 """ 30 31 def inner(df: DataFrame) -> DataFrame: 32 return df.withColumn(output_col, lit(datetime.now())) 33 34 return inner
Add column with current date.
The current date comes from the driver as a constant, not from every executor.
Arguments:
- output_col: name of the output column.
Returns:
A function to be executed in the .transform() spark function.
View Example
36 @staticmethod 37 def convert_to_date( 38 cols: List[str], source_format: Optional[str] = None 39 ) -> Callable: 40 """Convert multiple string columns with a source format into dates. 41 42 Args: 43 cols: list of names of the string columns to convert. 44 source_format: dates source format (e.g., YYYY-MM-dd). [Check here]( 45 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 46 47 Returns: 48 A function to be executed in the .transform() spark function. 49 """ 50 51 def inner(df: DataFrame) -> DataFrame: 52 converted_df = df 53 for c in cols: 54 converted_df = converted_df.withColumn( 55 c, to_date(col(c), source_format) 56 ) 57 58 return converted_df 59 60 return inner
Convert multiple string columns with a source format into dates.
Arguments:
- cols: list of names of the string columns to convert.
- source_format: dates source format (e.g., YYYY-MM-dd). Check here.
Returns:
A function to be executed in the .transform() spark function.
View Example
62 @staticmethod 63 def convert_to_timestamp( 64 cols: List[str], source_format: Optional[str] = None 65 ) -> Callable: 66 """Convert multiple string columns with a source format into timestamps. 67 68 Args: 69 cols: list of names of the string columns to convert. 70 source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS). 71 [Check here]( 72 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 73 74 Returns: 75 A function to be executed in the .transform() spark function. 76 """ 77 78 def inner(df: DataFrame) -> DataFrame: 79 converted_df = df 80 for c in cols: 81 converted_df = converted_df.withColumn( 82 c, to_timestamp(col(c), source_format) 83 ) 84 85 return converted_df 86 87 return inner
Convert multiple string columns with a source format into timestamps.
Arguments:
- cols: list of names of the string columns to convert.
- source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS). Check here.
Returns:
A function to be executed in the .transform() spark function.
View Example
89 @staticmethod 90 def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable: 91 """Convert multiple date/timestamp columns into strings with the target format. 92 93 Args: 94 cols: list of names of the string columns to convert. 95 target_format: strings target format (e.g., YYYY-MM-dd). [Check here]( 96 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 97 98 Returns: 99 A function to be executed in the .transform() spark function. 100 """ 101 102 def inner(df: DataFrame) -> DataFrame: 103 converted_df = df 104 for c in cols: 105 converted_df = converted_df.withColumn( 106 c, date_format(col(c), target_format) 107 ) 108 109 return converted_df 110 111 return inner
Convert multiple date/timestamp columns into strings with the target format.
Arguments:
- cols: list of names of the string columns to convert.
- target_format: strings target format (e.g., YYYY-MM-dd). Check here.
Returns:
A function to be executed in the .transform() spark function.
View Example
113 @staticmethod 114 def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable: 115 """Create day/month/week/quarter/year hierarchy for the provided date columns. 116 117 Uses Spark's extract function. 118 119 Args: 120 cols: list of names of the date columns to create the hierarchy. 121 formats: dict with the correspondence between the hierarchy and the format 122 to apply. [Check here]( 123 https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html). 124 Example: { 125 "year": "year", 126 "month": "month", 127 "day": "day", 128 "week": "week", 129 "quarter": "quarter" 130 } 131 132 Returns: 133 A function to be executed in the .transform() spark function. 134 """ 135 if not formats: 136 formats = { 137 "year": "year", 138 "month": "month", 139 "day": "day", 140 "week": "week", 141 "quarter": "quarter", 142 } 143 144 def inner(df: DataFrame) -> DataFrame: 145 transformer_df = df 146 for c in cols: 147 transformer_df = transformer_df.selectExpr( 148 "*", 149 f"extract({formats['day']} from {c}) as {c}_day", 150 f"extract({formats['month']} from {c}) as {c}_month", 151 f"extract({formats['week']} from {c}) as {c}_week", 152 f"extract({formats['quarter']} from {c}) as {c}_quarter", 153 f"extract({formats['year']} from {c}) as {c}_year", 154 ) 155 156 return transformer_df 157 158 return inner
Create day/month/week/quarter/year hierarchy for the provided date columns.
Uses Spark's extract function.
Arguments:
- cols: list of names of the date columns to create the hierarchy.
- formats: dict with the correspondence between the hierarchy and the format to apply. Check here. Example: { "year": "year", "month": "month", "day": "day", "week": "week", "quarter": "quarter" }
Returns:
A function to be executed in the .transform() spark function.