lakehouse_engine.algorithms.data_loader
Module to define DataLoader class.
1"""Module to define DataLoader class.""" 2 3from collections import OrderedDict 4from copy import deepcopy 5from logging import Logger 6from typing import List, Optional 7 8from lakehouse_engine.algorithms.algorithm import Algorithm 9from lakehouse_engine.core.definitions import ( 10 DQFunctionSpec, 11 DQSpec, 12 DQType, 13 InputSpec, 14 MergeOptions, 15 OutputFormat, 16 OutputSpec, 17 ReadType, 18 TerminatorSpec, 19 TransformerSpec, 20 TransformSpec, 21) 22from lakehouse_engine.io.reader_factory import ReaderFactory 23from lakehouse_engine.io.writer_factory import WriterFactory 24from lakehouse_engine.terminators.notifier_factory import NotifierFactory 25from lakehouse_engine.terminators.terminator_factory import TerminatorFactory 26from lakehouse_engine.transformers.transformer_factory import TransformerFactory 27from lakehouse_engine.utils.logging_handler import LoggingHandler 28 29 30class DataLoader(Algorithm): 31 """Load data using an algorithm configuration (ACON represented as dict). 32 33 This algorithm focuses on the cases where users will be specifying all the algorithm 34 steps and configurations through a dict based configuration, which we name ACON 35 in our framework. 36 37 Since an ACON is a dict you can pass a custom transformer through a python function 38 and, therefore, the DataLoader can also be used to load data with custom 39 transformations not provided in our transformers package. 40 41 As the algorithm base class of the lakehouse-engine framework is based on the 42 concept of ACON, this DataLoader algorithm simply inherits from Algorithm, 43 without overriding anything. We designed the codebase like this to avoid 44 instantiating the Algorithm class directly, which was always meant to be an 45 abstraction for any specific algorithm included in the lakehouse-engine framework. 46 """ 47 48 def __init__(self, acon: dict): 49 """Construct DataLoader algorithm instances. 50 51 A data loader needs several specifications to work properly, 52 but some of them might be optional. The available specifications are: 53 54 - input specifications (mandatory): specify how to read data. 55 - transform specifications (optional): specify how to transform data. 56 - data quality specifications (optional): specify how to execute the data 57 quality process. 58 - output specifications (mandatory): specify how to write data to the 59 target. 60 - terminate specifications (optional): specify what to do after writing into 61 the target (e.g., optimizing target table, vacuum, compute stats, etc). 62 63 Args: 64 acon: algorithm configuration. 65 """ 66 self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger() 67 super().__init__(acon) 68 self.input_specs: List[InputSpec] = self._get_input_specs() 69 # the streaming transformers plan is needed to future change the 70 # execution specification to accommodate streaming mode limitations in invoking 71 # certain functions (e.g., sort, window, generate row ids/auto increments, ...). 72 self._streaming_micro_batch_transformers_plan: dict = {} 73 self.transform_specs: List[TransformSpec] = self._get_transform_specs() 74 # our data quality process is not compatible with streaming mode, hence we 75 # have to run it in micro batches, similar to what happens to certain 76 # transformation functions not supported in streaming mode. 77 self._streaming_micro_batch_dq_plan: dict = {} 78 self.dq_specs: List[DQSpec] = self._get_dq_specs() 79 self.output_specs: List[OutputSpec] = self._get_output_specs() 80 self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs() 81 82 def read(self) -> OrderedDict: 83 """Read data from an input location into a distributed dataframe. 84 85 Returns: 86 An ordered dict with all the dataframes that were read. 87 """ 88 read_dfs: OrderedDict = OrderedDict({}) 89 for spec in self.input_specs: 90 self._logger.info(f"Found input specification: {spec}") 91 read_dfs[spec.spec_id] = ReaderFactory.get_data(spec) 92 return read_dfs 93 94 def transform(self, data: OrderedDict) -> OrderedDict: 95 """Transform (optionally) the data that was read. 96 97 If there isn't a transformation specification this step will be skipped, and the 98 original dataframes that were read will be returned. 99 Transformations can have dependency from another transformation result, however 100 we need to keep in mind if we are using streaming source and for some reason we 101 need to enable micro batch processing, this result cannot be used as input to 102 another transformation. Micro batch processing in pyspark streaming is only 103 available in .write(), which means this transformation with micro batch needs 104 to be the end of the process. 105 106 Args: 107 data: input dataframes in an ordered dict. 108 109 Returns: 110 Another ordered dict with the transformed dataframes, according to the 111 transformation specification. 112 """ 113 if not self.transform_specs: 114 return data 115 else: 116 transformed_dfs = OrderedDict(data) 117 for spec in self.transform_specs: 118 self._logger.info(f"Found transform specification: {spec}") 119 transformed_df = transformed_dfs[spec.input_id] 120 for transformer in spec.transformers: 121 transformed_df = transformed_df.transform( 122 TransformerFactory.get_transformer(transformer, transformed_dfs) 123 ) 124 transformed_dfs[spec.spec_id] = transformed_df 125 return transformed_dfs 126 127 def process_dq(self, data: OrderedDict) -> OrderedDict: 128 """Process the data quality tasks for the data that was read and/or transformed. 129 130 It supports multiple input dataframes. Although just one is advisable. 131 132 It is possible to use data quality validators/expectations that will validate 133 your data and fail the process in case the expectations are not met. The DQ 134 process also generates and keeps updating a site containing the results of the 135 expectations that were done on your data. The location of the site is 136 configurable and can either be on file system or S3. If you define it to be 137 stored on S3, you can even configure your S3 bucket to serve the site so that 138 people can easily check the quality of your data. Moreover, it is also 139 possible to store the result of the DQ process into a defined result sink. 140 141 Args: 142 data: dataframes from previous steps of the algorithm that we which to 143 run the DQ process on. 144 145 Returns: 146 Another ordered dict with the validated dataframes. 147 """ 148 if not self.dq_specs: 149 return data 150 else: 151 from lakehouse_engine.dq_processors.dq_factory import DQFactory 152 153 dq_processed_dfs = OrderedDict(data) 154 for spec in self.dq_specs: 155 df_processed_df = dq_processed_dfs[spec.input_id] 156 self._logger.info(f"Found data quality specification: {spec}") 157 if ( 158 spec.dq_type == DQType.PRISMA.value or spec.dq_functions 159 ) and spec.spec_id not in self._streaming_micro_batch_dq_plan: 160 if spec.cache_df: 161 df_processed_df.cache() 162 dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process( 163 spec, df_processed_df 164 ) 165 else: 166 dq_processed_dfs[spec.spec_id] = df_processed_df 167 return dq_processed_dfs 168 169 def write(self, data: OrderedDict) -> OrderedDict: 170 """Write the data that was read and transformed (if applicable). 171 172 It supports writing multiple datasets. However, we only recommend to write one 173 dataframe. This recommendation is based on easy debugging and reproducibility, 174 since if we start mixing several datasets being fueled by the same algorithm, it 175 would unleash an infinite sea of reproducibility issues plus tight coupling and 176 dependencies between datasets. Having said that, there may be cases where 177 writing multiple datasets is desirable according to the use case requirements. 178 Use it accordingly. 179 180 Args: 181 data: dataframes that were read and transformed (if applicable). 182 183 Returns: 184 Dataframes that were written. 185 """ 186 written_dfs: OrderedDict = OrderedDict({}) 187 for spec in self.output_specs: 188 self._logger.info(f"Found output specification: {spec}") 189 190 written_output = WriterFactory.get_writer( 191 spec, data[spec.input_id], data 192 ).write() 193 if written_output: 194 written_dfs.update(written_output) 195 else: 196 written_dfs[spec.spec_id] = data[spec.input_id] 197 198 return written_dfs 199 200 def terminate(self, data: OrderedDict) -> None: 201 """Terminate the algorithm. 202 203 Args: 204 data: dataframes that were written. 205 """ 206 if self.terminate_specs: 207 for spec in self.terminate_specs: 208 self._logger.info(f"Found terminate specification: {spec}") 209 TerminatorFactory.execute_terminator( 210 spec, data[spec.input_id] if spec.input_id else None 211 ) 212 213 def execute(self) -> Optional[OrderedDict]: 214 """Define the algorithm execution behaviour.""" 215 try: 216 self._logger.info("Starting read stage...") 217 read_dfs = self.read() 218 self._logger.info("Starting transform stage...") 219 transformed_dfs = self.transform(read_dfs) 220 self._logger.info("Starting data quality stage...") 221 validated_dfs = self.process_dq(transformed_dfs) 222 self._logger.info("Starting write stage...") 223 written_dfs = self.write(validated_dfs) 224 self._logger.info("Starting terminate stage...") 225 self.terminate(written_dfs) 226 self._logger.info("Execution of the algorithm has finished!") 227 except Exception as e: 228 NotifierFactory.generate_failure_notification(self.terminate_specs, e) 229 raise e 230 231 return written_dfs 232 233 def _get_input_specs(self) -> List[InputSpec]: 234 """Get the input specifications from an acon. 235 236 Returns: 237 List of input specifications. 238 """ 239 return [InputSpec(**spec) for spec in self.acon["input_specs"]] 240 241 def _get_transform_specs(self) -> List[TransformSpec]: 242 """Get the transformation specifications from an acon. 243 244 If we are executing the algorithm in streaming mode and if the 245 transformer function is not supported in streaming mode, it is 246 important to note that ONLY those unsupported operations will 247 go into the streaming_micro_batch_transformers (see if in the function code), 248 in the same order that they appear in the list of transformations. This means 249 that other supported transformations that appear after an 250 unsupported one continue to stay one the normal execution plan, 251 i.e., outside the foreachBatch function. Therefore, this may 252 make your algorithm to execute a different logic than the one you 253 originally intended. For this reason: 254 1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST; 255 2) USE force_streaming_foreach_batch_processing option in transform_spec 256 section. 257 3) USE THE CUSTOM_TRANSFORMATION AND WRITE ALL YOUR TRANSFORMATION LOGIC 258 THERE. 259 260 Check list of unsupported spark streaming operations here: 261 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations 262 263 Returns: 264 List of transformation specifications. 265 """ 266 input_read_types = self._get_input_read_types(self.acon["input_specs"]) 267 transform_input_ids = self._get_transform_input_ids( 268 self.acon.get("transform_specs", []) 269 ) 270 prev_spec_read_types = self._get_previous_spec_read_types( 271 input_read_types, transform_input_ids 272 ) 273 transform_specs = [] 274 for spec in self.acon.get("transform_specs", []): 275 transform_spec = TransformSpec( 276 spec_id=spec["spec_id"], 277 input_id=spec["input_id"], 278 transformers=[], 279 force_streaming_foreach_batch_processing=spec.get( 280 "force_streaming_foreach_batch_processing", False 281 ), 282 ) 283 284 for s in spec["transformers"]: 285 transformer_spec = TransformerSpec( 286 function=s["function"], args=s.get("args", {}) 287 ) 288 if ( 289 prev_spec_read_types[transform_spec.input_id] 290 == ReadType.STREAMING.value 291 and s["function"] 292 in TransformerFactory.UNSUPPORTED_STREAMING_TRANSFORMERS 293 ) or ( 294 prev_spec_read_types[transform_spec.input_id] 295 == ReadType.STREAMING.value 296 and transform_spec.force_streaming_foreach_batch_processing 297 ): 298 self._move_to_streaming_micro_batch_transformers( 299 transform_spec, transformer_spec 300 ) 301 else: 302 transform_spec.transformers.append(transformer_spec) 303 304 transform_specs.append(transform_spec) 305 306 return transform_specs 307 308 def _get_dq_specs(self) -> List[DQSpec]: 309 """Get list of data quality specification objects from acon. 310 311 In streaming mode, we automatically convert the data quality specification in 312 the streaming_micro_batch_dq_processors list for the respective output spec. 313 This is needed because our dq process cannot be executed using native streaming 314 functions. 315 316 Returns: 317 List of data quality spec objects. 318 """ 319 input_read_types = self._get_input_read_types(self.acon["input_specs"]) 320 transform_input_ids = self._get_transform_input_ids( 321 self.acon.get("transform_specs", []) 322 ) 323 prev_spec_read_types = self._get_previous_spec_read_types( 324 input_read_types, transform_input_ids 325 ) 326 327 dq_specs = [] 328 for spec in self.acon.get("dq_specs", []): 329 330 dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(spec) 331 332 if prev_spec_read_types[dq_spec.input_id] == ReadType.STREAMING.value: 333 # we need to use deepcopy to explicitly create a copy of the dict 334 # otherwise python only create binding for dicts, and we would be 335 # modifying the original dict, which we don't want to. 336 self._move_to_streaming_micro_batch_dq_processors( 337 deepcopy(dq_spec), dq_functions, critical_functions 338 ) 339 else: 340 dq_spec.dq_functions = dq_functions 341 dq_spec.critical_functions = critical_functions 342 343 self._logger.info( 344 f"Streaming Micro Batch DQ Plan: " 345 f"{str(self._streaming_micro_batch_dq_plan)}" 346 ) 347 dq_specs.append(dq_spec) 348 349 return dq_specs 350 351 def _get_output_specs(self) -> List[OutputSpec]: 352 """Get the output specifications from an acon. 353 354 Returns: 355 List of output specifications. 356 """ 357 return [ 358 OutputSpec( 359 spec_id=spec["spec_id"], 360 input_id=spec["input_id"], 361 write_type=spec.get("write_type", None), 362 data_format=spec.get("data_format", OutputFormat.DELTAFILES.value), 363 db_table=spec.get("db_table", None), 364 location=spec.get("location", None), 365 merge_opts=( 366 MergeOptions(**spec["merge_opts"]) 367 if spec.get("merge_opts") 368 else None 369 ), 370 partitions=spec.get("partitions", []), 371 streaming_micro_batch_transformers=self._get_streaming_transformer_plan( 372 spec["input_id"], self.dq_specs 373 ), 374 streaming_once=spec.get("streaming_once", None), 375 streaming_processing_time=spec.get("streaming_processing_time", None), 376 streaming_available_now=spec.get( 377 "streaming_available_now", 378 ( 379 False 380 if ( 381 spec.get("streaming_once", None) 382 or spec.get("streaming_processing_time", None) 383 or spec.get("streaming_continuous", None) 384 ) 385 else True 386 ), 387 ), 388 streaming_continuous=spec.get("streaming_continuous", None), 389 streaming_await_termination=spec.get( 390 "streaming_await_termination", True 391 ), 392 streaming_await_termination_timeout=spec.get( 393 "streaming_await_termination_timeout", None 394 ), 395 with_batch_id=spec.get("with_batch_id", False), 396 options=spec.get("options", None), 397 streaming_micro_batch_dq_processors=( 398 self._streaming_micro_batch_dq_plan.get(spec["input_id"], []) 399 ), 400 ) 401 for spec in self.acon["output_specs"] 402 ] 403 404 def _get_streaming_transformer_plan( 405 self, input_id: str, dq_specs: Optional[List[DQSpec]] 406 ) -> List[TransformerSpec]: 407 """Gets the plan for transformations to be applied on streaming micro batches. 408 409 When running both DQ processes and transformations in streaming micro batches, 410 the _streaming_micro_batch_transformers_plan to consider is the one associated 411 with the transformer spec_id and not with the dq spec_id. Thus, on those cases, 412 this method maps the input id of the output_spec (which is the spec_id of a 413 dq_spec) with the dependent transformer spec_id. 414 415 Args: 416 input_id: id of the corresponding input specification. 417 dq_specs: data quality specifications. 418 419 Returns: 420 a list of TransformerSpec, representing the transformations plan. 421 """ 422 transformer_id = ( 423 [dq_spec.input_id for dq_spec in dq_specs if dq_spec.spec_id == input_id][0] 424 if self._streaming_micro_batch_dq_plan.get(input_id) 425 and self._streaming_micro_batch_transformers_plan 426 else input_id 427 ) 428 429 streaming_micro_batch_transformers_plan: list[TransformerSpec] = ( 430 self._streaming_micro_batch_transformers_plan.get(transformer_id, []) 431 ) 432 433 return streaming_micro_batch_transformers_plan 434 435 def _get_terminate_specs(self) -> List[TerminatorSpec]: 436 """Get the terminate specifications from an acon. 437 438 Returns: 439 List of terminate specifications. 440 """ 441 return [TerminatorSpec(**spec) for spec in self.acon.get("terminate_specs", [])] 442 443 def _move_to_streaming_micro_batch_transformers( 444 self, transform_spec: TransformSpec, transformer_spec: TransformerSpec 445 ) -> None: 446 """Move the transformer to the list of streaming micro batch transformations. 447 448 If the transform specs contain functions that cannot be executed in streaming 449 mode, this function sends those functions to the output specs 450 streaming_micro_batch_transformers, where they will be executed inside the 451 stream foreachBatch function. 452 453 To accomplish that we use an instance variable that associates the 454 streaming_micro_batch_transformers to each output spec, in order to do reverse 455 lookup when creating the OutputSpec. 456 457 Args: 458 transform_spec: transform specification (overall 459 transformation specification - a transformation may contain multiple 460 transformers). 461 transformer_spec: the specific transformer function and arguments. 462 """ 463 if transform_spec.spec_id not in self._streaming_micro_batch_transformers_plan: 464 self._streaming_micro_batch_transformers_plan[transform_spec.spec_id] = [] 465 466 self._streaming_micro_batch_transformers_plan[transform_spec.spec_id].append( 467 transformer_spec 468 ) 469 470 def _move_to_streaming_micro_batch_dq_processors( 471 self, 472 dq_spec: DQSpec, 473 dq_functions: List[DQFunctionSpec], 474 critical_functions: List[DQFunctionSpec], 475 ) -> None: 476 """Move the dq function to the list of streaming micro batch transformations. 477 478 If the dq specs contain functions that cannot be executed in streaming mode, 479 this function sends those functions to the output specs 480 streaming_micro_batch_dq_processors, where they will be executed inside the 481 stream foreachBatch function. 482 483 To accomplish that we use an instance variable that associates the 484 streaming_micro_batch_dq_processors to each output spec, in order to do reverse 485 lookup when creating the OutputSpec. 486 487 Args: 488 dq_spec: dq specification (overall dq process specification). 489 dq_functions: the list of dq functions to be considered. 490 critical_functions: list of critical functions to be considered. 491 """ 492 if dq_spec.spec_id not in self._streaming_micro_batch_dq_plan: 493 self._streaming_micro_batch_dq_plan[dq_spec.spec_id] = [] 494 495 dq_spec.dq_functions = dq_functions 496 dq_spec.critical_functions = critical_functions 497 self._streaming_micro_batch_dq_plan[dq_spec.spec_id].append(dq_spec) 498 499 @staticmethod 500 def _get_input_read_types(list_of_specs: List) -> dict: 501 """Get a dict of spec ids and read types from a list of input specs. 502 503 Args: 504 list_of_specs: list of input specs ([{k:v}]). 505 506 Returns: 507 Dict of {input_spec_id: read_type}. 508 """ 509 return {item["spec_id"]: item["read_type"] for item in list_of_specs} 510 511 @staticmethod 512 def _get_transform_input_ids(list_of_specs: List) -> dict: 513 """Get a dict of transform spec ids and input ids from list of transform specs. 514 515 Args: 516 list_of_specs: list of transform specs ([{k:v}]). 517 518 Returns: 519 Dict of {transform_spec_id: input_id}. 520 """ 521 return {item["spec_id"]: item["input_id"] for item in list_of_specs} 522 523 @staticmethod 524 def _get_previous_spec_read_types( 525 input_read_types: dict, transform_input_ids: dict 526 ) -> dict: 527 """Get the read types of the previous specification: input and/or transform. 528 529 For the chaining transformations and for DQ process to work seamlessly in batch 530 and streaming mode, we have to figure out if the previous spec to the transform 531 or dq spec(e.g., input spec or transform spec) refers to a batch read type or 532 a streaming read type. 533 534 Args: 535 input_read_types: dict of {input_spec_id: read_type}. 536 transform_input_ids: dict of {transform_spec_id: input_id}. 537 538 Returns: 539 Dict of {input_spec_id or transform_spec_id: read_type} 540 """ 541 combined_read_types = input_read_types 542 for spec_id, input_id in transform_input_ids.items(): 543 combined_read_types[spec_id] = combined_read_types[input_id] 544 545 return combined_read_types
31class DataLoader(Algorithm): 32 """Load data using an algorithm configuration (ACON represented as dict). 33 34 This algorithm focuses on the cases where users will be specifying all the algorithm 35 steps and configurations through a dict based configuration, which we name ACON 36 in our framework. 37 38 Since an ACON is a dict you can pass a custom transformer through a python function 39 and, therefore, the DataLoader can also be used to load data with custom 40 transformations not provided in our transformers package. 41 42 As the algorithm base class of the lakehouse-engine framework is based on the 43 concept of ACON, this DataLoader algorithm simply inherits from Algorithm, 44 without overriding anything. We designed the codebase like this to avoid 45 instantiating the Algorithm class directly, which was always meant to be an 46 abstraction for any specific algorithm included in the lakehouse-engine framework. 47 """ 48 49 def __init__(self, acon: dict): 50 """Construct DataLoader algorithm instances. 51 52 A data loader needs several specifications to work properly, 53 but some of them might be optional. The available specifications are: 54 55 - input specifications (mandatory): specify how to read data. 56 - transform specifications (optional): specify how to transform data. 57 - data quality specifications (optional): specify how to execute the data 58 quality process. 59 - output specifications (mandatory): specify how to write data to the 60 target. 61 - terminate specifications (optional): specify what to do after writing into 62 the target (e.g., optimizing target table, vacuum, compute stats, etc). 63 64 Args: 65 acon: algorithm configuration. 66 """ 67 self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger() 68 super().__init__(acon) 69 self.input_specs: List[InputSpec] = self._get_input_specs() 70 # the streaming transformers plan is needed to future change the 71 # execution specification to accommodate streaming mode limitations in invoking 72 # certain functions (e.g., sort, window, generate row ids/auto increments, ...). 73 self._streaming_micro_batch_transformers_plan: dict = {} 74 self.transform_specs: List[TransformSpec] = self._get_transform_specs() 75 # our data quality process is not compatible with streaming mode, hence we 76 # have to run it in micro batches, similar to what happens to certain 77 # transformation functions not supported in streaming mode. 78 self._streaming_micro_batch_dq_plan: dict = {} 79 self.dq_specs: List[DQSpec] = self._get_dq_specs() 80 self.output_specs: List[OutputSpec] = self._get_output_specs() 81 self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs() 82 83 def read(self) -> OrderedDict: 84 """Read data from an input location into a distributed dataframe. 85 86 Returns: 87 An ordered dict with all the dataframes that were read. 88 """ 89 read_dfs: OrderedDict = OrderedDict({}) 90 for spec in self.input_specs: 91 self._logger.info(f"Found input specification: {spec}") 92 read_dfs[spec.spec_id] = ReaderFactory.get_data(spec) 93 return read_dfs 94 95 def transform(self, data: OrderedDict) -> OrderedDict: 96 """Transform (optionally) the data that was read. 97 98 If there isn't a transformation specification this step will be skipped, and the 99 original dataframes that were read will be returned. 100 Transformations can have dependency from another transformation result, however 101 we need to keep in mind if we are using streaming source and for some reason we 102 need to enable micro batch processing, this result cannot be used as input to 103 another transformation. Micro batch processing in pyspark streaming is only 104 available in .write(), which means this transformation with micro batch needs 105 to be the end of the process. 106 107 Args: 108 data: input dataframes in an ordered dict. 109 110 Returns: 111 Another ordered dict with the transformed dataframes, according to the 112 transformation specification. 113 """ 114 if not self.transform_specs: 115 return data 116 else: 117 transformed_dfs = OrderedDict(data) 118 for spec in self.transform_specs: 119 self._logger.info(f"Found transform specification: {spec}") 120 transformed_df = transformed_dfs[spec.input_id] 121 for transformer in spec.transformers: 122 transformed_df = transformed_df.transform( 123 TransformerFactory.get_transformer(transformer, transformed_dfs) 124 ) 125 transformed_dfs[spec.spec_id] = transformed_df 126 return transformed_dfs 127 128 def process_dq(self, data: OrderedDict) -> OrderedDict: 129 """Process the data quality tasks for the data that was read and/or transformed. 130 131 It supports multiple input dataframes. Although just one is advisable. 132 133 It is possible to use data quality validators/expectations that will validate 134 your data and fail the process in case the expectations are not met. The DQ 135 process also generates and keeps updating a site containing the results of the 136 expectations that were done on your data. The location of the site is 137 configurable and can either be on file system or S3. If you define it to be 138 stored on S3, you can even configure your S3 bucket to serve the site so that 139 people can easily check the quality of your data. Moreover, it is also 140 possible to store the result of the DQ process into a defined result sink. 141 142 Args: 143 data: dataframes from previous steps of the algorithm that we which to 144 run the DQ process on. 145 146 Returns: 147 Another ordered dict with the validated dataframes. 148 """ 149 if not self.dq_specs: 150 return data 151 else: 152 from lakehouse_engine.dq_processors.dq_factory import DQFactory 153 154 dq_processed_dfs = OrderedDict(data) 155 for spec in self.dq_specs: 156 df_processed_df = dq_processed_dfs[spec.input_id] 157 self._logger.info(f"Found data quality specification: {spec}") 158 if ( 159 spec.dq_type == DQType.PRISMA.value or spec.dq_functions 160 ) and spec.spec_id not in self._streaming_micro_batch_dq_plan: 161 if spec.cache_df: 162 df_processed_df.cache() 163 dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process( 164 spec, df_processed_df 165 ) 166 else: 167 dq_processed_dfs[spec.spec_id] = df_processed_df 168 return dq_processed_dfs 169 170 def write(self, data: OrderedDict) -> OrderedDict: 171 """Write the data that was read and transformed (if applicable). 172 173 It supports writing multiple datasets. However, we only recommend to write one 174 dataframe. This recommendation is based on easy debugging and reproducibility, 175 since if we start mixing several datasets being fueled by the same algorithm, it 176 would unleash an infinite sea of reproducibility issues plus tight coupling and 177 dependencies between datasets. Having said that, there may be cases where 178 writing multiple datasets is desirable according to the use case requirements. 179 Use it accordingly. 180 181 Args: 182 data: dataframes that were read and transformed (if applicable). 183 184 Returns: 185 Dataframes that were written. 186 """ 187 written_dfs: OrderedDict = OrderedDict({}) 188 for spec in self.output_specs: 189 self._logger.info(f"Found output specification: {spec}") 190 191 written_output = WriterFactory.get_writer( 192 spec, data[spec.input_id], data 193 ).write() 194 if written_output: 195 written_dfs.update(written_output) 196 else: 197 written_dfs[spec.spec_id] = data[spec.input_id] 198 199 return written_dfs 200 201 def terminate(self, data: OrderedDict) -> None: 202 """Terminate the algorithm. 203 204 Args: 205 data: dataframes that were written. 206 """ 207 if self.terminate_specs: 208 for spec in self.terminate_specs: 209 self._logger.info(f"Found terminate specification: {spec}") 210 TerminatorFactory.execute_terminator( 211 spec, data[spec.input_id] if spec.input_id else None 212 ) 213 214 def execute(self) -> Optional[OrderedDict]: 215 """Define the algorithm execution behaviour.""" 216 try: 217 self._logger.info("Starting read stage...") 218 read_dfs = self.read() 219 self._logger.info("Starting transform stage...") 220 transformed_dfs = self.transform(read_dfs) 221 self._logger.info("Starting data quality stage...") 222 validated_dfs = self.process_dq(transformed_dfs) 223 self._logger.info("Starting write stage...") 224 written_dfs = self.write(validated_dfs) 225 self._logger.info("Starting terminate stage...") 226 self.terminate(written_dfs) 227 self._logger.info("Execution of the algorithm has finished!") 228 except Exception as e: 229 NotifierFactory.generate_failure_notification(self.terminate_specs, e) 230 raise e 231 232 return written_dfs 233 234 def _get_input_specs(self) -> List[InputSpec]: 235 """Get the input specifications from an acon. 236 237 Returns: 238 List of input specifications. 239 """ 240 return [InputSpec(**spec) for spec in self.acon["input_specs"]] 241 242 def _get_transform_specs(self) -> List[TransformSpec]: 243 """Get the transformation specifications from an acon. 244 245 If we are executing the algorithm in streaming mode and if the 246 transformer function is not supported in streaming mode, it is 247 important to note that ONLY those unsupported operations will 248 go into the streaming_micro_batch_transformers (see if in the function code), 249 in the same order that they appear in the list of transformations. This means 250 that other supported transformations that appear after an 251 unsupported one continue to stay one the normal execution plan, 252 i.e., outside the foreachBatch function. Therefore, this may 253 make your algorithm to execute a different logic than the one you 254 originally intended. For this reason: 255 1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST; 256 2) USE force_streaming_foreach_batch_processing option in transform_spec 257 section. 258 3) USE THE CUSTOM_TRANSFORMATION AND WRITE ALL YOUR TRANSFORMATION LOGIC 259 THERE. 260 261 Check list of unsupported spark streaming operations here: 262 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations 263 264 Returns: 265 List of transformation specifications. 266 """ 267 input_read_types = self._get_input_read_types(self.acon["input_specs"]) 268 transform_input_ids = self._get_transform_input_ids( 269 self.acon.get("transform_specs", []) 270 ) 271 prev_spec_read_types = self._get_previous_spec_read_types( 272 input_read_types, transform_input_ids 273 ) 274 transform_specs = [] 275 for spec in self.acon.get("transform_specs", []): 276 transform_spec = TransformSpec( 277 spec_id=spec["spec_id"], 278 input_id=spec["input_id"], 279 transformers=[], 280 force_streaming_foreach_batch_processing=spec.get( 281 "force_streaming_foreach_batch_processing", False 282 ), 283 ) 284 285 for s in spec["transformers"]: 286 transformer_spec = TransformerSpec( 287 function=s["function"], args=s.get("args", {}) 288 ) 289 if ( 290 prev_spec_read_types[transform_spec.input_id] 291 == ReadType.STREAMING.value 292 and s["function"] 293 in TransformerFactory.UNSUPPORTED_STREAMING_TRANSFORMERS 294 ) or ( 295 prev_spec_read_types[transform_spec.input_id] 296 == ReadType.STREAMING.value 297 and transform_spec.force_streaming_foreach_batch_processing 298 ): 299 self._move_to_streaming_micro_batch_transformers( 300 transform_spec, transformer_spec 301 ) 302 else: 303 transform_spec.transformers.append(transformer_spec) 304 305 transform_specs.append(transform_spec) 306 307 return transform_specs 308 309 def _get_dq_specs(self) -> List[DQSpec]: 310 """Get list of data quality specification objects from acon. 311 312 In streaming mode, we automatically convert the data quality specification in 313 the streaming_micro_batch_dq_processors list for the respective output spec. 314 This is needed because our dq process cannot be executed using native streaming 315 functions. 316 317 Returns: 318 List of data quality spec objects. 319 """ 320 input_read_types = self._get_input_read_types(self.acon["input_specs"]) 321 transform_input_ids = self._get_transform_input_ids( 322 self.acon.get("transform_specs", []) 323 ) 324 prev_spec_read_types = self._get_previous_spec_read_types( 325 input_read_types, transform_input_ids 326 ) 327 328 dq_specs = [] 329 for spec in self.acon.get("dq_specs", []): 330 331 dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(spec) 332 333 if prev_spec_read_types[dq_spec.input_id] == ReadType.STREAMING.value: 334 # we need to use deepcopy to explicitly create a copy of the dict 335 # otherwise python only create binding for dicts, and we would be 336 # modifying the original dict, which we don't want to. 337 self._move_to_streaming_micro_batch_dq_processors( 338 deepcopy(dq_spec), dq_functions, critical_functions 339 ) 340 else: 341 dq_spec.dq_functions = dq_functions 342 dq_spec.critical_functions = critical_functions 343 344 self._logger.info( 345 f"Streaming Micro Batch DQ Plan: " 346 f"{str(self._streaming_micro_batch_dq_plan)}" 347 ) 348 dq_specs.append(dq_spec) 349 350 return dq_specs 351 352 def _get_output_specs(self) -> List[OutputSpec]: 353 """Get the output specifications from an acon. 354 355 Returns: 356 List of output specifications. 357 """ 358 return [ 359 OutputSpec( 360 spec_id=spec["spec_id"], 361 input_id=spec["input_id"], 362 write_type=spec.get("write_type", None), 363 data_format=spec.get("data_format", OutputFormat.DELTAFILES.value), 364 db_table=spec.get("db_table", None), 365 location=spec.get("location", None), 366 merge_opts=( 367 MergeOptions(**spec["merge_opts"]) 368 if spec.get("merge_opts") 369 else None 370 ), 371 partitions=spec.get("partitions", []), 372 streaming_micro_batch_transformers=self._get_streaming_transformer_plan( 373 spec["input_id"], self.dq_specs 374 ), 375 streaming_once=spec.get("streaming_once", None), 376 streaming_processing_time=spec.get("streaming_processing_time", None), 377 streaming_available_now=spec.get( 378 "streaming_available_now", 379 ( 380 False 381 if ( 382 spec.get("streaming_once", None) 383 or spec.get("streaming_processing_time", None) 384 or spec.get("streaming_continuous", None) 385 ) 386 else True 387 ), 388 ), 389 streaming_continuous=spec.get("streaming_continuous", None), 390 streaming_await_termination=spec.get( 391 "streaming_await_termination", True 392 ), 393 streaming_await_termination_timeout=spec.get( 394 "streaming_await_termination_timeout", None 395 ), 396 with_batch_id=spec.get("with_batch_id", False), 397 options=spec.get("options", None), 398 streaming_micro_batch_dq_processors=( 399 self._streaming_micro_batch_dq_plan.get(spec["input_id"], []) 400 ), 401 ) 402 for spec in self.acon["output_specs"] 403 ] 404 405 def _get_streaming_transformer_plan( 406 self, input_id: str, dq_specs: Optional[List[DQSpec]] 407 ) -> List[TransformerSpec]: 408 """Gets the plan for transformations to be applied on streaming micro batches. 409 410 When running both DQ processes and transformations in streaming micro batches, 411 the _streaming_micro_batch_transformers_plan to consider is the one associated 412 with the transformer spec_id and not with the dq spec_id. Thus, on those cases, 413 this method maps the input id of the output_spec (which is the spec_id of a 414 dq_spec) with the dependent transformer spec_id. 415 416 Args: 417 input_id: id of the corresponding input specification. 418 dq_specs: data quality specifications. 419 420 Returns: 421 a list of TransformerSpec, representing the transformations plan. 422 """ 423 transformer_id = ( 424 [dq_spec.input_id for dq_spec in dq_specs if dq_spec.spec_id == input_id][0] 425 if self._streaming_micro_batch_dq_plan.get(input_id) 426 and self._streaming_micro_batch_transformers_plan 427 else input_id 428 ) 429 430 streaming_micro_batch_transformers_plan: list[TransformerSpec] = ( 431 self._streaming_micro_batch_transformers_plan.get(transformer_id, []) 432 ) 433 434 return streaming_micro_batch_transformers_plan 435 436 def _get_terminate_specs(self) -> List[TerminatorSpec]: 437 """Get the terminate specifications from an acon. 438 439 Returns: 440 List of terminate specifications. 441 """ 442 return [TerminatorSpec(**spec) for spec in self.acon.get("terminate_specs", [])] 443 444 def _move_to_streaming_micro_batch_transformers( 445 self, transform_spec: TransformSpec, transformer_spec: TransformerSpec 446 ) -> None: 447 """Move the transformer to the list of streaming micro batch transformations. 448 449 If the transform specs contain functions that cannot be executed in streaming 450 mode, this function sends those functions to the output specs 451 streaming_micro_batch_transformers, where they will be executed inside the 452 stream foreachBatch function. 453 454 To accomplish that we use an instance variable that associates the 455 streaming_micro_batch_transformers to each output spec, in order to do reverse 456 lookup when creating the OutputSpec. 457 458 Args: 459 transform_spec: transform specification (overall 460 transformation specification - a transformation may contain multiple 461 transformers). 462 transformer_spec: the specific transformer function and arguments. 463 """ 464 if transform_spec.spec_id not in self._streaming_micro_batch_transformers_plan: 465 self._streaming_micro_batch_transformers_plan[transform_spec.spec_id] = [] 466 467 self._streaming_micro_batch_transformers_plan[transform_spec.spec_id].append( 468 transformer_spec 469 ) 470 471 def _move_to_streaming_micro_batch_dq_processors( 472 self, 473 dq_spec: DQSpec, 474 dq_functions: List[DQFunctionSpec], 475 critical_functions: List[DQFunctionSpec], 476 ) -> None: 477 """Move the dq function to the list of streaming micro batch transformations. 478 479 If the dq specs contain functions that cannot be executed in streaming mode, 480 this function sends those functions to the output specs 481 streaming_micro_batch_dq_processors, where they will be executed inside the 482 stream foreachBatch function. 483 484 To accomplish that we use an instance variable that associates the 485 streaming_micro_batch_dq_processors to each output spec, in order to do reverse 486 lookup when creating the OutputSpec. 487 488 Args: 489 dq_spec: dq specification (overall dq process specification). 490 dq_functions: the list of dq functions to be considered. 491 critical_functions: list of critical functions to be considered. 492 """ 493 if dq_spec.spec_id not in self._streaming_micro_batch_dq_plan: 494 self._streaming_micro_batch_dq_plan[dq_spec.spec_id] = [] 495 496 dq_spec.dq_functions = dq_functions 497 dq_spec.critical_functions = critical_functions 498 self._streaming_micro_batch_dq_plan[dq_spec.spec_id].append(dq_spec) 499 500 @staticmethod 501 def _get_input_read_types(list_of_specs: List) -> dict: 502 """Get a dict of spec ids and read types from a list of input specs. 503 504 Args: 505 list_of_specs: list of input specs ([{k:v}]). 506 507 Returns: 508 Dict of {input_spec_id: read_type}. 509 """ 510 return {item["spec_id"]: item["read_type"] for item in list_of_specs} 511 512 @staticmethod 513 def _get_transform_input_ids(list_of_specs: List) -> dict: 514 """Get a dict of transform spec ids and input ids from list of transform specs. 515 516 Args: 517 list_of_specs: list of transform specs ([{k:v}]). 518 519 Returns: 520 Dict of {transform_spec_id: input_id}. 521 """ 522 return {item["spec_id"]: item["input_id"] for item in list_of_specs} 523 524 @staticmethod 525 def _get_previous_spec_read_types( 526 input_read_types: dict, transform_input_ids: dict 527 ) -> dict: 528 """Get the read types of the previous specification: input and/or transform. 529 530 For the chaining transformations and for DQ process to work seamlessly in batch 531 and streaming mode, we have to figure out if the previous spec to the transform 532 or dq spec(e.g., input spec or transform spec) refers to a batch read type or 533 a streaming read type. 534 535 Args: 536 input_read_types: dict of {input_spec_id: read_type}. 537 transform_input_ids: dict of {transform_spec_id: input_id}. 538 539 Returns: 540 Dict of {input_spec_id or transform_spec_id: read_type} 541 """ 542 combined_read_types = input_read_types 543 for spec_id, input_id in transform_input_ids.items(): 544 combined_read_types[spec_id] = combined_read_types[input_id] 545 546 return combined_read_types
Load data using an algorithm configuration (ACON represented as dict).
This algorithm focuses on the cases where users will be specifying all the algorithm steps and configurations through a dict based configuration, which we name ACON in our framework.
Since an ACON is a dict you can pass a custom transformer through a python function and, therefore, the DataLoader can also be used to load data with custom transformations not provided in our transformers package.
As the algorithm base class of the lakehouse-engine framework is based on the concept of ACON, this DataLoader algorithm simply inherits from Algorithm, without overriding anything. We designed the codebase like this to avoid instantiating the Algorithm class directly, which was always meant to be an abstraction for any specific algorithm included in the lakehouse-engine framework.
49 def __init__(self, acon: dict): 50 """Construct DataLoader algorithm instances. 51 52 A data loader needs several specifications to work properly, 53 but some of them might be optional. The available specifications are: 54 55 - input specifications (mandatory): specify how to read data. 56 - transform specifications (optional): specify how to transform data. 57 - data quality specifications (optional): specify how to execute the data 58 quality process. 59 - output specifications (mandatory): specify how to write data to the 60 target. 61 - terminate specifications (optional): specify what to do after writing into 62 the target (e.g., optimizing target table, vacuum, compute stats, etc). 63 64 Args: 65 acon: algorithm configuration. 66 """ 67 self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger() 68 super().__init__(acon) 69 self.input_specs: List[InputSpec] = self._get_input_specs() 70 # the streaming transformers plan is needed to future change the 71 # execution specification to accommodate streaming mode limitations in invoking 72 # certain functions (e.g., sort, window, generate row ids/auto increments, ...). 73 self._streaming_micro_batch_transformers_plan: dict = {} 74 self.transform_specs: List[TransformSpec] = self._get_transform_specs() 75 # our data quality process is not compatible with streaming mode, hence we 76 # have to run it in micro batches, similar to what happens to certain 77 # transformation functions not supported in streaming mode. 78 self._streaming_micro_batch_dq_plan: dict = {} 79 self.dq_specs: List[DQSpec] = self._get_dq_specs() 80 self.output_specs: List[OutputSpec] = self._get_output_specs() 81 self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()
Construct DataLoader algorithm instances.
A data loader needs several specifications to work properly, but some of them might be optional. The available specifications are:
- input specifications (mandatory): specify how to read data.
- transform specifications (optional): specify how to transform data.
- data quality specifications (optional): specify how to execute the data quality process.
- output specifications (mandatory): specify how to write data to the target.
- terminate specifications (optional): specify what to do after writing into the target (e.g., optimizing target table, vacuum, compute stats, etc).
Arguments:
- acon: algorithm configuration.
83 def read(self) -> OrderedDict: 84 """Read data from an input location into a distributed dataframe. 85 86 Returns: 87 An ordered dict with all the dataframes that were read. 88 """ 89 read_dfs: OrderedDict = OrderedDict({}) 90 for spec in self.input_specs: 91 self._logger.info(f"Found input specification: {spec}") 92 read_dfs[spec.spec_id] = ReaderFactory.get_data(spec) 93 return read_dfs
Read data from an input location into a distributed dataframe.
Returns:
An ordered dict with all the dataframes that were read.
95 def transform(self, data: OrderedDict) -> OrderedDict: 96 """Transform (optionally) the data that was read. 97 98 If there isn't a transformation specification this step will be skipped, and the 99 original dataframes that were read will be returned. 100 Transformations can have dependency from another transformation result, however 101 we need to keep in mind if we are using streaming source and for some reason we 102 need to enable micro batch processing, this result cannot be used as input to 103 another transformation. Micro batch processing in pyspark streaming is only 104 available in .write(), which means this transformation with micro batch needs 105 to be the end of the process. 106 107 Args: 108 data: input dataframes in an ordered dict. 109 110 Returns: 111 Another ordered dict with the transformed dataframes, according to the 112 transformation specification. 113 """ 114 if not self.transform_specs: 115 return data 116 else: 117 transformed_dfs = OrderedDict(data) 118 for spec in self.transform_specs: 119 self._logger.info(f"Found transform specification: {spec}") 120 transformed_df = transformed_dfs[spec.input_id] 121 for transformer in spec.transformers: 122 transformed_df = transformed_df.transform( 123 TransformerFactory.get_transformer(transformer, transformed_dfs) 124 ) 125 transformed_dfs[spec.spec_id] = transformed_df 126 return transformed_dfs
Transform (optionally) the data that was read.
If there isn't a transformation specification this step will be skipped, and the original dataframes that were read will be returned. Transformations can have dependency from another transformation result, however we need to keep in mind if we are using streaming source and for some reason we need to enable micro batch processing, this result cannot be used as input to another transformation. Micro batch processing in pyspark streaming is only available in .write(), which means this transformation with micro batch needs to be the end of the process.
Arguments:
- data: input dataframes in an ordered dict.
Returns:
Another ordered dict with the transformed dataframes, according to the transformation specification.
128 def process_dq(self, data: OrderedDict) -> OrderedDict: 129 """Process the data quality tasks for the data that was read and/or transformed. 130 131 It supports multiple input dataframes. Although just one is advisable. 132 133 It is possible to use data quality validators/expectations that will validate 134 your data and fail the process in case the expectations are not met. The DQ 135 process also generates and keeps updating a site containing the results of the 136 expectations that were done on your data. The location of the site is 137 configurable and can either be on file system or S3. If you define it to be 138 stored on S3, you can even configure your S3 bucket to serve the site so that 139 people can easily check the quality of your data. Moreover, it is also 140 possible to store the result of the DQ process into a defined result sink. 141 142 Args: 143 data: dataframes from previous steps of the algorithm that we which to 144 run the DQ process on. 145 146 Returns: 147 Another ordered dict with the validated dataframes. 148 """ 149 if not self.dq_specs: 150 return data 151 else: 152 from lakehouse_engine.dq_processors.dq_factory import DQFactory 153 154 dq_processed_dfs = OrderedDict(data) 155 for spec in self.dq_specs: 156 df_processed_df = dq_processed_dfs[spec.input_id] 157 self._logger.info(f"Found data quality specification: {spec}") 158 if ( 159 spec.dq_type == DQType.PRISMA.value or spec.dq_functions 160 ) and spec.spec_id not in self._streaming_micro_batch_dq_plan: 161 if spec.cache_df: 162 df_processed_df.cache() 163 dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process( 164 spec, df_processed_df 165 ) 166 else: 167 dq_processed_dfs[spec.spec_id] = df_processed_df 168 return dq_processed_dfs
Process the data quality tasks for the data that was read and/or transformed.
It supports multiple input dataframes. Although just one is advisable.
It is possible to use data quality validators/expectations that will validate your data and fail the process in case the expectations are not met. The DQ process also generates and keeps updating a site containing the results of the expectations that were done on your data. The location of the site is configurable and can either be on file system or S3. If you define it to be stored on S3, you can even configure your S3 bucket to serve the site so that people can easily check the quality of your data. Moreover, it is also possible to store the result of the DQ process into a defined result sink.
Arguments:
- data: dataframes from previous steps of the algorithm that we which to run the DQ process on.
Returns:
Another ordered dict with the validated dataframes.
170 def write(self, data: OrderedDict) -> OrderedDict: 171 """Write the data that was read and transformed (if applicable). 172 173 It supports writing multiple datasets. However, we only recommend to write one 174 dataframe. This recommendation is based on easy debugging and reproducibility, 175 since if we start mixing several datasets being fueled by the same algorithm, it 176 would unleash an infinite sea of reproducibility issues plus tight coupling and 177 dependencies between datasets. Having said that, there may be cases where 178 writing multiple datasets is desirable according to the use case requirements. 179 Use it accordingly. 180 181 Args: 182 data: dataframes that were read and transformed (if applicable). 183 184 Returns: 185 Dataframes that were written. 186 """ 187 written_dfs: OrderedDict = OrderedDict({}) 188 for spec in self.output_specs: 189 self._logger.info(f"Found output specification: {spec}") 190 191 written_output = WriterFactory.get_writer( 192 spec, data[spec.input_id], data 193 ).write() 194 if written_output: 195 written_dfs.update(written_output) 196 else: 197 written_dfs[spec.spec_id] = data[spec.input_id] 198 199 return written_dfs
Write the data that was read and transformed (if applicable).
It supports writing multiple datasets. However, we only recommend to write one dataframe. This recommendation is based on easy debugging and reproducibility, since if we start mixing several datasets being fueled by the same algorithm, it would unleash an infinite sea of reproducibility issues plus tight coupling and dependencies between datasets. Having said that, there may be cases where writing multiple datasets is desirable according to the use case requirements. Use it accordingly.
Arguments:
- data: dataframes that were read and transformed (if applicable).
Returns:
Dataframes that were written.
201 def terminate(self, data: OrderedDict) -> None: 202 """Terminate the algorithm. 203 204 Args: 205 data: dataframes that were written. 206 """ 207 if self.terminate_specs: 208 for spec in self.terminate_specs: 209 self._logger.info(f"Found terminate specification: {spec}") 210 TerminatorFactory.execute_terminator( 211 spec, data[spec.input_id] if spec.input_id else None 212 )
Terminate the algorithm.
Arguments:
- data: dataframes that were written.
214 def execute(self) -> Optional[OrderedDict]: 215 """Define the algorithm execution behaviour.""" 216 try: 217 self._logger.info("Starting read stage...") 218 read_dfs = self.read() 219 self._logger.info("Starting transform stage...") 220 transformed_dfs = self.transform(read_dfs) 221 self._logger.info("Starting data quality stage...") 222 validated_dfs = self.process_dq(transformed_dfs) 223 self._logger.info("Starting write stage...") 224 written_dfs = self.write(validated_dfs) 225 self._logger.info("Starting terminate stage...") 226 self.terminate(written_dfs) 227 self._logger.info("Execution of the algorithm has finished!") 228 except Exception as e: 229 NotifierFactory.generate_failure_notification(self.terminate_specs, e) 230 raise e 231 232 return written_dfs
Define the algorithm execution behaviour.