lakehouse_engine.engine
Contract of the lakehouse engine with all the available functions to be executed.
1"""Contract of the lakehouse engine with all the available functions to be executed.""" 2 3from typing import List, Optional, OrderedDict 4 5from lakehouse_engine.algorithms.data_loader import DataLoader 6from lakehouse_engine.algorithms.gab import GAB 7from lakehouse_engine.algorithms.reconciliator import Reconciliator 8from lakehouse_engine.algorithms.sensor import Sensor, SensorStatus 9from lakehouse_engine.core.definitions import ( 10 CollectEngineUsage, 11 DQDefaults, 12 SAPLogchain, 13 TerminatorSpec, 14) 15from lakehouse_engine.core.exec_env import ExecEnv 16from lakehouse_engine.core.file_manager import FileManagerFactory 17from lakehouse_engine.core.sensor_manager import SensorUpstreamManager 18from lakehouse_engine.core.table_manager import TableManager 19from lakehouse_engine.terminators.notifier_factory import NotifierFactory 20from lakehouse_engine.terminators.sensor_terminator import SensorTerminator 21from lakehouse_engine.utils.acon_utils import validate_and_resolve_acon 22from lakehouse_engine.utils.configs.config_utils import ConfigUtils 23from lakehouse_engine.utils.engine_usage_stats import EngineUsageStats 24 25 26def load_data( 27 acon_path: Optional[str] = None, 28 acon: Optional[dict] = None, 29 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 30 spark_confs: dict = None, 31) -> Optional[OrderedDict]: 32 """Load data using the DataLoader algorithm. 33 34 Args: 35 acon_path: path of the acon (algorithm configuration) file. 36 acon: acon provided directly through python code (e.g., notebooks or other 37 apps). 38 collect_engine_usage: Lakehouse usage statistics collection strategy. 39 spark_confs: optional dictionary with the spark confs to be used when collecting 40 the engine usage. 41 """ 42 try: 43 acon = ConfigUtils.get_acon(acon_path, acon) 44 ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None)) 45 acon = validate_and_resolve_acon(acon, "in_motion") 46 finally: 47 EngineUsageStats.store_engine_usage( 48 acon, load_data.__name__, collect_engine_usage, spark_confs 49 ) 50 return DataLoader(acon).execute() 51 52 53def execute_reconciliation( 54 acon_path: Optional[str] = None, 55 acon: Optional[dict] = None, 56 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 57 spark_confs: dict = None, 58) -> None: 59 """Execute the Reconciliator algorithm. 60 61 Args: 62 acon_path: path of the acon (algorithm configuration) file. 63 acon: acon provided directly through python code (e.g., notebooks or other 64 apps). 65 collect_engine_usage: Lakehouse usage statistics collection strategy. 66 spark_confs: optional dictionary with the spark confs to be used when collecting 67 the engine usage. 68 """ 69 try: 70 acon = ConfigUtils.get_acon(acon_path, acon) 71 ExecEnv.get_or_create( 72 app_name="reconciliator", config=acon.get("exec_env", None) 73 ) 74 acon = validate_and_resolve_acon(acon) 75 finally: 76 EngineUsageStats.store_engine_usage( 77 acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs 78 ) 79 Reconciliator(acon).execute() 80 81 82def execute_dq_validation( 83 acon_path: Optional[str] = None, 84 acon: Optional[dict] = None, 85 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 86 spark_confs: dict = None, 87) -> None: 88 """Execute the DQValidator algorithm. 89 90 Args: 91 acon_path: path of the acon (algorithm configuration) file. 92 acon: acon provided directly through python code (e.g., notebooks or other 93 apps). 94 collect_engine_usage: Lakehouse usage statistics collection strategy. 95 spark_confs: optional dictionary with the spark confs to be used when collecting 96 the engine usage. 97 """ 98 from lakehouse_engine.algorithms.dq_validator import DQValidator 99 100 try: 101 acon = ConfigUtils.get_acon(acon_path, acon) 102 ExecEnv.get_or_create( 103 app_name="dq_validator", config=acon.get("exec_env", None) 104 ) 105 acon = validate_and_resolve_acon(acon, "at_rest") 106 finally: 107 EngineUsageStats.store_engine_usage( 108 acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs 109 ) 110 DQValidator(acon).execute() 111 112 113def manage_table( 114 acon_path: Optional[str] = None, 115 acon: Optional[dict] = None, 116 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 117 spark_confs: dict = None, 118) -> None: 119 """Manipulate tables/views using Table Manager algorithm. 120 121 Args: 122 acon_path: path of the acon (algorithm configuration) file. 123 acon: acon provided directly through python code (e.g., notebooks 124 or other apps). 125 collect_engine_usage: Lakehouse usage statistics collection strategy. 126 spark_confs: optional dictionary with the spark confs to be used when collecting 127 the engine usage. 128 """ 129 acon = ConfigUtils.get_acon(acon_path, acon) 130 ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None)) 131 EngineUsageStats.store_engine_usage( 132 acon, manage_table.__name__, collect_engine_usage, spark_confs 133 ) 134 TableManager(acon).get_function() 135 136 137def manage_files( 138 acon_path: Optional[str] = None, 139 acon: Optional[dict] = None, 140 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 141 spark_confs: dict = None, 142) -> None: 143 """Manipulate s3 files using File Manager algorithm. 144 145 Args: 146 acon_path: path of the acon (algorithm configuration) file. 147 acon: acon provided directly through python code (e.g., notebooks 148 or other apps). 149 collect_engine_usage: Lakehouse usage statistics collection strategy. 150 spark_confs: optional dictionary with the spark confs to be used when collecting 151 the engine usage. 152 """ 153 acon = ConfigUtils.get_acon(acon_path, acon) 154 ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None)) 155 EngineUsageStats.store_engine_usage( 156 acon, manage_files.__name__, collect_engine_usage, spark_confs 157 ) 158 FileManagerFactory.execute_function(configs=acon) 159 160 161def execute_sensor( 162 acon_path: Optional[str] = None, 163 acon: Optional[dict] = None, 164 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 165 spark_confs: dict = None, 166) -> bool: 167 """Execute a sensor based on a Sensor Algorithm Configuration. 168 169 A sensor is useful to check if an upstream system has new data. 170 171 Args: 172 acon_path: path of the acon (algorithm configuration) file. 173 acon: acon provided directly through python code (e.g., notebooks 174 or other apps). 175 collect_engine_usage: Lakehouse usage statistics collection strategy. 176 spark_confs: optional dictionary with the spark confs to be used when collecting 177 the engine usage. 178 """ 179 acon = ConfigUtils.get_acon(acon_path, acon) 180 ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None)) 181 EngineUsageStats.store_engine_usage( 182 acon, execute_sensor.__name__, collect_engine_usage, spark_confs 183 ) 184 return Sensor(acon).execute() 185 186 187def update_sensor_status( 188 sensor_id: str, 189 control_db_table_name: str, 190 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 191 assets: List[str] = None, 192) -> None: 193 """Update internal sensor status. 194 195 Update the sensor status in the control table, 196 it should be used to tell the system 197 that the sensor has processed all new data that was previously identified, 198 hence updating the shifted sensor status. 199 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 200 `SensorStatus.PROCESSED_NEW_DATA`, 201 but there might be scenarios - still to identify - 202 where we can update the sensor status from/to different statuses. 203 204 Args: 205 sensor_id: sensor id. 206 control_db_table_name: `db.table` to store sensor checkpoints. 207 status: status of the sensor. 208 assets: a list of assets that are considered as available to 209 consume downstream after this sensor has status 210 PROCESSED_NEW_DATA. 211 """ 212 ExecEnv.get_or_create(app_name="update_sensor_status") 213 SensorTerminator.update_sensor_status( 214 sensor_id=sensor_id, 215 control_db_table_name=control_db_table_name, 216 status=status, 217 assets=assets, 218 ) 219 220 221def generate_sensor_query( 222 sensor_id: str, 223 filter_exp: str = None, 224 control_db_table_name: str = None, 225 upstream_key: str = None, 226 upstream_value: str = None, 227 upstream_table_name: str = None, 228) -> str: 229 """Generates a preprocess query to be used in a sensor configuration. 230 231 Args: 232 sensor_id: sensor id. 233 filter_exp: expression to filter incoming new data. 234 You can use the placeholder ?default_upstream_key and 235 ?default_upstream_value, so that it can be replaced by the 236 respective values in the control_db_table_name for this specific 237 sensor_id. 238 control_db_table_name: `db.table` to retrieve the last status change 239 timestamp. This is only relevant for the jdbc sensor. 240 upstream_key: the key of custom sensor information to control how to 241 identify new data from the upstream (e.g., a time column in the 242 upstream). 243 upstream_value: the upstream value 244 to identify new data from the upstream (e.g., the value of a time 245 present in the upstream). 246 upstream_table_name: value for custom sensor 247 to query new data from the upstream 248 If none we will set the default value, 249 our `sensor_new_data` view. 250 251 Return: 252 The query string. 253 """ 254 ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query") 255 if filter_exp: 256 return SensorUpstreamManager.generate_filter_exp_query( 257 sensor_id=sensor_id, 258 filter_exp=filter_exp, 259 control_db_table_name=control_db_table_name, 260 upstream_key=upstream_key, 261 upstream_value=upstream_value, 262 upstream_table_name=upstream_table_name, 263 ) 264 else: 265 return SensorUpstreamManager.generate_sensor_table_preprocess_query( 266 sensor_id=sensor_id 267 ) 268 269 270def generate_sensor_sap_logchain_query( 271 chain_id: str, 272 dbtable: str = SAPLogchain.DBTABLE.value, 273 status: str = SAPLogchain.GREEN_STATUS.value, 274 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 275) -> str: 276 """Generates a sensor query based in the SAP Logchain table. 277 278 Args: 279 chain_id: chain id to query the status on SAP. 280 dbtable: `db.table` to retrieve the data to 281 check if the sap chain is already finished. 282 status: `db.table` to retrieve the last status change 283 timestamp. 284 engine_table_name: table name exposed with the SAP LOGCHAIN data. 285 This table will be used in the jdbc query. 286 287 Return: 288 The query string. 289 """ 290 ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query") 291 return SensorUpstreamManager.generate_sensor_sap_logchain_query( 292 chain_id=chain_id, 293 dbtable=dbtable, 294 status=status, 295 engine_table_name=engine_table_name, 296 ) 297 298 299def send_notification(args: dict) -> None: 300 """Send a notification using a notifier. 301 302 Args: 303 args: arguments for the notifier. 304 """ 305 notifier = NotifierFactory.get_notifier( 306 spec=TerminatorSpec(function="notify", args=args) 307 ) 308 309 notifier.create_notification() 310 notifier.send_notification() 311 312 313def build_data_docs( 314 store_backend: str = DQDefaults.STORE_BACKEND.value, 315 local_fs_root_dir: str = None, 316 data_docs_local_fs: str = None, 317 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value, 318 bucket: str = None, 319 data_docs_bucket: str = None, 320 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 321 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value, 322 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value, 323) -> None: 324 """Build Data Docs for the project. 325 326 This function does a full build of data docs based on all the great expectations 327 checkpoints in the specified location, getting all history of run/validations 328 executed and results. 329 330 Args: 331 store_backend: which store_backend to use (e.g. s3 or file_system). 332 local_fs_root_dir: path of the root directory. Note: only applicable 333 for store_backend file_system 334 data_docs_local_fs: path of the root directory. Note: only applicable 335 for store_backend file_system. 336 data_docs_prefix: prefix where to store data_docs' data. 337 bucket: the bucket name to consider for the store_backend 338 (store DQ artefacts). Note: only applicable for store_backend s3. 339 data_docs_bucket: the bucket name for data docs only. When defined, 340 it will supersede bucket parameter. 341 Note: only applicable for store_backend s3. 342 expectations_store_prefix: prefix where to store expectations' data. 343 Note: only applicable for store_backend s3. 344 validations_store_prefix: prefix where to store validations' data. 345 Note: only applicable for store_backend s3. 346 checkpoint_store_prefix: prefix where to store checkpoints' data. 347 Note: only applicable for store_backend s3. 348 """ 349 from lakehouse_engine.dq_processors.dq_factory import DQFactory 350 351 DQFactory.build_data_docs( 352 store_backend=store_backend, 353 local_fs_root_dir=local_fs_root_dir, 354 data_docs_local_fs=data_docs_local_fs, 355 data_docs_prefix=data_docs_prefix, 356 bucket=bucket, 357 data_docs_bucket=data_docs_bucket, 358 expectations_store_prefix=expectations_store_prefix, 359 validations_store_prefix=validations_store_prefix, 360 checkpoint_store_prefix=checkpoint_store_prefix, 361 ) 362 363 364def execute_gab( 365 acon_path: Optional[str] = None, 366 acon: Optional[dict] = None, 367 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 368 spark_confs: dict = None, 369) -> None: 370 """Execute the gold asset builder based on a GAB Algorithm Configuration. 371 372 GaB is useful to build your gold assets with predefined functions for recurrent 373 periods. 374 375 Args: 376 acon_path: path of the acon (algorithm configuration) file. 377 acon: acon provided directly through python code (e.g., notebooks 378 or other apps). 379 collect_engine_usage: Lakehouse usage statistics collection strategy. 380 spark_confs: optional dictionary with the spark confs to be used when collecting 381 the engine usage. 382 """ 383 acon = ConfigUtils.get_acon(acon_path, acon) 384 ExecEnv.get_or_create(app_name="execute_gab", config=acon.get("exec_env", None)) 385 EngineUsageStats.store_engine_usage( 386 acon, execute_gab.__name__, collect_engine_usage, spark_confs 387 ) 388 GAB(acon).execute()
27def load_data( 28 acon_path: Optional[str] = None, 29 acon: Optional[dict] = None, 30 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 31 spark_confs: dict = None, 32) -> Optional[OrderedDict]: 33 """Load data using the DataLoader algorithm. 34 35 Args: 36 acon_path: path of the acon (algorithm configuration) file. 37 acon: acon provided directly through python code (e.g., notebooks or other 38 apps). 39 collect_engine_usage: Lakehouse usage statistics collection strategy. 40 spark_confs: optional dictionary with the spark confs to be used when collecting 41 the engine usage. 42 """ 43 try: 44 acon = ConfigUtils.get_acon(acon_path, acon) 45 ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None)) 46 acon = validate_and_resolve_acon(acon, "in_motion") 47 finally: 48 EngineUsageStats.store_engine_usage( 49 acon, load_data.__name__, collect_engine_usage, spark_confs 50 ) 51 return DataLoader(acon).execute()
Load data using the DataLoader algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
54def execute_reconciliation( 55 acon_path: Optional[str] = None, 56 acon: Optional[dict] = None, 57 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 58 spark_confs: dict = None, 59) -> None: 60 """Execute the Reconciliator algorithm. 61 62 Args: 63 acon_path: path of the acon (algorithm configuration) file. 64 acon: acon provided directly through python code (e.g., notebooks or other 65 apps). 66 collect_engine_usage: Lakehouse usage statistics collection strategy. 67 spark_confs: optional dictionary with the spark confs to be used when collecting 68 the engine usage. 69 """ 70 try: 71 acon = ConfigUtils.get_acon(acon_path, acon) 72 ExecEnv.get_or_create( 73 app_name="reconciliator", config=acon.get("exec_env", None) 74 ) 75 acon = validate_and_resolve_acon(acon) 76 finally: 77 EngineUsageStats.store_engine_usage( 78 acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs 79 ) 80 Reconciliator(acon).execute()
Execute the Reconciliator algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
83def execute_dq_validation( 84 acon_path: Optional[str] = None, 85 acon: Optional[dict] = None, 86 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 87 spark_confs: dict = None, 88) -> None: 89 """Execute the DQValidator algorithm. 90 91 Args: 92 acon_path: path of the acon (algorithm configuration) file. 93 acon: acon provided directly through python code (e.g., notebooks or other 94 apps). 95 collect_engine_usage: Lakehouse usage statistics collection strategy. 96 spark_confs: optional dictionary with the spark confs to be used when collecting 97 the engine usage. 98 """ 99 from lakehouse_engine.algorithms.dq_validator import DQValidator 100 101 try: 102 acon = ConfigUtils.get_acon(acon_path, acon) 103 ExecEnv.get_or_create( 104 app_name="dq_validator", config=acon.get("exec_env", None) 105 ) 106 acon = validate_and_resolve_acon(acon, "at_rest") 107 finally: 108 EngineUsageStats.store_engine_usage( 109 acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs 110 ) 111 DQValidator(acon).execute()
Execute the DQValidator algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
114def manage_table( 115 acon_path: Optional[str] = None, 116 acon: Optional[dict] = None, 117 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 118 spark_confs: dict = None, 119) -> None: 120 """Manipulate tables/views using Table Manager algorithm. 121 122 Args: 123 acon_path: path of the acon (algorithm configuration) file. 124 acon: acon provided directly through python code (e.g., notebooks 125 or other apps). 126 collect_engine_usage: Lakehouse usage statistics collection strategy. 127 spark_confs: optional dictionary with the spark confs to be used when collecting 128 the engine usage. 129 """ 130 acon = ConfigUtils.get_acon(acon_path, acon) 131 ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None)) 132 EngineUsageStats.store_engine_usage( 133 acon, manage_table.__name__, collect_engine_usage, spark_confs 134 ) 135 TableManager(acon).get_function()
Manipulate tables/views using Table Manager algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
138def manage_files( 139 acon_path: Optional[str] = None, 140 acon: Optional[dict] = None, 141 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 142 spark_confs: dict = None, 143) -> None: 144 """Manipulate s3 files using File Manager algorithm. 145 146 Args: 147 acon_path: path of the acon (algorithm configuration) file. 148 acon: acon provided directly through python code (e.g., notebooks 149 or other apps). 150 collect_engine_usage: Lakehouse usage statistics collection strategy. 151 spark_confs: optional dictionary with the spark confs to be used when collecting 152 the engine usage. 153 """ 154 acon = ConfigUtils.get_acon(acon_path, acon) 155 ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None)) 156 EngineUsageStats.store_engine_usage( 157 acon, manage_files.__name__, collect_engine_usage, spark_confs 158 ) 159 FileManagerFactory.execute_function(configs=acon)
Manipulate s3 files using File Manager algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
162def execute_sensor( 163 acon_path: Optional[str] = None, 164 acon: Optional[dict] = None, 165 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 166 spark_confs: dict = None, 167) -> bool: 168 """Execute a sensor based on a Sensor Algorithm Configuration. 169 170 A sensor is useful to check if an upstream system has new data. 171 172 Args: 173 acon_path: path of the acon (algorithm configuration) file. 174 acon: acon provided directly through python code (e.g., notebooks 175 or other apps). 176 collect_engine_usage: Lakehouse usage statistics collection strategy. 177 spark_confs: optional dictionary with the spark confs to be used when collecting 178 the engine usage. 179 """ 180 acon = ConfigUtils.get_acon(acon_path, acon) 181 ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None)) 182 EngineUsageStats.store_engine_usage( 183 acon, execute_sensor.__name__, collect_engine_usage, spark_confs 184 ) 185 return Sensor(acon).execute()
Execute a sensor based on a Sensor Algorithm Configuration.
A sensor is useful to check if an upstream system has new data.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
188def update_sensor_status( 189 sensor_id: str, 190 control_db_table_name: str, 191 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 192 assets: List[str] = None, 193) -> None: 194 """Update internal sensor status. 195 196 Update the sensor status in the control table, 197 it should be used to tell the system 198 that the sensor has processed all new data that was previously identified, 199 hence updating the shifted sensor status. 200 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 201 `SensorStatus.PROCESSED_NEW_DATA`, 202 but there might be scenarios - still to identify - 203 where we can update the sensor status from/to different statuses. 204 205 Args: 206 sensor_id: sensor id. 207 control_db_table_name: `db.table` to store sensor checkpoints. 208 status: status of the sensor. 209 assets: a list of assets that are considered as available to 210 consume downstream after this sensor has status 211 PROCESSED_NEW_DATA. 212 """ 213 ExecEnv.get_or_create(app_name="update_sensor_status") 214 SensorTerminator.update_sensor_status( 215 sensor_id=sensor_id, 216 control_db_table_name=control_db_table_name, 217 status=status, 218 assets=assets, 219 )
Update internal sensor status.
Update the sensor status in the control table,
it should be used to tell the system
that the sensor has processed all new data that was previously identified,
hence updating the shifted sensor status.
Usually used to move from SensorStatus.ACQUIRED_NEW_DATA
to
SensorStatus.PROCESSED_NEW_DATA
,
but there might be scenarios - still to identify -
where we can update the sensor status from/to different statuses.
Arguments:
- sensor_id: sensor id.
- control_db_table_name:
db.table
to store sensor checkpoints. - status: status of the sensor.
- assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
222def generate_sensor_query( 223 sensor_id: str, 224 filter_exp: str = None, 225 control_db_table_name: str = None, 226 upstream_key: str = None, 227 upstream_value: str = None, 228 upstream_table_name: str = None, 229) -> str: 230 """Generates a preprocess query to be used in a sensor configuration. 231 232 Args: 233 sensor_id: sensor id. 234 filter_exp: expression to filter incoming new data. 235 You can use the placeholder ?default_upstream_key and 236 ?default_upstream_value, so that it can be replaced by the 237 respective values in the control_db_table_name for this specific 238 sensor_id. 239 control_db_table_name: `db.table` to retrieve the last status change 240 timestamp. This is only relevant for the jdbc sensor. 241 upstream_key: the key of custom sensor information to control how to 242 identify new data from the upstream (e.g., a time column in the 243 upstream). 244 upstream_value: the upstream value 245 to identify new data from the upstream (e.g., the value of a time 246 present in the upstream). 247 upstream_table_name: value for custom sensor 248 to query new data from the upstream 249 If none we will set the default value, 250 our `sensor_new_data` view. 251 252 Return: 253 The query string. 254 """ 255 ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query") 256 if filter_exp: 257 return SensorUpstreamManager.generate_filter_exp_query( 258 sensor_id=sensor_id, 259 filter_exp=filter_exp, 260 control_db_table_name=control_db_table_name, 261 upstream_key=upstream_key, 262 upstream_value=upstream_value, 263 upstream_table_name=upstream_table_name, 264 ) 265 else: 266 return SensorUpstreamManager.generate_sensor_table_preprocess_query( 267 sensor_id=sensor_id 268 )
Generates a preprocess query to be used in a sensor configuration.
Arguments:
- sensor_id: sensor id.
- filter_exp: expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id.
- control_db_table_name:
db.table
to retrieve the last status change timestamp. This is only relevant for the jdbc sensor. - upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
- upstream_value: the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream).
- upstream_table_name: value for custom sensor
to query new data from the upstream
If none we will set the default value,
our
sensor_new_data
view.
Return:
The query string.
271def generate_sensor_sap_logchain_query( 272 chain_id: str, 273 dbtable: str = SAPLogchain.DBTABLE.value, 274 status: str = SAPLogchain.GREEN_STATUS.value, 275 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 276) -> str: 277 """Generates a sensor query based in the SAP Logchain table. 278 279 Args: 280 chain_id: chain id to query the status on SAP. 281 dbtable: `db.table` to retrieve the data to 282 check if the sap chain is already finished. 283 status: `db.table` to retrieve the last status change 284 timestamp. 285 engine_table_name: table name exposed with the SAP LOGCHAIN data. 286 This table will be used in the jdbc query. 287 288 Return: 289 The query string. 290 """ 291 ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query") 292 return SensorUpstreamManager.generate_sensor_sap_logchain_query( 293 chain_id=chain_id, 294 dbtable=dbtable, 295 status=status, 296 engine_table_name=engine_table_name, 297 )
Generates a sensor query based in the SAP Logchain table.
Arguments:
- chain_id: chain id to query the status on SAP.
- dbtable:
db.table
to retrieve the data to check if the sap chain is already finished. - status:
db.table
to retrieve the last status change timestamp. - engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:
The query string.
300def send_notification(args: dict) -> None: 301 """Send a notification using a notifier. 302 303 Args: 304 args: arguments for the notifier. 305 """ 306 notifier = NotifierFactory.get_notifier( 307 spec=TerminatorSpec(function="notify", args=args) 308 ) 309 310 notifier.create_notification() 311 notifier.send_notification()
Send a notification using a notifier.
Arguments:
- args: arguments for the notifier.
314def build_data_docs( 315 store_backend: str = DQDefaults.STORE_BACKEND.value, 316 local_fs_root_dir: str = None, 317 data_docs_local_fs: str = None, 318 data_docs_prefix: str = DQDefaults.DATA_DOCS_PREFIX.value, 319 bucket: str = None, 320 data_docs_bucket: str = None, 321 expectations_store_prefix: str = DQDefaults.EXPECTATIONS_STORE_PREFIX.value, 322 validations_store_prefix: str = DQDefaults.VALIDATIONS_STORE_PREFIX.value, 323 checkpoint_store_prefix: str = DQDefaults.CHECKPOINT_STORE_PREFIX.value, 324) -> None: 325 """Build Data Docs for the project. 326 327 This function does a full build of data docs based on all the great expectations 328 checkpoints in the specified location, getting all history of run/validations 329 executed and results. 330 331 Args: 332 store_backend: which store_backend to use (e.g. s3 or file_system). 333 local_fs_root_dir: path of the root directory. Note: only applicable 334 for store_backend file_system 335 data_docs_local_fs: path of the root directory. Note: only applicable 336 for store_backend file_system. 337 data_docs_prefix: prefix where to store data_docs' data. 338 bucket: the bucket name to consider for the store_backend 339 (store DQ artefacts). Note: only applicable for store_backend s3. 340 data_docs_bucket: the bucket name for data docs only. When defined, 341 it will supersede bucket parameter. 342 Note: only applicable for store_backend s3. 343 expectations_store_prefix: prefix where to store expectations' data. 344 Note: only applicable for store_backend s3. 345 validations_store_prefix: prefix where to store validations' data. 346 Note: only applicable for store_backend s3. 347 checkpoint_store_prefix: prefix where to store checkpoints' data. 348 Note: only applicable for store_backend s3. 349 """ 350 from lakehouse_engine.dq_processors.dq_factory import DQFactory 351 352 DQFactory.build_data_docs( 353 store_backend=store_backend, 354 local_fs_root_dir=local_fs_root_dir, 355 data_docs_local_fs=data_docs_local_fs, 356 data_docs_prefix=data_docs_prefix, 357 bucket=bucket, 358 data_docs_bucket=data_docs_bucket, 359 expectations_store_prefix=expectations_store_prefix, 360 validations_store_prefix=validations_store_prefix, 361 checkpoint_store_prefix=checkpoint_store_prefix, 362 )
Build Data Docs for the project.
This function does a full build of data docs based on all the great expectations checkpoints in the specified location, getting all history of run/validations executed and results.
Arguments:
- store_backend: which store_backend to use (e.g. s3 or file_system).
- local_fs_root_dir: path of the root directory. Note: only applicable for store_backend file_system
- data_docs_local_fs: path of the root directory. Note: only applicable for store_backend file_system.
- data_docs_prefix: prefix where to store data_docs' data.
- bucket: the bucket name to consider for the store_backend (store DQ artefacts). Note: only applicable for store_backend s3.
- data_docs_bucket: the bucket name for data docs only. When defined, it will supersede bucket parameter. Note: only applicable for store_backend s3.
- expectations_store_prefix: prefix where to store expectations' data. Note: only applicable for store_backend s3.
- validations_store_prefix: prefix where to store validations' data. Note: only applicable for store_backend s3.
- checkpoint_store_prefix: prefix where to store checkpoints' data. Note: only applicable for store_backend s3.
365def execute_gab( 366 acon_path: Optional[str] = None, 367 acon: Optional[dict] = None, 368 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 369 spark_confs: dict = None, 370) -> None: 371 """Execute the gold asset builder based on a GAB Algorithm Configuration. 372 373 GaB is useful to build your gold assets with predefined functions for recurrent 374 periods. 375 376 Args: 377 acon_path: path of the acon (algorithm configuration) file. 378 acon: acon provided directly through python code (e.g., notebooks 379 or other apps). 380 collect_engine_usage: Lakehouse usage statistics collection strategy. 381 spark_confs: optional dictionary with the spark confs to be used when collecting 382 the engine usage. 383 """ 384 acon = ConfigUtils.get_acon(acon_path, acon) 385 ExecEnv.get_or_create(app_name="execute_gab", config=acon.get("exec_env", None)) 386 EngineUsageStats.store_engine_usage( 387 acon, execute_gab.__name__, collect_engine_usage, spark_confs 388 ) 389 GAB(acon).execute()
Execute the gold asset builder based on a GAB Algorithm Configuration.
GaB is useful to build your gold assets with predefined functions for recurrent periods.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.