lakehouse_engine.transformers.repartitioners

Module with repartitioners transformers.

 1"""Module with repartitioners transformers."""
 2
 3from typing import Callable, List, Optional
 4
 5from pyspark.sql import DataFrame
 6
 7from lakehouse_engine.transformers.exceptions import WrongArgumentsException
 8from lakehouse_engine.utils.logging_handler import LoggingHandler
 9
10
11class Repartitioners(object):
12    """Class containing repartitioners transformers."""
13
14    _logger = LoggingHandler(__name__).get_logger()
15
16    @classmethod
17    def coalesce(cls, num_partitions: int) -> Callable:
18        """Coalesce a dataframe into n partitions.
19
20        Args:
21            num_partitions: num of partitions to coalesce.
22
23        Returns:
24            A function to be called in .transform() spark function.
25        """
26
27        def inner(df: DataFrame) -> DataFrame:
28            return df.coalesce(num_partitions)
29
30        return inner
31
32    @classmethod
33    def repartition(
34        cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None
35    ) -> Callable:
36        """Repartition a dataframe into n partitions.
37
38        If num_partitions is provided repartitioning happens based on the provided
39        number, otherwise it happens based on the values of the provided cols (columns).
40
41        Args:
42            num_partitions: num of partitions to repartition.
43            cols: list of columns to use for repartitioning.
44
45        Returns:
46            A function to be called in .transform() spark function.
47        """
48
49        def inner(df: DataFrame) -> DataFrame:
50            if cols:
51                return df.repartition(num_partitions, *cols)
52            elif num_partitions:
53                return df.repartition(num_partitions)
54            else:
55                raise WrongArgumentsException(
56                    "num_partitions or cols should be specified"
57                )
58
59        return inner
class Repartitioners:
12class Repartitioners(object):
13    """Class containing repartitioners transformers."""
14
15    _logger = LoggingHandler(__name__).get_logger()
16
17    @classmethod
18    def coalesce(cls, num_partitions: int) -> Callable:
19        """Coalesce a dataframe into n partitions.
20
21        Args:
22            num_partitions: num of partitions to coalesce.
23
24        Returns:
25            A function to be called in .transform() spark function.
26        """
27
28        def inner(df: DataFrame) -> DataFrame:
29            return df.coalesce(num_partitions)
30
31        return inner
32
33    @classmethod
34    def repartition(
35        cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None
36    ) -> Callable:
37        """Repartition a dataframe into n partitions.
38
39        If num_partitions is provided repartitioning happens based on the provided
40        number, otherwise it happens based on the values of the provided cols (columns).
41
42        Args:
43            num_partitions: num of partitions to repartition.
44            cols: list of columns to use for repartitioning.
45
46        Returns:
47            A function to be called in .transform() spark function.
48        """
49
50        def inner(df: DataFrame) -> DataFrame:
51            if cols:
52                return df.repartition(num_partitions, *cols)
53            elif num_partitions:
54                return df.repartition(num_partitions)
55            else:
56                raise WrongArgumentsException(
57                    "num_partitions or cols should be specified"
58                )
59
60        return inner

Class containing repartitioners transformers.

@classmethod
def coalesce(cls, num_partitions: int) -> Callable:
17    @classmethod
18    def coalesce(cls, num_partitions: int) -> Callable:
19        """Coalesce a dataframe into n partitions.
20
21        Args:
22            num_partitions: num of partitions to coalesce.
23
24        Returns:
25            A function to be called in .transform() spark function.
26        """
27
28        def inner(df: DataFrame) -> DataFrame:
29            return df.coalesce(num_partitions)
30
31        return inner

Coalesce a dataframe into n partitions.

Arguments:
  • num_partitions: num of partitions to coalesce.
Returns:

A function to be called in .transform() spark function.

View Example
39{
40    "function": "coalesce",
41    "args": {
42        "num_partitions": 1
43    }
44}
View Full Acon


@classmethod
def repartition( cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None) -> Callable:
33    @classmethod
34    def repartition(
35        cls, num_partitions: Optional[int] = None, cols: Optional[List[str]] = None
36    ) -> Callable:
37        """Repartition a dataframe into n partitions.
38
39        If num_partitions is provided repartitioning happens based on the provided
40        number, otherwise it happens based on the values of the provided cols (columns).
41
42        Args:
43            num_partitions: num of partitions to repartition.
44            cols: list of columns to use for repartitioning.
45
46        Returns:
47            A function to be called in .transform() spark function.
48        """
49
50        def inner(df: DataFrame) -> DataFrame:
51            if cols:
52                return df.repartition(num_partitions, *cols)
53            elif num_partitions:
54                return df.repartition(num_partitions)
55            else:
56                raise WrongArgumentsException(
57                    "num_partitions or cols should be specified"
58                )
59
60        return inner

Repartition a dataframe into n partitions.

If num_partitions is provided repartitioning happens based on the provided number, otherwise it happens based on the values of the provided cols (columns).

Arguments:
  • num_partitions: num of partitions to repartition.
  • cols: list of columns to use for repartitioning.
Returns:

A function to be called in .transform() spark function.

View Example
48{
49    "function": "repartition",
50    "args": {
51        "num_partitions": 1
52    }
53}
View Full Acon