Skip to content

Data loader

Module to define DataLoader class.

DataLoader

Bases: Algorithm

Load data using an algorithm configuration (ACON represented as dict).

This algorithm focuses on the cases where users will be specifying all the algorithm steps and configurations through a dict based configuration, which we name ACON in our framework.

Since an ACON is a dict you can pass a custom transformer through a python function and, therefore, the DataLoader can also be used to load data with custom transformations not provided in our transformers package.

As the algorithm base class of the lakehouse-engine framework is based on the concept of ACON, this DataLoader algorithm simply inherits from Algorithm, without overriding anything. We designed the codebase like this to avoid instantiating the Algorithm class directly, which was always meant to be an abstraction for any specific algorithm included in the lakehouse-engine framework.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
class DataLoader(Algorithm):
    """Load data using an algorithm configuration (ACON represented as dict).

    This algorithm focuses on the cases where users will be specifying all the algorithm
    steps and configurations through a dict based configuration, which we name ACON
    in our framework.

    Since an ACON is a dict you can pass a custom transformer through a python function
    and, therefore, the DataLoader can also be used to load data with custom
    transformations not provided in our transformers package.

    As the algorithm base class of the lakehouse-engine framework is based on the
    concept of ACON, this DataLoader algorithm simply inherits from Algorithm,
    without overriding anything. We designed the codebase like this to avoid
    instantiating the Algorithm class directly, which was always meant to be an
    abstraction for any specific algorithm included in the lakehouse-engine framework.
    """

    def __init__(self, acon: dict):
        """Construct DataLoader algorithm instances.

        A data loader needs several specifications to work properly,
        but some of them might be optional. The available specifications are:

        - input specifications (mandatory): specify how to read data.
        - transform specifications (optional): specify how to transform data.
        - data quality specifications (optional): specify how to execute the data
            quality process.
        - output specifications (mandatory): specify how to write data to the
            target.
        - terminate specifications (optional): specify what to do after writing into
            the target (e.g., optimizing target table, vacuum, compute stats, etc).

        Args:
            acon: algorithm configuration.
        """
        self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger()
        super().__init__(acon)
        self.input_specs: List[InputSpec] = self._get_input_specs()
        # the streaming transformers plan is needed to future change the
        # execution specification to accommodate streaming mode limitations in invoking
        # certain functions (e.g., sort, window, generate row ids/auto increments, ...).
        self._streaming_micro_batch_transformers_plan: dict = {}
        self.transform_specs: List[TransformSpec] = self._get_transform_specs()
        # our data quality process is not compatible with streaming mode, hence we
        # have to run it in micro batches, similar to what happens to certain
        # transformation functions not supported in streaming mode.
        self._streaming_micro_batch_dq_plan: dict = {}
        self.dq_specs: List[DQSpec] = self._get_dq_specs()
        self.output_specs: List[OutputSpec] = self._get_output_specs()
        self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()

    def read(self) -> OrderedDict:
        """Read data from an input location into a distributed dataframe.

        Returns:
             An ordered dict with all the dataframes that were read.
        """
        read_dfs: OrderedDict = OrderedDict({})
        for spec in self.input_specs:
            self._logger.info(f"Found input specification: {spec}")
            read_dfs[spec.spec_id] = ReaderFactory.get_data(spec)
        return read_dfs

    def transform(self, data: OrderedDict) -> OrderedDict:
        """Transform (optionally) the data that was read.

        If there isn't a transformation specification this step will be skipped, and the
        original dataframes that were read will be returned.
        Transformations can have dependency from another transformation result, however
        we need to keep in mind if we are using streaming source and for some reason we
        need to enable micro batch processing, this result cannot be used as input to
        another transformation. Micro batch processing in pyspark streaming is only
        available in .write(), which means this transformation with micro batch needs
        to be the end of the process.

        Args:
            data: input dataframes in an ordered dict.

        Returns:
            Another ordered dict with the transformed dataframes, according to the
            transformation specification.
        """
        if not self.transform_specs:
            return data
        else:
            transformed_dfs = OrderedDict(data)
            for spec in self.transform_specs:
                self._logger.info(f"Found transform specification: {spec}")
                transformed_df = transformed_dfs[spec.input_id]
                for transformer in spec.transformers:
                    transformed_df = transformed_df.transform(
                        TransformerFactory.get_transformer(transformer, transformed_dfs)
                    )
                transformed_dfs[spec.spec_id] = transformed_df
            return transformed_dfs

    def process_dq(
        self, data: OrderedDict
    ) -> tuple[OrderedDict, Optional[dict[str, str]]]:
        """Process the data quality tasks for the data that was read and/or transformed.

        It supports multiple input dataframes. Although just one is advisable.

        It is possible to use data quality validators/expectations that will validate
        your data and fail the process in case the expectations are not met. The DQ
        process also generates and keeps updating a site containing the results of the
        expectations that were done on your data. The location of the site is
        configurable and can either be on file system or S3. If you define it to be
        stored on S3, you can even configure your S3 bucket to serve the site so that
        people can easily check the quality of your data. Moreover, it is also
        possible to store the result of the DQ process into a defined result sink.

        Args:
            data: dataframes from previous steps of the algorithm that we which to
                run the DQ process on.

        Returns:
            Another ordered dict with the validated dataframes and
            a dictionary with the errors if they exist, or None.
        """
        if not self.dq_specs:
            return data, None

        dq_processed_dfs, error = self._verify_dq_rule_id_uniqueness(
            data, self.dq_specs
        )
        if error:
            return dq_processed_dfs, error
        else:
            from lakehouse_engine.dq_processors.dq_factory import DQFactory

            dq_processed_dfs = OrderedDict(data)
            for spec in self.dq_specs:
                df_processed_df = dq_processed_dfs[spec.input_id]
                self._logger.info(f"Found data quality specification: {spec}")
                if (
                    spec.dq_type == DQType.PRISMA.value or spec.dq_functions
                ) and spec.spec_id not in self._streaming_micro_batch_dq_plan:

                    if spec.cache_df:
                        df_processed_df.cache()
                    dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process(
                        spec, df_processed_df
                    )
                else:
                    dq_processed_dfs[spec.spec_id] = df_processed_df

            return dq_processed_dfs, None

    def write(self, data: OrderedDict) -> OrderedDict:
        """Write the data that was read and transformed (if applicable).

        It supports writing multiple datasets. However, we only recommend to write one
        dataframe. This recommendation is based on easy debugging and reproducibility,
        since if we start mixing several datasets being fueled by the same algorithm, it
        would unleash an infinite sea of reproducibility issues plus tight coupling and
        dependencies between datasets. Having said that, there may be cases where
        writing multiple datasets is desirable according to the use case requirements.
        Use it accordingly.

        Args:
            data: dataframes that were read and transformed (if applicable).

        Returns:
            Dataframes that were written.
        """
        written_dfs: OrderedDict = OrderedDict({})
        for spec in self.output_specs:
            self._logger.info(f"Found output specification: {spec}")

            written_output = WriterFactory.get_writer(
                spec, data[spec.input_id], data
            ).write()
            if written_output:
                written_dfs.update(written_output)
            else:
                written_dfs[spec.spec_id] = data[spec.input_id]

        return written_dfs

    def terminate(self, data: OrderedDict) -> None:
        """Terminate the algorithm.

        Args:
            data: dataframes that were written.
        """
        if self.terminate_specs:
            for spec in self.terminate_specs:
                self._logger.info(f"Found terminate specification: {spec}")
                TerminatorFactory.execute_terminator(
                    spec, data[spec.input_id] if spec.input_id else None
                )

    def execute(self) -> Optional[OrderedDict]:
        """Define the algorithm execution behaviour."""
        try:
            self._logger.info("Starting read stage...")
            read_dfs = self.read()
            self._logger.info("Starting transform stage...")
            transformed_dfs = self.transform(read_dfs)
            self._logger.info("Starting data quality stage...")
            validated_dfs, errors = self.process_dq(transformed_dfs)
            self._logger.info("Starting write stage...")
            written_dfs = self.write(validated_dfs)
            self._logger.info("Starting terminate stage...")
            self.terminate(written_dfs)
            self._logger.info("Execution of the algorithm has finished!")
        except Exception as e:
            NotifierFactory.generate_failure_notification(self.terminate_specs, e)
            raise e

        if errors:
            raise DQDuplicateRuleIdException(
                "Data Written Successfully, but DQ Process Encountered an Issue.\n"
                "We detected a duplicate dq_rule_id in the dq_spec definition. "
                "As a result, none of the Data Quality (DQ) processes (dq_spec) "
                "were executed.\n"
                "Please review and verify the following dq_rules:\n"
                f"{errors}"
            )

        return written_dfs

    def _get_input_specs(self) -> List[InputSpec]:
        """Get the input specifications from an acon.

        Returns:
            List of input specifications.
        """
        return [InputSpec(**spec) for spec in self.acon["input_specs"]]

    def _get_transform_specs(self) -> List[TransformSpec]:
        """Get the transformation specifications from an acon.

        If we are executing the algorithm in streaming mode and if the
        transformer function is not supported in streaming mode, it is
        important to note that ONLY those unsupported operations will
        go into the streaming_micro_batch_transformers (see if in the function code),
        in the same order that they appear in the list of transformations. This means
        that other supported transformations that appear after an
        unsupported one continue to stay one the normal execution plan,
        i.e., outside the foreachBatch function. Therefore, this may
        make your algorithm to execute a different logic than the one you
        originally intended. For this reason:
            1) ALWAYS PLACE UNSUPPORTED STREAMING TRANSFORMATIONS AT LAST;
            2) USE force_streaming_foreach_batch_processing option in transform_spec
            section.
            3) USE THE CUSTOM_TRANSFORMATION AND WRITE ALL YOUR TRANSFORMATION LOGIC
            THERE.

        Check list of unsupported spark streaming operations here:
        https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

        Returns:
            List of transformation specifications.
        """
        input_read_types = self._get_input_read_types(self.acon["input_specs"])
        transform_input_ids = self._get_transform_input_ids(
            self.acon.get("transform_specs", [])
        )
        prev_spec_read_types = self._get_previous_spec_read_types(
            input_read_types, transform_input_ids
        )
        transform_specs = []
        for spec in self.acon.get("transform_specs", []):
            transform_spec = TransformSpec(
                spec_id=spec["spec_id"],
                input_id=spec["input_id"],
                transformers=[],
                force_streaming_foreach_batch_processing=spec.get(
                    "force_streaming_foreach_batch_processing", False
                ),
            )

            for s in spec["transformers"]:
                transformer_spec = TransformerSpec(
                    function=s["function"], args=s.get("args", {})
                )
                if (
                    prev_spec_read_types[transform_spec.input_id]
                    == ReadType.STREAMING.value
                    and s["function"]
                    in TransformerFactory.UNSUPPORTED_STREAMING_TRANSFORMERS
                ) or (
                    prev_spec_read_types[transform_spec.input_id]
                    == ReadType.STREAMING.value
                    and transform_spec.force_streaming_foreach_batch_processing
                ):
                    self._move_to_streaming_micro_batch_transformers(
                        transform_spec, transformer_spec
                    )
                else:
                    transform_spec.transformers.append(transformer_spec)

            transform_specs.append(transform_spec)

        return transform_specs

    def _get_dq_specs(self) -> List[DQSpec]:
        """Get list of data quality specification objects from acon.

        In streaming mode, we automatically convert the data quality specification in
        the streaming_micro_batch_dq_processors list for the respective output spec.
        This is needed because our dq process cannot be executed using native streaming
        functions.

        Returns:
            List of data quality spec objects.
        """
        input_read_types = self._get_input_read_types(self.acon["input_specs"])
        transform_input_ids = self._get_transform_input_ids(
            self.acon.get("transform_specs", [])
        )
        prev_spec_read_types = self._get_previous_spec_read_types(
            input_read_types, transform_input_ids
        )

        dq_specs = []
        for spec in self.acon.get("dq_specs", []):

            dq_spec, dq_functions, critical_functions = Algorithm.get_dq_spec(spec)

            if prev_spec_read_types[dq_spec.input_id] == ReadType.STREAMING.value:
                # we need to use deepcopy to explicitly create a copy of the dict
                # otherwise python only create binding for dicts, and we would be
                # modifying the original dict, which we don't want to.
                self._move_to_streaming_micro_batch_dq_processors(
                    deepcopy(dq_spec), dq_functions, critical_functions
                )
            else:
                dq_spec.dq_functions = dq_functions
                dq_spec.critical_functions = critical_functions

            self._logger.info(
                f"Streaming Micro Batch DQ Plan: "
                f"{str(self._streaming_micro_batch_dq_plan)}"
            )
            dq_specs.append(dq_spec)

        return dq_specs

    def _get_output_specs(self) -> List[OutputSpec]:
        """Get the output specifications from an acon.

        Returns:
            List of output specifications.
        """
        return [
            OutputSpec(
                spec_id=spec["spec_id"],
                input_id=spec["input_id"],
                write_type=spec.get("write_type", None),
                data_format=spec.get("data_format", OutputFormat.DELTAFILES.value),
                db_table=spec.get("db_table", None),
                location=spec.get("location", None),
                merge_opts=(
                    MergeOptions(**spec["merge_opts"])
                    if spec.get("merge_opts")
                    else None
                ),
                sharepoint_opts=(
                    SharepointOptions(**spec["sharepoint_opts"])
                    if spec.get("sharepoint_opts")
                    else None
                ),
                partitions=spec.get("partitions", []),
                streaming_micro_batch_transformers=self._get_streaming_transformer_plan(
                    spec["input_id"], self.dq_specs
                ),
                streaming_once=spec.get("streaming_once", None),
                streaming_processing_time=spec.get("streaming_processing_time", None),
                streaming_available_now=spec.get(
                    "streaming_available_now",
                    (
                        False
                        if (
                            spec.get("streaming_once", None)
                            or spec.get("streaming_processing_time", None)
                            or spec.get("streaming_continuous", None)
                        )
                        else True
                    ),
                ),
                streaming_continuous=spec.get("streaming_continuous", None),
                streaming_await_termination=spec.get(
                    "streaming_await_termination", True
                ),
                streaming_await_termination_timeout=spec.get(
                    "streaming_await_termination_timeout", None
                ),
                with_batch_id=spec.get("with_batch_id", False),
                options=spec.get("options", None),
                streaming_micro_batch_dq_processors=(
                    self._streaming_micro_batch_dq_plan.get(spec["input_id"], [])
                ),
            )
            for spec in self.acon["output_specs"]
        ]

    def _get_streaming_transformer_plan(
        self, input_id: str, dq_specs: Optional[List[DQSpec]]
    ) -> List[TransformerSpec]:
        """Gets the plan for transformations to be applied on streaming micro batches.

        When running both DQ processes and transformations in streaming micro batches,
        the _streaming_micro_batch_transformers_plan to consider is the one associated
        with the transformer spec_id and not with the dq spec_id. Thus, on those cases,
        this method maps the input id of the output_spec (which is the spec_id of a
        dq_spec) with the dependent transformer spec_id.

        Args:
            input_id: id of the corresponding input specification.
            dq_specs: data quality specifications.

        Returns:
            a list of TransformerSpec, representing the transformations plan.
        """
        transformer_id = (
            [dq_spec.input_id for dq_spec in dq_specs if dq_spec.spec_id == input_id][0]
            if self._streaming_micro_batch_dq_plan.get(input_id)
            and self._streaming_micro_batch_transformers_plan
            else input_id
        )

        streaming_micro_batch_transformers_plan: list[TransformerSpec] = (
            self._streaming_micro_batch_transformers_plan.get(transformer_id, [])
        )

        return streaming_micro_batch_transformers_plan

    def _get_terminate_specs(self) -> List[TerminatorSpec]:
        """Get the terminate specifications from an acon.

        Returns:
            List of terminate specifications.
        """
        return [TerminatorSpec(**spec) for spec in self.acon.get("terminate_specs", [])]

    def _move_to_streaming_micro_batch_transformers(
        self, transform_spec: TransformSpec, transformer_spec: TransformerSpec
    ) -> None:
        """Move the transformer to the list of streaming micro batch transformations.

        If the transform specs contain functions that cannot be executed in streaming
        mode, this function sends those functions to the output specs
        streaming_micro_batch_transformers, where they will be executed inside the
        stream foreachBatch function.

        To accomplish that we use an instance variable that associates the
        streaming_micro_batch_transformers to each output spec, in order to do reverse
        lookup when creating the OutputSpec.

        Args:
            transform_spec: transform specification (overall
                transformation specification - a transformation may contain multiple
                transformers).
            transformer_spec: the specific transformer function and arguments.
        """
        if transform_spec.spec_id not in self._streaming_micro_batch_transformers_plan:
            self._streaming_micro_batch_transformers_plan[transform_spec.spec_id] = []

        self._streaming_micro_batch_transformers_plan[transform_spec.spec_id].append(
            transformer_spec
        )

    def _move_to_streaming_micro_batch_dq_processors(
        self,
        dq_spec: DQSpec,
        dq_functions: List[DQFunctionSpec],
        critical_functions: List[DQFunctionSpec],
    ) -> None:
        """Move the dq function to the list of streaming micro batch transformations.

        If the dq specs contain functions that cannot be executed in streaming mode,
        this function sends those functions to the output specs
        streaming_micro_batch_dq_processors, where they will be executed inside the
        stream foreachBatch function.

        To accomplish that we use an instance variable that associates the
        streaming_micro_batch_dq_processors to each output spec, in order to do reverse
        lookup when creating the OutputSpec.

        Args:
            dq_spec: dq specification (overall dq process specification).
            dq_functions: the list of dq functions to be considered.
            critical_functions: list of critical functions to be considered.
        """
        if dq_spec.spec_id not in self._streaming_micro_batch_dq_plan:
            self._streaming_micro_batch_dq_plan[dq_spec.spec_id] = []

        dq_spec.dq_functions = dq_functions
        dq_spec.critical_functions = critical_functions
        self._streaming_micro_batch_dq_plan[dq_spec.spec_id].append(dq_spec)

    @staticmethod
    def _get_input_read_types(list_of_specs: List) -> dict:
        """Get a dict of spec ids and read types from a list of input specs.

        Args:
            list_of_specs: list of input specs ([{k:v}]).

        Returns:
            Dict of {input_spec_id: read_type}.
        """
        return {item["spec_id"]: item["read_type"] for item in list_of_specs}

    @staticmethod
    def _get_transform_input_ids(list_of_specs: List) -> dict:
        """Get a dict of transform spec ids and input ids from list of transform specs.

        Args:
            list_of_specs: list of transform specs ([{k:v}]).

        Returns:
            Dict of {transform_spec_id: input_id}.
        """
        return {item["spec_id"]: item["input_id"] for item in list_of_specs}

    @staticmethod
    def _get_previous_spec_read_types(
        input_read_types: dict, transform_input_ids: dict
    ) -> dict:
        """Get the read types of the previous specification: input and/or transform.

        For the chaining transformations and for DQ process to work seamlessly in batch
        and streaming mode, we have to figure out if the previous spec to the transform
        or dq spec(e.g., input spec or transform spec) refers to a batch read type or
        a streaming read type.

        Args:
            input_read_types: dict of {input_spec_id: read_type}.
            transform_input_ids: dict of {transform_spec_id: input_id}.

        Returns:
            Dict of {input_spec_id or transform_spec_id: read_type}
        """
        combined_read_types = input_read_types
        for spec_id, input_id in transform_input_ids.items():
            combined_read_types[spec_id] = combined_read_types[input_id]

        return combined_read_types

    @staticmethod
    def _verify_dq_rule_id_uniqueness(
        data: OrderedDict, dq_specs: list[DQSpec]
    ) -> tuple[OrderedDict, dict[str, str]]:
        """Verify the uniqueness of dq_rule_id.

        Verify the existence of duplicate dq_rule_id values
        and prepare the DataFrame for the next stage.

        Args:
            data: dataframes.
            dq_specs: a list of DQSpec to be validated.

        Returns:
             processed df and error if existed.
        """
        error_dict = PrismaUtils.validate_rule_id_duplication(dq_specs)
        dq_processed_dfs = OrderedDict(data)
        for spec in dq_specs:
            df_processed_df = dq_processed_dfs[spec.input_id]
            dq_processed_dfs[spec.spec_id] = df_processed_df
        return dq_processed_dfs, error_dict

