lakehouse_engine.core.table_manager

Table manager module.

  1"""Table manager module."""
  2
  3from typing import List
  4
  5from delta.tables import DeltaTable
  6from pyspark.sql import DataFrame
  7from pyspark.sql.functions import translate
  8
  9from lakehouse_engine.core.definitions import SQLDefinitions
 10from lakehouse_engine.core.exec_env import ExecEnv
 11from lakehouse_engine.utils.configs.config_utils import ConfigUtils
 12from lakehouse_engine.utils.logging_handler import LoggingHandler
 13from lakehouse_engine.utils.sql_parser_utils import SQLParserUtils
 14
 15
 16class TableManager(object):
 17    """Set of actions to manipulate tables/views in several ways."""
 18
 19    def __init__(self, configs: dict):
 20        """Construct TableManager algorithm instances.
 21
 22        Args:
 23            configs: configurations for the TableManager algorithm.
 24        """
 25        self._logger = LoggingHandler(__name__).get_logger()
 26        self.configs = configs
 27        self.function = self.configs["function"]
 28
 29    def get_function(self) -> None:
 30        """Get a specific function to execute."""
 31        available_functions = {
 32            "compute_table_statistics": self.compute_table_statistics,
 33            "create_table": self.create,
 34            "create_tables": self.create_many,
 35            "create_view": self.create,
 36            "drop_table": self.drop_table,
 37            "drop_view": self.drop_view,
 38            "execute_sql": self.execute_sql,
 39            "truncate": self.truncate,
 40            "vacuum": self.vacuum,
 41            "describe": self.describe,
 42            "optimize": self.optimize,
 43            "show_tbl_properties": self.show_tbl_properties,
 44            "get_tbl_pk": self.get_tbl_pk,
 45            "repair_table": self.repair_table,
 46            "delete_where": self.delete_where,
 47        }
 48
 49        self._logger.info("Function being executed: {}".format(self.function))
 50
 51        if self.function in available_functions.keys():
 52            func = available_functions[self.function]
 53            func()
 54        else:
 55            raise NotImplementedError(
 56                f"The requested function {self.function} is not implemented."
 57            )
 58
 59    def create(self) -> None:
 60        """Create a new table or view on metastore."""
 61        disable_dbfs_retry = (
 62            self.configs["disable_dbfs_retry"]
 63            if "disable_dbfs_retry" in self.configs.keys()
 64            else False
 65        )
 66        sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry)
 67        try:
 68            sql_commands = SQLParserUtils().split_sql_commands(
 69                sql_commands=sql,
 70                delimiter=self.configs.get("delimiter", ";"),
 71                advanced_parser=self.configs.get("advanced_parser", False),
 72            )
 73            for command in sql_commands:
 74                if command.strip():
 75                    self._logger.info(f"sql command: {command}")
 76                    ExecEnv.SESSION.sql(command)
 77            self._logger.info(f"{self.function} successfully executed!")
 78        except Exception as e:
 79            self._logger.error(e)
 80            raise
 81
 82    def create_many(self) -> None:
 83        """Create multiple tables or views on metastore.
 84
 85        In this function the path to the ddl files can be separated by comma.
 86        """
 87        self.execute_multiple_sql_files()
 88
 89    def compute_table_statistics(self) -> None:
 90        """Compute table statistics."""
 91        sql = SQLDefinitions.compute_table_stats.value.format(
 92            self.configs["table_or_view"]
 93        )
 94        try:
 95            self._logger.info(f"sql command: {sql}")
 96            ExecEnv.SESSION.sql(sql)
 97            self._logger.info(f"{self.function} successfully executed!")
 98        except Exception as e:
 99            self._logger.error(e)
