Skip to content

Sharepoint utils

Utilities for sharepoint API operations.

SharepointUtils

Bases: object

Class with methods to connect and extract data from Sharepoint.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
class SharepointUtils(object):
    """Class with methods to connect and extract data from Sharepoint."""

    def __init__(
        self,
        client_id: str,
        tenant_id: str,
        local_path: str,
        api_version: str,
        site_name: str,
        drive_name: str,
        file_name: str,
        secret: str,
        folder_relative_path: str = None,
        chunk_size: int = 5 * 1024 * 1024,  # 5 MB
        local_options: dict = None,
        conflict_behaviour: str = "replace",
        file_pattern: str = None,
        file_type: str = None,
    ):
        """Instantiate objects of the SharepointUtils class.

        Args:
            client_id: application (client) ID of your Azure AD app.
            tenant_id: tenant ID (directory ID) from Azure AD for authentication.
            local_path: local directory path (Volume) where the files are temporarily
            stored.
            api_version: Graph API version to use.
            site_name: name of the Sharepoint site where the files are stored.
            drive_name: name of the document library or drive in Sharepoint.
            file_name: name of the file to be stored in sharepoint.
            secret: client secret for authentication.
            folder_relative_path: optional; relative path within the
            drive(drive_name) where the file will be stored.
            chunk_size: Optional; size of file chunks to be uploaded/downloaded
            in bytes (default is 5 MB).
            local_options: Optional; additional options for customizing write
            action to local path.
            conflict_behaviour: Optional; defines how conflicts in file uploads are
            handled('replace', 'fail', etc.).
            file_pattern: Optional; pattern to match files in Sharepoint (e.g.,
            'data_*').
            file_type: Optional; type of the file to be stored in Sharepoint (e.g.,
            'csv').

        Returns:
            A SharepointUtils object.
        """
        self.client_id = client_id
        self.tenant_id = tenant_id
        self.local_path = local_path
        self.api_version = api_version
        self.site_name = site_name
        self.drive_name = drive_name
        self.file_name = file_name
        self.secret = secret
        self.folder_relative_path = folder_relative_path
        self.chunk_size = chunk_size
        self.local_options = local_options
        self.conflict_behaviour = conflict_behaviour
        self.site_id = None
        self.drive_id = None
        self.token = None
        self.file_pattern = file_pattern
        self.file_type = file_type

        self._create_app()

    def _get_token(self) -> None:
        """Fetch and store a valid access token for Sharepoint API."""
        try:
            self.token = self.app.acquire_token_for_client(
                scopes=[f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/.default"]
            )
        except Exception as err:
            _logger.error(f"Token acquisition error: {err}")

    def _create_app(self) -> None:
        """Create an MSAL (Microsoft Authentication Library) instance.

        This is used to handle authentication and authorization with Azure AD.
        """
        import msal

        self.app = msal.ConfidentialClientApplication(
            client_id=self.client_id,
            authority=f"{ExecEnv.ENGINE_CONFIG.sharepoint_authority}/{self.tenant_id}",
            client_credential=self.secret,
        )

        self._get_token()

    @retry(
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=30, min=30, max=150),
        retry=retry_if_exception_type(
            (RequestException, SharePointAPIError)
        ),  # Retry on these exceptions
    )
    def _make_request(
        self,
        endpoint: str,
        method: str = "GET",
        headers: dict = None,
        json_options: dict = None,
        data: object = None,
        stream: bool = False,
    ) -> requests.Response:
        """Execute API requests to Microsoft Graph API.

        !!! note
            If you try to upload large files sequentially,you may encounter
            a 503 "serviceNotAvailable" error. To mitigate this, consider using
            coalesce in the Acon transform specification. However, be aware that
            increasing the number of partitions also increases the likelihood of
            server throttling

        Args:
            endpoint: The API endpoint to call.
            headers: A dictionary containing the necessary headers.
            json_options: Optional; JSON data to include in the request body.
            method: The HTTP method to use ('GET', 'POST', 'PUT', etc.).
            data: Optional; additional data (e.g., file content) on request body.

        Returns:
            A Response object from the request library.

        Raises:
            SharePointAPIError: If there is an issue with the Sharepoint
            API request.
        """
        self._get_token()

        # Required to avoid cicd issue
        if not self.token or "access_token" not in self.token:
            raise SharePointAPIError("Authentication token is missing or invalid.")

        try:
            if "access_token" in self.token:
                response = requests.request(
                    method=method,
                    url=endpoint,
                    headers=(
                        headers
                        if headers
                        else {"Authorization": "Bearer " + self.token["access_token"]}
                    ),
                    json=json_options,
                    data=data,
                    stream=stream,
                )
                return response
        except RequestException as error:
            raise SharePointAPIError(f"{error}")

    def _parse_json(self, response: requests.Response, context: str) -> Dict[str, Any]:
        """Parse JSON response and raise on errors.

        Args:
            response: HTTP response object.
            context: Operation context for error logging.

        Returns:
            Parsed JSON as a dictionary.

        Raises:
            HTTPError: If the request fails.
            ValueError: If the response is not valid JSON.
        """
        try:
            response.raise_for_status()
        except requests.HTTPError as e:
            _logger.error(
                "HTTP error while %s: %s | body: %s", context, e, response.text[:200]
            )
            raise
        try:
            data = response.json()
            if not isinstance(data, dict):
                raise ValueError(f"Expected dict JSON while {context}")
            return data
        except (requests.JSONDecodeError, ValueError):
            _logger.error(
                "Non-JSON or wrong type while %s. Body preview: %s",
                context,
                response.text[:200],
            )
            raise

    def _get_site_id(self) -> str:
        """Get site ID from site name, with caching.

        Returns:
            Site ID as a string.

        Raises:
            SharepointAPIError: If the request fails.
            RuntimeError: For unexpected errors or missing site ID.
        """
        if self.site_id is not None:
            return self.site_id

        endpoint = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}"
            f"/sites/{ExecEnv.ENGINE_CONFIG.sharepoint_company_domain}:/"
            f"sites/{self.site_name}"
        )
        try:
            response = self._make_request(endpoint=endpoint)
            response_data = self._parse_json(
                response, f"getting site id for site '{self.site_name}'"
            )

            self.site_id = response_data.get("id")

            if not self.site_id:
                raise ValueError(
                    f"Site ID not found for site '{self.site_name}' in the API "
                    f"response: {response_data}"
                )

            return self.site_id

        except RequestException as error:
            raise SharePointAPIError(f"{error}")
        except Exception as e:
            raise RuntimeError(
                f"Unexpected error while reading site ID for site '{self.site_name}':"
                f"{e}"
            )

    def _get_drive_id(self) -> str:
        """Get drive ID from site ID and drive name, with caching.

        Returns:
            Drive ID as a string.

        Raises:
            SharepointAPIError: If the request fails.
            ValueError: If no drive is found.
        """
        if self.drive_id is not None:
            return str(self.drive_id)

        site_id = self._get_site_id()

        endpoint = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
            f"{self.api_version}/sites/{site_id}/drives"
        )

        try:
            response = self._make_request(endpoint=endpoint)
            response_data = self._parse_json(response, "listing drives for site")

            drives = response_data.get("value", [])
            if not drives:
                raise ValueError(f"No drives found for site '{self.site_id}'.")

            for drive in drives:
                if self.drive_name.strip().lower() == drive["name"].strip().lower():
                    drive_id = drive["id"]
                    self.drive_id = drive_id
                    return str(drive_id)

            raise ValueError(
                f"Drive '{self.drive_name}' could not be found in site '{site_id}'."
            )

        except RequestException as error:
            raise SharePointAPIError(f"Request error: {error}")

    def check_if_endpoint_exists(
        self, folder_root_path: str = None, raise_error: bool = True
    ) -> bool:
        """Check if a Sharepoint drive or folder exists.

        Args:
            folder_root_path: Optional folder path to check.
            raise_error: Raise error if the folder doesn't exist.

        Returns:
            True if the endpoint exists, False otherwise.

        Raises:
            SharepointAPIError: If the endpoint doesn't exist and raise_error is True.
        """
        try:
            site_id = self._get_site_id()
            drive_id = self._get_drive_id()

            if not folder_root_path:
                return True

            endpoint = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
                f"{self.api_version}/sites/{site_id}/drives/{drive_id}"
                f"/root:/{folder_root_path}"
            )

            response = self._make_request(endpoint=endpoint)
            response.raise_for_status()
            return True

        except requests.HTTPError as error:
            if error.response.status_code == 404:
                _logger.warning(f"Sharepoint path doesn't exist: {folder_root_path}")
                if raise_error:
                    raise SharePointAPIError(
                        f"Path '{folder_root_path}' doesn't exist!"
                    )
                return False
            raise

    def check_if_local_path_exists(self, local_path: str) -> None:
        """Verify that a local path exists.

        Args:
            local_path: Local folder where files are temporarily stored.

        Raises:
            SharePointAPIError: If the path cannot be read.
        """
        try:
            os.listdir(local_path)
        except IOError as error:
            raise SharePointAPIError(f"{error}")

    def save_to_staging_area(self, sp_file: SharepointFile) -> str:
        """Save a Sharepoint file locally (direct write or streaming).

        If the file is under the threshold and already loaded in memory, write its
        content directly.
        Otherwise, download the file via streaming to avoid memory overload.

        Args:
            sp_file: File metadata and content.

        Returns:
            Local file path.

        Raises:
            SharePointAPIError: On download or write failure.
        """
        try:
            if sp_file.content and sp_file.content_size < (500 * 1024 * 1024):
                _logger.info(
                    f"Writing '{sp_file.file_name}' via direct write (under 500MB)."
                )
                return self.write_bytes_to_local_file(sp_file)

            _logger.info(
                f"Writing '{sp_file.file_name}' via streaming (500MB+ or content not"
                f" loaded)."
            )
            return self.download_file_streaming(sp_file)

        except Exception as e:
            raise SharePointAPIError(f"Failed to write '{sp_file.file_name}': {e}")

    def download_file_streaming(self, sp_file: SharepointFile) -> str:
        """Download a large file from Sharepoint in chunks to a local path.

        Uses the configured chunk size to avoid memory overload with large files.

        Args:
            sp_file: File with remote path and name.

        Returns:
            Local file path.

        Raises:
            SharePointAPIError: If the download fails.
        """
        try:
            site_id = self._get_site_id()
            drive_id = self._get_drive_id()
            url = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/root:/{sp_file.file_path}:/content"
            )

            local_file_path = Path(self.local_path) / sp_file.file_name
            local_file_path.parent.mkdir(parents=True, exist_ok=True)

            with self._make_request(endpoint=url, stream=True) as response:
                response.raise_for_status()
                with open(local_file_path, "wb") as file:
                    for chunk in response.iter_content(chunk_size=self.chunk_size):
                        if chunk:
                            file.write(chunk)

            return str(local_file_path)

        except requests.RequestException as error:
            raise SharePointAPIError(f"Failed to stream download: {error}")

    def write_bytes_to_local_file(self, sp_file: SharepointFile) -> str:
        """Write Sharepoint file content (bytes) to a local path.

        Args:
            sp_file: File with content and metadata.

        Returns:
            Local file path.

        Raises:
            ValueError: If content is missing.
            RuntimeError: If writing to disk fails.
        """
        if not sp_file.content:
            raise ValueError(
                f"Cannot write file '{sp_file.file_name}': Content is empty."
            )

        try:
            # Local base path (e.g., Unity Volumes, DBFS, or other mounted storage)
            local_base_path = Path(self.local_path)
            local_base_path.mkdir(parents=True, exist_ok=True)
            file_path = local_base_path / sp_file.file_name
            file_path.write_bytes(sp_file.content)
            return str(file_path)
        except Exception as e:
            raise RuntimeError(
                f"Failed to write file '{sp_file.file_name}' to Unity Volume: {e}"
            )

    def write_to_local_path(self, df: DataFrame) -> None:
        """Write a Spark DataFrame to a local path (Volume) in CSV format.

        This method writes the provided Spark DataFrame to a specified local directory,
        saving it in CSV format. The method renames the output file from its default
        "part-*" naming convention to a specified file name.
        The dictionary local_options enables the customisation of the write action.
        The customizable options can be found here:
        https://spark.apache.org/docs/3.5.1/sql-data-sources-csv.html.

        Args:
            df: The Spark DataFrame to write to the local file system.

        Returns:
            None.

        Raises:
            IOError: If there is an issue during the file writing process.
        """
        try:
            df.coalesce(1).write.mode("overwrite").save(
                path=self.local_path,
                format="csv",
                **self.local_options if self.local_options else {},
            )
            self._rename_local_file(self.local_path, self.file_name)
        except IOError as error:
            raise SharePointAPIError(f"{error}")

    def _rename_local_file(self, local_path: str, file_name: str) -> None:
        """Rename a local file that starts with 'part-' to the desired file name.

        Args:
            local_path: The directory where the file is located.
            file_name: The new file name for the local file.
        """
        files_in_dir = os.listdir(local_path)

        part_file = [f for f in files_in_dir if f.startswith("part-")][0]

        try:
            os.rename(
                os.path.join(local_path, part_file), os.path.join(local_path, file_name)
            )
        except IOError as error:
            raise SharePointAPIError(f"{error}")

    def write_to_sharepoint(self) -> None:
        """Upload a local file to Sharepoint in chunks using the Microsoft Graph API.

        This method creates an upload session and uploads a local CSV file to a
        Sharepoint document library.
        The file is divided into chunks (based on the `chunk_size` specified)
        to handle large file uploads and send sequentially using the upload URL
        returned from the Graph API.

        The method uses instance attributes such as `api_domain`, `api_version`,
        `site_name`, `drive_name`, `folder_relative_path`, and `file_name` to
        construct the necessary API calls and upload the file to the specified
        location in Sharepoint.

        Returns:
            None.

        Raises:
            APIError: If an error occurs during any stage of the upload
            (e.g., failure to create upload session,issues during chunk upload).
        """
        drive_id = self._get_drive_id()

        if self.folder_relative_path:
            endpoint = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}"
                f"/{self.api_version}/drives/{drive_id}/items/root:"
                f"/{self.folder_relative_path}/{self.file_name}.csv:"
                f"/createUploadSession"
            )
        else:
            endpoint = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}"
                f"/{self.api_version}/drives/{drive_id}/items/root:"
                f"/{self.file_name}.csv:/createUploadSession"
            )

        response = self._make_request(method="POST", endpoint=endpoint)
        response.raise_for_status()
        upload_session = response.json()
        upload_url = upload_session["uploadUrl"]

        upload_file = str(Path(self.local_path) / self.file_name)
        stat = os.stat(upload_file)
        size = stat.st_size

        with open(upload_file, "rb") as data:
            start = 0
            while start < size:
                chunk = data.read(self.chunk_size)
                bytes_read = len(chunk)
                upload_range = f"bytes {start}-{start + bytes_read - 1}/{size}"
                headers = {
                    "Content-Length": str(bytes_read),
                    "Content-Range": upload_range,
                }
                response = self._make_request(
                    method="PUT", endpoint=upload_url, headers=headers, data=chunk
                )
                response.raise_for_status()
                start += bytes_read

    def delete_local_path(self) -> None:
        """Delete and recreate the local path used for temporary storage.

        Raises:
            SharePointAPIError: If deletion or recreation fails.
        """
        try:
            local_path = Path(self.local_path)
            if local_path.exists():
                shutil.rmtree(local_path)
            local_path.mkdir(parents=True, exist_ok=True)
        except Exception as e:
            raise SharePointAPIError(f"Failed to clear or recreate local path: {e}")

    @contextmanager
    def staging_area(self) -> Generator[str, None, None]:
        """Provide a clean local staging folder for Sharepoint files.

        Yield the local path after ensuring it's empty. Cleans up after use.

        Yield:
            Path to the staging folder as a string.
        """
        self.delete_local_path()
        try:
            yield self.local_path
        finally:
            try:
                self.delete_local_path()
            except Exception as e:
                _logger.warning(f"Failed to clean up local path: {e}")

    def list_items_in_path(self, path: str) -> list[Any]:
        """List items (files/folders) at a Sharepoint path.

        Args:
            path: Relative folder or file path.

        Returns:
            List of items; files include @microsoft.graph.downloadUrl.

        Raises:
            ValueError: If the path is invalid or not found.
        """
        site_id = self._get_site_id()
        drive_id = self._get_drive_id()

        path = path.strip("/")
        if not path:
            resp = self._make_request(
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/root/children"
            )
            data = self._parse_json(resp, "listing root children")
            return cast(List[dict[str, Any]], data.get("value", []))

        path_parts = path.split("/")

        # start from root children
        resp = self._make_request(
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/sites/"
            f"{site_id}/drives/{drive_id}/root/children"
        )
        data = self._parse_json(resp, "listing root children")
        items = cast(List[dict[str, Any]], data.get("value", []))

        for component in path_parts:
            current_item = next(
                (item for item in items if item.get("name") == component), None
            )

            if not current_item:
                raise ValueError(f"Path component '{component}' not found in '{path}'.")

            if "folder" in current_item:
                # descend into folder
                resp = self._make_request(
                    f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                    f"sites/{site_id}/drives/{drive_id}/items/"
                    f"{current_item['id']}/children"
                )
                data = self._parse_json(resp, f"listing children for '{component}'")
                items = cast(List[dict[str, Any]], data.get("value", []))
            else:
                # it's a file; ensure we have downloadUrl
                if "@microsoft.graph.downloadUrl" not in current_item:
                    resp = self._make_request(
                        f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
                        f"{self.api_version}/sites/{site_id}/drives/{drive_id}/"
                        f"items/{current_item['id']}"
                    )
                    current_item = self._parse_json(
                        resp, f"fetching file metadata for item id {current_item['id']}"
                    )
                return [current_item]

        return items

    def get_file_metadata(self, file_path: str) -> SharepointFile:
        """Fetch file metadata and content from Sharepoint.

        Args:
            file_path: Full Sharepoint path (e.g., 'folder/file.csv').

        Returns:
            SharepointFile with metadata and bytes content.

        Raises:
            ValueError: If required metadata is missing or path is invalid.
            requests.HTTPError: On HTTP errors during retrieval.
        """
        site_id = self._get_site_id()
        drive_id = self._get_drive_id()

        file_metadata_url = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
            f"{self.api_version}/sites/{site_id}/drives/{drive_id}/root:/{file_path}"
        )

        # Get metadata
        metadata_response = self._make_request(endpoint=file_metadata_url, method="GET")
        metadata = self._parse_json(
            metadata_response,
            f"fetching metadata for '{file_path}'",
        )

        file_name = metadata.get("name")
        time_created = metadata.get("createdDateTime", "")
        time_modified = metadata.get("lastModifiedDateTime", "")
        download_url = metadata.get("@microsoft.graph.downloadUrl")

        if not file_name or not download_url:
            raise ValueError(
                f"Missing required metadata for '{file_path}': "
                f"name={file_name!r}, "
                f"downloadUrl={'present' if download_url else 'absent'}"
            )

        # Download file content (bytes)
        content_response = self._make_request(endpoint=download_url, method="GET")
        content_response.raise_for_status()
        file_content = content_response.content

        if "/" not in file_path:
            raise ValueError(
                f"Invalid file path: '{file_path}'. Expected a folder/file structure."
            )
        folder = file_path.rsplit("/", 1)[0]

        return SharepointFile(
            file_name=file_name,
            time_created=time_created,
            time_modified=time_modified,
            content=file_content,
            _folder=folder,
        )

    def archive_sharepoint_file(
        self, sp_file: SharepointFile, to_path: str | None, *, move_enabled: bool = True
    ) -> None:
        """Rename (timestamp) and optionally move a Sharepoint file.

        Args:
            sp_file: File to archive.
            to_path: Destination folder (if moving).
            move_enabled: Whether to move after rename.

        Raises:
            SharePointAPIError: If the request fails.
        """
        # If already archived (renamed+moved before), don't repeat
        if getattr(sp_file, "_already_archived", False) and move_enabled and to_path:
            _logger.info(
                "Skipping archive: file already archived -> %s", sp_file.file_name
            )
            return

        try:
            if not getattr(sp_file, "skip_rename", False):
                new_file_name = self._rename_sharepoint_file(sp_file)
                sp_file.file_name = new_file_name
                sp_file.skip_rename = True

            if not move_enabled or not to_path:
                _logger.info(
                    """Archiving disabled or no target folder;
                     Renamed only and left in place: '%s'.""",
                    sp_file.file_path,
                )
                return

            self._move_file_in_sharepoint(sp_file, to_path)
            sp_file._already_archived = True
            _logger.info("Archived '%s' to '%s'.", sp_file.file_name, to_path)

        except requests.RequestException as e:
            _logger.error(
                "Request failed while archiving '%s': %s", sp_file.file_name, e
            )
            raise SharePointAPIError(f"Request failed: {e}")

    def _rename_sharepoint_file(self, sp_file: SharepointFile) -> str:
        """Prefix file name with a timestamp (skip if already renamed).

        Args:
            sp_file: File to rename.

        Returns:
            New file name.

        Raises:
            SharePointAPIError: If the rename request fails.
        """
        try:
            if getattr(sp_file, "skip_rename", False):
                _logger.info(
                    f"Skipping rename for already-prefixed file: {sp_file.file_name}"
                )
                return sp_file.file_name

            _logger.info(f"Renaming file at '{sp_file.file_path}'.")

            site_id = self._get_site_id()
            drive_id = self._get_drive_id()
            current_date_formatted = datetime.now().strftime("%Y%m%d%H%M%S")
            new_file_name = f"{current_date_formatted}_{sp_file.file_name}"

            url_get_file = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/root:/{sp_file.file_path}"
            )
            resp = self._make_request(endpoint=url_get_file, method="GET")
            file_info = self._parse_json(
                resp, f"fetching file info at '{sp_file.file_path}'"
            )
            file_id = file_info.get("id")
            if not file_id:
                raise ValueError(
                    f"File '{sp_file.file_name}' not found in '{sp_file.file_path}'."
                )

            url_rename_file = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/items/{file_id}"
            )
            rename_payload = {"name": new_file_name}
            rename_resp = self._make_request(
                endpoint=url_rename_file, method="PATCH", json_options=rename_payload
            )
            rename_resp.raise_for_status()

            _logger.info(f"File '{sp_file.file_name}' renamed to '{new_file_name}'.")
            sp_file.file_name = new_file_name
            return new_file_name

        except requests.RequestException as e:
            _logger.error(
                f"Request failed while renaming file '{sp_file.file_name}': {e}"
            )
            raise SharePointAPIError(f"Request failed: {e}")

    def _move_file_in_sharepoint(self, sp_file: SharepointFile, to_path: str) -> None:
        """Move a file to another folder in Sharepoint.

        Args:
            sp_file: File to move.
            to_path: Destination path.

        Raises:
            ValueError: If the file ID cannot be resolved.
            SharePointAPIError: If the move request fails.
        """
        try:
            _logger.info(
                f"Moving file '{sp_file.file_name}' from '{sp_file.file_path}' to "
                f"'{to_path}'."
            )

            site_id = self._get_site_id()
            drive_id = self._get_drive_id()

            if not self.check_if_endpoint_exists(
                folder_root_path=to_path, raise_error=False
            ):
                self._create_folder_in_sharepoint(to_path)
                # Create the folder if it doesn't exist; raise_error = false so it
                # doesn't throw error
                _logger.info(f"Created archive folder: {to_path}")

            url_get_file = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/root:/{sp_file.file_path}"
            )

            response = self._make_request(endpoint=url_get_file, method="GET")
            file_info = self._parse_json(
                response,
                f"getting file id for move '{sp_file.file_path}'",
            )

            file_id = file_info.get("id")

            if not file_id:
                raise ValueError(
                    f"File '{sp_file.file_name}' not found in '{sp_file.file_path}'."
                )

            url_move_file = (
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/items/{file_id}"
            )

            new_parent_reference = {
                "parentReference": {"path": f"/drive/root:/{to_path}"},
                "name": sp_file.file_name,
            }

            response = self._make_request(
                endpoint=url_move_file,
                method="PATCH",
                json_options=new_parent_reference,
            )
            response.raise_for_status()

            _logger.info(
                f"File '{sp_file.file_name}' successfully moved to '{to_path}'."
            )

        except requests.RequestException as e:
            _logger.error(
                f"Request failed while moving file '{sp_file.file_name}': {e}"
            )
            raise SharePointAPIError(f"Request failed: {e}")

    def _create_folder_in_sharepoint(self, folder_path: str) -> None:
        """Create the final folder in a Sharepoint path.

        Args:
            folder_path: Full folder path to create.

        Raises:
            SharePointAPIError: If folder creation fails.
        """
        try:
            site_id = self._get_site_id()
            drive_id = self._get_drive_id()

            parent_path, folder_name = (
                folder_path.rsplit("/", 1) if "/" in folder_path else ("", folder_path)
            )
            parent_path = parent_path.strip("/")  # Clean path just in case

            _logger.info(
                f"Creating folder '{folder_name}' inside '{parent_path or 'root'}'"
            )

            if parent_path:
                endpoint = (
                    f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                    f"sites/{site_id}/drives/{drive_id}/root:/{parent_path}:/children"
                )
            else:
                endpoint = (
                    f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                    f"sites/{site_id}/drives/{drive_id}/root/children"
                )

            folder_metadata = {"name": folder_name, "folder": {}}

            response = self._make_request(
                endpoint=endpoint, method="POST", json_options=folder_metadata
            )
            response.raise_for_status()

            _logger.info(f"Folder '{folder_path}' created successfully.")

        except requests.RequestException as e:
            _logger.error(f"Failed to create folder '{folder_path}': {e}")
            raise SharePointAPIError(f"Error creating folder '{folder_path}': {e}")