__init__(acon)

Construct DataLoader algorithm instances.

A data loader needs several specifications to work properly, but some of them might be optional. The available specifications are:

  • input specifications (mandatory): specify how to read data.
  • transform specifications (optional): specify how to transform data.
  • data quality specifications (optional): specify how to execute the data quality process.
  • output specifications (mandatory): specify how to write data to the target.
  • terminate specifications (optional): specify what to do after writing into the target (e.g., optimizing target table, vacuum, compute stats, etc).

Parameters:

Name Type Description Default
acon dict

algorithm configuration.

required
Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def __init__(self, acon: dict):
    """Construct DataLoader algorithm instances.

    A data loader needs several specifications to work properly,
    but some of them might be optional. The available specifications are:

    - input specifications (mandatory): specify how to read data.
    - transform specifications (optional): specify how to transform data.
    - data quality specifications (optional): specify how to execute the data
        quality process.
    - output specifications (mandatory): specify how to write data to the
        target.
    - terminate specifications (optional): specify what to do after writing into
        the target (e.g., optimizing target table, vacuum, compute stats, etc).

    Args:
        acon: algorithm configuration.
    """
    self._logger: Logger = LoggingHandler(self.__class__.__name__).get_logger()
    super().__init__(acon)
    self.input_specs: List[InputSpec] = self._get_input_specs()
    # the streaming transformers plan is needed to future change the
    # execution specification to accommodate streaming mode limitations in invoking
    # certain functions (e.g., sort, window, generate row ids/auto increments, ...).
    self._streaming_micro_batch_transformers_plan: dict = {}
    self.transform_specs: List[TransformSpec] = self._get_transform_specs()
    # our data quality process is not compatible with streaming mode, hence we
    # have to run it in micro batches, similar to what happens to certain
    # transformation functions not supported in streaming mode.
    self._streaming_micro_batch_dq_plan: dict = {}
    self.dq_specs: List[DQSpec] = self._get_dq_specs()
    self.output_specs: List[OutputSpec] = self._get_output_specs()
    self.terminate_specs: List[TerminatorSpec] = self._get_terminate_specs()