100            raise
101
102    def drop_table(self) -> None:
103        """Delete table function deletes table from metastore and erases all data."""
104        drop_stmt = "{} {}".format(
105            SQLDefinitions.drop_table_stmt.value,
106            self.configs["table_or_view"],
107        )
108
109        self._logger.info(f"sql command: {drop_stmt}")
110        ExecEnv.SESSION.sql(drop_stmt)
111        self._logger.info("Table successfully dropped!")
112
113    def drop_view(self) -> None:
114        """Delete view function deletes view from metastore and erases all data."""
115        drop_stmt = "{} {}".format(
116            SQLDefinitions.drop_view_stmt.value,
117            self.configs["table_or_view"],
118        )
119
120        self._logger.info(f"sql command: {drop_stmt}")
121        ExecEnv.SESSION.sql(drop_stmt)
122        self._logger.info("View successfully dropped!")
123
124    def truncate(self) -> None:
125        """Truncate function erases all data but keeps metadata."""
126        truncate_stmt = "{} {}".format(
127            SQLDefinitions.truncate_stmt.value,
128            self.configs["table_or_view"],
129        )
130
131        self._logger.info(f"sql command: {truncate_stmt}")
132        ExecEnv.SESSION.sql(truncate_stmt)
133        self._logger.info("Table successfully truncated!")
134
135    def vacuum(self) -> None:
136        """Vacuum function erases older versions from Delta Lake tables or locations."""
137        if not self.configs.get("table_or_view", None):
138            delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"])
139
140            self._logger.info(f"Vacuuming location: {self.configs['path']}")
141            delta_table.vacuum(self.configs.get("vacuum_hours", 168))
142        else:
143            delta_table = DeltaTable.forName(
144                ExecEnv.SESSION, self.configs["table_or_view"]
145            )
146
147            self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}")
148            delta_table.vacuum(self.configs.get("vacuum_hours", 168))
149
150    def describe(self) -> None:
151        """Describe function describes metadata from some table or view."""
152        describe_stmt = "{} {}".format(
153            SQLDefinitions.describe_stmt.value,
154            self.configs["table_or_view"],
155        )
156
157        self._logger.info(f"sql command: {describe_stmt}")
158        output = ExecEnv.SESSION.sql(describe_stmt)
159        self._logger.info(output)
160
161    def optimize(self) -> None:
162        """Optimize function optimizes the layout of Delta Lake data."""
163        if self.configs.get("where_clause", None):
164            where_exp = "WHERE {}".format(self.configs["where_clause"].strip())
165        else:
166            where_exp = ""
167
168        if self.configs.get("optimize_zorder_col_list", None):
169            zorder_exp = "ZORDER BY ({})".format(
170                self.configs["optimize_zorder_col_list"].strip()
171            )
172        else:
173            zorder_exp = ""
174
175        optimize_stmt = "{} {} {} {}".format(
176            SQLDefinitions.optimize_stmt.value,
177            (
178                f"delta.`{self.configs.get('path', None)}`"
179                if not self.configs.get("table_or_view", None)
180                else self.configs.get("table_or_view", None)
181            ),
182            where_exp,
183            zorder_exp,
184        )
185
186        self._logger.info(f"sql command: {optimize_stmt}")
187        output = ExecEnv.SESSION.sql(optimize_stmt)
188        self._logger.info(output)
189
190    def execute_multiple_sql_files(self) -> None:
191        """Execute multiple statements in multiple sql files.
192
193        In this function the path to the files is separated by comma.
194        """
195        for table_metadata_file in self.configs["path"].split(","):
196            disable_dbfs_retry = (
197                self.configs["disable_dbfs_retry"]
198                if "disable_dbfs_retry" in self.configs.keys()
199                else False
200            )
201            sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry)
202            sql_commands = SQLParserUtils().split_sql_commands(
203                sql_commands=sql,
204                delimiter=self.configs.get("delimiter", ";"),
205                advanced_parser=self.configs.get("advanced_parser", False),
206            )
207            for command in sql_commands:
208                if command.strip():
209                    self._logger.info(f"sql command: {command}")
210                    ExecEnv.SESSION.sql(command)
211            self._logger.info("sql file successfully executed!")
212
213    def execute_sql(self) -> None:
214        """Execute sql commands separated by semicolon (;)."""
215        sql_commands = SQLParserUtils().split_sql_commands(
216            sql_commands=self.configs.get("sql"),
217            delimiter=self.configs.get("delimiter", ";"),
218            advanced_parser=self.configs.get("advanced_parser", False),
219        )
220        for command in sql_commands:
221            if command.strip():
222                self._logger.info(f"sql command: {command}")
223                ExecEnv.SESSION.sql(command)
224        self._logger.info("sql successfully executed!")
225
226    def show_tbl_properties(self) -> DataFrame:
227        """Show Table Properties.
228
229        Returns:
230            A dataframe with the table properties.
231        """
232        show_tbl_props_stmt = "{} {}".format(
233            SQLDefinitions.show_tbl_props_stmt.value,
234            self.configs["table_or_view"],
235        )
236
237        self._logger.info(f"sql command: {show_tbl_props_stmt}")
238        output = ExecEnv.SESSION.sql(show_tbl_props_stmt)
239        self._logger.info(output)
240        return output
241
242    def get_tbl_pk(self) -> List[str]:
243        """Get the primary key of a particular table.
244
245        Returns:
246            The list of columns that are part of the primary key.
247        """
248        output: List[str] = (
249            self.show_tbl_properties()
250            .filter("key == 'lakehouse.primary_key'")
251            .select("value")
252            .withColumn("value", translate("value", " `", ""))
253            .first()[0]
254            .split(",")
255        )
256        self._logger.info(output)
257
258        return output
259
260    def repair_table(self) -> None:
261        """Run the repair table command."""
262        table_name = self.configs["table_or_view"]
263        sync_metadata = self.configs["sync_metadata"]
264
265        repair_stmt = (
266            f"MSCK REPAIR TABLE {table_name} "
267            f"{'SYNC METADATA' if sync_metadata else ''}"
268        )
269
270        self._logger.info(f"sql command: {repair_stmt}")
271        output = ExecEnv.SESSION.sql(repair_stmt)
272        self._logger.info(output)
273
274    def delete_where(self) -> None:
275        """Run the delete where command."""
276        table_name = self.configs["table_or_view"]
277        delete_where = self.configs["where_clause"].strip()
278
279        delete_stmt = SQLDefinitions.delete_where_stmt.value.format(
280            table_name, delete_where
281        )
282
283        self._logger.info(f"sql command: {delete_stmt}")
284        output = ExecEnv.SESSION.sql(delete_stmt)
285        self._logger.info(output)
class TableManager:
 17class TableManager(object):
 18    """Set of actions to manipulate tables/views in several ways."""
 19
 20    def __init__(self, configs: dict):
 21        """Construct TableManager algorithm instances.
 22
 23        Args:
 24            configs: configurations for the TableManager algorithm.
 25        """
 26        self._logger = LoggingHandler(__name__).get_logger()
 27        self.configs = configs
 28        self.function = self.configs["function"]
 29
 30    def get_function(self) -> None:
 31        """Get a specific function to execute."""
 32        available_functions = {
 33            "compute_table_statistics": self.compute_table_statistics,
 34            "create_table": self.create,
 35            "create_tables": self.create_many,
 36            "create_view": self.create,
 37            "drop_table": self.drop_table,
 38            "drop_view": self.drop_view,
 39            "execute_sql": self.execute_sql,
 40            "truncate": self.truncate,
 41            "vacuum": self.vacuum,
 42            "describe": self.describe,
 43            "optimize": self.optimize,
 44            "show_tbl_properties": self.show_tbl_properties,
 45            "get_tbl_pk": self.get_tbl_pk,
 46            "repair_table": self.repair_table,
 47            "delete_where": self.delete_where,
 48        }
 49
 50        self._logger.info("Function being executed: {}".format(self.function))
 51
 52        if self.function in available_functions.keys():
 53            func = available_functions[self.function]
 54            func()
 55        else:
 56            raise NotImplementedError(
 57                f"The requested function {self.function} is not implemented."
 58            )
 59
 60    def create(self) -> None:
 61        """Create a new table or view on metastore."""
 62        disable_dbfs_retry = (
 63            self.configs["disable_dbfs_retry"]
 64            if "disable_dbfs_retry" in self.configs.keys()
 65            else False
 66        )
 67        sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry)
 68        try:
 69            sql_commands = SQLParserUtils().split_sql_commands(
 70                sql_commands=sql,
 71                delimiter=self.configs.get("delimiter", ";"),
 72                advanced_parser=self.configs.get("advanced_parser", False),
 73            )
 74            for command in sql_commands:
 75                if command.strip():
 76                    self._logger.info(f"sql command: {command}")
 77                    ExecEnv.SESSION.sql(command)
 78            self._logger.info(f"{self.function} successfully executed!")
 79        except Exception as e:
 80            self._logger.error(e)
 81            raise
 82
 83    def create_many(self) -> None:
 84        """Create multiple tables or views on metastore.
 85
 86        In this function the path to the ddl files can be separated by comma.
 87        """
 88        self.execute_multiple_sql_files()
 89
 90    def compute_table_statistics(self) -> None:
 91        """Compute table statistics."""
 92        sql = SQLDefinitions.compute_table_stats.value.format(
 93            self.configs["table_or_view"]
 94        )
 95        try:
 96            self._logger.info(f"sql command: {sql}")
 97            ExecEnv.SESSION.sql(sql)
 98            self._logger.info(f"{self.function} successfully executed!")
 99        except Exception as e:
