lakehouse_engine.terminators.dataset_optimizer
Module with dataset optimizer terminator.
1"""Module with dataset optimizer terminator.""" 2 3from typing import List, Optional 4 5from pyspark.sql.utils import AnalysisException, ParseException 6 7from lakehouse_engine.core.table_manager import TableManager 8from lakehouse_engine.transformers.exceptions import WrongArgumentsException 9from lakehouse_engine.utils.logging_handler import LoggingHandler 10 11 12class DatasetOptimizer(object): 13 """Class with dataset optimizer terminator.""" 14 15 _logger = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def optimize_dataset( 19 cls, 20 db_table: Optional[str] = None, 21 location: Optional[str] = None, 22 compute_table_stats: bool = True, 23 vacuum: bool = True, 24 vacuum_hours: int = 720, 25 optimize: bool = True, 26 optimize_where: Optional[str] = None, 27 optimize_zorder_col_list: Optional[List[str]] = None, 28 debug: bool = False, 29 ) -> None: 30 """Optimize a dataset based on a set of pre-conceived optimizations. 31 32 Most of the time the dataset is a table, but it can be a file-based one only. 33 34 Args: 35 db_table: `database_name.table_name`. 36 location: dataset/table filesystem location. 37 compute_table_stats: to compute table statistics or not. 38 vacuum: (delta lake tables only) whether to vacuum the delta lake 39 table or not. 40 vacuum_hours: (delta lake tables only) number of hours to consider 41 in vacuum operation. 42 optimize: (delta lake tables only) whether to optimize the table or 43 not. Custom optimize parameters can be supplied through ExecEnv (Spark) 44 configs 45 optimize_where: expression to use in the optimize function. 46 optimize_zorder_col_list: (delta lake tables only) list of 47 columns to consider in the zorder optimization process. Custom optimize 48 parameters can be supplied through ExecEnv (Spark) configs. 49 debug: flag indicating if we are just debugging this for local 50 tests and therefore pass through all the exceptions to perform some 51 assertions in local tests. 52 """ 53 if optimize: 54 if debug: 55 try: 56 cls._optimize( 57 db_table, location, optimize_where, optimize_zorder_col_list 58 ) 59 except ParseException: 60 pass 61 else: 62 cls._optimize( 63 db_table, location, optimize_where, optimize_zorder_col_list 64 ) 65 66 if vacuum: 67 cls._vacuum(db_table, location, vacuum_hours) 68 69 if compute_table_stats: 70 if debug: 71 try: 72 cls._compute_table_stats(db_table) 73 except AnalysisException: 74 pass 75 else: 76 cls._compute_table_stats(db_table) 77 78 @classmethod 79 def _compute_table_stats(cls, db_table: str) -> None: 80 """Compute table statistics. 81 82 Args: 83 db_table: `<db>.<table>` string. 84 """ 85 if not db_table: 86 raise WrongArgumentsException("A table needs to be provided.") 87 88 config = {"function": "compute_table_statistics", "table_or_view": db_table} 89 cls._logger.info(f"Computing table statistics for {db_table}...") 90 TableManager(config).compute_table_statistics() 91 92 @classmethod 93 def _vacuum(cls, db_table: str, location: str, hours: int) -> None: 94 """Vacuum a delta table. 95 96 Args: 97 db_table: `<db>.<table>` string. Takes precedence over location. 98 location: location of the delta table. 99 hours: number of hours to consider in vacuum operation. 100 """ 101 if not db_table and not location: 102 raise WrongArgumentsException("A table or location need to be provided.") 103 104 table_or_location = db_table if db_table else f"delta.`{location}`" 105 106 config = { 107 "function": "compute_table_statistics", 108 "table_or_view": table_or_location, 109 "vacuum_hours": hours, 110 } 111 cls._logger.info(f"Vacuuming table {table_or_location}...") 112 TableManager(config).vacuum() 113 114 @classmethod 115 def _optimize( 116 cls, db_table: str, location: str, where: str, zorder_cols: List[str] 117 ) -> None: 118 """Optimize a delta table. 119 120 Args: 121 db_table: `<db>.<table>` string. Takes precedence over location. 122 location: location of the delta table. 123 where: expression to use in the optimize function. 124 zorder_cols: list of columns to consider in the zorder optimization process. 125 """ 126 if not db_table and not location: 127 raise WrongArgumentsException("A table or location needs to be provided.") 128 129 table_or_location = db_table if db_table else f"delta.`{location}`" 130 131 config = { 132 "function": "compute_table_statistics", 133 "table_or_view": table_or_location, 134 "optimize_where": where, 135 "optimize_zorder_col_list": ",".join(zorder_cols if zorder_cols else []), 136 } 137 cls._logger.info(f"Optimizing table {table_or_location}...") 138 TableManager(config).optimize()
class
DatasetOptimizer:
13class DatasetOptimizer(object): 14 """Class with dataset optimizer terminator.""" 15 16 _logger = LoggingHandler(__name__).get_logger() 17 18 @classmethod 19 def optimize_dataset( 20 cls, 21 db_table: Optional[str] = None, 22 location: Optional[str] = None, 23 compute_table_stats: bool = True, 24 vacuum: bool = True, 25 vacuum_hours: int = 720, 26 optimize: bool = True, 27 optimize_where: Optional[str] = None, 28 optimize_zorder_col_list: Optional[List[str]] = None, 29 debug: bool = False, 30 ) -> None: 31 """Optimize a dataset based on a set of pre-conceived optimizations. 32 33 Most of the time the dataset is a table, but it can be a file-based one only. 34 35 Args: 36 db_table: `database_name.table_name`. 37 location: dataset/table filesystem location. 38 compute_table_stats: to compute table statistics or not. 39 vacuum: (delta lake tables only) whether to vacuum the delta lake 40 table or not. 41 vacuum_hours: (delta lake tables only) number of hours to consider 42 in vacuum operation. 43 optimize: (delta lake tables only) whether to optimize the table or 44 not. Custom optimize parameters can be supplied through ExecEnv (Spark) 45 configs 46 optimize_where: expression to use in the optimize function. 47 optimize_zorder_col_list: (delta lake tables only) list of 48 columns to consider in the zorder optimization process. Custom optimize 49 parameters can be supplied through ExecEnv (Spark) configs. 50 debug: flag indicating if we are just debugging this for local 51 tests and therefore pass through all the exceptions to perform some 52 assertions in local tests. 53 """ 54 if optimize: 55 if debug: 56 try: 57 cls._optimize( 58 db_table, location, optimize_where, optimize_zorder_col_list 59 ) 60 except ParseException: 61 pass 62 else: 63 cls._optimize( 64 db_table, location, optimize_where, optimize_zorder_col_list 65 ) 66 67 if vacuum: 68 cls._vacuum(db_table, location, vacuum_hours) 69 70 if compute_table_stats: 71 if debug: 72 try: 73 cls._compute_table_stats(db_table) 74 except AnalysisException: 75 pass 76 else: 77 cls._compute_table_stats(db_table) 78 79 @classmethod 80 def _compute_table_stats(cls, db_table: str) -> None: 81 """Compute table statistics. 82 83 Args: 84 db_table: `<db>.<table>` string. 85 """ 86 if not db_table: 87 raise WrongArgumentsException("A table needs to be provided.") 88 89 config = {"function": "compute_table_statistics", "table_or_view": db_table} 90 cls._logger.info(f"Computing table statistics for {db_table}...") 91 TableManager(config).compute_table_statistics() 92 93 @classmethod 94 def _vacuum(cls, db_table: str, location: str, hours: int) -> None: 95 """Vacuum a delta table. 96 97 Args: 98 db_table: `<db>.<table>` string. Takes precedence over location. 99 location: location of the delta table. 100 hours: number of hours to consider in vacuum operation. 101 """ 102 if not db_table and not location: 103 raise WrongArgumentsException("A table or location need to be provided.") 104 105 table_or_location = db_table if db_table else f"delta.`{location}`" 106 107 config = { 108 "function": "compute_table_statistics", 109 "table_or_view": table_or_location, 110 "vacuum_hours": hours, 111 } 112 cls._logger.info(f"Vacuuming table {table_or_location}...") 113 TableManager(config).vacuum() 114 115 @classmethod 116 def _optimize( 117 cls, db_table: str, location: str, where: str, zorder_cols: List[str] 118 ) -> None: 119 """Optimize a delta table. 120 121 Args: 122 db_table: `<db>.<table>` string. Takes precedence over location. 123 location: location of the delta table. 124 where: expression to use in the optimize function. 125 zorder_cols: list of columns to consider in the zorder optimization process. 126 """ 127 if not db_table and not location: 128 raise WrongArgumentsException("A table or location needs to be provided.") 129 130 table_or_location = db_table if db_table else f"delta.`{location}`" 131 132 config = { 133 "function": "compute_table_statistics", 134 "table_or_view": table_or_location, 135 "optimize_where": where, 136 "optimize_zorder_col_list": ",".join(zorder_cols if zorder_cols else []), 137 } 138 cls._logger.info(f"Optimizing table {table_or_location}...") 139 TableManager(config).optimize()
Class with dataset optimizer terminator.
@classmethod
def
optimize_dataset( cls, db_table: Optional[str] = None, location: Optional[str] = None, compute_table_stats: bool = True, vacuum: bool = True, vacuum_hours: int = 720, optimize: bool = True, optimize_where: Optional[str] = None, optimize_zorder_col_list: Optional[List[str]] = None, debug: bool = False) -> None:
18 @classmethod 19 def optimize_dataset( 20 cls, 21 db_table: Optional[str] = None, 22 location: Optional[str] = None, 23 compute_table_stats: bool = True, 24 vacuum: bool = True, 25 vacuum_hours: int = 720, 26 optimize: bool = True, 27 optimize_where: Optional[str] = None, 28 optimize_zorder_col_list: Optional[List[str]] = None, 29 debug: bool = False, 30 ) -> None: 31 """Optimize a dataset based on a set of pre-conceived optimizations. 32 33 Most of the time the dataset is a table, but it can be a file-based one only. 34 35 Args: 36 db_table: `database_name.table_name`. 37 location: dataset/table filesystem location. 38 compute_table_stats: to compute table statistics or not. 39 vacuum: (delta lake tables only) whether to vacuum the delta lake 40 table or not. 41 vacuum_hours: (delta lake tables only) number of hours to consider 42 in vacuum operation. 43 optimize: (delta lake tables only) whether to optimize the table or 44 not. Custom optimize parameters can be supplied through ExecEnv (Spark) 45 configs 46 optimize_where: expression to use in the optimize function. 47 optimize_zorder_col_list: (delta lake tables only) list of 48 columns to consider in the zorder optimization process. Custom optimize 49 parameters can be supplied through ExecEnv (Spark) configs. 50 debug: flag indicating if we are just debugging this for local 51 tests and therefore pass through all the exceptions to perform some 52 assertions in local tests. 53 """ 54 if optimize: 55 if debug: 56 try: 57 cls._optimize( 58 db_table, location, optimize_where, optimize_zorder_col_list 59 ) 60 except ParseException: 61 pass 62 else: 63 cls._optimize( 64 db_table, location, optimize_where, optimize_zorder_col_list 65 ) 66 67 if vacuum: 68 cls._vacuum(db_table, location, vacuum_hours) 69 70 if compute_table_stats: 71 if debug: 72 try: 73 cls._compute_table_stats(db_table) 74 except AnalysisException: 75 pass 76 else: 77 cls._compute_table_stats(db_table)
Optimize a dataset based on a set of pre-conceived optimizations.
Most of the time the dataset is a table, but it can be a file-based one only.
Arguments:
- db_table:
database_name.table_name
. - location: dataset/table filesystem location.
- compute_table_stats: to compute table statistics or not.
- vacuum: (delta lake tables only) whether to vacuum the delta lake table or not.
- vacuum_hours: (delta lake tables only) number of hours to consider in vacuum operation.
- optimize: (delta lake tables only) whether to optimize the table or not. Custom optimize parameters can be supplied through ExecEnv (Spark) configs
- optimize_where: expression to use in the optimize function.
- optimize_zorder_col_list: (delta lake tables only) list of columns to consider in the zorder optimization process. Custom optimize parameters can be supplied through ExecEnv (Spark) configs.
- debug: flag indicating if we are just debugging this for local tests and therefore pass through all the exceptions to perform some assertions in local tests.