execute()

Define the algorithm execution behaviour.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def execute(self) -> Optional[OrderedDict]:
    """Define the algorithm execution behaviour."""
    try:
        self._logger.info("Starting read stage...")
        read_dfs = self.read()
        self._logger.info("Starting transform stage...")
        transformed_dfs = self.transform(read_dfs)
        self._logger.info("Starting data quality stage...")
        validated_dfs, errors = self.process_dq(transformed_dfs)
        self._logger.info("Starting write stage...")
        written_dfs = self.write(validated_dfs)
        self._logger.info("Starting terminate stage...")
        self.terminate(written_dfs)
        self._logger.info("Execution of the algorithm has finished!")
    except Exception as e:
        NotifierFactory.generate_failure_notification(self.terminate_specs, e)
        raise e

    if errors:
        raise DQDuplicateRuleIdException(
            "Data Written Successfully, but DQ Process Encountered an Issue.\n"
            "We detected a duplicate dq_rule_id in the dq_spec definition. "
            "As a result, none of the Data Quality (DQ) processes (dq_spec) "
            "were executed.\n"
            "Please review and verify the following dq_rules:\n"
            f"{errors}"
        )

    return written_dfs

process_dq(data)

Process the data quality tasks for the data that was read and/or transformed.

It supports multiple input dataframes. Although just one is advisable.