__init__(client_id, tenant_id, local_path, api_version, site_name, drive_name, file_name, secret, folder_relative_path=None, chunk_size=5 * 1024 * 1024, local_options=None, conflict_behaviour='replace', file_pattern=None, file_type=None)

Instantiate objects of the SharepointUtils class.

Parameters:

Name Type Description Default
client_id str

application (client) ID of your Azure AD app.

required
tenant_id str

tenant ID (directory ID) from Azure AD for authentication.

required
local_path str

local directory path (Volume) where the files are temporarily

required
api_version str

Graph API version to use.

required
site_name str

name of the Sharepoint site where the files are stored.

required
drive_name str

name of the document library or drive in Sharepoint.

required
file_name str

name of the file to be stored in sharepoint.

required
secret str

client secret for authentication.

required
folder_relative_path str

optional; relative path within the

None
chunk_size int

Optional; size of file chunks to be uploaded/downloaded

5 * 1024 * 1024
local_options dict

Optional; additional options for customizing write

None
conflict_behaviour str

Optional; defines how conflicts in file uploads are

'replace'
file_pattern str

Optional; pattern to match files in Sharepoint (e.g.,

None
file_type str

Optional; type of the file to be stored in Sharepoint (e.g.,

None

Returns:

Type Description

A SharepointUtils object.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def __init__(
    self,
    client_id: str,
    tenant_id: str,
    local_path: str,
    api_version: str,
    site_name: str,
    drive_name: str,
    file_name: str,
    secret: str,
    folder_relative_path: str = None,
    chunk_size: int = 5 * 1024 * 1024,  # 5 MB
    local_options: dict = None,
    conflict_behaviour: str = "replace",
    file_pattern: str = None,
    file_type: str = None,
):
    """Instantiate objects of the SharepointUtils class.

    Args:
        client_id: application (client) ID of your Azure AD app.
        tenant_id: tenant ID (directory ID) from Azure AD for authentication.
        local_path: local directory path (Volume) where the files are temporarily
        stored.
        api_version: Graph API version to use.
        site_name: name of the Sharepoint site where the files are stored.
        drive_name: name of the document library or drive in Sharepoint.
        file_name: name of the file to be stored in sharepoint.
        secret: client secret for authentication.
        folder_relative_path: optional; relative path within the
        drive(drive_name) where the file will be stored.
        chunk_size: Optional; size of file chunks to be uploaded/downloaded
        in bytes (default is 5 MB).
        local_options: Optional; additional options for customizing write
        action to local path.
        conflict_behaviour: Optional; defines how conflicts in file uploads are
        handled('replace', 'fail', etc.).
        file_pattern: Optional; pattern to match files in Sharepoint (e.g.,
        'data_*').
        file_type: Optional; type of the file to be stored in Sharepoint (e.g.,
        'csv').

    Returns:
        A SharepointUtils object.
    """
    self.client_id = client_id
    self.tenant_id = tenant_id
    self.local_path = local_path
    self.api_version = api_version
    self.site_name = site_name
    self.drive_name = drive_name
    self.file_name = file_name
    self.secret = secret
    self.folder_relative_path = folder_relative_path
    self.chunk_size = chunk_size
    self.local_options = local_options
    self.conflict_behaviour = conflict_behaviour
    self.site_id = None
    self.drive_id = None
    self.token = None
    self.file_pattern = file_pattern
    self.file_type = file_type

    self._create_app()

archive_sharepoint_file(sp_file, to_path, *, move_enabled=True)

Rename (timestamp) and optionally move a Sharepoint file.

Parameters:

Name Type Description Default
sp_file SharepointFile

File to archive.

required
to_path str | None

Destination folder (if moving).

required
move_enabled bool

Whether to move after rename.

True

Raises:

Type Description
SharePointAPIError

If the request fails.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def archive_sharepoint_file(
    self, sp_file: SharepointFile, to_path: str | None, *, move_enabled: bool = True
) -> None:
    """Rename (timestamp) and optionally move a Sharepoint file.

    Args:
        sp_file: File to archive.
        to_path: Destination folder (if moving).
        move_enabled: Whether to move after rename.

    Raises:
        SharePointAPIError: If the request fails.
    """
    # If already archived (renamed+moved before), don't repeat
    if getattr(sp_file, "_already_archived", False) and move_enabled and to_path:
        _logger.info(
            "Skipping archive: file already archived -> %s", sp_file.file_name
        )
        return

    try:
        if not getattr(sp_file, "skip_rename", False):
            new_file_name = self._rename_sharepoint_file(sp_file)
            sp_file.file_name = new_file_name
            sp_file.skip_rename = True

        if not move_enabled or not to_path:
            _logger.info(
                """Archiving disabled or no target folder;
                 Renamed only and left in place: '%s'.""",
                sp_file.file_path,
            )
            return

        self._move_file_in_sharepoint(sp_file, to_path)
        sp_file._already_archived = True
        _logger.info("Archived '%s' to '%s'.", sp_file.file_name, to_path)

    except requests.RequestException as e:
        _logger.error(
            "Request failed while archiving '%s': %s", sp_file.file_name, e
        )
        raise SharePointAPIError(f"Request failed: {e}")

check_if_endpoint_exists(folder_root_path=None, raise_error=True)

Check if a Sharepoint drive or folder exists.

Parameters:

Name Type Description Default
folder_root_path str

Optional folder path to check.

None
raise_error bool

Raise error if the folder doesn't exist.

True

Returns:

Type Description
bool

True if the endpoint exists, False otherwise.

Raises:

Type Description
SharepointAPIError

If the endpoint doesn't exist and raise_error is True.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def check_if_endpoint_exists(
    self, folder_root_path: str = None, raise_error: bool = True
) -> bool:
    """Check if a Sharepoint drive or folder exists.

    Args:
        folder_root_path: Optional folder path to check.
        raise_error: Raise error if the folder doesn't exist.

    Returns:
        True if the endpoint exists, False otherwise.

    Raises:
        SharepointAPIError: If the endpoint doesn't exist and raise_error is True.
    """
    try:
        site_id = self._get_site_id()
        drive_id = self._get_drive_id()

        if not folder_root_path:
            return True

        endpoint = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
            f"{self.api_version}/sites/{site_id}/drives/{drive_id}"
            f"/root:/{folder_root_path}"
        )

        response = self._make_request(endpoint=endpoint)
        response.raise_for_status()
        return True

    except requests.HTTPError as error:
        if error.response.status_code == 404:
            _logger.warning(f"Sharepoint path doesn't exist: {folder_root_path}")
            if raise_error:
                raise SharePointAPIError(
                    f"Path '{folder_root_path}' doesn't exist!"
                )
            return False
        raise

