lakehouse_engine.transformers.date_transformers

Module containing date transformers.

  1"""Module containing date transformers."""
  2
  3from datetime import datetime
  4from typing import Callable, List, Optional
  5
  6from pyspark.sql import DataFrame
  7from pyspark.sql.functions import col, date_format, lit, to_date, to_timestamp
  8
  9from lakehouse_engine.utils.logging_handler import LoggingHandler
 10
 11
 12class DateTransformers(object):
 13    """Class with set of transformers to transform dates in several forms."""
 14
 15    _logger = LoggingHandler(__name__).get_logger()
 16
 17    @staticmethod
 18    def add_current_date(output_col: str) -> Callable:
 19        """Add column with current date.
 20
 21        The current date comes from the driver as a constant, not from every executor.
 22
 23        Args:
 24            output_col: name of the output column.
 25
 26        Returns:
 27            A function to be executed in the .transform() spark function.
 28        """
 29
 30        def inner(df: DataFrame) -> DataFrame:
 31            return df.withColumn(output_col, lit(datetime.now()))
 32
 33        return inner
 34
 35    @staticmethod
 36    def convert_to_date(
 37        cols: List[str], source_format: Optional[str] = None
 38    ) -> Callable:
 39        """Convert multiple string columns with a source format into dates.
 40
 41        Args:
 42            cols: list of names of the string columns to convert.
 43            source_format: dates source format (e.g., YYYY-MM-dd). [Check here](
 44                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 45
 46        Returns:
 47            A function to be executed in the .transform() spark function.
 48        """
 49
 50        def inner(df: DataFrame) -> DataFrame:
 51            converted_df = df
 52            for c in cols:
 53                converted_df = converted_df.withColumn(
 54                    c, to_date(col(c), source_format)
 55                )
 56
 57            return converted_df
 58
 59        return inner
 60
 61    @staticmethod
 62    def convert_to_timestamp(
 63        cols: List[str], source_format: Optional[str] = None
 64    ) -> Callable:
 65        """Convert multiple string columns with a source format into timestamps.
 66
 67        Args:
 68            cols: list of names of the string columns to convert.
 69            source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS).
 70                [Check here](
 71                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 72
 73        Returns:
 74            A function to be executed in the .transform() spark function.
 75        """
 76
 77        def inner(df: DataFrame) -> DataFrame:
 78            converted_df = df
 79            for c in cols:
 80                converted_df = converted_df.withColumn(
 81                    c, to_timestamp(col(c), source_format)
 82                )
 83
 84            return converted_df
 85
 86        return inner
 87
 88    @staticmethod
 89    def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable:
 90        """Convert multiple date/timestamp columns into strings with the target format.
 91
 92        Args:
 93            cols: list of names of the string columns to convert.
 94            target_format: strings target format (e.g., YYYY-MM-dd). [Check here](
 95                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 96
 97        Returns:
 98            A function to be executed in the .transform() spark function.
 99        """