It is possible to use data quality validators/expectations that will validate your data and fail the process in case the expectations are not met. The DQ process also generates and keeps updating a site containing the results of the expectations that were done on your data. The location of the site is configurable and can either be on file system or S3. If you define it to be stored on S3, you can even configure your S3 bucket to serve the site so that people can easily check the quality of your data. Moreover, it is also possible to store the result of the DQ process into a defined result sink.

Parameters:

Name Type Description Default
data OrderedDict

dataframes from previous steps of the algorithm that we which to run the DQ process on.

required

Returns:

Type Description
OrderedDict

Another ordered dict with the validated dataframes and

Optional[dict[str, str]]

a dictionary with the errors if they exist, or None.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def process_dq(
    self, data: OrderedDict
) -> tuple[OrderedDict, Optional[dict[str, str]]]:
    """Process the data quality tasks for the data that was read and/or transformed.

    It supports multiple input dataframes. Although just one is advisable.

    It is possible to use data quality validators/expectations that will validate
    your data and fail the process in case the expectations are not met. The DQ
    process also generates and keeps updating a site containing the results of the
    expectations that were done on your data. The location of the site is
    configurable and can either be on file system or S3. If you define it to be
    stored on S3, you can even configure your S3 bucket to serve the site so that
    people can easily check the quality of your data. Moreover, it is also
    possible to store the result of the DQ process into a defined result sink.

    Args:
        data: dataframes from previous steps of the algorithm that we which to
            run the DQ process on.

    Returns:
        Another ordered dict with the validated dataframes and
        a dictionary with the errors if they exist, or None.
    """
    if not self.dq_specs:
        return data, None

    dq_processed_dfs, error = self._verify_dq_rule_id_uniqueness(
        data, self.dq_specs
    )
    if error:
        return dq_processed_dfs, error
    else:
        from lakehouse_engine.dq_processors.dq_factory import DQFactory

        dq_processed_dfs = OrderedDict(data)
        for spec in self.dq_specs:
            df_processed_df = dq_processed_dfs[spec.input_id]
            self._logger.info(f"Found data quality specification: {spec}")
            if (
                spec.dq_type == DQType.PRISMA.value or spec.dq_functions
            ) and spec.spec_id not in self._streaming_micro_batch_dq_plan:

                if spec.cache_df:
                    df_processed_df.cache()
                dq_processed_dfs[spec.spec_id] = DQFactory.run_dq_process(
                    spec, df_processed_df
                )
            else:
                dq_processed_dfs[spec.spec_id] = df_processed_df

        return dq_processed_dfs, None