check_if_local_path_exists(local_path)

Verify that a local path exists.

Parameters:

Name Type Description Default
local_path str

Local folder where files are temporarily stored.

required

Raises:

Type Description
SharePointAPIError

If the path cannot be read.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def check_if_local_path_exists(self, local_path: str) -> None:
    """Verify that a local path exists.

    Args:
        local_path: Local folder where files are temporarily stored.

    Raises:
        SharePointAPIError: If the path cannot be read.
    """
    try:
        os.listdir(local_path)
    except IOError as error:
        raise SharePointAPIError(f"{error}")

delete_local_path()

Delete and recreate the local path used for temporary storage.

Raises:

Type Description
SharePointAPIError

If deletion or recreation fails.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def delete_local_path(self) -> None:
    """Delete and recreate the local path used for temporary storage.

    Raises:
        SharePointAPIError: If deletion or recreation fails.
    """
    try:
        local_path = Path(self.local_path)
        if local_path.exists():
            shutil.rmtree(local_path)
        local_path.mkdir(parents=True, exist_ok=True)
    except Exception as e:
        raise SharePointAPIError(f"Failed to clear or recreate local path: {e}")

download_file_streaming(sp_file)

Download a large file from Sharepoint in chunks to a local path.

Uses the configured chunk size to avoid memory overload with large files.