100
101        def inner(df: DataFrame) -> DataFrame:
102            converted_df = df
103            for c in cols:
104                converted_df = converted_df.withColumn(
105                    c, date_format(col(c), target_format)
106                )
107
108            return converted_df
109
110        return inner
111
112    @staticmethod
113    def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable:
114        """Create day/month/week/quarter/year hierarchy for the provided date columns.
115
116        Uses Spark's extract function.
117
118        Args:
119            cols: list of names of the date columns to create the hierarchy.
120            formats: dict with the correspondence between the hierarchy and the format
121                to apply. [Check here](
122                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
123                Example: {
124                    "year": "year",
125                    "month": "month",
126                    "day": "day",
127                    "week": "week",
128                    "quarter": "quarter"
129                }
130
131        Returns:
132            A function to be executed in the .transform() spark function.
133        """
134        if not formats:
135            formats = {
136                "year": "year",
137                "month": "month",
138                "day": "day",
139                "week": "week",
140                "quarter": "quarter",
141            }
142
143        def inner(df: DataFrame) -> DataFrame:
144            transformer_df = df
145            for c in cols:
146                transformer_df = transformer_df.selectExpr(
147                    "*",
148                    f"extract({formats['day']} from {c}) as {c}_day",
149                    f"extract({formats['month']} from {c}) as {c}_month",
150                    f"extract({formats['week']} from {c}) as {c}_week",
151                    f"extract({formats['quarter']} from {c}) as {c}_quarter",
152                    f"extract({formats['year']} from {c}) as {c}_year",
153                )
154
155            return transformer_df
156
157        return inner
class DateTransformers:
 13class DateTransformers(object):
 14    """Class with set of transformers to transform dates in several forms."""
 15
 16    _logger = LoggingHandler(__name__).get_logger()
 17
 18    @staticmethod
 19    def add_current_date(output_col: str) -> Callable:
 20        """Add column with current date.
 21
 22        The current date comes from the driver as a constant, not from every executor.
 23
 24        Args:
 25            output_col: name of the output column.
 26
 27        Returns:
 28            A function to be executed in the .transform() spark function.
 29        """
 30
 31        def inner(df: DataFrame) -> DataFrame:
 32            return df.withColumn(output_col, lit(datetime.now()))
 33
 34        return inner
 35
 36    @staticmethod
 37    def convert_to_date(
 38        cols: List[str], source_format: Optional[str] = None
 39    ) -> Callable:
 40        """Convert multiple string columns with a source format into dates.
 41
 42        Args:
 43            cols: list of names of the string columns to convert.
 44            source_format: dates source format (e.g., YYYY-MM-dd). [Check here](
 45                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 46
 47        Returns:
 48            A function to be executed in the .transform() spark function.
 49        """
 50
 51        def inner(df: DataFrame) -> DataFrame:
 52            converted_df = df
 53            for c in cols:
 54                converted_df = converted_df.withColumn(
 55                    c, to_date(col(c), source_format)
 56                )
 57
 58            return converted_df
 59
 60        return inner
 61
 62    @staticmethod
 63    def convert_to_timestamp(
 64        cols: List[str], source_format: Optional[str] = None
 65    ) -> Callable:
 66        """Convert multiple string columns with a source format into timestamps.
 67
 68        Args:
 69            cols: list of names of the string columns to convert.
 70            source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS).
 71                [Check here](
 72                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 73
 74        Returns:
 75            A function to be executed in the .transform() spark function.
 76        """
 77
 78        def inner(df: DataFrame) -> DataFrame:
 79            converted_df = df
 80            for c in cols:
 81                converted_df = converted_df.withColumn(
 82                    c, to_timestamp(col(c), source_format)
 83                )
 84
 85            return converted_df
 86
 87        return inner
 88
 89    @staticmethod
 90    def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable:
 91        """Convert multiple date/timestamp columns into strings with the target format.
 92
 93        Args:
 94            cols: list of names of the string columns to convert.
 95            target_format: strings target format (e.g., YYYY-MM-dd). [Check here](
 96                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 97
 98        Returns:
 99            A function to be executed in the .transform() spark function.
100        """
101
102        def inner(df: DataFrame) -> DataFrame:
103            converted_df = df
104            for c in cols:
105                converted_df = converted_df.withColumn(
106                    c, date_format(col(c), target_format)
107                )
108
109            return converted_df
110
111        return inner
112
113    @staticmethod
114    def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable:
115        """Create day/month/week/quarter/year hierarchy for the provided date columns.
116
117        Uses Spark's extract function.
118
119        Args:
120            cols: list of names of the date columns to create the hierarchy.
121            formats: dict with the correspondence between the hierarchy and the format
122                to apply. [Check here](
123                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
124                Example: {
125                    "year": "year",
126                    "month": "month",
127                    "day": "day",
128                    "week": "week",
129                    "quarter": "quarter"
130                }
131
132        Returns:
133            A function to be executed in the .transform() spark function.
134        """
135        if not formats:
136            formats = {
137                "year": "year",
138                "month": "month",
139                "day": "day",
140                "week": "week",
141                "quarter": "quarter",
142            }
143
144        def inner(df: DataFrame) -> DataFrame:
145            transformer_df = df
146            for c in cols:
147                transformer_df = transformer_df.selectExpr(
148                    "*",
149                    f"extract({formats['day']} from {c}) as {c}_day",
150                    f"extract({formats['month']} from {c}) as {c}_month",
151                    f"extract({formats['week']} from {c}) as {c}_week",
152                    f"extract({formats['quarter']} from {c}) as {c}_quarter",
153                    f"extract({formats['year']} from {c}) as {c}_year",
154                )
155
156            return transformer_df
157
158        return inner

Class with set of transformers to transform dates in several forms.

@staticmethod
def add_current_date(output_col: str) -> Callable:
18    @staticmethod
19    def add_current_date(output_col: str) -> Callable:
20        """Add column with current date.
21
22        The current date comes from the driver as a constant, not from every executor.
23
24        Args:
25            output_col: name of the output column.
26
27        Returns:
28            A function to be executed in the .transform() spark function.
29        """
30
31        def inner(df: DataFrame) -> DataFrame:
32            return df.withColumn(output_col, lit(datetime.now()))
33
34        return inner

Add column with current date.

The current date comes from the driver as a constant, not from every executor.

Arguments:
  • output_col: name of the output column.
Returns:

A function to be executed in the .transform() spark function.

View Example
21{
22    "function": "add_current_date",
23    "args": {
24        "output_col": "curr_date"
25    }
26}
View Full Acon


@staticmethod
def convert_to_date(cols: List[str], source_format: Optional[str] = None) -> Callable:
36    @staticmethod
37    def convert_to_date(
38        cols: List[str], source_format: Optional[str] = None
39    ) -> Callable:
40        """Convert multiple string columns with a source format into dates.
41
42        Args:
43            cols: list of names of the string columns to convert.
44            source_format: dates source format (e.g., YYYY-MM-dd). [Check here](
45                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
46
47        Returns:
48            A function to be executed in the .transform() spark function.
49        """
50
51        def inner(df: DataFrame) -> DataFrame:
52            converted_df = df
53            for c in cols:
54                converted_df = converted_df.withColumn(
55                    c, to_date(col(c), source_format)
56                )
57
58            return converted_df
59
60        return inner

