lakehouse_engine.algorithms.data_loader

Module to define DataLoader class.

  1"""Module to define DataLoader class."""
  2
  3from collections import OrderedDict
  4from copy import deepcopy
  5from logging import Logger
  6from typing import List, Optional
  7
  8from lakehouse_engine.algorithms.algorithm import Algorithm
  9from lakehouse_engine.core.definitions import (
 10    DQFunctionSpec,
 11    DQSpec,
 12    DQType,
 13    InputSpec,
 14    MergeOptions,
 15    OutputFormat,
 16    OutputSpec,
 17    ReadType,
 18    TerminatorSpec,
 19    TransformerSpec,
 20    TransformSpec,
 21)
 22from lakehouse_engine.io.reader_factory import ReaderFactory
 23from lakehouse_engine.io.writer_factory import WriterFactory
 24from lakehouse_engine.terminators.notifier_factory import NotifierFactory
 25from lakehouse_engine.terminators.terminator_factory import TerminatorFactory
 26from lakehouse_engine.transformers.transformer_factory import TransformerFactory
 27from lakehouse_engine.utils.logging_handler import LoggingHandler
 28
 29
 30class DataLoader(Algorithm):
 31    """Load data using an algorithm configuration (ACON represented as dict).
 32
 33    This algorithm focuses on the cases where users will be specifying all the algorithm
 34    steps and configurations through a dict based configuration, which we name ACON
 35    in our framework.
 36
 37    Since an ACON is a dict you can pass a custom transformer through a python function
 38    and, therefore, the DataLoader can also be used to load data with custom
 39    transformations not provided in our transformers package.
 40
 41    As the algorithm base class of the lakehouse-engine framework is based on the
 42    concept of ACON, this DataLoader algorithm simply inherits from Algorithm,
 43    without overriding anything. We designed the codebase like this to avoid
 44    instantiating the Algorithm class directly, which was always meant to be an
 45    abstraction for any specific algorithm included in the lakehouse-engine framework.
 46    """
 47
 48    def __init__(self, acon: dict):
 49        """Construct DataLoader algorithm instances.
 50
 51        A data loader needs several specifications to work properly,
 52        but some of them might be optional. The available specifications are:
 53
 54        - input specifications (mandatory): specify how to read data.
 55        - transform specifications (optional): specify how to transform data.
 56        - data quality specifications (optional): specify how to execute the data
 57            quality process.
 58        - output specifications (mandatory): specify how to write data to the
 59            target.
 60        - terminate specifications (optional): specify what to do after writing into
 61            the target (e.g., optimizing target table, vacuum, compute stats, etc).
 62
 63        Args:
 64            acon: algorithm configuration.
 65        """
 66        self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger()
 67        super().__init__(acon)
 68        self.input_specs: List[InputSpec] = self._get_input_specs()
 69        # the streaming transformers plan is needed to future change the
 70        # execution specification to accommodate streaming mode limitations in invoking
 71        # certain functions (e.g., sort, window, generate row ids/auto increments, ...).
 72        self._streaming_micro_batch_transformers_plan: dict = {}
 73        self.transform_specs: List[TransformSpec] = self._get_transform_specs()
 74        # our data quality process is not compatible with streaming mode, hence we
 75        # have to run it in micro batches, similar to what happens to certain
 76        # transformation functions not supported in streaming mode.
 77        self._streaming_micro_batch_dq_plan: dict = {}
 78        self.dq_specs: List[DQSpec] = self._get_dq_specs()
 79        self.output_specs: List[OutputSpec] = self._get_output_specs()
 80        self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()
 81
 82    def read(self) -> OrderedDict:
 83        """Read data from an input location into a distributed dataframe.
 84
 85        Returns:
 86             An ordered dict with all the dataframes that were read.
 87        """
 88        read_dfs: OrderedDict = OrderedDict({})
 89        for spec in self.input_specs:
 90            self._logger.info(f"Found input specification: {spec}")
 91            read_dfs[spec.spec_id] = ReaderFactory.get_data(spec)
 92        return read_dfs
 93
 94    def transform(self, data: OrderedDict) -> OrderedDict:
 95        """Transform (optionally) the data that was read.
 96
 97        If there isn't a transformation specification this step will be skipped, and the
 98        original dataframes that were read will be returned.
 99        Transformations can have dependency from another transformation result, however
100        we need to keep in mind if we are using streaming source and for some reason we
101        need to enable micro batch processing, this result cannot be used as input to
102        another transformation. Micro batch processing in pyspark streaming is only
103        available in .write(), which means this transformation with micro batch needs
104        to be the end of the process.
105
106        Args:
107            data: input dataframes in an ordered dict.
108
109        Returns:
110            Another ordered dict with the transformed dataframes, according to the
111            transformation specification.
112        """
113        if not self.transform_specs:
114            return data
115        else:
116            transformed_dfs = OrderedDict(data)
117            for spec in self.transform_specs:
118                self._logger.info(f"Found transform specification: {spec}")
119                transformed_df = transformed_dfs[spec.input_id]
120                for transformer in spec.transformers:
121                    transformed_df = transformed_df.transform(
122                        TransformerFactory.get_transformer(transformer, transformed_dfs)
123                    )
124                transformed_dfs[spec.spec_id] = transformed_df
125            return transformed_dfs
126
127    def process_dq(self, data: OrderedDict) -> OrderedDict:
128        """Process the data quality tasks for the data that was read and/or transformed.
129
130        It supports multiple input dataframes. Although just one is advisable.
131
132        It is possible to use data quality validators/expectations that will validate
133        your data and fail the process in case the expectations are not met. The DQ
134        process also generates and keeps updating a site containing the results of the
135        expectations that were done on your data. The location of the site is
136        configurable and can either be on file system or S3. If you define it to be
137        stored on S3, you can even configure your S3 bucket to serve the site so that
138        people can easily check the quality of your data. Moreover, it is also
139        possible to store the result of the DQ process into a defined result sink.
140
141        Args:
142            data: dataframes from previous steps of the algorithm that we which to
143                run the DQ process on.
144
145        Returns:
146            Another ordered dict with the validated dataframes.
147        """
148        if not self.dq_specs:
149            return data
150        else:
151            from lakehouse_engine.dq_processors.dq_factory import DQFactory
152
153            dq_processed_dfs = OrderedDict(data)
154            for spec in self.dq_specs:
155                df_processed_df = dq_processed_dfs[spec.input_id]
156                self._logger.info(f"Found data quality specification: {spec}")
157                if (
158                    spec.dq_type == DQType.PRISMA.value or spec.dq_functions
159                ) and spec.spec_id not in self._streaming_micro_batch_dq_plan:
160                    if spec.cache_df:
161                        df_processed_df.cache()
162                    dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process(
163                        spec, df_processed_df
164                    )
165                else:
166                    dq_processed_dfs[spec.spec_id] = df_processed_df
167            return dq_processed_dfs
168
169    def write(self, data: OrderedDict) -> OrderedDict:
170        """Write the data that was read and transformed (if applicable).
171
172        It supports writing multiple datasets. However, we only recommend to write one
173        dataframe. This recommendation is based on easy debugging and reproducibility,
174        since if we start mixing several datasets being fueled by the same algorithm, it
175        would unleash an infinite sea of reproducibility issues plus tight coupling and
176        dependencies between datasets. Having said that, there may be cases where
177        writing multiple datasets is desirable according to the use case requirements.
178        Use it accordingly.
179
180        Args:
181            data: dataframes that were read and transformed (if applicable).
182
183        Returns:
184            Dataframes that were written.
185        """
186        written_dfs: OrderedDict = OrderedDict({})
187        for spec in self.output_specs:
188            self._logger.info(f"Found output specification: {spec}")
189
190            written_output = WriterFactory.get_writer(
191                spec, data[spec.input_id], data
192            ).write()
193            if written_output:
194                written_dfs.update(written_output)
195            else:
196                written_dfs[spec.spec_id] = data[spec.input_id]
197
198        return written_dfs
199
200    def terminate(self, data: OrderedDict) -> None:
201        """Terminate the algorithm.
202
203        Args:
204            data: dataframes that were written.
205        """
206        if self.terminate_specs:
207            for spec in self.terminate_specs:
208                self._logger.info(f"Found terminate specification: {spec}")
209                TerminatorFactory.execute_terminator(
210                    spec, data[spec.input_id] if spec.input_id else None
211                )
212
213    def execute(self) -> Optional[OrderedDict]:
214        """Define the algorithm execution behaviour."""
215        try:
216            self._logger.info("Starting read stage...")
217            read_dfs = self.read()
218            self._logger.info("Starting transform stage...")
219            transformed_dfs = self.transform(read_dfs)
220            self._logger.info("Starting data quality stage...")
221            validated_dfs = self.process_dq(transformed_dfs)
222            self._logger.info("Starting write stage...")
223            written_dfs = self.write(validated_dfs)
224            self._logger.info("Starting terminate stage...")
225            self.terminate(written_dfs)
226            self._logger.info("Execution of the algorithm has finished!")
227        except Exception as e:
228            NotifierFactory.generate_failure_notification(self.terminate_specs, e)
229            raise e
230
231        return written_dfs
232
233    def _get_input_specs(self) -> List[InputSpec]:
234        """Get the input specifications from an acon.
235
236        Returns:
237            List of input specifications.
238        """
239        return [InputSpec(**spec) for spec in self.acon["input_specs"]]
240
241    def _get_transform_specs(self) -> List[TransformSpec]:
242        """Get the transformation specifications from an acon.
243
244        If we are executing the algorithm in streaming mode and if the
245        transformer function is not supported in streaming mode, it is
246        important to note that ONLY those unsupported operations will
247        go into the streaming_micro_batch_transformers (see if in the function code),
248        in the same order that they appear in the list of transformations. This means
249        that other supported transformations that appear after an
250        unsupported one continue to stay one the normal execution plan,
251        i.e., outside the foreachBatch function. Therefore, this may
252        make your algorithm to execute a different logic than the one you
253        originally intended. For this reason:
254            1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST;
255            2) USE force_streaming_foreach_batch_processing option in transform_spec
256            section.
257            3) USE THE CUSTOM_TRANSFORMATION AND WRITE ALL YOUR TRANSFORMATION LOGIC
258            THERE.
259
260        Check list of unsupported spark streaming operations here:
261        https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
262
263        Returns:
264            List of transformation specifications.
265        """
266        input_read_types = self._get_input_read_types(self.acon["input_specs"])
267        transform_input_ids = self._get_transform_input_ids(
268            self.acon.get("transform_specs", [])
269        )
270        prev_spec_read_types = self._get_previous_spec_read_types(
271            input_read_types, transform_input_ids
272        )
273        transform_specs = []
274        for spec in self.acon.get("transform_specs", []):
275            transform_spec = TransformSpec(
276                spec_id=spec["spec_id"],
277                input_id=spec["input_id"],
278                transformers=[],
279                force_streaming_foreach_batch_processing=spec.get(
280                    "force_streaming_foreach_batch_processing", False
281                ),
282            )
283
284            for s in spec["transformers"]:
285                transformer_spec = TransformerSpec(
286                    function=s["function"], args=s.get("args", {})
287                )
288                if (
289                    prev_spec_read_types[transform_spec.input_id]
290                    == ReadType.STREAMING.value
291                    and s["function"]
292                    in TransformerFactory.UNSUPPORTED_STREAMING_TRANSFORMERS
293                ) or (
294                    prev_spec_read_types[transform_spec.input_id]
295                    == ReadType.STREAMING.value
296                    and transform_spec.force_streaming_foreach_batch_processing
297                ):
298                    self._move_to_streaming_micro_batch_transformers(
299                        transform_spec, transformer_spec
300                    )
301                else:
302                    transform_spec.transformers.append(transformer_spec)
303
304            transform_specs.append(transform_spec)
305
306        return transform_specs
307
308    def _get_dq_specs(self) -> List[DQSpec]:
309        """Get list of data quality specification objects from acon.
310
311        In streaming mode, we automatically convert the data quality specification in
312        the streaming_micro_batch_dq_processors list for the respective output spec.
313        This is needed because our dq process cannot be executed using native streaming
314        functions.
315
316        Returns:
317            List of data quality spec objects.
318        """
319        input_read_types = self._get_input_read_types(self.acon["input_specs"])
320        transform_input_ids = self._get_transform_input_ids(
321            self.acon.get("transform_specs", [])
322        )
323        prev_spec_read_types = self._get_previous_spec_read_types(
324            input_read_types, transform_input_ids
325        )
326
327        dq_specs = []
328        for spec in self.acon.get("dq_specs", []):
329
330            dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(spec)
331
332            if prev_spec_read_types[dq_spec.input_id] == ReadType.STREAMING.value:
333                # we need to use deepcopy to explicitly create a copy of the dict
334                # otherwise python only create binding for dicts, and we would be
335                # modifying the original dict, which we don't want to.
336                self._move_to_streaming_micro_batch_dq_processors(
337                    deepcopy(dq_spec), dq_functions, critical_functions
338                )
339            else:
340                dq_spec.dq_functions = dq_functions
341                dq_spec.critical_functions = critical_functions
342
343            self._logger.info(
344                f"Streaming Micro Batch DQ Plan: "
345                f"{str(self._streaming_micro_batch_dq_plan)}"
346            )
347            dq_specs.append(dq_spec)
348
349        return dq_specs
350
351    def _get_output_specs(self) -> List[OutputSpec]:
352        """Get the output specifications from an acon.
353
354        Returns:
355            List of output specifications.
356        """
357        return [
358            OutputSpec(
359                spec_id=spec["spec_id"],
360                input_id=spec["input_id"],
361                write_type=spec.get("write_type", None),
362                data_format=spec.get("data_format", OutputFormat.DELTAFILES.value),
363                db_table=spec.get("db_table", None),
364                location=spec.get("location", None),
365                merge_opts=(
366                    MergeOptions(**spec["merge_opts"])
367                    if spec.get("merge_opts")
368                    else None
369                ),
370                partitions=spec.get("partitions", []),
371                streaming_micro_batch_transformers=self._get_streaming_transformer_plan(
372                    spec["input_id"], self.dq_specs
373                ),
374                streaming_once=spec.get("streaming_once", None),
375                streaming_processing_time=spec.get("streaming_processing_time", None),
376                streaming_available_now=spec.get(
377                    "streaming_available_now",
378                    (
379                        False
380                        if (
381                            spec.get("streaming_once", None)
382                            or spec.get("streaming_processing_time", None)
383                            or spec.get("streaming_continuous", None)
384                        )
385                        else True
386                    ),
387                ),
388                streaming_continuous=spec.get("streaming_continuous", None),
389                streaming_await_termination=spec.get(
390                    "streaming_await_termination", True
391                ),
392                streaming_await_termination_timeout=spec.get(
393                    "streaming_await_termination_timeout", None
394                ),
395                with_batch_id=spec.get("with_batch_id", False),
396                options=spec.get("options", None),
397                streaming_micro_batch_dq_processors=(
398                    self._streaming_micro_batch_dq_plan.get(spec["input_id"], [])
399                ),
400            )
401            for spec in self.acon["output_specs"]
402        ]
403
404    def _get_streaming_transformer_plan(
405        self, input_id: str, dq_specs: Optional[List[DQSpec]]
406    ) -> List[TransformerSpec]:
407        """Gets the plan for transformations to be applied on streaming micro batches.
408
409        When running both DQ processes and transformations in streaming micro batches,
410        the _streaming_micro_batch_transformers_plan to consider is the one associated
411        with the transformer spec_id and not with the dq spec_id. Thus, on those cases,
412        this method maps the input id of the output_spec (which is the spec_id of a
413        dq_spec) with the dependent transformer spec_id.
414
415        Args:
416            input_id: id of the corresponding input specification.
417            dq_specs: data quality specifications.
418
419        Returns:
420            a list of TransformerSpec, representing the transformations plan.
421        """
422        transformer_id = (
423            [dq_spec.input_id for dq_spec in dq_specs if dq_spec.spec_id == input_id][0]
424            if self._streaming_micro_batch_dq_plan.get(input_id)
425            and self._streaming_micro_batch_transformers_plan
426            else input_id
427        )
428
429        streaming_micro_batch_transformers_plan: list[TransformerSpec] = (
430            self._streaming_micro_batch_transformers_plan.get(transformer_id, [])
431        )
432
433        return streaming_micro_batch_transformers_plan
434
435    def _get_terminate_specs(self) -> List[TerminatorSpec]:
436        """Get the terminate specifications from an acon.
437
438        Returns:
439            List of terminate specifications.
440        """
441        return [TerminatorSpec(**spec) for spec in self.acon.get("terminate_specs", [])]
442
443    def _move_to_streaming_micro_batch_transformers(
444        self, transform_spec: TransformSpec, transformer_spec: TransformerSpec
445    ) -> None:
446        """Move the transformer to the list of streaming micro batch transformations.
447
448        If the transform specs contain functions that cannot be executed in streaming
449        mode, this function sends those functions to the output specs
450        streaming_micro_batch_transformers, where they will be executed inside the
451        stream foreachBatch function.
452
453        To accomplish that we use an instance variable that associates the
454        streaming_micro_batch_transformers to each output spec, in order to do reverse
455        lookup when creating the OutputSpec.
456
457        Args:
458            transform_spec: transform specification (overall
459                transformation specification - a transformation may contain multiple
460                transformers).
461            transformer_spec: the specific transformer function and arguments.
462        """
463        if transform_spec.spec_id not in self._streaming_micro_batch_transformers_plan:
464            self._streaming_micro_batch_transformers_plan[transform_spec.spec_id] = []
465
466        self._streaming_micro_batch_transformers_plan[transform_spec.spec_id].append(
467            transformer_spec
468        )
469
470    def _move_to_streaming_micro_batch_dq_processors(
471        self,
472        dq_spec: DQSpec,
473        dq_functions: List[DQFunctionSpec],
474        critical_functions: List[DQFunctionSpec],
475    ) -> None:
476        """Move the dq function to the list of streaming micro batch transformations.
477
478        If the dq specs contain functions that cannot be executed in streaming mode,
479        this function sends those functions to the output specs
480        streaming_micro_batch_dq_processors, where they will be executed inside the
481        stream foreachBatch function.
482
483        To accomplish that we use an instance variable that associates the
484        streaming_micro_batch_dq_processors to each output spec, in order to do reverse
485        lookup when creating the OutputSpec.
486
487        Args:
488            dq_spec: dq specification (overall dq process specification).
489            dq_functions: the list of dq functions to be considered.
490            critical_functions: list of critical functions to be considered.
491        """
492        if dq_spec.spec_id not in self._streaming_micro_batch_dq_plan:
493            self._streaming_micro_batch_dq_plan[dq_spec.spec_id] = []
494
495        dq_spec.dq_functions = dq_functions
496        dq_spec.critical_functions = critical_functions
497        self._streaming_micro_batch_dq_plan[dq_spec.spec_id].append(dq_spec)
498
499    @staticmethod
500    def _get_input_read_types(list_of_specs: List) -> dict:
501        """Get a dict of spec ids and read types from a list of input specs.
502
503        Args:
504            list_of_specs: list of input specs ([{k:v}]).
505
506        Returns:
507            Dict of {input_spec_id: read_type}.
508        """
509        return {item["spec_id"]: item["read_type"] for item in list_of_specs}
510
511    @staticmethod
512    def _get_transform_input_ids(list_of_specs: List) -> dict:
513        """Get a dict of transform spec ids and input ids from list of transform specs.
514
515        Args:
516            list_of_specs: list of transform specs ([{k:v}]).
517
518        Returns:
519            Dict of {transform_spec_id: input_id}.
520        """
521        return {item["spec_id"]: item["input_id"] for item in list_of_specs}
522
523    @staticmethod
524    def _get_previous_spec_read_types(
525        input_read_types: dict, transform_input_ids: dict
526    ) -> dict:
527        """Get the read types of the previous specification: input and/or transform.
528
529        For the chaining transformations and for DQ process to work seamlessly in batch
530        and streaming mode, we have to figure out if the previous spec to the transform
531        or dq spec(e.g., input spec or transform spec) refers to a batch read type or
532        a streaming read type.
533
534        Args:
535            input_read_types: dict of {input_spec_id: read_type}.
536            transform_input_ids: dict of {transform_spec_id: input_id}.
537
538        Returns:
539            Dict of {input_spec_id or transform_spec_id: read_type}
540        """
541        combined_read_types = input_read_types
542        for spec_id, input_id in transform_input_ids.items():
543            combined_read_types[spec_id] = combined_read_types[input_id]
544
545        return combined_read_types
class DataLoader(lakehouse_engine.algorithms.algorithm.Algorithm):
 31class DataLoader(Algorithm):
 32    """Load data using an algorithm configuration (ACON represented as dict).
 33
 34    This algorithm focuses on the cases where users will be specifying all the algorithm
 35    steps and configurations through a dict based configuration, which we name ACON
 36    in our framework.
 37
 38    Since an ACON is a dict you can pass a custom transformer through a python function
 39    and, therefore, the DataLoader can also be used to load data with custom
 40    transformations not provided in our transformers package.
 41
 42    As the algorithm base class of the lakehouse-engine framework is based on the
 43    concept of ACON, this DataLoader algorithm simply inherits from Algorithm,
 44    without overriding anything. We designed the codebase like this to avoid
 45    instantiating the Algorithm class directly, which was always meant to be an
 46    abstraction for any specific algorithm included in the lakehouse-engine framework.
 47    """
 48
 49    def __init__(self, acon: dict):
 50        """Construct DataLoader algorithm instances.
 51
 52        A data loader needs several specifications to work properly,
 53        but some of them might be optional. The available specifications are:
 54
 55        - input specifications (mandatory): specify how to read data.
 56        - transform specifications (optional): specify how to transform data.
 57        - data quality specifications (optional): specify how to execute the data
 58            quality process.
 59        - output specifications (mandatory): specify how to write data to the
 60            target.
 61        - terminate specifications (optional): specify what to do after writing into
 62            the target (e.g., optimizing target table, vacuum, compute stats, etc).
 63
 64        Args:
 65            acon: algorithm configuration.
 66        """
 67        self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger()
 68        super().__init__(acon)
 69        self.input_specs: List[InputSpec] = self._get_input_specs()
 70        # the streaming transformers plan is needed to future change the
 71        # execution specification to accommodate streaming mode limitations in invoking
 72        # certain functions (e.g., sort, window, generate row ids/auto increments, ...).
 73        self._streaming_micro_batch_transformers_plan: dict = {}
 74        self.transform_specs: List[TransformSpec] = self._get_transform_specs()
 75        # our data quality process is not compatible with streaming mode, hence we
 76        # have to run it in micro batches, similar to what happens to certain
 77        # transformation functions not supported in streaming mode.
 78        self._streaming_micro_batch_dq_plan: dict = {}
 79        self.dq_specs: List[DQSpec] = self._get_dq_specs()
 80        self.output_specs: List[OutputSpec] = self._get_output_specs()
 81        self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()
 82
 83    def read(self) -> OrderedDict:
 84        """Read data from an input location into a distributed dataframe.
 85
 86        Returns:
 87             An ordered dict with all the dataframes that were read.
 88        """
 89        read_dfs: OrderedDict = OrderedDict({})
 90        for spec in self.input_specs:
 91            self._logger.info(f"Found input specification: {spec}")
 92            read_dfs[spec.spec_id] = ReaderFactory.get_data(spec)
 93        return read_dfs
 94
 95    def transform(self, data: OrderedDict) -> OrderedDict:
 96        """Transform (optionally) the data that was read.
 97
 98        If there isn't a transformation specification this step will be skipped, and the
 99        original dataframes that were read will be returned.
100        Transformations can have dependency from another transformation result, however
101        we need to keep in mind if we are using streaming source and for some reason we
102        need to enable micro batch processing, this result cannot be used as input to
103        another transformation. Micro batch processing in pyspark streaming is only
104        available in .write(), which means this transformation with micro batch needs
105        to be the end of the process.
106
107        Args:
108            data: input dataframes in an ordered dict.
109
110        Returns:
111            Another ordered dict with the transformed dataframes, according to the
112            transformation specification.
113        """
114        if not self.transform_specs:
115            return data
116        else:
117            transformed_dfs = OrderedDict(data)
118            for spec in self.transform_specs:
119                self._logger.info(f"Found transform specification: {spec}")
120                transformed_df = transformed_dfs[spec.input_id]
121                for transformer in spec.transformers:
122                    transformed_df = transformed_df.transform(
123                        TransformerFactory.get_transformer(transformer, transformed_dfs)
124                    )
125                transformed_dfs[spec.spec_id] = transformed_df
126            return transformed_dfs
127
128    def process_dq(self, data: OrderedDict) -> OrderedDict:
129        """Process the data quality tasks for the data that was read and/or transformed.
130
131        It supports multiple input dataframes. Although just one is advisable.
132
133        It is possible to use data quality validators/expectations that will validate
134        your data and fail the process in case the expectations are not met. The DQ
135        process also generates and keeps updating a site containing the results of the
136        expectations that were done on your data. The location of the site is
137        configurable and can either be on file system or S3. If you define it to be
138        stored on S3, you can even configure your S3 bucket to serve the site so that
139        people can easily check the quality of your data. Moreover, it is also
140        possible to store the result of the DQ process into a defined result sink.
141
142        Args:
143            data: dataframes from previous steps of the algorithm that we which to
144                run the DQ process on.
145
146        Returns:
147            Another ordered dict with the validated dataframes.
148        """
149        if not self.dq_specs:
150            return data
151        else:
152            from lakehouse_engine.dq_processors.dq_factory import DQFactory
153
154            dq_processed_dfs = OrderedDict(data)
155            for spec in self.dq_specs:
156                df_processed_df = dq_processed_dfs[spec.input_id]
157                self._logger.info(f"Found data quality specification: {spec}")
158                if (
159                    spec.dq_type == DQType.PRISMA.value or spec.dq_functions
160                ) and spec.spec_id not in self._streaming_micro_batch_dq_plan:
161                    if spec.cache_df:
162                        df_processed_df.cache()
163                    dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process(
164                        spec, df_processed_df
165                    )
166                else:
167                    dq_processed_dfs[spec.spec_id] = df_processed_df
168            return dq_processed_dfs
169
170    def write(self, data: OrderedDict) -> OrderedDict:
171        """Write the data that was read and transformed (if applicable).
172
173        It supports writing multiple datasets. However, we only recommend to write one
174        dataframe. This recommendation is based on easy debugging and reproducibility,
175        since if we start mixing several datasets being fueled by the same algorithm, it
176        would unleash an infinite sea of reproducibility issues plus tight coupling and
177        dependencies between datasets. Having said that, there may be cases where
178        writing multiple datasets is desirable according to the use case requirements.
179        Use it accordingly.
180
181        Args:
182            data: dataframes that were read and transformed (if applicable).
183
184        Returns:
185            Dataframes that were written.
186        """
187        written_dfs: OrderedDict = OrderedDict({})
188        for spec in self.output_specs:
189            self._logger.info(f"Found output specification: {spec}")
190
191            written_output = WriterFactory.get_writer(
192                spec, data[spec.input_id], data
193            ).write()
194            if written_output:
195                written_dfs.update(written_output)
196            else:
197                written_dfs[spec.spec_id] = data[spec.input_id]
198
199        return written_dfs
200
201    def terminate(self, data: OrderedDict) -> None:
202        """Terminate the algorithm.
203
204        Args:
205            data: dataframes that were written.
206        """
207        if self.terminate_specs:
208            for spec in self.terminate_specs:
209                self._logger.info(f"Found terminate specification: {spec}")
210                TerminatorFactory.execute_terminator(
211                    spec, data[spec.input_id] if spec.input_id else None
212                )
213
214    def execute(self) -> Optional[OrderedDict]:
215        """Define the algorithm execution behaviour."""
216        try:
217            self._logger.info("Starting read stage...")
218            read_dfs = self.read()
219            self._logger.info("Starting transform stage...")
220            transformed_dfs = self.transform(read_dfs)
221            self._logger.info("Starting data quality stage...")
222            validated_dfs = self.process_dq(transformed_dfs)
223            self._logger.info("Starting write stage...")
224            written_dfs = self.write(validated_dfs)
225            self._logger.info("Starting terminate stage...")
226            self.terminate(written_dfs)
227            self._logger.info("Execution of the algorithm has finished!")
228        except Exception as e:
229            NotifierFactory.generate_failure_notification(self.terminate_specs, e)
230            raise e
231
232        return written_dfs
233
234    def _get_input_specs(self) -> List[InputSpec]:
235        """Get the input specifications from an acon.
236
237        Returns:
238            List of input specifications.
239        """
240        return [InputSpec(**spec) for spec in self.acon["input_specs"]]
241
242    def _get_transform_specs(self) -> List[TransformSpec]:
243        """Get the transformation specifications from an acon.
244
245        If we are executing the algorithm in streaming mode and if the
246        transformer function is not supported in streaming mode, it is
247        important to note that ONLY those unsupported operations will
248        go into the streaming_micro_batch_transformers (see if in the function code),
249        in the same order that they appear in the list of transformations. This means
250        that other supported transformations that appear after an
251        unsupported one continue to stay one the normal execution plan,
252        i.e., outside the foreachBatch function. Therefore, this may
253        make your algorithm to execute a different logic than the one you
254        originally intended. For this reason:
255            1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST;
256            2) USE force_streaming_foreach_batch_processing option in transform_spec
257            section.
258            3) USE THE CUSTOM_TRANSFORMATION AND WRITE ALL YOUR TRANSFORMATION LOGIC
259            THERE.
260
261        Check list of unsupported spark streaming operations here:
262        https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations
263
264        Returns:
265            List of transformation specifications.
266        """
267        input_read_types = self._get_input_read_types(self.acon["input_specs"])
268        transform_input_ids = self._get_transform_input_ids(
269            self.acon.get("transform_specs", [])
270        )
271        prev_spec_read_types = self._get_previous_spec_read_types(
272            input_read_types, transform_input_ids
273        )
274        transform_specs = []
275        for spec in self.acon.get("transform_specs", []):
276            transform_spec = TransformSpec(
277                spec_id=spec["spec_id"],
278                input_id=spec["input_id"],
279                transformers=[],
280                force_streaming_foreach_batch_processing=spec.get(
281                    "force_streaming_foreach_batch_processing", False
282                ),
283            )
284
285            for s in spec["transformers"]:
286                transformer_spec = TransformerSpec(
287                    function=s["function"], args=s.get("args", {})
288                )
289                if (
290                    prev_spec_read_types[transform_spec.input_id]
291                    == ReadType.STREAMING.value
292                    and s["function"]
293                    in TransformerFactory.UNSUPPORTED_STREAMING_TRANSFORMERS
294                ) or (
295                    prev_spec_read_types[transform_spec.input_id]
296                    == ReadType.STREAMING.value
297                    and transform_spec.force_streaming_foreach_batch_processing
298                ):
299                    self._move_to_streaming_micro_batch_transformers(
300                        transform_spec, transformer_spec
301                    )
302                else:
303                    transform_spec.transformers.append(transformer_spec)
304
305            transform_specs.append(transform_spec)
306
307        return transform_specs
308
309    def _get_dq_specs(self) -> List[DQSpec]:
310        """Get list of data quality specification objects from acon.
311
312        In streaming mode, we automatically convert the data quality specification in
313        the streaming_micro_batch_dq_processors list for the respective output spec.
314        This is needed because our dq process cannot be executed using native streaming
315        functions.
316
317        Returns:
318            List of data quality spec objects.
319        """
320        input_read_types = self._get_input_read_types(self.acon["input_specs"])
321        transform_input_ids = self._get_transform_input_ids(
322            self.acon.get("transform_specs", [])
323        )
324        prev_spec_read_types = self._get_previous_spec_read_types(
325            input_read_types, transform_input_ids
326        )
327
328        dq_specs = []
329        for spec in self.acon.get("dq_specs", []):
330
331            dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(spec)
332
333            if prev_spec_read_types[dq_spec.input_id] == ReadType.STREAMING.value:
334                # we need to use deepcopy to explicitly create a copy of the dict
335                # otherwise python only create binding for dicts, and we would be
336                # modifying the original dict, which we don't want to.
337                self._move_to_streaming_micro_batch_dq_processors(
338                    deepcopy(dq_spec), dq_functions, critical_functions
339                )
340            else:
341                dq_spec.dq_functions = dq_functions
342                dq_spec.critical_functions = critical_functions
343
344            self._logger.info(
345                f"Streaming Micro Batch DQ Plan: "
346                f"{str(self._streaming_micro_batch_dq_plan)}"
347            )
348            dq_specs.append(dq_spec)
349
350        return dq_specs
351
352    def _get_output_specs(self) -> List[OutputSpec]:
353        """Get the output specifications from an acon.
354
355        Returns:
356            List of output specifications.
357        """
358        return [
359            OutputSpec(
360                spec_id=spec["spec_id"],
361                input_id=spec["input_id"],
362                write_type=spec.get("write_type", None),
363                data_format=spec.get("data_format", OutputFormat.DELTAFILES.value),
364                db_table=spec.get("db_table", None),
365                location=spec.get("location", None),
366                merge_opts=(
367                    MergeOptions(**spec["merge_opts"])
368                    if spec.get("merge_opts")
369                    else None
370                ),
371                partitions=spec.get("partitions", []),
372                streaming_micro_batch_transformers=self._get_streaming_transformer_plan(
373                    spec["input_id"], self.dq_specs
374                ),
375                streaming_once=spec.get("streaming_once", None),
376                streaming_processing_time=spec.get("streaming_processing_time", None),
377                streaming_available_now=spec.get(
378                    "streaming_available_now",
379                    (
380                        False
381                        if (
382                            spec.get("streaming_once", None)
383                            or spec.get("streaming_processing_time", None)
384                            or spec.get("streaming_continuous", None)
385                        )
386                        else True
387                    ),
388                ),
389                streaming_continuous=spec.get("streaming_continuous", None),
390                streaming_await_termination=spec.get(
391                    "streaming_await_termination", True
392                ),
393                streaming_await_termination_timeout=spec.get(
394                    "streaming_await_termination_timeout", None
395                ),
396                with_batch_id=spec.get("with_batch_id", False),
397                options=spec.get("options", None),
398                streaming_micro_batch_dq_processors=(
399                    self._streaming_micro_batch_dq_plan.get(spec["input_id"], [])
400                ),
401            )
402            for spec in self.acon["output_specs"]
403        ]
404
405    def _get_streaming_transformer_plan(
406        self, input_id: str, dq_specs: Optional[List[DQSpec]]
407    ) -> List[TransformerSpec]:
408        """Gets the plan for transformations to be applied on streaming micro batches.
409
410        When running both DQ processes and transformations in streaming micro batches,
411        the _streaming_micro_batch_transformers_plan to consider is the one associated
412        with the transformer spec_id and not with the dq spec_id. Thus, on those cases,
413        this method maps the input id of the output_spec (which is the spec_id of a
414        dq_spec) with the dependent transformer spec_id.
415
416        Args:
417            input_id: id of the corresponding input specification.
418            dq_specs: data quality specifications.
419
420        Returns:
421            a list of TransformerSpec, representing the transformations plan.
422        """
423        transformer_id = (
424            [dq_spec.input_id for dq_spec in dq_specs if dq_spec.spec_id == input_id][0]
425            if self._streaming_micro_batch_dq_plan.get(input_id)
426            and self._streaming_micro_batch_transformers_plan
427            else input_id
428        )
429
430        streaming_micro_batch_transformers_plan: list[TransformerSpec] = (
431            self._streaming_micro_batch_transformers_plan.get(transformer_id, [])
432        )
433
434        return streaming_micro_batch_transformers_plan
435
436    def _get_terminate_specs(self) -> List[TerminatorSpec]:
437        """Get the terminate specifications from an acon.
438
439        Returns:
440            List of terminate specifications.
441        """
442        return [TerminatorSpec(**spec) for spec in self.acon.get("terminate_specs", [])]
443
444    def _move_to_streaming_micro_batch_transformers(
445        self, transform_spec: TransformSpec, transformer_spec: TransformerSpec
446    ) -> None:
447        """Move the transformer to the list of streaming micro batch transformations.
448
449        If the transform specs contain functions that cannot be executed in streaming
450        mode, this function sends those functions to the output specs
451        streaming_micro_batch_transformers, where they will be executed inside the
452        stream foreachBatch function.
453
454        To accomplish that we use an instance variable that associates the
455        streaming_micro_batch_transformers to each output spec, in order to do reverse
456        lookup when creating the OutputSpec.
457
458        Args:
459            transform_spec: transform specification (overall
460                transformation specification - a transformation may contain multiple
461                transformers).
462            transformer_spec: the specific transformer function and arguments.
463        """
464        if transform_spec.spec_id not in self._streaming_micro_batch_transformers_plan:
465            self._streaming_micro_batch_transformers_plan[transform_spec.spec_id] = []
466
467        self._streaming_micro_batch_transformers_plan[transform_spec.spec_id].append(
468            transformer_spec
469        )
470
471    def _move_to_streaming_micro_batch_dq_processors(
472        self,
473        dq_spec: DQSpec,
474        dq_functions: List[DQFunctionSpec],
475        critical_functions: List[DQFunctionSpec],
476    ) -> None:
477        """Move the dq function to the list of streaming micro batch transformations.
478
479        If the dq specs contain functions that cannot be executed in streaming mode,
480        this function sends those functions to the output specs
481        streaming_micro_batch_dq_processors, where they will be executed inside the
482        stream foreachBatch function.
483
484        To accomplish that we use an instance variable that associates the
485        streaming_micro_batch_dq_processors to each output spec, in order to do reverse
486        lookup when creating the OutputSpec.
487
488        Args:
489            dq_spec: dq specification (overall dq process specification).
490            dq_functions: the list of dq functions to be considered.
491            critical_functions: list of critical functions to be considered.
492        """
493        if dq_spec.spec_id not in self._streaming_micro_batch_dq_plan:
494            self._streaming_micro_batch_dq_plan[dq_spec.spec_id] = []
495
496        dq_spec.dq_functions = dq_functions
497        dq_spec.critical_functions = critical_functions
498        self._streaming_micro_batch_dq_plan[dq_spec.spec_id].append(dq_spec)
499
500    @staticmethod
501    def _get_input_read_types(list_of_specs: List) -> dict:
502        """Get a dict of spec ids and read types from a list of input specs.
503
504        Args:
505            list_of_specs: list of input specs ([{k:v}]).
506
507        Returns:
508            Dict of {input_spec_id: read_type}.
509        """
510        return {item["spec_id"]: item["read_type"] for item in list_of_specs}
511
512    @staticmethod
513    def _get_transform_input_ids(list_of_specs: List) -> dict:
514        """Get a dict of transform spec ids and input ids from list of transform specs.
515
516        Args:
517            list_of_specs: list of transform specs ([{k:v}]).
518
519        Returns:
520            Dict of {transform_spec_id: input_id}.
521        """
522        return {item["spec_id"]: item["input_id"] for item in list_of_specs}
523
524    @staticmethod
525    def _get_previous_spec_read_types(
526        input_read_types: dict, transform_input_ids: dict
527    ) -> dict:
528        """Get the read types of the previous specification: input and/or transform.
529
530        For the chaining transformations and for DQ process to work seamlessly in batch
531        and streaming mode, we have to figure out if the previous spec to the transform
532        or dq spec(e.g., input spec or transform spec) refers to a batch read type or
533        a streaming read type.
534
535        Args:
536            input_read_types: dict of {input_spec_id: read_type}.
537            transform_input_ids: dict of {transform_spec_id: input_id}.
538
539        Returns:
540            Dict of {input_spec_id or transform_spec_id: read_type}
541        """
542        combined_read_types = input_read_types
543        for spec_id, input_id in transform_input_ids.items():
544            combined_read_types[spec_id] = combined_read_types[input_id]
545
546        return combined_read_types