100            self._logger.error(e)
101            raise
102
103    def drop_table(self) -> None:
104        """Delete table function deletes table from metastore and erases all data."""
105        drop_stmt = "{} {}".format(
106            SQLDefinitions.drop_table_stmt.value,
107            self.configs["table_or_view"],
108        )
109
110        self._logger.info(f"sql command: {drop_stmt}")
111        ExecEnv.SESSION.sql(drop_stmt)
112        self._logger.info("Table successfully dropped!")
113
114    def drop_view(self) -> None:
115        """Delete view function deletes view from metastore and erases all data."""
116        drop_stmt = "{} {}".format(
117            SQLDefinitions.drop_view_stmt.value,
118            self.configs["table_or_view"],
119        )
120
121        self._logger.info(f"sql command: {drop_stmt}")
122        ExecEnv.SESSION.sql(drop_stmt)
123        self._logger.info("View successfully dropped!")
124
125    def truncate(self) -> None:
126        """Truncate function erases all data but keeps metadata."""
127        truncate_stmt = "{} {}".format(
128            SQLDefinitions.truncate_stmt.value,
129            self.configs["table_or_view"],
130        )
131
132        self._logger.info(f"sql command: {truncate_stmt}")
133        ExecEnv.SESSION.sql(truncate_stmt)
134        self._logger.info("Table successfully truncated!")
135
136    def vacuum(self) -> None:
137        """Vacuum function erases older versions from Delta Lake tables or locations."""
138        if not self.configs.get("table_or_view", None):
139            delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"])
140
141            self._logger.info(f"Vacuuming location: {self.configs['path']}")
142            delta_table.vacuum(self.configs.get("vacuum_hours", 168))
143        else:
144            delta_table = DeltaTable.forName(
145                ExecEnv.SESSION, self.configs["table_or_view"]
146            )
147
148            self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}")
149            delta_table.vacuum(self.configs.get("vacuum_hours", 168))
150
151    def describe(self) -> None:
152        """Describe function describes metadata from some table or view."""
153        describe_stmt = "{} {}".format(
154            SQLDefinitions.describe_stmt.value,
155            self.configs["table_or_view"],
156        )
157
158        self._logger.info(f"sql command: {describe_stmt}")
159        output = ExecEnv.SESSION.sql(describe_stmt)
160        self._logger.info(output)
161
162    def optimize(self) -> None:
163        """Optimize function optimizes the layout of Delta Lake data."""
164        if self.configs.get("where_clause", None):
165            where_exp = "WHERE {}".format(self.configs["where_clause"].strip())
166        else:
167            where_exp = ""
168
169        if self.configs.get("optimize_zorder_col_list", None):
170            zorder_exp = "ZORDER BY ({})".format(
171                self.configs["optimize_zorder_col_list"].strip()
172            )
173        else:
174            zorder_exp = ""
175
176        optimize_stmt = "{} {} {} {}".format(
177            SQLDefinitions.optimize_stmt.value,
178            (
179                f"delta.`{self.configs.get('path', None)}`"
180                if not self.configs.get("table_or_view", None)
181                else self.configs.get("table_or_view", None)
182            ),
183            where_exp,
184            zorder_exp,
185        )
186
187        self._logger.info(f"sql command: {optimize_stmt}")
188        output = ExecEnv.SESSION.sql(optimize_stmt)
189        self._logger.info(output)
190
191    def execute_multiple_sql_files(self) -> None:
192        """Execute multiple statements in multiple sql files.
193
194        In this function the path to the files is separated by comma.
195        """
196        for table_metadata_file in self.configs["path"].split(","):
197            disable_dbfs_retry = (
198                self.configs["disable_dbfs_retry"]
199                if "disable_dbfs_retry" in self.configs.keys()
200                else False
201            )
202            sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry)
203            sql_commands = SQLParserUtils().split_sql_commands(
204                sql_commands=sql,
205                delimiter=self.configs.get("delimiter", ";"),
206                advanced_parser=self.configs.get("advanced_parser", False),
207            )
208            for command in sql_commands:
209                if command.strip():
210                    self._logger.info(f"sql command: {command}")
211                    ExecEnv.SESSION.sql(command)
212            self._logger.info("sql file successfully executed!")
213
214    def execute_sql(self) -> None:
215        """Execute sql commands separated by semicolon (;)."""
216        sql_commands = SQLParserUtils().split_sql_commands(
217            sql_commands=self.configs.get("sql"),
218            delimiter=self.configs.get("delimiter", ";"),
219            advanced_parser=self.configs.get("advanced_parser", False),
220        )
221        for command in sql_commands:
222            if command.strip():
223                self._logger.info(f"sql command: {command}")
224                ExecEnv.SESSION.sql(command)
225        self._logger.info("sql successfully executed!")
226
227    def show_tbl_properties(self) -> DataFrame:
228        """Show Table Properties.
229
230        Returns:
231            A dataframe with the table properties.
232        """
233        show_tbl_props_stmt = "{} {}".format(
234            SQLDefinitions.show_tbl_props_stmt.value,
235            self.configs["table_or_view"],
236        )
237
238        self._logger.info(f"sql command: {show_tbl_props_stmt}")
239        output = ExecEnv.SESSION.sql(show_tbl_props_stmt)
240        self._logger.info(output)
241        return output
242
243    def get_tbl_pk(self) -> List[str]:
244        """Get the primary key of a particular table.
245
246        Returns:
247            The list of columns that are part of the primary key.
248        """
249        output: List[str] = (
250            self.show_tbl_properties()
251            .filter("key == 'lakehouse.primary_key'")
252            .select("value")
253            .withColumn("value", translate("value", " `", ""))
254            .first()[0]
255            .split(",")
256        )
257        self._logger.info(output)
258
259        return output
260
261    def repair_table(self) -> None:
262        """Run the repair table command."""
263        table_name = self.configs["table_or_view"]
264        sync_metadata = self.configs["sync_metadata"]
265
266        repair_stmt = (
267            f"MSCK REPAIR TABLE {table_name} "
268            f"{'SYNC METADATA' if sync_metadata else ''}"
269        )
270
271        self._logger.info(f"sql command: {repair_stmt}")
272        output = ExecEnv.SESSION.sql(repair_stmt)
273        self._logger.info(output)
274
275    def delete_where(self) -> None:
276        """Run the delete where command."""
277        table_name = self.configs["table_or_view"]
278        delete_where = self.configs["where_clause"].strip()
279
280        delete_stmt = SQLDefinitions.delete_where_stmt.value.format(
281            table_name, delete_where
282        )
283
284        self._logger.info(f"sql command: {delete_stmt}")
285        output = ExecEnv.SESSION.sql(delete_stmt)
286        self._logger.info(output)