Convert multiple string columns with a source format into dates.

Arguments:
  • cols: list of names of the string columns to convert.
  • source_format: dates source format (e.g., YYYY-MM-dd). Check here.
Returns:

A function to be executed in the .transform() spark function.

View Example
27{
28    "function": "convert_to_date",
29    "args": {
30        "cols": [
31            "order_date2"
32        ],
33        "source_format": "dd-MM-yyyy"
34    }
35}
View Full Acon


@staticmethod
def convert_to_timestamp(cols: List[str], source_format: Optional[str] = None) -> Callable:
62    @staticmethod
63    def convert_to_timestamp(
64        cols: List[str], source_format: Optional[str] = None
65    ) -> Callable:
66        """Convert multiple string columns with a source format into timestamps.
67
68        Args:
69            cols: list of names of the string columns to convert.
70            source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS).
71                [Check here](
72                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
73
74        Returns:
75            A function to be executed in the .transform() spark function.
76        """
77
78        def inner(df: DataFrame) -> DataFrame:
79            converted_df = df
80            for c in cols:
81                converted_df = converted_df.withColumn(
82                    c, to_timestamp(col(c), source_format)
83                )
84
85            return converted_df
86
87        return inner

Convert multiple string columns with a source format into timestamps.

Arguments:
  • cols: list of names of the string columns to convert.
  • source_format: dates source format (e.g., MM-dd-yyyy HH:mm:ss.SSS). Check here.
Returns:

A function to be executed in the .transform() spark function.

View Example
41{
42    "function": "convert_to_timestamp",
43    "args": {
44        "cols": [
45            "ship_date"
46        ],
47        "source_format": "yyyy-dd-MM HH:mm:ss"
48    }
49}
View Full Acon


@staticmethod
def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable:
 89    @staticmethod
 90    def format_date(cols: List[str], target_format: Optional[str] = None) -> Callable:
 91        """Convert multiple date/timestamp columns into strings with the target format.
 92
 93        Args:
 94            cols: list of names of the string columns to convert.
 95            target_format: strings target format (e.g., YYYY-MM-dd). [Check here](
 96                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
 97
 98        Returns:
 99            A function to be executed in the .transform() spark function.
100        """
101
102        def inner(df: DataFrame) -> DataFrame:
103            converted_df = df
104            for c in cols:
105                converted_df = converted_df.withColumn(
106                    c, date_format(col(c), target_format)
107                )
108
109            return converted_df
110
111        return inner

Convert multiple date/timestamp columns into strings with the target format.

Arguments:
  • cols: list of names of the string columns to convert.
  • target_format: strings target format (e.g., YYYY-MM-dd). Check here.
Returns:

A function to be executed in the .transform() spark function.

View Example
48{
49    "function": "format_date",
50    "args": {
51        "cols": [
52            "order_date3",
53            "ship_date"
54        ],
55        "target_format": "yy-d-M"
56    }
57}
View Full Acon


@staticmethod
def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable:
113    @staticmethod
114    def get_date_hierarchy(cols: List[str], formats: Optional[dict] = None) -> Callable:
115        """Create day/month/week/quarter/year hierarchy for the provided date columns.
116
117        Uses Spark's extract function.
118
119        Args:
120            cols: list of names of the date columns to create the hierarchy.
121            formats: dict with the correspondence between the hierarchy and the format
122                to apply. [Check here](
123                https://docs.oracle.com/javase/10/docs/api/java/time/format/DateTimeFormatter.html).
124                Example: {
125                    "year": "year",
126                    "month": "month",
127                    "day": "day",
128                    "week": "week",
129                    "quarter": "quarter"
130                }
131
132        Returns:
133            A function to be executed in the .transform() spark function.
134        """
135        if not formats:
136            formats = {
137                "year": "year",
138                "month": "month",
139                "day": "day",
140                "week": "week",
141                "quarter": "quarter",
142            }
143
144        def inner(df: DataFrame) -> DataFrame:
145            transformer_df = df
146            for c in cols:
147                transformer_df = transformer_df.selectExpr(
148                    "*",
149                    f"extract({formats['day']} from {c}) as {c}_day",
150                    f"extract({formats['month']} from {c}) as {c}_month",
151                    f"extract({formats['week']} from {c}) as {c}_week",
152                    f"extract({formats['quarter']} from {c}) as {c}_quarter",
153                    f"extract({formats['year']} from {c}) as {c}_year",
154                )
155
156            return transformer_df
157
158        return inner

Create day/month/week/quarter/year hierarchy for the provided date columns.

Uses Spark's extract function.

Arguments:
  • cols: list of names of the date columns to create the hierarchy.
  • formats: dict with the correspondence between the hierarchy and the format to apply. Check here. Example: { "year": "year", "month": "month", "day": "day", "week": "week", "quarter": "quarter" }
Returns:

A function to be executed in the .transform() spark function.

View Example
55{
56    "function": "get_date_hierarchy",
57    "args": {
58        "cols": [
59            "order_date2",
60            "ship_date2"
61        ]
62    }
63}
View Full Acon