Bases: Writer
Class to write data to Sharepoint.
This writer is designed specifically for uploading a single file
to Sharepoint. It first writes the data locally before uploading
it to the specified Sharepoint location. Since it handles only
a single file at a time, any logic for writing multiple files
must be implemented on the notebook-side.
Source code in mkdocs/lakehouse_engine/packages/io/writers/sharepoint_writer.py
| class SharepointWriter(Writer):
"""Class to write data to Sharepoint.
This writer is designed specifically for uploading a single file
to Sharepoint. It first writes the data locally before uploading
it to the specified Sharepoint location. Since it handles only
a single file at a time, any logic for writing multiple files
must be implemented on the notebook-side.
"""
def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
"""Construct FileWriter instances.
Args:
output_spec: output specification
df: dataframe to be written.
data: list of all dfs generated on previous steps before writer.
"""
super().__init__(output_spec, df, data)
self.sharepoint_utils = self._get_sharepoint_utils()
self._logger = LoggingHandler(__name__).get_logger()
def write(self) -> None:
"""Upload data to Sharepoint."""
if self._df.isStreaming:
raise NotSupportedException("Sharepoint writer doesn't support streaming!")
self._output_spec.sharepoint_opts.validate_for_writer()
if not self.sharepoint_utils.check_if_endpoint_exists(
folder_root_path=self._output_spec.sharepoint_opts.folder_relative_path
):
raise EndpointNotFoundException("The provided endpoint does not exist!")
self._write_to_sharepoint_in_batch_mode(self._df)
def _get_sharepoint_utils(self) -> SharepointUtils:
sharepoint_utils = SharepointUtils(
client_id=self._output_spec.sharepoint_opts.client_id,
tenant_id=self._output_spec.sharepoint_opts.tenant_id,
local_path=self._output_spec.sharepoint_opts.local_path,
api_version=self._output_spec.sharepoint_opts.api_version,
site_name=self._output_spec.sharepoint_opts.site_name,
drive_name=self._output_spec.sharepoint_opts.drive_name,
file_name=self._output_spec.sharepoint_opts.file_name,
folder_relative_path=self._output_spec.sharepoint_opts.folder_relative_path,
chunk_size=self._output_spec.sharepoint_opts.chunk_size,
local_options=self._output_spec.sharepoint_opts.local_options,
secret=self._output_spec.sharepoint_opts.secret,
conflict_behaviour=self._output_spec.sharepoint_opts.conflict_behaviour,
)
return sharepoint_utils
def _write_to_sharepoint_in_batch_mode(self, df: DataFrame) -> None:
"""Write to Sharepoint in batch mode.
This method first writes the provided DataFrame to a local file using the
SharePointUtils `write_to_local_path` method. If the local file is successfully
written, it then uploads the file to Sharepoint using the `write_to_sharepoint`
method, logging the process and outcome.
Args:
df: The DataFrame to write to a local file and subsequently
upload to Sharepoint.
"""
local_path = self._output_spec.sharepoint_opts.local_path
file_name = self._output_spec.sharepoint_opts.file_name
self._logger.info(f"Starting to write the data to the local path: {local_path}")
try:
self.sharepoint_utils.write_to_local_path(df)
except IOError as err:
self.sharepoint_utils.delete_local_path()
self._logger.info(f"Deleted the local folder: {local_path}")
raise WriteToLocalException(
f"The data was not written on the local path: {local_path}"
) from err
self._logger.info(f"The data was written to the local path: {local_path}")
file_size = os.path.getsize(local_path)
self._logger.info(
f"Uploading the {file_name} ({file_size} bytes) to Sharepoint."
)
self.sharepoint_utils.write_to_sharepoint()
self._logger.info(f"The {file_name} was uploaded to Sharepoint with success!")
self.sharepoint_utils.delete_local_path()
self._logger.info(f"Deleted the local folder: {local_path}")
|
__init__(output_spec, df, data)
Construct FileWriter instances.
Parameters:
| Name |
Type |
Description |
Default |
output_spec
|
OutputSpec
|
|
required
|
df
|
DataFrame
|
|
required
|
data
|
OrderedDict
|
list of all dfs generated on previous steps before writer.
|
required
|
Source code in mkdocs/lakehouse_engine/packages/io/writers/sharepoint_writer.py
| def __init__(self, output_spec: OutputSpec, df: DataFrame, data: OrderedDict):
"""Construct FileWriter instances.
Args:
output_spec: output specification
df: dataframe to be written.
data: list of all dfs generated on previous steps before writer.
"""
super().__init__(output_spec, df, data)
self.sharepoint_utils = self._get_sharepoint_utils()
self._logger = LoggingHandler(__name__).get_logger()
|
write()
Upload data to Sharepoint.
Source code in mkdocs/lakehouse_engine/packages/io/writers/sharepoint_writer.py
| def write(self) -> None:
"""Upload data to Sharepoint."""
if self._df.isStreaming:
raise NotSupportedException("Sharepoint writer doesn't support streaming!")
self._output_spec.sharepoint_opts.validate_for_writer()
if not self.sharepoint_utils.check_if_endpoint_exists(
folder_root_path=self._output_spec.sharepoint_opts.folder_relative_path
):
raise EndpointNotFoundException("The provided endpoint does not exist!")
self._write_to_sharepoint_in_batch_mode(self._df)
|