Load data using an algorithm configuration (ACON represented as dict).

This algorithm focuses on the cases where users will be specifying all the algorithm steps and configurations through a dict based configuration, which we name ACON in our framework.

Since an ACON is a dict you can pass a custom transformer through a python function and, therefore, the DataLoader can also be used to load data with custom transformations not provided in our transformers package.

As the algorithm base class of the lakehouse-engine framework is based on the concept of ACON, this DataLoader algorithm simply inherits from Algorithm, without overriding anything. We designed the codebase like this to avoid instantiating the Algorithm class directly, which was always meant to be an abstraction for any specific algorithm included in the lakehouse-engine framework.

DataLoader(acon: dict)
49    def __init__(self, acon: dict):
50        """Construct DataLoader algorithm instances.
51
52        A data loader needs several specifications to work properly,
53        but some of them might be optional. The available specifications are:
54
55        - input specifications (mandatory): specify how to read data.
56        - transform specifications (optional): specify how to transform data.
57        - data quality specifications (optional): specify how to execute the data
58            quality process.
59        - output specifications (mandatory): specify how to write data to the
60            target.
61        - terminate specifications (optional): specify what to do after writing into
62            the target (e.g., optimizing target table, vacuum, compute stats, etc).
63
64        Args:
65            acon: algorithm configuration.
66        """
67        self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger()
68        super().__init__(acon)
69        self.input_specs: List[InputSpec] = self._get_input_specs()
70        # the streaming transformers plan is needed to future change the
71        # execution specification to accommodate streaming mode limitations in invoking
72        # certain functions (e.g., sort, window, generate row ids/auto increments, ...).
73        self._streaming_micro_batch_transformers_plan: dict = {}
74        self.transform_specs: List[TransformSpec] = self._get_transform_specs()
75        # our data quality process is not compatible with streaming mode, hence we
76        # have to run it in micro batches, similar to what happens to certain
77        # transformation functions not supported in streaming mode.
78        self._streaming_micro_batch_dq_plan: dict = {}
79        self.dq_specs: List[DQSpec] = self._get_dq_specs()
80        self.output_specs: List[OutputSpec] = self._get_output_specs()
81        self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()

