lakehouse_engine.transformers.repartitioners
Module with repartitioners transformers.
1"""Module with repartitioners transformers.""" 2 3from typing import Callable, List, Optional 4 5from pyspark.sql import DataFrame 6 7from lakehouse_engine.transformers.exceptions import WrongArgumentsException 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9 10 11class Repartitioners(object): 12 """Class containing repartitioners transformers.""" 13 14 _logger = LoggingHandler(__name__).get_logger() 15 16 @classmethod 17 def coalesce(cls, num_partitions: int) -> Callable: 18 """Coalesce a dataframe into n partitions. 19 20 Args: 21 num_partitions: num of partitions to coalesce. 22 23 Returns: 24 A function to be called in .transform() spark function. 25 """ 26 27 def inner(df: DataFrame) -> DataFrame: 28 return df.coalesce(num_partitions) 29 30 return inner 31 32 @classmethod 33 def repartition( 34 cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None 35 ) -> Callable: 36 """Repartition a dataframe into n partitions. 37 38 If num_partitions is provided repartitioning happens based on the provided 39 number, otherwise it happens based on the values of the provided cols (columns). 40 41 Args: 42 num_partitions: num of partitions to repartition. 43 cols: list of columns to use for repartitioning. 44 45 Returns: 46 A function to be called in .transform() spark function. 47 """ 48 49 def inner(df: DataFrame) -> DataFrame: 50 if cols: 51 return df.repartition(num_partitions, *cols) 52 elif num_partitions: 53 return df.repartition(num_partitions) 54 else: 55 raise WrongArgumentsException( 56 "num_partitions or cols should be specified" 57 ) 58 59 return inner
class
Repartitioners:
12class Repartitioners(object): 13 """Class containing repartitioners transformers.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def coalesce(cls, num_partitions: int) -> Callable: 19 """Coalesce a dataframe into n partitions. 20 21 Args: 22 num_partitions: num of partitions to coalesce. 23 24 Returns: 25 A function to be called in .transform() spark function. 26 """ 27 28 def inner(df: DataFrame) -> DataFrame: 29 return df.coalesce(num_partitions) 30 31 return inner 32 33 @classmethod 34 def repartition( 35 cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None 36 ) -> Callable: 37 """Repartition a dataframe into n partitions. 38 39 If num_partitions is provided repartitioning happens based on the provided 40 number, otherwise it happens based on the values of the provided cols (columns). 41 42 Args: 43 num_partitions: num of partitions to repartition. 44 cols: list of columns to use for repartitioning. 45 46 Returns: 47 A function to be called in .transform() spark function. 48 """ 49 50 def inner(df: DataFrame) -> DataFrame: 51 if cols: 52 return df.repartition(num_partitions, *cols) 53 elif num_partitions: 54 return df.repartition(num_partitions) 55 else: 56 raise WrongArgumentsException( 57 "num_partitions or cols should be specified" 58 ) 59 60 return inner
Class containing repartitioners transformers.
@classmethod
def
coalesce(cls, num_partitions: int) -> Callable:
17 @classmethod 18 def coalesce(cls, num_partitions: int) -> Callable: 19 """Coalesce a dataframe into n partitions. 20 21 Args: 22 num_partitions: num of partitions to coalesce. 23 24 Returns: 25 A function to be called in .transform() spark function. 26 """ 27 28 def inner(df: DataFrame) -> DataFrame: 29 return df.coalesce(num_partitions) 30 31 return inner
Coalesce a dataframe into n partitions.
Arguments:
- num_partitions: num of partitions to coalesce.
Returns:
A function to be called in .transform() spark function.
@classmethod
def
repartition( cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None) -> Callable:
33 @classmethod 34 def repartition( 35 cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None 36 ) -> Callable: 37 """Repartition a dataframe into n partitions. 38 39 If num_partitions is provided repartitioning happens based on the provided 40 number, otherwise it happens based on the values of the provided cols (columns). 41 42 Args: 43 num_partitions: num of partitions to repartition. 44 cols: list of columns to use for repartitioning. 45 46 Returns: 47 A function to be called in .transform() spark function. 48 """ 49 50 def inner(df: DataFrame) -> DataFrame: 51 if cols: 52 return df.repartition(num_partitions, *cols) 53 elif num_partitions: 54 return df.repartition(num_partitions) 55 else: 56 raise WrongArgumentsException( 57 "num_partitions or cols should be specified" 58 ) 59 60 return inner
Repartition a dataframe into n partitions.
If num_partitions is provided repartitioning happens based on the provided number, otherwise it happens based on the values of the provided cols (columns).
Arguments:
- num_partitions: num of partitions to repartition.
- cols: list of columns to use for repartitioning.
Returns:
A function to be called in .transform() spark function.