read()

Read data from an input location into a distributed dataframe.

Returns:

Type Description
OrderedDict

An ordered dict with all the dataframes that were read.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def read(self) -> OrderedDict:
    """Read data from an input location into a distributed dataframe.

    Returns:
         An ordered dict with all the dataframes that were read.
    """
    read_dfs: OrderedDict = OrderedDict({})
    for spec in self.input_specs:
        self._logger.info(f"Found input specification: {spec}")
        read_dfs[spec.spec_id] = ReaderFactory.get_data(spec)
    return read_dfs

terminate(data)

Terminate the algorithm.

Parameters:

Name Type Description Default
data OrderedDict

dataframes that were written.

required
Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def terminate(self, data: OrderedDict) -> None:
    """Terminate the algorithm.

    Args:
        data: dataframes that were written.
    """
    if self.terminate_specs:
        for spec in self.terminate_specs:
            self._logger.info(f"Found terminate specification: {spec}")
            TerminatorFactory.execute_terminator(
                spec, data[spec.input_id] if spec.input_id else None
            )

transform(data)

Transform (optionally) the data that was read.

If there isn't a transformation specification this step will be skipped, and the original dataframes that were read will be returned. Transformations can have dependency from another transformation result, however we need to keep in mind if we are using streaming source and for some reason we need to enable micro batch processing, this result cannot be used as input to another transformation. Micro batch processing in pyspark streaming is only available in .write(), which means this transformation with micro batch needs to be the end of the process.

