lakehouse_engine.transformers.optimizers

Optimizers module.

 1"""Optimizers module."""
 2
 3from typing import Callable
 4
 5from pyspark.sql import DataFrame
 6from pyspark.storagelevel import StorageLevel
 7
 8from lakehouse_engine.utils.logging_handler import LoggingHandler
 9
10
11class Optimizers(object):
12    """Class containing all the functions that can provide optimizations."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @classmethod
17    def cache(cls) -> Callable:
18        """Caches the current dataframe.
19
20        The default storage level used is MEMORY_AND_DISK.
21
22        Returns:
23            A function to be called in .transform() spark function.
24        """
25
26        def inner(df: DataFrame) -> DataFrame:
27            return df.cache()
28
29        return inner
30
31    @classmethod
32    def persist(cls, storage_level: str = None) -> Callable:
33        """Caches the current dataframe with a specific StorageLevel.
34
35        Args:
36            storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER.
37                [More options here](
38                https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html).
39
40        Returns:
41            A function to be called in .transform() spark function.
42        """
43
44        def inner(df: DataFrame) -> DataFrame:
45            level = getattr(
46                StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER
47            )
48
49            return df.persist(level)
50
51        return inner
52
53    @classmethod
54    def unpersist(cls, blocking: bool = False) -> Callable:
55        """Removes the dataframe from the disk and memory.
56
57        Args:
58            blocking: whether to block until all the data blocks are
59                removed from disk/memory or run asynchronously.
60
61        Returns:
62            A function to be called in .transform() spark function.
63        """
64
65        def inner(df: DataFrame) -> DataFrame:
66            return df.unpersist(blocking)
67
68        return inner
class Optimizers:
12class Optimizers(object):
13    """Class containing all the functions that can provide optimizations."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def cache(cls) -> Callable:
19        """Caches the current dataframe.
20
21        The default storage level used is MEMORY_AND_DISK.
22
23        Returns:
24            A function to be called in .transform() spark function.
25        """
26
27        def inner(df: DataFrame) -> DataFrame:
28            return df.cache()
29
30        return inner
31
32    @classmethod
33    def persist(cls, storage_level: str = None) -> Callable:
34        """Caches the current dataframe with a specific StorageLevel.
35
36        Args:
37            storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER.
38                [More options here](
39                https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html).
40
41        Returns:
42            A function to be called in .transform() spark function.
43        """
44
45        def inner(df: DataFrame) -> DataFrame:
46            level = getattr(
47                StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER
48            )
49
50            return df.persist(level)
51
52        return inner
53
54    @classmethod
55    def unpersist(cls, blocking: bool = False) -> Callable:
56        """Removes the dataframe from the disk and memory.
57
58        Args:
59            blocking: whether to block until all the data blocks are
60                removed from disk/memory or run asynchronously.
61
62        Returns:
63            A function to be called in .transform() spark function.
64        """
65
66        def inner(df: DataFrame) -> DataFrame:
67            return df.unpersist(blocking)
68
69        return inner

Class containing all the functions that can provide optimizations.

@classmethod
def cache(cls) -> Callable:
17    @classmethod
18    def cache(cls) -> Callable:
19        """Caches the current dataframe.
20
21        The default storage level used is MEMORY_AND_DISK.
22
23        Returns:
24            A function to be called in .transform() spark function.
25        """
26
27        def inner(df: DataFrame) -> DataFrame:
28            return df.cache()
29
30        return inner

Caches the current dataframe.

The default storage level used is MEMORY_AND_DISK.

Returns:

A function to be called in .transform() spark function.

@classmethod
def persist(cls, storage_level: str = None) -> Callable:
32    @classmethod
33    def persist(cls, storage_level: str = None) -> Callable:
34        """Caches the current dataframe with a specific StorageLevel.
35
36        Args:
37            storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER.
38                [More options here](
39                https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html).
40
41        Returns:
42            A function to be called in .transform() spark function.
43        """
44
45        def inner(df: DataFrame) -> DataFrame:
46            level = getattr(
47                StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER
48            )
49
50            return df.persist(level)
51
52        return inner

Caches the current dataframe with a specific StorageLevel.

Arguments:
  • storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER. More options here.
Returns:

A function to be called in .transform() spark function.

@classmethod
def unpersist(cls, blocking: bool = False) -> Callable:
54    @classmethod
55    def unpersist(cls, blocking: bool = False) -> Callable:
56        """Removes the dataframe from the disk and memory.
57
58        Args:
59            blocking: whether to block until all the data blocks are
60                removed from disk/memory or run asynchronously.
61
62        Returns:
63            A function to be called in .transform() spark function.
64        """
65
66        def inner(df: DataFrame) -> DataFrame:
67            return df.unpersist(blocking)
68
69        return inner

Removes the dataframe from the disk and memory.

Arguments:
  • blocking: whether to block until all the data blocks are removed from disk/memory or run asynchronously.
Returns:

A function to be called in .transform() spark function.