lakehouse_engine.utils.storage.s3_storage
Module to represent a s3 file storage system.
1"""Module to represent a s3 file storage system.""" 2 3from typing import Any 4from urllib.parse import ParseResult 5 6import boto3 7 8from lakehouse_engine.utils.logging_handler import LoggingHandler 9from lakehouse_engine.utils.storage.file_storage import FileStorage 10 11 12class S3Storage(FileStorage): 13 """Class to represent a s3 file storage system.""" 14 15 _LOGGER = LoggingHandler(__name__).get_logger() 16 17 @classmethod 18 def get_file_payload(cls, url: ParseResult) -> Any: 19 """Get the payload of a config file. 20 21 Args: 22 url: url of the file. 23 24 Returns: 25 File payload/content. 26 """ 27 s3 = boto3.resource("s3") 28 obj = s3.Object(url.netloc, url.path.lstrip("/")) 29 cls._LOGGER.info( 30 f"Trying with s3_storage: " 31 f"Reading from file: {url.scheme}://{url.netloc}{url.path}" 32 ) 33 return obj.get()["Body"] 34 35 @classmethod 36 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 37 """Write payload into a file. 38 39 Args: 40 url: url of the file. 41 content: content to write into the file. 42 """ 43 s3 = boto3.resource("s3") 44 obj = s3.Object(url.netloc, url.path.lstrip("/")) 45 cls._LOGGER.info( 46 f"Trying with s3_storage: " 47 f"Writing into file: {url.scheme}://{url.netloc}{url.path}" 48 ) 49 obj.put(Body=content)
13class S3Storage(FileStorage): 14 """Class to represent a s3 file storage system.""" 15 16 _LOGGER = LoggingHandler(__name__).get_logger() 17 18 @classmethod 19 def get_file_payload(cls, url: ParseResult) -> Any: 20 """Get the payload of a config file. 21 22 Args: 23 url: url of the file. 24 25 Returns: 26 File payload/content. 27 """ 28 s3 = boto3.resource("s3") 29 obj = s3.Object(url.netloc, url.path.lstrip("/")) 30 cls._LOGGER.info( 31 f"Trying with s3_storage: " 32 f"Reading from file: {url.scheme}://{url.netloc}{url.path}" 33 ) 34 return obj.get()["Body"] 35 36 @classmethod 37 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 38 """Write payload into a file. 39 40 Args: 41 url: url of the file. 42 content: content to write into the file. 43 """ 44 s3 = boto3.resource("s3") 45 obj = s3.Object(url.netloc, url.path.lstrip("/")) 46 cls._LOGGER.info( 47 f"Trying with s3_storage: " 48 f"Writing into file: {url.scheme}://{url.netloc}{url.path}" 49 ) 50 obj.put(Body=content)
Class to represent a s3 file storage system.
@classmethod
def
get_file_payload(cls, url: urllib.parse.ParseResult) -> Any:
18 @classmethod 19 def get_file_payload(cls, url: ParseResult) -> Any: 20 """Get the payload of a config file. 21 22 Args: 23 url: url of the file. 24 25 Returns: 26 File payload/content. 27 """ 28 s3 = boto3.resource("s3") 29 obj = s3.Object(url.netloc, url.path.lstrip("/")) 30 cls._LOGGER.info( 31 f"Trying with s3_storage: " 32 f"Reading from file: {url.scheme}://{url.netloc}{url.path}" 33 ) 34 return obj.get()["Body"]
Get the payload of a config file.
Arguments:
- url: url of the file.
Returns:
File payload/content.
@classmethod
def
write_payload_to_file(cls, url: urllib.parse.ParseResult, content: str) -> None:
36 @classmethod 37 def write_payload_to_file(cls, url: ParseResult, content: str) -> None: 38 """Write payload into a file. 39 40 Args: 41 url: url of the file. 42 content: content to write into the file. 43 """ 44 s3 = boto3.resource("s3") 45 obj = s3.Object(url.netloc, url.path.lstrip("/")) 46 cls._LOGGER.info( 47 f"Trying with s3_storage: " 48 f"Writing into file: {url.scheme}://{url.netloc}{url.path}" 49 ) 50 obj.put(Body=content)
Write payload into a file.
Arguments:
- url: url of the file.
- content: content to write into the file.