Parameters:

Name Type Description Default
sp_file SharepointFile

File with remote path and name.

required

Returns:

Type Description
str

Local file path.

Raises:

Type Description
SharePointAPIError

If the download fails.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def download_file_streaming(self, sp_file: SharepointFile) -> str:
    """Download a large file from Sharepoint in chunks to a local path.

    Uses the configured chunk size to avoid memory overload with large files.

    Args:
        sp_file: File with remote path and name.

    Returns:
        Local file path.

    Raises:
        SharePointAPIError: If the download fails.
    """
    try:
        site_id = self._get_site_id()
        drive_id = self._get_drive_id()
        url = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
            f"sites/{site_id}/drives/{drive_id}/root:/{sp_file.file_path}:/content"
        )

        local_file_path = Path(self.local_path) / sp_file.file_name
        local_file_path.parent.mkdir(parents=True, exist_ok=True)

        with self._make_request(endpoint=url, stream=True) as response:
            response.raise_for_status()
            with open(local_file_path, "wb") as file:
                for chunk in response.iter_content(chunk_size=self.chunk_size):
                    if chunk:
                        file.write(chunk)

        return str(local_file_path)

    except requests.RequestException as error:
        raise SharePointAPIError(f"Failed to stream download: {error}")

get_file_metadata(file_path)

