lakehouse_engine.terminators.dataset_optimizer

Module with dataset optimizer terminator.

  1"""Module with dataset optimizer terminator."""
  2
  3from typing import List, Optional
  4
  5from pyspark.sql.utils import AnalysisException, ParseException
  6
  7from lakehouse_engine.core.table_manager import TableManager
  8from lakehouse_engine.transformers.exceptions import WrongArgumentsException
  9from lakehouse_engine.utils.logging_handler import LoggingHandler
 10
 11
 12class DatasetOptimizer(object):
 13    """Class with dataset optimizer terminator."""
 14
 15    _logger = LoggingHandler(__name__).get_logger()
 16
 17    @classmethod
 18    def optimize_dataset(
 19        cls,
 20        db_table: Optional[str] = None,
 21        location: Optional[str] = None,
 22        compute_table_stats: bool = True,
 23        vacuum: bool = True,
 24        vacuum_hours: int = 720,
 25        optimize: bool = True,
 26        optimize_where: Optional[str] = None,
 27        optimize_zorder_col_list: Optional[List[str]] = None,
 28        debug: bool = False,
 29    ) -> None:
 30        """Optimize a dataset based on a set of pre-conceived optimizations.
 31
 32        Most of the time the dataset is a table, but it can be a file-based one only.
 33
 34        Args:
 35            db_table: `database_name.table_name`.
 36            location: dataset/table filesystem location.
 37            compute_table_stats: to compute table statistics or not.
 38            vacuum: (delta lake tables only) whether to vacuum the delta lake
 39                table or not.
 40            vacuum_hours: (delta lake tables only) number of hours to consider
 41                in vacuum operation.
 42            optimize: (delta lake tables only) whether to optimize the table or
 43                not. Custom optimize parameters can be supplied through ExecEnv (Spark)
 44                configs
 45            optimize_where: expression to use in the optimize function.
 46            optimize_zorder_col_list: (delta lake tables only) list of
 47                columns to consider in the zorder optimization process. Custom optimize
 48                parameters can be supplied through ExecEnv (Spark) configs.
 49            debug: flag indicating if we are just debugging this for local
 50                tests and therefore pass through all the exceptions to perform some
 51                assertions in local tests.
 52        """
 53        if optimize:
 54            if debug:
 55                try:
 56                    cls._optimize(
 57                        db_table, location, optimize_where, optimize_zorder_col_list
 58                    )
 59                except ParseException:
 60                    pass
 61            else:
 62                cls._optimize(
 63                    db_table, location, optimize_where, optimize_zorder_col_list
 64                )
 65
 66        if vacuum:
 67            cls._vacuum(db_table, location, vacuum_hours)
 68
 69        if compute_table_stats:
 70            if debug:
 71                try:
 72                    cls._compute_table_stats(db_table)
 73                except AnalysisException:
 74                    pass
 75            else:
 76                cls._compute_table_stats(db_table)
 77
 78    @classmethod
 79    def _compute_table_stats(cls, db_table: str) -> None:
 80        """Compute table statistics.
 81
 82        Args:
 83            db_table: `<db>.<table>` string.
 84        """
 85        if not db_table:
 86            raise WrongArgumentsException("A table needs to be provided.")
 87
 88        config = {"function": "compute_table_statistics", "table_or_view": db_table}
 89        cls._logger.info(f"Computing table statistics for {db_table}...")
 90        TableManager(config).compute_table_statistics()
 91
 92    @classmethod
 93    def _vacuum(cls, db_table: str, location: str, hours: int) -> None:
 94        """Vacuum a delta table.
 95
 96        Args:
 97            db_table: `<db>.<table>` string. Takes precedence over location.
 98            location: location of the delta table.
 99            hours: number of hours to consider in vacuum operation.
100        """
101        if not db_table and not location:
102            raise WrongArgumentsException("A table or location need to be provided.")
103
104        table_or_location = db_table if db_table else f"delta.`{location}`"
105
106        config = {
107            "function": "compute_table_statistics",
108            "table_or_view": table_or_location,
109            "vacuum_hours": hours,
110        }
111        cls._logger.info(f"Vacuuming table {table_or_location}...")
112        TableManager(config).vacuum()
113
114    @classmethod
115    def _optimize(
116        cls, db_table: str, location: str, where: str, zorder_cols: List[str]
117    ) -> None:
118        """Optimize a delta table.
119
120        Args:
121            db_table: `<db>.<table>` string. Takes precedence over location.
122            location: location of the delta table.
123            where: expression to use in the optimize function.
124            zorder_cols: list of columns to consider in the zorder optimization process.
125        """
126        if not db_table and not location:
127            raise WrongArgumentsException("A table or location needs to be provided.")
128
129        table_or_location = db_table if db_table else f"delta.`{location}`"
130
131        config = {
132            "function": "compute_table_statistics",
133            "table_or_view": table_or_location,
134            "optimize_where": where,
135            "optimize_zorder_col_list": ",".join(zorder_cols if zorder_cols else []),
136        }
137        cls._logger.info(f"Optimizing table {table_or_location}...")
138        TableManager(config).optimize()
class DatasetOptimizer:
 13class DatasetOptimizer(object):
 14    """Class with dataset optimizer terminator."""
 15
 16    _logger = LoggingHandler(__name__).get_logger()
 17
 18    @classmethod
 19    def optimize_dataset(
 20        cls,
 21        db_table: Optional[str] = None,
 22        location: Optional[str] = None,
 23        compute_table_stats: bool = True,
 24        vacuum: bool = True,
 25        vacuum_hours: int = 720,
 26        optimize: bool = True,
 27        optimize_where: Optional[str] = None,
 28        optimize_zorder_col_list: Optional[List[str]] = None,
 29        debug: bool = False,
 30    ) -> None:
 31        """Optimize a dataset based on a set of pre-conceived optimizations.
 32
 33        Most of the time the dataset is a table, but it can be a file-based one only.
 34
 35        Args:
 36            db_table: `database_name.table_name`.
 37            location: dataset/table filesystem location.
 38            compute_table_stats: to compute table statistics or not.
 39            vacuum: (delta lake tables only) whether to vacuum the delta lake
 40                table or not.
 41            vacuum_hours: (delta lake tables only) number of hours to consider
 42                in vacuum operation.
 43            optimize: (delta lake tables only) whether to optimize the table or
 44                not. Custom optimize parameters can be supplied through ExecEnv (Spark)
 45                configs
 46            optimize_where: expression to use in the optimize function.
 47            optimize_zorder_col_list: (delta lake tables only) list of
 48                columns to consider in the zorder optimization process. Custom optimize
 49                parameters can be supplied through ExecEnv (Spark) configs.
 50            debug: flag indicating if we are just debugging this for local
 51                tests and therefore pass through all the exceptions to perform some
 52                assertions in local tests.
 53        """
 54        if optimize:
 55            if debug:
 56                try:
 57                    cls._optimize(
 58                        db_table, location, optimize_where, optimize_zorder_col_list
 59                    )
 60                except ParseException:
 61                    pass
 62            else:
 63                cls._optimize(
 64                    db_table, location, optimize_where, optimize_zorder_col_list
 65                )
 66
 67        if vacuum:
 68            cls._vacuum(db_table, location, vacuum_hours)
 69
 70        if compute_table_stats:
 71            if debug:
 72                try:
 73                    cls._compute_table_stats(db_table)
 74                except AnalysisException:
 75                    pass
 76            else:
 77                cls._compute_table_stats(db_table)
 78
 79    @classmethod
 80    def _compute_table_stats(cls, db_table: str) -> None:
 81        """Compute table statistics.
 82
 83        Args:
 84            db_table: `<db>.<table>` string.
 85        """
 86        if not db_table:
 87            raise WrongArgumentsException("A table needs to be provided.")
 88
 89        config = {"function": "compute_table_statistics", "table_or_view": db_table}
 90        cls._logger.info(f"Computing table statistics for {db_table}...")
 91        TableManager(config).compute_table_statistics()
 92
 93    @classmethod
 94    def _vacuum(cls, db_table: str, location: str, hours: int) -> None:
 95        """Vacuum a delta table.
 96
 97        Args:
 98            db_table: `<db>.<table>` string. Takes precedence over location.
 99            location: location of the delta table.
100            hours: number of hours to consider in vacuum operation.
101        """
102        if not db_table and not location:
103            raise WrongArgumentsException("A table or location need to be provided.")
104
105        table_or_location = db_table if db_table else f"delta.`{location}`"
106
107        config = {
108            "function": "compute_table_statistics",
109            "table_or_view": table_or_location,
110            "vacuum_hours": hours,
111        }
112        cls._logger.info(f"Vacuuming table {table_or_location}...")
113        TableManager(config).vacuum()
114
115    @classmethod
116    def _optimize(
117        cls, db_table: str, location: str, where: str, zorder_cols: List[str]
118    ) -> None:
119        """Optimize a delta table.
120
121        Args:
122            db_table: `<db>.<table>` string. Takes precedence over location.
123            location: location of the delta table.
124            where: expression to use in the optimize function.
125            zorder_cols: list of columns to consider in the zorder optimization process.
126        """
127        if not db_table and not location:
128            raise WrongArgumentsException("A table or location needs to be provided.")
129
130        table_or_location = db_table if db_table else f"delta.`{location}`"
131
132        config = {
133            "function": "compute_table_statistics",
134            "table_or_view": table_or_location,
135            "optimize_where": where,
136            "optimize_zorder_col_list": ",".join(zorder_cols if zorder_cols else []),
137        }
138        cls._logger.info(f"Optimizing table {table_or_location}...")
139        TableManager(config).optimize()

