lakehouse_engine.core.table_manager
Table manager module.
1"""Table manager module.""" 2 3from typing import List 4 5from delta.tables import DeltaTable 6from pyspark.sql import DataFrame 7from pyspark.sql.functions import translate 8 9from lakehouse_engine.core.definitions import SQLDefinitions 10from lakehouse_engine.core.exec_env import ExecEnv 11from lakehouse_engine.utils.configs.config_utils import ConfigUtils 12from lakehouse_engine.utils.logging_handler import LoggingHandler 13from lakehouse_engine.utils.sql_parser_utils import SQLParserUtils 14 15 16class TableManager(object): 17 """Set of actions to manipulate tables/views in several ways.""" 18 19 def __init__(self, configs: dict): 20 """Construct TableManager algorithm instances. 21 22 Args: 23 configs: configurations for the TableManager algorithm. 24 """ 25 self._logger = LoggingHandler(__name__).get_logger() 26 self.configs = configs 27 self.function = self.configs["function"] 28 29 def get_function(self) -> None: 30 """Get a specific function to execute.""" 31 available_functions = { 32 "compute_table_statistics": self.compute_table_statistics, 33 "create_table": self.create, 34 "create_tables": self.create_many, 35 "create_view": self.create, 36 "drop_table": self.drop_table, 37 "drop_view": self.drop_view, 38 "execute_sql": self.execute_sql, 39 "truncate": self.truncate, 40 "vacuum": self.vacuum, 41 "describe": self.describe, 42 "optimize": self.optimize, 43 "show_tbl_properties": self.show_tbl_properties, 44 "get_tbl_pk": self.get_tbl_pk, 45 "repair_table": self.repair_table, 46 "delete_where": self.delete_where, 47 } 48 49 self._logger.info("Function being executed: {}".format(self.function)) 50 51 if self.function in available_functions.keys(): 52 func = available_functions[self.function] 53 func() 54 else: 55 raise NotImplementedError( 56 f"The requested function {self.function} is not implemented." 57 ) 58 59 def create(self) -> None: 60 """Create a new table or view on metastore.""" 61 disable_dbfs_retry = ( 62 self.configs["disable_dbfs_retry"] 63 if "disable_dbfs_retry" in self.configs.keys() 64 else False 65 ) 66 sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry) 67 try: 68 sql_commands = SQLParserUtils().split_sql_commands( 69 sql_commands=sql, 70 delimiter=self.configs.get("delimiter", ";"), 71 advanced_parser=self.configs.get("advanced_parser", False), 72 ) 73 for command in sql_commands: 74 if command.strip(): 75 self._logger.info(f"sql command: {command}") 76 ExecEnv.SESSION.sql(command) 77 self._logger.info(f"{self.function} successfully executed!") 78 except Exception as e: 79 self._logger.error(e) 80 raise 81 82 def create_many(self) -> None: 83 """Create multiple tables or views on metastore. 84 85 In this function the path to the ddl files can be separated by comma. 86 """ 87 self.execute_multiple_sql_files() 88 89 def compute_table_statistics(self) -> None: 90 """Compute table statistics.""" 91 sql = SQLDefinitions.compute_table_stats.value.format( 92 self.configs["table_or_view"] 93 ) 94 try: 95 self._logger.info(f"sql command: {sql}") 96 ExecEnv.SESSION.sql(sql) 97 self._logger.info(f"{self.function} successfully executed!") 98 except Exception as e: 99 self._logger.error(e) 100 raise 101 102 def drop_table(self) -> None: 103 """Delete table function deletes table from metastore and erases all data.""" 104 drop_stmt = "{} {}".format( 105 SQLDefinitions.drop_table_stmt.value, 106 self.configs["table_or_view"], 107 ) 108 109 self._logger.info(f"sql command: {drop_stmt}") 110 ExecEnv.SESSION.sql(drop_stmt) 111 self._logger.info("Table successfully dropped!") 112 113 def drop_view(self) -> None: 114 """Delete view function deletes view from metastore and erases all data.""" 115 drop_stmt = "{} {}".format( 116 SQLDefinitions.drop_view_stmt.value, 117 self.configs["table_or_view"], 118 ) 119 120 self._logger.info(f"sql command: {drop_stmt}") 121 ExecEnv.SESSION.sql(drop_stmt) 122 self._logger.info("View successfully dropped!") 123 124 def truncate(self) -> None: 125 """Truncate function erases all data but keeps metadata.""" 126 truncate_stmt = "{} {}".format( 127 SQLDefinitions.truncate_stmt.value, 128 self.configs["table_or_view"], 129 ) 130 131 self._logger.info(f"sql command: {truncate_stmt}") 132 ExecEnv.SESSION.sql(truncate_stmt) 133 self._logger.info("Table successfully truncated!") 134 135 def vacuum(self) -> None: 136 """Vacuum function erases older versions from Delta Lake tables or locations.""" 137 if not self.configs.get("table_or_view", None): 138 delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"]) 139 140 self._logger.info(f"Vacuuming location: {self.configs['path']}") 141 delta_table.vacuum(self.configs.get("vacuum_hours", 168)) 142 else: 143 delta_table = DeltaTable.forName( 144 ExecEnv.SESSION, self.configs["table_or_view"] 145 ) 146 147 self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}") 148 delta_table.vacuum(self.configs.get("vacuum_hours", 168)) 149 150 def describe(self) -> None: 151 """Describe function describes metadata from some table or view.""" 152 describe_stmt = "{} {}".format( 153 SQLDefinitions.describe_stmt.value, 154 self.configs["table_or_view"], 155 ) 156 157 self._logger.info(f"sql command: {describe_stmt}") 158 output = ExecEnv.SESSION.sql(describe_stmt) 159 self._logger.info(output) 160 161 def optimize(self) -> None: 162 """Optimize function optimizes the layout of Delta Lake data.""" 163 if self.configs.get("where_clause", None): 164 where_exp = "WHERE {}".format(self.configs["where_clause"].strip()) 165 else: 166 where_exp = "" 167 168 if self.configs.get("optimize_zorder_col_list", None): 169 zorder_exp = "ZORDER BY ({})".format( 170 self.configs["optimize_zorder_col_list"].strip() 171 ) 172 else: 173 zorder_exp = "" 174 175 optimize_stmt = "{} {} {} {}".format( 176 SQLDefinitions.optimize_stmt.value, 177 ( 178 f"delta.`{self.configs.get('path', None)}`" 179 if not self.configs.get("table_or_view", None) 180 else self.configs.get("table_or_view", None) 181 ), 182 where_exp, 183 zorder_exp, 184 ) 185 186 self._logger.info(f"sql command: {optimize_stmt}") 187 output = ExecEnv.SESSION.sql(optimize_stmt) 188 self._logger.info(output) 189 190 def execute_multiple_sql_files(self) -> None: 191 """Execute multiple statements in multiple sql files. 192 193 In this function the path to the files is separated by comma. 194 """ 195 for table_metadata_file in self.configs["path"].split(","): 196 disable_dbfs_retry = ( 197 self.configs["disable_dbfs_retry"] 198 if "disable_dbfs_retry" in self.configs.keys() 199 else False 200 ) 201 sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry) 202 sql_commands = SQLParserUtils().split_sql_commands( 203 sql_commands=sql, 204 delimiter=self.configs.get("delimiter", ";"), 205 advanced_parser=self.configs.get("advanced_parser", False), 206 ) 207 for command in sql_commands: 208 if command.strip(): 209 self._logger.info(f"sql command: {command}") 210 ExecEnv.SESSION.sql(command) 211 self._logger.info("sql file successfully executed!") 212 213 def execute_sql(self) -> None: 214 """Execute sql commands separated by semicolon (;).""" 215 sql_commands = SQLParserUtils().split_sql_commands( 216 sql_commands=self.configs.get("sql"), 217 delimiter=self.configs.get("delimiter", ";"), 218 advanced_parser=self.configs.get("advanced_parser", False), 219 ) 220 for command in sql_commands: 221 if command.strip(): 222 self._logger.info(f"sql command: {command}") 223 ExecEnv.SESSION.sql(command) 224 self._logger.info("sql successfully executed!") 225 226 def show_tbl_properties(self) -> DataFrame: 227 """Show Table Properties. 228 229 Returns: 230 A dataframe with the table properties. 231 """ 232 show_tbl_props_stmt = "{} {}".format( 233 SQLDefinitions.show_tbl_props_stmt.value, 234 self.configs["table_or_view"], 235 ) 236 237 self._logger.info(f"sql command: {show_tbl_props_stmt}") 238 output = ExecEnv.SESSION.sql(show_tbl_props_stmt) 239 self._logger.info(output) 240 return output 241 242 def get_tbl_pk(self) -> List[str]: 243 """Get the primary key of a particular table. 244 245 Returns: 246 The list of columns that are part of the primary key. 247 """ 248 output: List[str] = ( 249 self.show_tbl_properties() 250 .filter("key == 'lakehouse.primary_key'") 251 .select("value") 252 .withColumn("value", translate("value", " `", "")) 253 .first()[0] 254 .split(",") 255 ) 256 self._logger.info(output) 257 258 return output 259 260 def repair_table(self) -> None: 261 """Run the repair table command.""" 262 table_name = self.configs["table_or_view"] 263 sync_metadata = self.configs["sync_metadata"] 264 265 repair_stmt = ( 266 f"MSCK REPAIR TABLE {table_name} " 267 f"{'SYNC METADATA' if sync_metadata else ''}" 268 ) 269 270 self._logger.info(f"sql command: {repair_stmt}") 271 output = ExecEnv.SESSION.sql(repair_stmt) 272 self._logger.info(output) 273 274 def delete_where(self) -> None: 275 """Run the delete where command.""" 276 table_name = self.configs["table_or_view"] 277 delete_where = self.configs["where_clause"].strip() 278 279 delete_stmt = SQLDefinitions.delete_where_stmt.value.format( 280 table_name, delete_where 281 ) 282 283 self._logger.info(f"sql command: {delete_stmt}") 284 output = ExecEnv.SESSION.sql(delete_stmt) 285 self._logger.info(output)
17class TableManager(object): 18 """Set of actions to manipulate tables/views in several ways.""" 19 20 def __init__(self, configs: dict): 21 """Construct TableManager algorithm instances. 22 23 Args: 24 configs: configurations for the TableManager algorithm. 25 """ 26 self._logger = LoggingHandler(__name__).get_logger() 27 self.configs = configs 28 self.function = self.configs["function"] 29 30 def get_function(self) -> None: 31 """Get a specific function to execute.""" 32 available_functions = { 33 "compute_table_statistics": self.compute_table_statistics, 34 "create_table": self.create, 35 "create_tables": self.create_many, 36 "create_view": self.create, 37 "drop_table": self.drop_table, 38 "drop_view": self.drop_view, 39 "execute_sql": self.execute_sql, 40 "truncate": self.truncate, 41 "vacuum": self.vacuum, 42 "describe": self.describe, 43 "optimize": self.optimize, 44 "show_tbl_properties": self.show_tbl_properties, 45 "get_tbl_pk": self.get_tbl_pk, 46 "repair_table": self.repair_table, 47 "delete_where": self.delete_where, 48 } 49 50 self._logger.info("Function being executed: {}".format(self.function)) 51 52 if self.function in available_functions.keys(): 53 func = available_functions[self.function] 54 func() 55 else: 56 raise NotImplementedError( 57 f"The requested function {self.function} is not implemented." 58 ) 59 60 def create(self) -> None: 61 """Create a new table or view on metastore.""" 62 disable_dbfs_retry = ( 63 self.configs["disable_dbfs_retry"] 64 if "disable_dbfs_retry" in self.configs.keys() 65 else False 66 ) 67 sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry) 68 try: 69 sql_commands = SQLParserUtils().split_sql_commands( 70 sql_commands=sql, 71 delimiter=self.configs.get("delimiter", ";"), 72 advanced_parser=self.configs.get("advanced_parser", False), 73 ) 74 for command in sql_commands: 75 if command.strip(): 76 self._logger.info(f"sql command: {command}") 77 ExecEnv.SESSION.sql(command) 78 self._logger.info(f"{self.function} successfully executed!") 79 except Exception as e: 80 self._logger.error(e) 81 raise 82 83 def create_many(self) -> None: 84 """Create multiple tables or views on metastore. 85 86 In this function the path to the ddl files can be separated by comma. 87 """ 88 self.execute_multiple_sql_files() 89 90 def compute_table_statistics(self) -> None: 91 """Compute table statistics.""" 92 sql = SQLDefinitions.compute_table_stats.value.format( 93 self.configs["table_or_view"] 94 ) 95 try: 96 self._logger.info(f"sql command: {sql}") 97 ExecEnv.SESSION.sql(sql) 98 self._logger.info(f"{self.function} successfully executed!") 99 except Exception as e: 100 self._logger.error(e) 101 raise 102 103 def drop_table(self) -> None: 104 """Delete table function deletes table from metastore and erases all data.""" 105 drop_stmt = "{} {}".format( 106 SQLDefinitions.drop_table_stmt.value, 107 self.configs["table_or_view"], 108 ) 109 110 self._logger.info(f"sql command: {drop_stmt}") 111 ExecEnv.SESSION.sql(drop_stmt) 112 self._logger.info("Table successfully dropped!") 113 114 def drop_view(self) -> None: 115 """Delete view function deletes view from metastore and erases all data.""" 116 drop_stmt = "{} {}".format( 117 SQLDefinitions.drop_view_stmt.value, 118 self.configs["table_or_view"], 119 ) 120 121 self._logger.info(f"sql command: {drop_stmt}") 122 ExecEnv.SESSION.sql(drop_stmt) 123 self._logger.info("View successfully dropped!") 124 125 def truncate(self) -> None: 126 """Truncate function erases all data but keeps metadata.""" 127 truncate_stmt = "{} {}".format( 128 SQLDefinitions.truncate_stmt.value, 129 self.configs["table_or_view"], 130 ) 131 132 self._logger.info(f"sql command: {truncate_stmt}") 133 ExecEnv.SESSION.sql(truncate_stmt) 134 self._logger.info("Table successfully truncated!") 135 136 def vacuum(self) -> None: 137 """Vacuum function erases older versions from Delta Lake tables or locations.""" 138 if not self.configs.get("table_or_view", None): 139 delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"]) 140 141 self._logger.info(f"Vacuuming location: {self.configs['path']}") 142 delta_table.vacuum(self.configs.get("vacuum_hours", 168)) 143 else: 144 delta_table = DeltaTable.forName( 145 ExecEnv.SESSION, self.configs["table_or_view"] 146 ) 147 148 self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}") 149 delta_table.vacuum(self.configs.get("vacuum_hours", 168)) 150 151 def describe(self) -> None: 152 """Describe function describes metadata from some table or view.""" 153 describe_stmt = "{} {}".format( 154 SQLDefinitions.describe_stmt.value, 155 self.configs["table_or_view"], 156 ) 157 158 self._logger.info(f"sql command: {describe_stmt}") 159 output = ExecEnv.SESSION.sql(describe_stmt) 160 self._logger.info(output) 161 162 def optimize(self) -> None: 163 """Optimize function optimizes the layout of Delta Lake data.""" 164 if self.configs.get("where_clause", None): 165 where_exp = "WHERE {}".format(self.configs["where_clause"].strip()) 166 else: 167 where_exp = "" 168 169 if self.configs.get("optimize_zorder_col_list", None): 170 zorder_exp = "ZORDER BY ({})".format( 171 self.configs["optimize_zorder_col_list"].strip() 172 ) 173 else: 174 zorder_exp = "" 175 176 optimize_stmt = "{} {} {} {}".format( 177 SQLDefinitions.optimize_stmt.value, 178 ( 179 f"delta.`{self.configs.get('path', None)}`" 180 if not self.configs.get("table_or_view", None) 181 else self.configs.get("table_or_view", None) 182 ), 183 where_exp, 184 zorder_exp, 185 ) 186 187 self._logger.info(f"sql command: {optimize_stmt}") 188 output = ExecEnv.SESSION.sql(optimize_stmt) 189 self._logger.info(output) 190 191 def execute_multiple_sql_files(self) -> None: 192 """Execute multiple statements in multiple sql files. 193 194 In this function the path to the files is separated by comma. 195 """ 196 for table_metadata_file in self.configs["path"].split(","): 197 disable_dbfs_retry = ( 198 self.configs["disable_dbfs_retry"] 199 if "disable_dbfs_retry" in self.configs.keys() 200 else False 201 ) 202 sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry) 203 sql_commands = SQLParserUtils().split_sql_commands( 204 sql_commands=sql, 205 delimiter=self.configs.get("delimiter", ";"), 206 advanced_parser=self.configs.get("advanced_parser", False), 207 ) 208 for command in sql_commands: 209 if command.strip(): 210 self._logger.info(f"sql command: {command}") 211 ExecEnv.SESSION.sql(command) 212 self._logger.info("sql file successfully executed!") 213 214 def execute_sql(self) -> None: 215 """Execute sql commands separated by semicolon (;).""" 216 sql_commands = SQLParserUtils().split_sql_commands( 217 sql_commands=self.configs.get("sql"), 218 delimiter=self.configs.get("delimiter", ";"), 219 advanced_parser=self.configs.get("advanced_parser", False), 220 ) 221 for command in sql_commands: 222 if command.strip(): 223 self._logger.info(f"sql command: {command}") 224 ExecEnv.SESSION.sql(command) 225 self._logger.info("sql successfully executed!") 226 227 def show_tbl_properties(self) -> DataFrame: 228 """Show Table Properties. 229 230 Returns: 231 A dataframe with the table properties. 232 """ 233 show_tbl_props_stmt = "{} {}".format( 234 SQLDefinitions.show_tbl_props_stmt.value, 235 self.configs["table_or_view"], 236 ) 237 238 self._logger.info(f"sql command: {show_tbl_props_stmt}") 239 output = ExecEnv.SESSION.sql(show_tbl_props_stmt) 240 self._logger.info(output) 241 return output 242 243 def get_tbl_pk(self) -> List[str]: 244 """Get the primary key of a particular table. 245 246 Returns: 247 The list of columns that are part of the primary key. 248 """ 249 output: List[str] = ( 250 self.show_tbl_properties() 251 .filter("key == 'lakehouse.primary_key'") 252 .select("value") 253 .withColumn("value", translate("value", " `", "")) 254 .first()[0] 255 .split(",") 256 ) 257 self._logger.info(output) 258 259 return output 260 261 def repair_table(self) -> None: 262 """Run the repair table command.""" 263 table_name = self.configs["table_or_view"] 264 sync_metadata = self.configs["sync_metadata"] 265 266 repair_stmt = ( 267 f"MSCK REPAIR TABLE {table_name} " 268 f"{'SYNC METADATA' if sync_metadata else ''}" 269 ) 270 271 self._logger.info(f"sql command: {repair_stmt}") 272 output = ExecEnv.SESSION.sql(repair_stmt) 273 self._logger.info(output) 274 275 def delete_where(self) -> None: 276 """Run the delete where command.""" 277 table_name = self.configs["table_or_view"] 278 delete_where = self.configs["where_clause"].strip() 279 280 delete_stmt = SQLDefinitions.delete_where_stmt.value.format( 281 table_name, delete_where 282 ) 283 284 self._logger.info(f"sql command: {delete_stmt}") 285 output = ExecEnv.SESSION.sql(delete_stmt) 286 self._logger.info(output)
Set of actions to manipulate tables/views in several ways.
20 def __init__(self, configs: dict): 21 """Construct TableManager algorithm instances. 22 23 Args: 24 configs: configurations for the TableManager algorithm. 25 """ 26 self._logger = LoggingHandler(__name__).get_logger() 27 self.configs = configs 28 self.function = self.configs["function"]
Construct TableManager algorithm instances.
Arguments:
- configs: configurations for the TableManager algorithm.
30 def get_function(self) -> None: 31 """Get a specific function to execute.""" 32 available_functions = { 33 "compute_table_statistics": self.compute_table_statistics, 34 "create_table": self.create, 35 "create_tables": self.create_many, 36 "create_view": self.create, 37 "drop_table": self.drop_table, 38 "drop_view": self.drop_view, 39 "execute_sql": self.execute_sql, 40 "truncate": self.truncate, 41 "vacuum": self.vacuum, 42 "describe": self.describe, 43 "optimize": self.optimize, 44 "show_tbl_properties": self.show_tbl_properties, 45 "get_tbl_pk": self.get_tbl_pk, 46 "repair_table": self.repair_table, 47 "delete_where": self.delete_where, 48 } 49 50 self._logger.info("Function being executed: {}".format(self.function)) 51 52 if self.function in available_functions.keys(): 53 func = available_functions[self.function] 54 func() 55 else: 56 raise NotImplementedError( 57 f"The requested function {self.function} is not implemented." 58 )
Get a specific function to execute.
60 def create(self) -> None: 61 """Create a new table or view on metastore.""" 62 disable_dbfs_retry = ( 63 self.configs["disable_dbfs_retry"] 64 if "disable_dbfs_retry" in self.configs.keys() 65 else False 66 ) 67 sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry) 68 try: 69 sql_commands = SQLParserUtils().split_sql_commands( 70 sql_commands=sql, 71 delimiter=self.configs.get("delimiter", ";"), 72 advanced_parser=self.configs.get("advanced_parser", False), 73 ) 74 for command in sql_commands: 75 if command.strip(): 76 self._logger.info(f"sql command: {command}") 77 ExecEnv.SESSION.sql(command) 78 self._logger.info(f"{self.function} successfully executed!") 79 except Exception as e: 80 self._logger.error(e) 81 raise
Create a new table or view on metastore.
83 def create_many(self) -> None: 84 """Create multiple tables or views on metastore. 85 86 In this function the path to the ddl files can be separated by comma. 87 """ 88 self.execute_multiple_sql_files()
Create multiple tables or views on metastore.
In this function the path to the ddl files can be separated by comma.
90 def compute_table_statistics(self) -> None: 91 """Compute table statistics.""" 92 sql = SQLDefinitions.compute_table_stats.value.format( 93 self.configs["table_or_view"] 94 ) 95 try: 96 self._logger.info(f"sql command: {sql}") 97 ExecEnv.SESSION.sql(sql) 98 self._logger.info(f"{self.function} successfully executed!") 99 except Exception as e: 100 self._logger.error(e) 101 raise
Compute table statistics.
103 def drop_table(self) -> None: 104 """Delete table function deletes table from metastore and erases all data.""" 105 drop_stmt = "{} {}".format( 106 SQLDefinitions.drop_table_stmt.value, 107 self.configs["table_or_view"], 108 ) 109 110 self._logger.info(f"sql command: {drop_stmt}") 111 ExecEnv.SESSION.sql(drop_stmt) 112 self._logger.info("Table successfully dropped!")
Delete table function deletes table from metastore and erases all data.
114 def drop_view(self) -> None: 115 """Delete view function deletes view from metastore and erases all data.""" 116 drop_stmt = "{} {}".format( 117 SQLDefinitions.drop_view_stmt.value, 118 self.configs["table_or_view"], 119 ) 120 121 self._logger.info(f"sql command: {drop_stmt}") 122 ExecEnv.SESSION.sql(drop_stmt) 123 self._logger.info("View successfully dropped!")
Delete view function deletes view from metastore and erases all data.
125 def truncate(self) -> None: 126 """Truncate function erases all data but keeps metadata.""" 127 truncate_stmt = "{} {}".format( 128 SQLDefinitions.truncate_stmt.value, 129 self.configs["table_or_view"], 130 ) 131 132 self._logger.info(f"sql command: {truncate_stmt}") 133 ExecEnv.SESSION.sql(truncate_stmt) 134 self._logger.info("Table successfully truncated!")
Truncate function erases all data but keeps metadata.
136 def vacuum(self) -> None: 137 """Vacuum function erases older versions from Delta Lake tables or locations.""" 138 if not self.configs.get("table_or_view", None): 139 delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"]) 140 141 self._logger.info(f"Vacuuming location: {self.configs['path']}") 142 delta_table.vacuum(self.configs.get("vacuum_hours", 168)) 143 else: 144 delta_table = DeltaTable.forName( 145 ExecEnv.SESSION, self.configs["table_or_view"] 146 ) 147 148 self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}") 149 delta_table.vacuum(self.configs.get("vacuum_hours", 168))
Vacuum function erases older versions from Delta Lake tables or locations.
151 def describe(self) -> None: 152 """Describe function describes metadata from some table or view.""" 153 describe_stmt = "{} {}".format( 154 SQLDefinitions.describe_stmt.value, 155 self.configs["table_or_view"], 156 ) 157 158 self._logger.info(f"sql command: {describe_stmt}") 159 output = ExecEnv.SESSION.sql(describe_stmt) 160 self._logger.info(output)
Describe function describes metadata from some table or view.
162 def optimize(self) -> None: 163 """Optimize function optimizes the layout of Delta Lake data.""" 164 if self.configs.get("where_clause", None): 165 where_exp = "WHERE {}".format(self.configs["where_clause"].strip()) 166 else: 167 where_exp = "" 168 169 if self.configs.get("optimize_zorder_col_list", None): 170 zorder_exp = "ZORDER BY ({})".format( 171 self.configs["optimize_zorder_col_list"].strip() 172 ) 173 else: 174 zorder_exp = "" 175 176 optimize_stmt = "{} {} {} {}".format( 177 SQLDefinitions.optimize_stmt.value, 178 ( 179 f"delta.`{self.configs.get('path', None)}`" 180 if not self.configs.get("table_or_view", None) 181 else self.configs.get("table_or_view", None) 182 ), 183 where_exp, 184 zorder_exp, 185 ) 186 187 self._logger.info(f"sql command: {optimize_stmt}") 188 output = ExecEnv.SESSION.sql(optimize_stmt) 189 self._logger.info(output)
Optimize function optimizes the layout of Delta Lake data.
191 def execute_multiple_sql_files(self) -> None: 192 """Execute multiple statements in multiple sql files. 193 194 In this function the path to the files is separated by comma. 195 """ 196 for table_metadata_file in self.configs["path"].split(","): 197 disable_dbfs_retry = ( 198 self.configs["disable_dbfs_retry"] 199 if "disable_dbfs_retry" in self.configs.keys() 200 else False 201 ) 202 sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry) 203 sql_commands = SQLParserUtils().split_sql_commands( 204 sql_commands=sql, 205 delimiter=self.configs.get("delimiter", ";"), 206 advanced_parser=self.configs.get("advanced_parser", False), 207 ) 208 for command in sql_commands: 209 if command.strip(): 210 self._logger.info(f"sql command: {command}") 211 ExecEnv.SESSION.sql(command) 212 self._logger.info("sql file successfully executed!")
Execute multiple statements in multiple sql files.
In this function the path to the files is separated by comma.
214 def execute_sql(self) -> None: 215 """Execute sql commands separated by semicolon (;).""" 216 sql_commands = SQLParserUtils().split_sql_commands( 217 sql_commands=self.configs.get("sql"), 218 delimiter=self.configs.get("delimiter", ";"), 219 advanced_parser=self.configs.get("advanced_parser", False), 220 ) 221 for command in sql_commands: 222 if command.strip(): 223 self._logger.info(f"sql command: {command}") 224 ExecEnv.SESSION.sql(command) 225 self._logger.info("sql successfully executed!")
Execute sql commands separated by semicolon (;).
227 def show_tbl_properties(self) -> DataFrame: 228 """Show Table Properties. 229 230 Returns: 231 A dataframe with the table properties. 232 """ 233 show_tbl_props_stmt = "{} {}".format( 234 SQLDefinitions.show_tbl_props_stmt.value, 235 self.configs["table_or_view"], 236 ) 237 238 self._logger.info(f"sql command: {show_tbl_props_stmt}") 239 output = ExecEnv.SESSION.sql(show_tbl_props_stmt) 240 self._logger.info(output) 241 return output
Show Table Properties.
Returns:
A dataframe with the table properties.
243 def get_tbl_pk(self) -> List[str]: 244 """Get the primary key of a particular table. 245 246 Returns: 247 The list of columns that are part of the primary key. 248 """ 249 output: List[str] = ( 250 self.show_tbl_properties() 251 .filter("key == 'lakehouse.primary_key'") 252 .select("value") 253 .withColumn("value", translate("value", " `", "")) 254 .first()[0] 255 .split(",") 256 ) 257 self._logger.info(output) 258 259 return output
Get the primary key of a particular table.
Returns:
The list of columns that are part of the primary key.
261 def repair_table(self) -> None: 262 """Run the repair table command.""" 263 table_name = self.configs["table_or_view"] 264 sync_metadata = self.configs["sync_metadata"] 265 266 repair_stmt = ( 267 f"MSCK REPAIR TABLE {table_name} " 268 f"{'SYNC METADATA' if sync_metadata else ''}" 269 ) 270 271 self._logger.info(f"sql command: {repair_stmt}") 272 output = ExecEnv.SESSION.sql(repair_stmt) 273 self._logger.info(output)
Run the repair table command.
275 def delete_where(self) -> None: 276 """Run the delete where command.""" 277 table_name = self.configs["table_or_view"] 278 delete_where = self.configs["where_clause"].strip() 279 280 delete_stmt = SQLDefinitions.delete_where_stmt.value.format( 281 table_name, delete_where 282 ) 283 284 self._logger.info(f"sql command: {delete_stmt}") 285 output = ExecEnv.SESSION.sql(delete_stmt) 286 self._logger.info(output)
Run the delete where command.