Fetch file metadata and content from Sharepoint.

Parameters:

Name Type Description Default
file_path str

Full Sharepoint path (e.g., 'folder/file.csv').

required

Returns:

Type Description
SharepointFile

SharepointFile with metadata and bytes content.

Raises:

Type Description
ValueError

If required metadata is missing or path is invalid.

HTTPError

On HTTP errors during retrieval.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def get_file_metadata(self, file_path: str) -> SharepointFile:
    """Fetch file metadata and content from Sharepoint.

    Args:
        file_path: Full Sharepoint path (e.g., 'folder/file.csv').

    Returns:
        SharepointFile with metadata and bytes content.

    Raises:
        ValueError: If required metadata is missing or path is invalid.
        requests.HTTPError: On HTTP errors during retrieval.
    """
    site_id = self._get_site_id()
    drive_id = self._get_drive_id()

    file_metadata_url = (
        f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
        f"{self.api_version}/sites/{site_id}/drives/{drive_id}/root:/{file_path}"
    )

    # Get metadata
    metadata_response = self._make_request(endpoint=file_metadata_url, method="GET")
    metadata = self._parse_json(
        metadata_response,
        f"fetching metadata for '{file_path}'",
    )

    file_name = metadata.get("name")
    time_created = metadata.get("createdDateTime", "")
    time_modified = metadata.get("lastModifiedDateTime", "")
    download_url = metadata.get("@microsoft.graph.downloadUrl")

    if not file_name or not download_url:
        raise ValueError(
            f"Missing required metadata for '{file_path}': "
            f"name={file_name!r}, "
            f"downloadUrl={'present' if download_url else 'absent'}"
        )

    # Download file content (bytes)
    content_response = self._make_request(endpoint=download_url, method="GET")
    content_response.raise_for_status()
    file_content = content_response.content

    if "/" not in file_path:
        raise ValueError(
            f"Invalid file path: '{file_path}'. Expected a folder/file structure."
        )
    folder = file_path.rsplit("/", 1)[0]

    return SharepointFile(
        file_name=file_name,
        time_created=time_created,
        time_modified=time_modified,
        content=file_content,
        _folder=folder,
    )