Class with dataset optimizer terminator.

@classmethod
def optimize_dataset( cls, db_table: Optional[str] = None, location: Optional[str] = None, compute_table_stats: bool = True, vacuum: bool = True, vacuum_hours: int = 720, optimize: bool = True, optimize_where: Optional[str] = None, optimize_zorder_col_list: Optional[List[str]] = None, debug: bool = False) -> None:
18    @classmethod
19    def optimize_dataset(
20        cls,
21        db_table: Optional[str] = None,
22        location: Optional[str] = None,
23        compute_table_stats: bool = True,
24        vacuum: bool = True,
25        vacuum_hours: int = 720,
26        optimize: bool = True,
27        optimize_where: Optional[str] = None,
28        optimize_zorder_col_list: Optional[List[str]] = None,
29        debug: bool = False,
30    ) -> None:
31        """Optimize a dataset based on a set of pre-conceived optimizations.
32
33        Most of the time the dataset is a table, but it can be a file-based one only.
34
35        Args:
36            db_table: `database_name.table_name`.
37            location: dataset/table filesystem location.
38            compute_table_stats: to compute table statistics or not.
39            vacuum: (delta lake tables only) whether to vacuum the delta lake
40                table or not.
41            vacuum_hours: (delta lake tables only) number of hours to consider
42                in vacuum operation.
43            optimize: (delta lake tables only) whether to optimize the table or
44                not. Custom optimize parameters can be supplied through ExecEnv (Spark)
45                configs
46            optimize_where: expression to use in the optimize function.
47            optimize_zorder_col_list: (delta lake tables only) list of
48                columns to consider in the zorder optimization process. Custom optimize
49                parameters can be supplied through ExecEnv (Spark) configs.
50            debug: flag indicating if we are just debugging this for local
51                tests and therefore pass through all the exceptions to perform some
52                assertions in local tests.
53        """
54        if optimize:
55            if debug:
56                try:
57                    cls._optimize(
58                        db_table, location, optimize_where, optimize_zorder_col_list
59                    )
60                except ParseException:
61                    pass
62            else:
63                cls._optimize(
64                    db_table, location, optimize_where, optimize_zorder_col_list
65                )
66
67        if vacuum:
68            cls._vacuum(db_table, location, vacuum_hours)
69
70        if compute_table_stats:
71            if debug:
72                try:
73                    cls._compute_table_stats(db_table)
74                except AnalysisException:
75                    pass
76            else:
77                cls._compute_table_stats(db_table)

Optimize a dataset based on a set of pre-conceived optimizations.

Most of the time the dataset is a table, but it can be a file-based one only.

Arguments:
  • db_table: database_name.table_name.
  • location: dataset/table filesystem location.
  • compute_table_stats: to compute table statistics or not.
  • vacuum: (delta lake tables only) whether to vacuum the delta lake table or not.
  • vacuum_hours: (delta lake tables only) number of hours to consider in vacuum operation.
  • optimize: (delta lake tables only) whether to optimize the table or not. Custom optimize parameters can be supplied through ExecEnv (Spark) configs
  • optimize_where: expression to use in the optimize function.
  • optimize_zorder_col_list: (delta lake tables only) list of columns to consider in the zorder optimization process. Custom optimize parameters can be supplied through ExecEnv (Spark) configs.
  • debug: flag indicating if we are just debugging this for local tests and therefore pass through all the exceptions to perform some assertions in local tests.