Parameters:

Name Type Description Default
data OrderedDict

input dataframes in an ordered dict.

required

Returns:

Type Description
OrderedDict

Another ordered dict with the transformed dataframes, according to the

OrderedDict

transformation specification.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def transform(self, data: OrderedDict) -> OrderedDict:
    """Transform (optionally) the data that was read.

    If there isn't a transformation specification this step will be skipped, and the
    original dataframes that were read will be returned.
    Transformations can have dependency from another transformation result, however
    we need to keep in mind if we are using streaming source and for some reason we
    need to enable micro batch processing, this result cannot be used as input to
    another transformation. Micro batch processing in pyspark streaming is only
    available in .write(), which means this transformation with micro batch needs
    to be the end of the process.

    Args:
        data: input dataframes in an ordered dict.

    Returns:
        Another ordered dict with the transformed dataframes, according to the
        transformation specification.
    """
    if not self.transform_specs:
        return data
    else:
        transformed_dfs = OrderedDict(data)
        for spec in self.transform_specs:
            self._logger.info(f"Found transform specification: {spec}")
            transformed_df = transformed_dfs[spec.input_id]
            for transformer in spec.transformers:
                transformed_df = transformed_df.transform(
                    TransformerFactory.get_transformer(transformer, transformed_dfs)
                )
            transformed_dfs[spec.spec_id] = transformed_df
        return transformed_dfs