list_items_in_path(path)

List items (files/folders) at a Sharepoint path.

Parameters:

Name Type Description Default
path str

Relative folder or file path.

required

Returns:

Type Description
list[Any]

List of items; files include @microsoft.graph.downloadUrl.

Raises:

Type Description
ValueError

If the path is invalid or not found.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def list_items_in_path(self, path: str) -> list[Any]:
    """List items (files/folders) at a Sharepoint path.

    Args:
        path: Relative folder or file path.

    Returns:
        List of items; files include @microsoft.graph.downloadUrl.

    Raises:
        ValueError: If the path is invalid or not found.
    """
    site_id = self._get_site_id()
    drive_id = self._get_drive_id()

    path = path.strip("/")
    if not path:
        resp = self._make_request(
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
            f"sites/{site_id}/drives/{drive_id}/root/children"
        )
        data = self._parse_json(resp, "listing root children")
        return cast(List[dict[str, Any]], data.get("value", []))

    path_parts = path.split("/")

    # start from root children
    resp = self._make_request(
        f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/sites/"
        f"{site_id}/drives/{drive_id}/root/children"
    )
    data = self._parse_json(resp, "listing root children")
    items = cast(List[dict[str, Any]], data.get("value", []))

    for component in path_parts:
        current_item = next(
            (item for item in items if item.get("name") == component), None
        )

        if not current_item:
            raise ValueError(f"Path component '{component}' not found in '{path}'.")

        if "folder" in current_item:
            # descend into folder
            resp = self._make_request(
                f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/{self.api_version}/"
                f"sites/{site_id}/drives/{drive_id}/items/"
                f"{current_item['id']}/children"
            )
            data = self._parse_json(resp, f"listing children for '{component}'")
            items = cast(List[dict[str, Any]], data.get("value", []))
        else:
            # it's a file; ensure we have downloadUrl
            if "@microsoft.graph.downloadUrl" not in current_item:
                resp = self._make_request(
                    f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}/"
                    f"{self.api_version}/sites/{site_id}/drives/{drive_id}/"
                    f"items/{current_item['id']}"
                )
                current_item = self._parse_json(
                    resp, f"fetching file metadata for item id {current_item['id']}"
                )
            return [current_item]

    return items