Construct DataLoader algorithm instances.

A data loader needs several specifications to work properly, but some of them might be optional. The available specifications are:

  • input specifications (mandatory): specify how to read data.
  • transform specifications (optional): specify how to transform data.
  • data quality specifications (optional): specify how to execute the data quality process.
  • output specifications (mandatory): specify how to write data to the target.
  • terminate specifications (optional): specify what to do after writing into the target (e.g., optimizing target table, vacuum, compute stats, etc).
Arguments:
  • acon: algorithm configuration.
def read(self) -> collections.OrderedDict:
83    def read(self) -> OrderedDict:
84        """Read data from an input location into a distributed dataframe.
85
86        Returns:
87             An ordered dict with all the dataframes that were read.
88        """
89        read_dfs: OrderedDict = OrderedDict({})
90        for spec in self.input_specs:
91            self._logger.info(f"Found input specification: {spec}")
92            read_dfs[spec.spec_id] = ReaderFactory.get_data(spec)
93        return read_dfs

Read data from an input location into a distributed dataframe.

Returns:

An ordered dict with all the dataframes that were read.

def transform(self, data: collections.OrderedDict) -> collections.OrderedDict:
 95    def transform(self, data: OrderedDict) -> OrderedDict:
 96        """Transform (optionally) the data that was read.
 97
 98        If there isn't a transformation specification this step will be skipped, and the
 99        original dataframes that were read will be returned.
100        Transformations can have dependency from another transformation result, however
101        we need to keep in mind if we are using streaming source and for some reason we
102        need to enable micro batch processing, this result cannot be used as input to
103        another transformation. Micro batch processing in pyspark streaming is only
104        available in .write(), which means this transformation with micro batch needs
105        to be the end of the process.
106
107        Args:
108            data: input dataframes in an ordered dict.
109
110        Returns:
111            Another ordered dict with the transformed dataframes, according to the
112            transformation specification.
113        """
114        if not self.transform_specs:
115            return data
116        else:
117            transformed_dfs = OrderedDict(data)
118            for spec in self.transform_specs:
119                self._logger.info(f"Found transform specification: {spec}")
120                transformed_df = transformed_dfs[spec.input_id]
121                for transformer in spec.transformers:
122                    transformed_df = transformed_df.transform(
123                        TransformerFactory.get_transformer(transformer, transformed_dfs)
124                    )
125                transformed_dfs[spec.spec_id] = transformed_df
126            return transformed_dfs

