lakehouse_engine.engine
Contract of the lakehouse engine with all the available functions to be executed.
1"""Contract of the lakehouse engine with all the available functions to be executed.""" 2from typing import List, Optional, OrderedDict 3 4from lakehouse_engine.algorithms.data_loader import DataLoader 5from lakehouse_engine.algorithms.dq_validator import DQValidator 6from lakehouse_engine.algorithms.reconciliator import Reconciliator 7from lakehouse_engine.algorithms.sensor import Sensor, SensorStatus 8from lakehouse_engine.core.definitions import ( 9 CollectEngineUsage, 10 SAPLogchain, 11 TerminatorSpec, 12) 13from lakehouse_engine.core.exec_env import ExecEnv 14from lakehouse_engine.core.file_manager import FileManagerFactory 15from lakehouse_engine.core.sensor_manager import SensorUpstreamManager 16from lakehouse_engine.core.table_manager import TableManager 17from lakehouse_engine.terminators.notifier_factory import NotifierFactory 18from lakehouse_engine.terminators.sensor_terminator import SensorTerminator 19from lakehouse_engine.utils.configs.config_utils import ConfigUtils 20from lakehouse_engine.utils.engine_usage_stats import EngineUsageStats 21 22 23def load_data( 24 acon_path: Optional[str] = None, 25 acon: Optional[dict] = None, 26 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 27 spark_confs: dict = None, 28) -> Optional[OrderedDict]: 29 """Load data using the DataLoader algorithm. 30 31 Args: 32 acon_path: path of the acon (algorithm configuration) file. 33 acon: acon provided directly through python code (e.g., notebooks or other 34 apps). 35 collect_engine_usage: Lakehouse usage statistics collection strategy. 36 spark_confs: optional dictionary with the spark confs to be used when collecting 37 the engine usage. 38 """ 39 acon = ConfigUtils.get_acon(acon_path, acon) 40 ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None)) 41 EngineUsageStats.store_engine_usage( 42 acon, load_data.__name__, collect_engine_usage, spark_confs 43 ) 44 return DataLoader(acon).execute() 45 46 47def execute_reconciliation( 48 acon_path: Optional[str] = None, 49 acon: Optional[dict] = None, 50 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 51 spark_confs: dict = None, 52) -> None: 53 """Execute the Reconciliator algorithm. 54 55 Args: 56 acon_path: path of the acon (algorithm configuration) file. 57 acon: acon provided directly through python code (e.g., notebooks or other 58 apps). 59 collect_engine_usage: Lakehouse usage statistics collection strategy. 60 spark_confs: optional dictionary with the spark confs to be used when collecting 61 the engine usage. 62 """ 63 acon = ConfigUtils.get_acon(acon_path, acon) 64 ExecEnv.get_or_create(app_name="reconciliator", config=acon.get("exec_env", None)) 65 EngineUsageStats.store_engine_usage( 66 acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs 67 ) 68 Reconciliator(acon).execute() 69 70 71def execute_dq_validation( 72 acon_path: Optional[str] = None, 73 acon: Optional[dict] = None, 74 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 75 spark_confs: dict = None, 76) -> None: 77 """Execute the DQValidator algorithm. 78 79 Args: 80 acon_path: path of the acon (algorithm configuration) file. 81 acon: acon provided directly through python code (e.g., notebooks or other 82 apps). 83 collect_engine_usage: Lakehouse usage statistics collection strategy. 84 spark_confs: optional dictionary with the spark confs to be used when collecting 85 the engine usage. 86 """ 87 acon = ConfigUtils.get_acon(acon_path, acon) 88 ExecEnv.get_or_create(app_name="dq_validator", config=acon.get("exec_env", None)) 89 EngineUsageStats.store_engine_usage( 90 acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs 91 ) 92 DQValidator(acon).execute() 93 94 95def manage_table( 96 acon_path: Optional[str] = None, 97 acon: Optional[dict] = None, 98 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 99 spark_confs: dict = None, 100) -> None: 101 """Manipulate tables/views using Table Manager algorithm. 102 103 Args: 104 acon_path: path of the acon (algorithm configuration) file. 105 acon: acon provided directly through python code (e.g., notebooks 106 or other apps). 107 collect_engine_usage: Lakehouse usage statistics collection strategy. 108 spark_confs: optional dictionary with the spark confs to be used when collecting 109 the engine usage. 110 """ 111 acon = ConfigUtils.get_acon(acon_path, acon) 112 ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None)) 113 EngineUsageStats.store_engine_usage( 114 acon, manage_table.__name__, collect_engine_usage, spark_confs 115 ) 116 TableManager(acon).get_function() 117 118 119def manage_files( 120 acon_path: Optional[str] = None, 121 acon: Optional[dict] = None, 122 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 123 spark_confs: dict = None, 124) -> None: 125 """Manipulate s3 files using File Manager algorithm. 126 127 Args: 128 acon_path: path of the acon (algorithm configuration) file. 129 acon: acon provided directly through python code (e.g., notebooks 130 or other apps). 131 collect_engine_usage: Lakehouse usage statistics collection strategy. 132 spark_confs: optional dictionary with the spark confs to be used when collecting 133 the engine usage. 134 """ 135 acon = ConfigUtils.get_acon(acon_path, acon) 136 ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None)) 137 EngineUsageStats.store_engine_usage( 138 acon, manage_files.__name__, collect_engine_usage, spark_confs 139 ) 140 FileManagerFactory.execute_function(configs=acon) 141 142 143def execute_sensor( 144 acon_path: Optional[str] = None, 145 acon: Optional[dict] = None, 146 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 147 spark_confs: dict = None, 148) -> bool: 149 """Execute a sensor based on a Sensor Algorithm Configuration. 150 151 A sensor is useful to check if an upstream system has new data. 152 153 Args: 154 acon_path: path of the acon (algorithm configuration) file. 155 acon: acon provided directly through python code (e.g., notebooks 156 or other apps). 157 collect_engine_usage: Lakehouse usage statistics collection strategy. 158 spark_confs: optional dictionary with the spark confs to be used when collecting 159 the engine usage. 160 """ 161 acon = ConfigUtils.get_acon(acon_path, acon) 162 ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None)) 163 EngineUsageStats.store_engine_usage( 164 acon, execute_sensor.__name__, collect_engine_usage, spark_confs 165 ) 166 return Sensor(acon).execute() 167 168 169def update_sensor_status( 170 sensor_id: str, 171 control_db_table_name: str, 172 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 173 assets: List[str] = None, 174) -> None: 175 """Update internal sensor status. 176 177 Update the sensor status in the control table, 178 it should be used to tell the system 179 that the sensor has processed all new data that was previously identified, 180 hence updating the shifted sensor status. 181 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 182 `SensorStatus.PROCESSED_NEW_DATA`, 183 but there might be scenarios - still to identify - 184 where we can update the sensor status from/to different statuses. 185 186 Args: 187 sensor_id: sensor id. 188 control_db_table_name: `db.table` to store sensor checkpoints. 189 status: status of the sensor. 190 assets: a list of assets that are considered as available to 191 consume downstream after this sensor has status 192 PROCESSED_NEW_DATA. 193 """ 194 ExecEnv.get_or_create(app_name="update_sensor_status") 195 SensorTerminator.update_sensor_status( 196 sensor_id=sensor_id, 197 control_db_table_name=control_db_table_name, 198 status=status, 199 assets=assets, 200 ) 201 202 203def generate_sensor_query( 204 sensor_id: str, 205 filter_exp: str = None, 206 control_db_table_name: str = None, 207 upstream_key: str = None, 208 upstream_value: str = None, 209 upstream_table_name: str = None, 210) -> str: 211 """Generates a preprocess query to be used in a sensor configuration. 212 213 Args: 214 sensor_id: sensor id. 215 filter_exp: expression to filter incoming new data. 216 You can use the placeholder ?default_upstream_key and 217 ?default_upstream_value, so that it can be replaced by the 218 respective values in the control_db_table_name for this specific 219 sensor_id. 220 control_db_table_name: `db.table` to retrieve the last status change 221 timestamp. This is only relevant for the jdbc sensor. 222 upstream_key: the key of custom sensor information to control how to 223 identify new data from the upstream (e.g., a time column in the 224 upstream). 225 upstream_value: the upstream value 226 to identify new data from the upstream (e.g., the value of a time 227 present in the upstream). 228 upstream_table_name: value for custom sensor 229 to query new data from the upstream 230 If none we will set the default value, 231 our `sensor_new_data` view. 232 233 Return: 234 The query string. 235 """ 236 ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query") 237 if filter_exp: 238 return SensorUpstreamManager.generate_filter_exp_query( 239 sensor_id=sensor_id, 240 filter_exp=filter_exp, 241 control_db_table_name=control_db_table_name, 242 upstream_key=upstream_key, 243 upstream_value=upstream_value, 244 upstream_table_name=upstream_table_name, 245 ) 246 else: 247 return SensorUpstreamManager.generate_sensor_table_preprocess_query( 248 sensor_id=sensor_id 249 ) 250 251 252def generate_sensor_sap_logchain_query( 253 chain_id: str, 254 dbtable: str = SAPLogchain.DBTABLE.value, 255 status: str = SAPLogchain.GREEN_STATUS.value, 256 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 257) -> str: 258 """Generates a sensor query based in the SAP Logchain table. 259 260 Args: 261 chain_id: chain id to query the status on SAP. 262 dbtable: `db.table` to retrieve the data to 263 check if the sap chain is already finished. 264 status: `db.table` to retrieve the last status change 265 timestamp. 266 engine_table_name: table name exposed with the SAP LOGCHAIN data. 267 This table will be used in the jdbc query. 268 269 Return: 270 The query string. 271 """ 272 ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query") 273 return SensorUpstreamManager.generate_sensor_sap_logchain_query( 274 chain_id=chain_id, 275 dbtable=dbtable, 276 status=status, 277 engine_table_name=engine_table_name, 278 ) 279 280 281def send_notification(args: dict) -> None: 282 """Send a notification using a notifier. 283 284 Args: 285 args: arguments for the notifier. 286 """ 287 notifier = NotifierFactory.get_notifier( 288 spec=TerminatorSpec(function="notify", args=args) 289 ) 290 291 notifier.create_notification() 292 notifier.send_notification()
24def load_data( 25 acon_path: Optional[str] = None, 26 acon: Optional[dict] = None, 27 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 28 spark_confs: dict = None, 29) -> Optional[OrderedDict]: 30 """Load data using the DataLoader algorithm. 31 32 Args: 33 acon_path: path of the acon (algorithm configuration) file. 34 acon: acon provided directly through python code (e.g., notebooks or other 35 apps). 36 collect_engine_usage: Lakehouse usage statistics collection strategy. 37 spark_confs: optional dictionary with the spark confs to be used when collecting 38 the engine usage. 39 """ 40 acon = ConfigUtils.get_acon(acon_path, acon) 41 ExecEnv.get_or_create(app_name="data_loader", config=acon.get("exec_env", None)) 42 EngineUsageStats.store_engine_usage( 43 acon, load_data.__name__, collect_engine_usage, spark_confs 44 ) 45 return DataLoader(acon).execute()
Load data using the DataLoader algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
48def execute_reconciliation( 49 acon_path: Optional[str] = None, 50 acon: Optional[dict] = None, 51 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 52 spark_confs: dict = None, 53) -> None: 54 """Execute the Reconciliator algorithm. 55 56 Args: 57 acon_path: path of the acon (algorithm configuration) file. 58 acon: acon provided directly through python code (e.g., notebooks or other 59 apps). 60 collect_engine_usage: Lakehouse usage statistics collection strategy. 61 spark_confs: optional dictionary with the spark confs to be used when collecting 62 the engine usage. 63 """ 64 acon = ConfigUtils.get_acon(acon_path, acon) 65 ExecEnv.get_or_create(app_name="reconciliator", config=acon.get("exec_env", None)) 66 EngineUsageStats.store_engine_usage( 67 acon, execute_reconciliation.__name__, collect_engine_usage, spark_confs 68 ) 69 Reconciliator(acon).execute()
Execute the Reconciliator algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
72def execute_dq_validation( 73 acon_path: Optional[str] = None, 74 acon: Optional[dict] = None, 75 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 76 spark_confs: dict = None, 77) -> None: 78 """Execute the DQValidator algorithm. 79 80 Args: 81 acon_path: path of the acon (algorithm configuration) file. 82 acon: acon provided directly through python code (e.g., notebooks or other 83 apps). 84 collect_engine_usage: Lakehouse usage statistics collection strategy. 85 spark_confs: optional dictionary with the spark confs to be used when collecting 86 the engine usage. 87 """ 88 acon = ConfigUtils.get_acon(acon_path, acon) 89 ExecEnv.get_or_create(app_name="dq_validator", config=acon.get("exec_env", None)) 90 EngineUsageStats.store_engine_usage( 91 acon, execute_dq_validation.__name__, collect_engine_usage, spark_confs 92 ) 93 DQValidator(acon).execute()
Execute the DQValidator algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
96def manage_table( 97 acon_path: Optional[str] = None, 98 acon: Optional[dict] = None, 99 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 100 spark_confs: dict = None, 101) -> None: 102 """Manipulate tables/views using Table Manager algorithm. 103 104 Args: 105 acon_path: path of the acon (algorithm configuration) file. 106 acon: acon provided directly through python code (e.g., notebooks 107 or other apps). 108 collect_engine_usage: Lakehouse usage statistics collection strategy. 109 spark_confs: optional dictionary with the spark confs to be used when collecting 110 the engine usage. 111 """ 112 acon = ConfigUtils.get_acon(acon_path, acon) 113 ExecEnv.get_or_create(app_name="manage_table", config=acon.get("exec_env", None)) 114 EngineUsageStats.store_engine_usage( 115 acon, manage_table.__name__, collect_engine_usage, spark_confs 116 ) 117 TableManager(acon).get_function()
Manipulate tables/views using Table Manager algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
120def manage_files( 121 acon_path: Optional[str] = None, 122 acon: Optional[dict] = None, 123 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 124 spark_confs: dict = None, 125) -> None: 126 """Manipulate s3 files using File Manager algorithm. 127 128 Args: 129 acon_path: path of the acon (algorithm configuration) file. 130 acon: acon provided directly through python code (e.g., notebooks 131 or other apps). 132 collect_engine_usage: Lakehouse usage statistics collection strategy. 133 spark_confs: optional dictionary with the spark confs to be used when collecting 134 the engine usage. 135 """ 136 acon = ConfigUtils.get_acon(acon_path, acon) 137 ExecEnv.get_or_create(app_name="manage_files", config=acon.get("exec_env", None)) 138 EngineUsageStats.store_engine_usage( 139 acon, manage_files.__name__, collect_engine_usage, spark_confs 140 ) 141 FileManagerFactory.execute_function(configs=acon)
Manipulate s3 files using File Manager algorithm.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
144def execute_sensor( 145 acon_path: Optional[str] = None, 146 acon: Optional[dict] = None, 147 collect_engine_usage: str = CollectEngineUsage.PROD_ONLY.value, 148 spark_confs: dict = None, 149) -> bool: 150 """Execute a sensor based on a Sensor Algorithm Configuration. 151 152 A sensor is useful to check if an upstream system has new data. 153 154 Args: 155 acon_path: path of the acon (algorithm configuration) file. 156 acon: acon provided directly through python code (e.g., notebooks 157 or other apps). 158 collect_engine_usage: Lakehouse usage statistics collection strategy. 159 spark_confs: optional dictionary with the spark confs to be used when collecting 160 the engine usage. 161 """ 162 acon = ConfigUtils.get_acon(acon_path, acon) 163 ExecEnv.get_or_create(app_name="execute_sensor", config=acon.get("exec_env", None)) 164 EngineUsageStats.store_engine_usage( 165 acon, execute_sensor.__name__, collect_engine_usage, spark_confs 166 ) 167 return Sensor(acon).execute()
Execute a sensor based on a Sensor Algorithm Configuration.
A sensor is useful to check if an upstream system has new data.
Arguments:
- acon_path: path of the acon (algorithm configuration) file.
- acon: acon provided directly through python code (e.g., notebooks or other apps).
- collect_engine_usage: Lakehouse usage statistics collection strategy.
- spark_confs: optional dictionary with the spark confs to be used when collecting the engine usage.
170def update_sensor_status( 171 sensor_id: str, 172 control_db_table_name: str, 173 status: str = SensorStatus.PROCESSED_NEW_DATA.value, 174 assets: List[str] = None, 175) -> None: 176 """Update internal sensor status. 177 178 Update the sensor status in the control table, 179 it should be used to tell the system 180 that the sensor has processed all new data that was previously identified, 181 hence updating the shifted sensor status. 182 Usually used to move from `SensorStatus.ACQUIRED_NEW_DATA` to 183 `SensorStatus.PROCESSED_NEW_DATA`, 184 but there might be scenarios - still to identify - 185 where we can update the sensor status from/to different statuses. 186 187 Args: 188 sensor_id: sensor id. 189 control_db_table_name: `db.table` to store sensor checkpoints. 190 status: status of the sensor. 191 assets: a list of assets that are considered as available to 192 consume downstream after this sensor has status 193 PROCESSED_NEW_DATA. 194 """ 195 ExecEnv.get_or_create(app_name="update_sensor_status") 196 SensorTerminator.update_sensor_status( 197 sensor_id=sensor_id, 198 control_db_table_name=control_db_table_name, 199 status=status, 200 assets=assets, 201 )
Update internal sensor status.
Update the sensor status in the control table,
it should be used to tell the system
that the sensor has processed all new data that was previously identified,
hence updating the shifted sensor status.
Usually used to move from SensorStatus.ACQUIRED_NEW_DATA
to
SensorStatus.PROCESSED_NEW_DATA
,
but there might be scenarios - still to identify -
where we can update the sensor status from/to different statuses.
Arguments:
- sensor_id: sensor id.
- control_db_table_name:
db.table
to store sensor checkpoints. - status: status of the sensor.
- assets: a list of assets that are considered as available to consume downstream after this sensor has status PROCESSED_NEW_DATA.
204def generate_sensor_query( 205 sensor_id: str, 206 filter_exp: str = None, 207 control_db_table_name: str = None, 208 upstream_key: str = None, 209 upstream_value: str = None, 210 upstream_table_name: str = None, 211) -> str: 212 """Generates a preprocess query to be used in a sensor configuration. 213 214 Args: 215 sensor_id: sensor id. 216 filter_exp: expression to filter incoming new data. 217 You can use the placeholder ?default_upstream_key and 218 ?default_upstream_value, so that it can be replaced by the 219 respective values in the control_db_table_name for this specific 220 sensor_id. 221 control_db_table_name: `db.table` to retrieve the last status change 222 timestamp. This is only relevant for the jdbc sensor. 223 upstream_key: the key of custom sensor information to control how to 224 identify new data from the upstream (e.g., a time column in the 225 upstream). 226 upstream_value: the upstream value 227 to identify new data from the upstream (e.g., the value of a time 228 present in the upstream). 229 upstream_table_name: value for custom sensor 230 to query new data from the upstream 231 If none we will set the default value, 232 our `sensor_new_data` view. 233 234 Return: 235 The query string. 236 """ 237 ExecEnv.get_or_create(app_name="generate_sensor_preprocess_query") 238 if filter_exp: 239 return SensorUpstreamManager.generate_filter_exp_query( 240 sensor_id=sensor_id, 241 filter_exp=filter_exp, 242 control_db_table_name=control_db_table_name, 243 upstream_key=upstream_key, 244 upstream_value=upstream_value, 245 upstream_table_name=upstream_table_name, 246 ) 247 else: 248 return SensorUpstreamManager.generate_sensor_table_preprocess_query( 249 sensor_id=sensor_id 250 )
Generates a preprocess query to be used in a sensor configuration.
Arguments:
- sensor_id: sensor id.
- filter_exp: expression to filter incoming new data. You can use the placeholder ?default_upstream_key and ?default_upstream_value, so that it can be replaced by the respective values in the control_db_table_name for this specific sensor_id.
- control_db_table_name:
db.table
to retrieve the last status change timestamp. This is only relevant for the jdbc sensor. - upstream_key: the key of custom sensor information to control how to identify new data from the upstream (e.g., a time column in the upstream).
- upstream_value: the upstream value to identify new data from the upstream (e.g., the value of a time present in the upstream).
- upstream_table_name: value for custom sensor
to query new data from the upstream
If none we will set the default value,
our
sensor_new_data
view.
Return:
The query string.
253def generate_sensor_sap_logchain_query( 254 chain_id: str, 255 dbtable: str = SAPLogchain.DBTABLE.value, 256 status: str = SAPLogchain.GREEN_STATUS.value, 257 engine_table_name: str = SAPLogchain.ENGINE_TABLE.value, 258) -> str: 259 """Generates a sensor query based in the SAP Logchain table. 260 261 Args: 262 chain_id: chain id to query the status on SAP. 263 dbtable: `db.table` to retrieve the data to 264 check if the sap chain is already finished. 265 status: `db.table` to retrieve the last status change 266 timestamp. 267 engine_table_name: table name exposed with the SAP LOGCHAIN data. 268 This table will be used in the jdbc query. 269 270 Return: 271 The query string. 272 """ 273 ExecEnv.get_or_create(app_name="generate_sensor_sap_logchain_query") 274 return SensorUpstreamManager.generate_sensor_sap_logchain_query( 275 chain_id=chain_id, 276 dbtable=dbtable, 277 status=status, 278 engine_table_name=engine_table_name, 279 )
Generates a sensor query based in the SAP Logchain table.
Arguments:
- chain_id: chain id to query the status on SAP.
- dbtable:
db.table
to retrieve the data to check if the sap chain is already finished. - status:
db.table
to retrieve the last status change timestamp. - engine_table_name: table name exposed with the SAP LOGCHAIN data. This table will be used in the jdbc query.
Return:
The query string.
282def send_notification(args: dict) -> None: 283 """Send a notification using a notifier. 284 285 Args: 286 args: arguments for the notifier. 287 """ 288 notifier = NotifierFactory.get_notifier( 289 spec=TerminatorSpec(function="notify", args=args) 290 ) 291 292 notifier.create_notification() 293 notifier.send_notification()
Send a notification using a notifier.
Arguments:
- args: arguments for the notifier.