Set of actions to manipulate tables/views in several ways.

TableManager(configs: dict)
20    def __init__(self, configs: dict):
21        """Construct TableManager algorithm instances.
22
23        Args:
24            configs: configurations for the TableManager algorithm.
25        """
26        self._logger = LoggingHandler(__name__).get_logger()
27        self.configs = configs
28        self.function = self.configs["function"]

Construct TableManager algorithm instances.

Arguments:
  • configs: configurations for the TableManager algorithm.
configs
function
def get_function(self) -> None:
30    def get_function(self) -> None:
31        """Get a specific function to execute."""
32        available_functions = {
33            "compute_table_statistics": self.compute_table_statistics,
34            "create_table": self.create,
35            "create_tables": self.create_many,
36            "create_view": self.create,
37            "drop_table": self.drop_table,
38            "drop_view": self.drop_view,
39            "execute_sql": self.execute_sql,
40            "truncate": self.truncate,
41            "vacuum": self.vacuum,
42            "describe": self.describe,
43            "optimize": self.optimize,
44            "show_tbl_properties": self.show_tbl_properties,
45            "get_tbl_pk": self.get_tbl_pk,
46            "repair_table": self.repair_table,
47            "delete_where": self.delete_where,
48        }
49
50        self._logger.info("Function being executed: {}".format(self.function))
51
52        if self.function in available_functions.keys():
53            func = available_functions[self.function]
54            func()
55        else:
56            raise NotImplementedError(
57                f"The requested function {self.function} is not implemented."
58            )