Transform (optionally) the data that was read.

If there isn't a transformation specification this step will be skipped, and the original dataframes that were read will be returned. Transformations can have dependency from another transformation result, however we need to keep in mind if we are using streaming source and for some reason we need to enable micro batch processing, this result cannot be used as input to another transformation. Micro batch processing in pyspark streaming is only available in .write(), which means this transformation with micro batch needs to be the end of the process.

Arguments:
  • data: input dataframes in an ordered dict.
Returns:

Another ordered dict with the transformed dataframes, according to the transformation specification.

def process_dq(self, data: collections.OrderedDict) -> collections.OrderedDict:
128    def process_dq(self, data: OrderedDict) -> OrderedDict:
129        """Process the data quality tasks for the data that was read and/or transformed.
130
131        It supports multiple input dataframes. Although just one is advisable.
132
133        It is possible to use data quality validators/expectations that will validate
134        your data and fail the process in case the expectations are not met. The DQ
135        process also generates and keeps updating a site containing the results of the
136        expectations that were done on your data. The location of the site is
137        configurable and can either be on file system or S3. If you define it to be
138        stored on S3, you can even configure your S3 bucket to serve the site so that
139        people can easily check the quality of your data. Moreover, it is also
140        possible to store the result of the DQ process into a defined result sink.
141
142        Args:
143            data: dataframes from previous steps of the algorithm that we which to
144                run the DQ process on.
145
146        Returns:
147            Another ordered dict with the validated dataframes.
148        """
149        if not self.dq_specs:
150            return data
151        else:
152            from lakehouse_engine.dq_processors.dq_factory import DQFactory
153
154            dq_processed_dfs = OrderedDict(data)
155            for spec in self.dq_specs:
156                df_processed_df = dq_processed_dfs[spec.input_id]
157                self._logger.info(f"Found data quality specification: {spec}")
158                if (
159                    spec.dq_type == DQType.PRISMA.value or spec.dq_functions
160                ) and spec.spec_id not in self._streaming_micro_batch_dq_plan:
161                    if spec.cache_df:
162                        df_processed_df.cache()
163                    dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process(
164                        spec, df_processed_df
165                    )
166                else:
167                    dq_processed_dfs[spec.spec_id] = df_processed_df
168            return dq_processed_dfs