save_to_staging_area(sp_file)

Save a Sharepoint file locally (direct write or streaming).

If the file is under the threshold and already loaded in memory, write its content directly. Otherwise, download the file via streaming to avoid memory overload.

Parameters:

Name Type Description Default
sp_file SharepointFile

File metadata and content.

required

Returns:

Type Description
str

Local file path.

Raises:

Type Description
SharePointAPIError

On download or write failure.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def save_to_staging_area(self, sp_file: SharepointFile) -> str:
    """Save a Sharepoint file locally (direct write or streaming).

    If the file is under the threshold and already loaded in memory, write its
    content directly.
    Otherwise, download the file via streaming to avoid memory overload.

    Args:
        sp_file: File metadata and content.

    Returns:
        Local file path.

    Raises:
        SharePointAPIError: On download or write failure.
    """
    try:
        if sp_file.content and sp_file.content_size < (500 * 1024 * 1024):
            _logger.info(
                f"Writing '{sp_file.file_name}' via direct write (under 500MB)."
            )
            return self.write_bytes_to_local_file(sp_file)

        _logger.info(
            f"Writing '{sp_file.file_name}' via streaming (500MB+ or content not"
            f" loaded)."
        )
        return self.download_file_streaming(sp_file)

    except Exception as e:
        raise SharePointAPIError(f"Failed to write '{sp_file.file_name}': {e}")

staging_area()

Provide a clean local staging folder for Sharepoint files.

Yield the local path after ensuring it's empty. Cleans up after use.

Yield