Get a specific function to execute.

def create(self) -> None:
60    def create(self) -> None:
61        """Create a new table or view on metastore."""
62        disable_dbfs_retry = (
63            self.configs["disable_dbfs_retry"]
64            if "disable_dbfs_retry" in self.configs.keys()
65            else False
66        )
67        sql = ConfigUtils.read_sql(self.configs["path"], disable_dbfs_retry)
68        try:
69            sql_commands = SQLParserUtils().split_sql_commands(
70                sql_commands=sql,
71                delimiter=self.configs.get("delimiter", ";"),
72                advanced_parser=self.configs.get("advanced_parser", False),
73            )
74            for command in sql_commands:
75                if command.strip():
76                    self._logger.info(f"sql command: {command}")
77                    ExecEnv.SESSION.sql(command)
78            self._logger.info(f"{self.function} successfully executed!")
79        except Exception as e:
80            self._logger.error(e)
81            raise

Create a new table or view on metastore.

def create_many(self) -> None:
83    def create_many(self) -> None:
84        """Create multiple tables or views on metastore.
85
86        In this function the path to the ddl files can be separated by comma.
87        """
88        self.execute_multiple_sql_files()

Create multiple tables or views on metastore.

In this function the path to the ddl files can be separated by comma.

def compute_table_statistics(self) -> None:
 90    def compute_table_statistics(self) -> None:
 91        """Compute table statistics."""
 92        sql = SQLDefinitions.compute_table_stats.value.format(
 93            self.configs["table_or_view"]
 94        )
 95        try:
 96            self._logger.info(f"sql command: {sql}")
 97            ExecEnv.SESSION.sql(sql)
 98            self._logger.info(f"{self.function} successfully executed!")
 99        except Exception as e:
100            self._logger.error(e)
101            raise

Compute table statistics.

def drop_table(self) -> None:
103    def drop_table(self) -> None:
104        """Delete table function deletes table from metastore and erases all data."""
105        drop_stmt = "{} {}".format(
106            SQLDefinitions.drop_table_stmt.value,
107            self.configs["table_or_view"],
108        )
109
110        self._logger.info(f"sql command: {drop_stmt}")
111        ExecEnv.SESSION.sql(drop_stmt)
112        self._logger.info("Table successfully dropped!")

Delete table function deletes table from metastore and erases all data.

def drop_view(self) -> None:
114    def drop_view(self) -> None:
115        """Delete view function deletes view from metastore and erases all data."""
116        drop_stmt = "{} {}".format(
117            SQLDefinitions.drop_view_stmt.value,
118            self.configs["table_or_view"],
119        )
120
121        self._logger.info(f"sql command: {drop_stmt}")
122        ExecEnv.SESSION.sql(drop_stmt)
123        self._logger.info("View successfully dropped!")

Delete view function deletes view from metastore and erases all data.

def truncate(self) -> None:
125    def truncate(self) -> None:
126        """Truncate function erases all data but keeps metadata."""
127        truncate_stmt = "{} {}".format(
128            SQLDefinitions.truncate_stmt.value,
129            self.configs["table_or_view"],
130        )
131
132        self._logger.info(f"sql command: {truncate_stmt}")
133        ExecEnv.SESSION.sql(truncate_stmt)
134        self._logger.info("Table successfully truncated!")