Process the data quality tasks for the data that was read and/or transformed.

It supports multiple input dataframes. Although just one is advisable.

It is possible to use data quality validators/expectations that will validate your data and fail the process in case the expectations are not met. The DQ process also generates and keeps updating a site containing the results of the expectations that were done on your data. The location of the site is configurable and can either be on file system or S3. If you define it to be stored on S3, you can even configure your S3 bucket to serve the site so that people can easily check the quality of your data. Moreover, it is also possible to store the result of the DQ process into a defined result sink.

Arguments:
  • data: dataframes from previous steps of the algorithm that we which to run the DQ process on.
Returns:

Another ordered dict with the validated dataframes.

def write(self, data: collections.OrderedDict) -> collections.OrderedDict:
170    def write(self, data: OrderedDict) -> OrderedDict:
171        """Write the data that was read and transformed (if applicable).
172
173        It supports writing multiple datasets. However, we only recommend to write one
174        dataframe. This recommendation is based on easy debugging and reproducibility,
175        since if we start mixing several datasets being fueled by the same algorithm, it
176        would unleash an infinite sea of reproducibility issues plus tight coupling and
177        dependencies between datasets. Having said that, there may be cases where
178        writing multiple datasets is desirable according to the use case requirements.
179        Use it accordingly.
180
181        Args:
182            data: dataframes that were read and transformed (if applicable).
183
184        Returns:
185            Dataframes that were written.
186        """
187        written_dfs: OrderedDict = OrderedDict({})
188        for spec in self.output_specs:
189            self._logger.info(f"Found output specification: {spec}")
190
191            written_output = WriterFactory.get_writer(
192                spec, data[spec.input_id], data
193            ).write()
194            if written_output:
195                written_dfs.update(written_output)
196            else:
197                written_dfs[spec.spec_id] = data[spec.input_id]
198
199        return written_dfs