write(data)

Write the data that was read and transformed (if applicable).

It supports writing multiple datasets. However, we only recommend to write one dataframe. This recommendation is based on easy debugging and reproducibility, since if we start mixing several datasets being fueled by the same algorithm, it would unleash an infinite sea of reproducibility issues plus tight coupling and dependencies between datasets. Having said that, there may be cases where writing multiple datasets is desirable according to the use case requirements. Use it accordingly.

Parameters:

Name Type Description Default
data OrderedDict

dataframes that were read and transformed (if applicable).

required

Returns:

Type Description
OrderedDict

Dataframes that were written.

Source code in mkdocs/lakehouse_engine/packages/algorithms/data_loader.py
def write(self, data: OrderedDict) -> OrderedDict:
    """Write the data that was read and transformed (if applicable).

    It supports writing multiple datasets. However, we only recommend to write one
    dataframe. This recommendation is based on easy debugging and reproducibility,
    since if we start mixing several datasets being fueled by the same algorithm, it
    would unleash an infinite sea of reproducibility issues plus tight coupling and
    dependencies between datasets. Having said that, there may be cases where
    writing multiple datasets is desirable according to the use case requirements.
    Use it accordingly.

    Args:
        data: dataframes that were read and transformed (if applicable).

    Returns:
        Dataframes that were written.
    """
    written_dfs: OrderedDict = OrderedDict({})
    for spec in self.output_specs:
        self._logger.info(f"Found output specification: {spec}")

        written_output = WriterFactory.get_writer(
            spec, data[spec.input_id], data
        ).write()
        if written_output:
            written_dfs.update(written_output)
        else:
            written_dfs[spec.spec_id] = data[spec.input_id]

    return written_dfs