Truncate function erases all data but keeps metadata.

def vacuum(self) -> None:
136    def vacuum(self) -> None:
137        """Vacuum function erases older versions from Delta Lake tables or locations."""
138        if not self.configs.get("table_or_view", None):
139            delta_table = DeltaTable.forPath(ExecEnv.SESSION, self.configs["path"])
140
141            self._logger.info(f"Vacuuming location: {self.configs['path']}")
142            delta_table.vacuum(self.configs.get("vacuum_hours", 168))
143        else:
144            delta_table = DeltaTable.forName(
145                ExecEnv.SESSION, self.configs["table_or_view"]
146            )
147
148            self._logger.info(f"Vacuuming table: {self.configs['table_or_view']}")
149            delta_table.vacuum(self.configs.get("vacuum_hours", 168))

Vacuum function erases older versions from Delta Lake tables or locations.

def describe(self) -> None:
151    def describe(self) -> None:
152        """Describe function describes metadata from some table or view."""
153        describe_stmt = "{} {}".format(
154            SQLDefinitions.describe_stmt.value,
155            self.configs["table_or_view"],
156        )
157
158        self._logger.info(f"sql command: {describe_stmt}")
159        output = ExecEnv.SESSION.sql(describe_stmt)
160        self._logger.info(output)

Describe function describes metadata from some table or view.

def optimize(self) -> None:
162    def optimize(self) -> None:
163        """Optimize function optimizes the layout of Delta Lake data."""
164        if self.configs.get("where_clause", None):
165            where_exp = "WHERE {}".format(self.configs["where_clause"].strip())
166        else:
167            where_exp = ""
168
169        if self.configs.get("optimize_zorder_col_list", None):
170            zorder_exp = "ZORDER BY ({})".format(
171                self.configs["optimize_zorder_col_list"].strip()
172            )
173        else:
174            zorder_exp = ""
175
176        optimize_stmt = "{} {} {} {}".format(
177            SQLDefinitions.optimize_stmt.value,
178            (
179                f"delta.`{self.configs.get('path', None)}`"
180                if not self.configs.get("table_or_view", None)
181                else self.configs.get("table_or_view", None)
182            ),
183            where_exp,
184            zorder_exp,
185        )
186
187        self._logger.info(f"sql command: {optimize_stmt}")
188        output = ExecEnv.SESSION.sql(optimize_stmt)
189        self._logger.info(output)

Optimize function optimizes the layout of Delta Lake data.

