lakehouse_engine.transformers.optimizers
Optimizers module.
1"""Optimizers module.""" 2 3from typing import Callable 4 5from pyspark.sql import DataFrame 6from pyspark.storagelevel import StorageLevel 7 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9 10 11class Optimizers(object): 12 """Class containing all the functions that can provide optimizations.""" 13 14 _logger = LoggingHandler(__name__).get_logger() 15 16 @classmethod 17 def cache(cls) -> Callable: 18 """Caches the current dataframe. 19 20 The default storage level used is MEMORY_AND_DISK. 21 22 Returns: 23 A function to be called in .transform() spark function. 24 """ 25 26 def inner(df: DataFrame) -> DataFrame: 27 return df.cache() 28 29 return inner 30 31 @classmethod 32 def persist(cls, storage_level: str = None) -> Callable: 33 """Caches the current dataframe with a specific StorageLevel. 34 35 Args: 36 storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER. 37 [More options here]( 38 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html). 39 40 Returns: 41 A function to be called in .transform() spark function. 42 """ 43 44 def inner(df: DataFrame) -> DataFrame: 45 level = getattr( 46 StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER 47 ) 48 49 return df.persist(level) 50 51 return inner 52 53 @classmethod 54 def unpersist(cls, blocking: bool = False) -> Callable: 55 """Removes the dataframe from the disk and memory. 56 57 Args: 58 blocking: whether to block until all the data blocks are 59 removed from disk/memory or run asynchronously. 60 61 Returns: 62 A function to be called in .transform() spark function. 63 """ 64 65 def inner(df: DataFrame) -> DataFrame: 66 return df.unpersist(blocking) 67 68 return inner
class
Optimizers:
12class Optimizers(object): 13 """Class containing all the functions that can provide optimizations.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def cache(cls) -> Callable: 19 """Caches the current dataframe. 20 21 The default storage level used is MEMORY_AND_DISK. 22 23 Returns: 24 A function to be called in .transform() spark function. 25 """ 26 27 def inner(df: DataFrame) -> DataFrame: 28 return df.cache() 29 30 return inner 31 32 @classmethod 33 def persist(cls, storage_level: str = None) -> Callable: 34 """Caches the current dataframe with a specific StorageLevel. 35 36 Args: 37 storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER. 38 [More options here]( 39 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html). 40 41 Returns: 42 A function to be called in .transform() spark function. 43 """ 44 45 def inner(df: DataFrame) -> DataFrame: 46 level = getattr( 47 StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER 48 ) 49 50 return df.persist(level) 51 52 return inner 53 54 @classmethod 55 def unpersist(cls, blocking: bool = False) -> Callable: 56 """Removes the dataframe from the disk and memory. 57 58 Args: 59 blocking: whether to block until all the data blocks are 60 removed from disk/memory or run asynchronously. 61 62 Returns: 63 A function to be called in .transform() spark function. 64 """ 65 66 def inner(df: DataFrame) -> DataFrame: 67 return df.unpersist(blocking) 68 69 return inner
Class containing all the functions that can provide optimizations.
@classmethod
def
cache(cls) -> Callable:
17 @classmethod 18 def cache(cls) -> Callable: 19 """Caches the current dataframe. 20 21 The default storage level used is MEMORY_AND_DISK. 22 23 Returns: 24 A function to be called in .transform() spark function. 25 """ 26 27 def inner(df: DataFrame) -> DataFrame: 28 return df.cache() 29 30 return inner
Caches the current dataframe.
The default storage level used is MEMORY_AND_DISK.
Returns:
A function to be called in .transform() spark function.
@classmethod
def
persist(cls, storage_level: str = None) -> Callable:
32 @classmethod 33 def persist(cls, storage_level: str = None) -> Callable: 34 """Caches the current dataframe with a specific StorageLevel. 35 36 Args: 37 storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER. 38 [More options here]( 39 https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html). 40 41 Returns: 42 A function to be called in .transform() spark function. 43 """ 44 45 def inner(df: DataFrame) -> DataFrame: 46 level = getattr( 47 StorageLevel, storage_level, StorageLevel.MEMORY_AND_DISK_DESER 48 ) 49 50 return df.persist(level) 51 52 return inner
Caches the current dataframe with a specific StorageLevel.
Arguments:
- storage_level: the type of StorageLevel, as default MEMORY_AND_DISK_DESER. More options here.
Returns:
A function to be called in .transform() spark function.
@classmethod
def
unpersist(cls, blocking: bool = False) -> Callable:
54 @classmethod 55 def unpersist(cls, blocking: bool = False) -> Callable: 56 """Removes the dataframe from the disk and memory. 57 58 Args: 59 blocking: whether to block until all the data blocks are 60 removed from disk/memory or run asynchronously. 61 62 Returns: 63 A function to be called in .transform() spark function. 64 """ 65 66 def inner(df: DataFrame) -> DataFrame: 67 return df.unpersist(blocking) 68 69 return inner
Removes the dataframe from the disk and memory.
Arguments:
- blocking: whether to block until all the data blocks are removed from disk/memory or run asynchronously.
Returns:
A function to be called in .transform() spark function.