Path to the staging folder as a string.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
@contextmanager
def staging_area(self) -> Generator[str, None, None]:
    """Provide a clean local staging folder for Sharepoint files.

    Yield the local path after ensuring it's empty. Cleans up after use.

    Yield:
        Path to the staging folder as a string.
    """
    self.delete_local_path()
    try:
        yield self.local_path
    finally:
        try:
            self.delete_local_path()
        except Exception as e:
            _logger.warning(f"Failed to clean up local path: {e}")

write_bytes_to_local_file(sp_file)

Write Sharepoint file content (bytes) to a local path.

Parameters:

Name Type Description Default
sp_file SharepointFile

File with content and metadata.

required

Returns:

Type Description
str

Local file path.

Raises:

Type Description
ValueError

If content is missing.

RuntimeError

If writing to disk fails.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def write_bytes_to_local_file(self, sp_file: SharepointFile) -> str:
    """Write Sharepoint file content (bytes) to a local path.

    Args:
        sp_file: File with content and metadata.

    Returns:
        Local file path.

    Raises:
        ValueError: If content is missing.
        RuntimeError: If writing to disk fails.
    """
    if not sp_file.content:
        raise ValueError(
            f"Cannot write file '{sp_file.file_name}': Content is empty."
        )

    try:
        # Local base path (e.g., Unity Volumes, DBFS, or other mounted storage)
        local_base_path = Path(self.local_path)
        local_base_path.mkdir(parents=True, exist_ok=True)
        file_path = local_base_path / sp_file.file_name
        file_path.write_bytes(sp_file.content)
        return str(file_path)
    except Exception as e:
        raise RuntimeError(
            f"Failed to write file '{sp_file.file_name}' to Unity Volume: {e}"
        )

write_to_local_path(df)

Write a Spark DataFrame to a local path (Volume) in CSV format.

This method writes the provided Spark DataFrame to a specified local directory, saving it in CSV format. The method renames the output file from its default "part-*" naming convention to a specified file name. The dictionary local_options enables the customisation of the write action. The customizable options can be found here: https://spark.apache.org/docs/3.5.1/sql-data-sources-csv.html.

Parameters:

Name Type Description Default
df DataFrame

The Spark DataFrame to write to the local file system.

required

Returns:

Type Description
None

None.

Raises:

Type Description
IOError

If there is an issue during the file writing process.

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def write_to_local_path(self, df: DataFrame) -> None:
    """Write a Spark DataFrame to a local path (Volume) in CSV format.

    This method writes the provided Spark DataFrame to a specified local directory,
    saving it in CSV format. The method renames the output file from its default
    "part-*" naming convention to a specified file name.
    The dictionary local_options enables the customisation of the write action.
    The customizable options can be found here:
    https://spark.apache.org/docs/3.5.1/sql-data-sources-csv.html.

    Args:
        df: The Spark DataFrame to write to the local file system.

    Returns:
        None.

    Raises:
        IOError: If there is an issue during the file writing process.
    """
    try:
        df.coalesce(1).write.mode("overwrite").save(
            path=self.local_path,
            format="csv",
            **self.local_options if self.local_options else {},
        )
        self._rename_local_file(self.local_path, self.file_name)
    except IOError as error:
        raise SharePointAPIError(f"{error}")

write_to_sharepoint()

Upload a local file to Sharepoint in chunks using the Microsoft Graph API.

This method creates an upload session and uploads a local CSV file to a Sharepoint document library. The file is divided into chunks (based on the chunk_size specified) to handle large file uploads and send sequentially using the upload URL returned from the Graph API.

The method uses instance attributes such as api_domain, api_version, site_name, drive_name, folder_relative_path, and file_name to construct the necessary API calls and upload the file to the specified location in Sharepoint.

Returns:

Type Description
None

None.

Raises:

Type Description
APIError

If an error occurs during any stage of the upload

Source code in mkdocs/lakehouse_engine/packages/utils/sharepoint_utils.py
def write_to_sharepoint(self) -> None:
    """Upload a local file to Sharepoint in chunks using the Microsoft Graph API.

    This method creates an upload session and uploads a local CSV file to a
    Sharepoint document library.
    The file is divided into chunks (based on the `chunk_size` specified)
    to handle large file uploads and send sequentially using the upload URL
    returned from the Graph API.

    The method uses instance attributes such as `api_domain`, `api_version`,
    `site_name`, `drive_name`, `folder_relative_path`, and `file_name` to
    construct the necessary API calls and upload the file to the specified
    location in Sharepoint.

    Returns:
        None.

    Raises:
        APIError: If an error occurs during any stage of the upload
        (e.g., failure to create upload session,issues during chunk upload).
    """
    drive_id = self._get_drive_id()

    if self.folder_relative_path:
        endpoint = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}"
            f"/{self.api_version}/drives/{drive_id}/items/root:"
            f"/{self.folder_relative_path}/{self.file_name}.csv:"
            f"/createUploadSession"
        )
    else:
        endpoint = (
            f"{ExecEnv.ENGINE_CONFIG.sharepoint_api_domain}"
            f"/{self.api_version}/drives/{drive_id}/items/root:"
            f"/{self.file_name}.csv:/createUploadSession"
        )

    response = self._make_request(method="POST", endpoint=endpoint)
    response.raise_for_status()
    upload_session = response.json()
    upload_url = upload_session["uploadUrl"]

    upload_file = str(Path(self.local_path) / self.file_name)
    stat = os.stat(upload_file)
    size = stat.st_size

    with open(upload_file, "rb") as data:
        start = 0
        while start < size:
            chunk = data.read(self.chunk_size)
            bytes_read = len(chunk)
            upload_range = f"bytes {start}-{start + bytes_read - 1}/{size}"
            headers = {
                "Content-Length": str(bytes_read),
                "Content-Range": upload_range,
            }
            response = self._make_request(
                method="PUT", endpoint=upload_url, headers=headers, data=chunk
            )
            response.raise_for_status()
            start += bytes_read