def execute_multiple_sql_files(self) -> None:
191    def execute_multiple_sql_files(self) -> None:
192        """Execute multiple statements in multiple sql files.
193
194        In this function the path to the files is separated by comma.
195        """
196        for table_metadata_file in self.configs["path"].split(","):
197            disable_dbfs_retry = (
198                self.configs["disable_dbfs_retry"]
199                if "disable_dbfs_retry" in self.configs.keys()
200                else False
201            )
202            sql = ConfigUtils.read_sql(table_metadata_file.strip(), disable_dbfs_retry)
203            sql_commands = SQLParserUtils().split_sql_commands(
204                sql_commands=sql,
205                delimiter=self.configs.get("delimiter", ";"),
206                advanced_parser=self.configs.get("advanced_parser", False),
207            )
208            for command in sql_commands:
209                if command.strip():
210                    self._logger.info(f"sql command: {command}")
211                    ExecEnv.SESSION.sql(command)
212            self._logger.info("sql file successfully executed!")

Execute multiple statements in multiple sql files.

In this function the path to the files is separated by comma.

def execute_sql(self) -> None:
214    def execute_sql(self) -> None:
215        """Execute sql commands separated by semicolon (;)."""
216        sql_commands = SQLParserUtils().split_sql_commands(
217            sql_commands=self.configs.get("sql"),
218            delimiter=self.configs.get("delimiter", ";"),
219            advanced_parser=self.configs.get("advanced_parser", False),
220        )
221        for command in sql_commands:
222            if command.strip():
223                self._logger.info(f"sql command: {command}")
224                ExecEnv.SESSION.sql(command)
225        self._logger.info("sql successfully executed!")

Execute sql commands separated by semicolon (;).

def show_tbl_properties(self) -> pyspark.sql.dataframe.DataFrame:
227    def show_tbl_properties(self) -> DataFrame:
228        """Show Table Properties.
229
230        Returns:
231            A dataframe with the table properties.
232        """
233        show_tbl_props_stmt = "{} {}".format(
234            SQLDefinitions.show_tbl_props_stmt.value,
235            self.configs["table_or_view"],
236        )
237
238        self._logger.info(f"sql command: {show_tbl_props_stmt}")
239        output = ExecEnv.SESSION.sql(show_tbl_props_stmt)
240        self._logger.info(output)
241        return output

Show Table Properties.

Returns:

A dataframe with the table properties.

def get_tbl_pk(self) -> List[str]:
243    def get_tbl_pk(self) -> List[str]:
244        """Get the primary key of a particular table.
245
246        Returns:
247            The list of columns that are part of the primary key.
248        """
249        output: List[str] = (
250            self.show_tbl_properties()
251            .filter("key == 'lakehouse.primary_key'")
252            .select("value")
253            .withColumn("value", translate("value", " `", ""))
254            .first()[0]
255            .split(",")
256        )
257        self._logger.info(output)
258
259        return output

Get the primary key of a particular table.

Returns:

The list of columns that are part of the primary key.

def repair_table(self) -> None:
261    def repair_table(self) -> None:
262        """Run the repair table command."""
263        table_name = self.configs["table_or_view"]
264        sync_metadata = self.configs["sync_metadata"]
265
266        repair_stmt = (
267            f"MSCK REPAIR TABLE {table_name} "
268            f"{'SYNC METADATA' if sync_metadata else ''}"
269        )
270
271        self._logger.info(f"sql command: {repair_stmt}")
272        output = ExecEnv.SESSION.sql(repair_stmt)
273        self._logger.info(output)

Run the repair table command.

def delete_where(self) -> None:
275    def delete_where(self) -> None:
276        """Run the delete where command."""
277        table_name = self.configs["table_or_view"]
278        delete_where = self.configs["where_clause"].strip()
279
280        delete_stmt = SQLDefinitions.delete_where_stmt.value.format(
281            table_name, delete_where
282        )
283
284        self._logger.info(f"sql command: {delete_stmt}")
285        output = ExecEnv.SESSION.sql(delete_stmt)
286        self._logger.info(output)

Run the delete where command.