Write the data that was read and transformed (if applicable).

It supports writing multiple datasets. However, we only recommend to write one dataframe. This recommendation is based on easy debugging and reproducibility, since if we start mixing several datasets being fueled by the same algorithm, it would unleash an infinite sea of reproducibility issues plus tight coupling and dependencies between datasets. Having said that, there may be cases where writing multiple datasets is desirable according to the use case requirements. Use it accordingly.

Arguments:
  • data: dataframes that were read and transformed (if applicable).
Returns:

Dataframes that were written.

def terminate(self, data: collections.OrderedDict) -> None:
201    def terminate(self, data: OrderedDict) -> None:
202        """Terminate the algorithm.
203
204        Args:
205            data: dataframes that were written.
206        """
207        if self.terminate_specs:
208            for spec in self.terminate_specs:
209                self._logger.info(f"Found terminate specification: {spec}")
210                TerminatorFactory.execute_terminator(
211                    spec, data[spec.input_id] if spec.input_id else None
212                )

Terminate the algorithm.

Arguments:
  • data: dataframes that were written.
def execute(self) -> Optional[collections.OrderedDict]:
214    def execute(self) -> Optional[OrderedDict]:
215        """Define the algorithm execution behaviour."""
216        try:
217            self._logger.info("Starting read stage...")
218            read_dfs = self.read()
219            self._logger.info("Starting transform stage...")
220            transformed_dfs = self.transform(read_dfs)
221            self._logger.info("Starting data quality stage...")
222            validated_dfs = self.process_dq(transformed_dfs)
223            self._logger.info("Starting write stage...")
224            written_dfs = self.write(validated_dfs)
225            self._logger.info("Starting terminate stage...")
226            self.terminate(written_dfs)
227            self._logger.info("Execution of the algorithm has finished!")
228        except Exception as e:
229            NotifierFactory.generate_failure_notification(self.terminate_specs, e)
230            raise e
231
232        return written_dfs

Define the algorithm execution behaviour.