Skip to content

Commit

Permalink
refactor: update load_stream method to directly yield file chunks (la…
Browse files Browse the repository at this point in the history
  • Loading branch information
hwzhuhao authored and JunXu01 committed Nov 9, 2024
1 parent 11ae746 commit 77005f3
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 83 deletions.
9 changes: 3 additions & 6 deletions api/extensions/storage/aliyun_oss_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
while chunk := obj.read(4096):
yield chunk

return generate()
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
while chunk := obj.read(4096):
yield chunk

def download(self, filename, target_filepath):
self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath)
Expand Down
19 changes: 8 additions & 11 deletions api/extensions/storage/aws_s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,14 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].iter_chunks()
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError("File not found")
else:
raise

return generate()
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].iter_chunks()
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError("File not found")
else:
raise

def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath)
Expand Down
10 changes: 3 additions & 7 deletions api/extensions/storage/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ def load_once(self, filename: str) -> bytes:

def load_stream(self, filename: str) -> Generator:
client = self._sync_client()

def generate(filename: str = filename) -> Generator:
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
blob_data = blob.download_blob()
yield from blob_data.chunks()

return generate(filename)
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
blob_data = blob.download_blob()
yield from blob_data.chunks()

def download(self, filename, target_filepath):
client = self._sync_client()
Expand Down
9 changes: 3 additions & 6 deletions api/extensions/storage/baidu_obs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ def load_once(self, filename: str) -> bytes:
return response.data.read()

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
while chunk := response.read(4096):
yield chunk

return generate()
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
while chunk := response.read(4096):
yield chunk

def download(self, filename, target_filepath):
self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath)
Expand Down
13 changes: 5 additions & 8 deletions api/extensions/storage/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with blob.open(mode="rb") as blob_stream:
while chunk := blob_stream.read(4096):
yield chunk

return generate()
bucket = self.client.get_bucket(self.bucket_name)
blob = bucket.get_blob(filename)
with blob.open(mode="rb") as blob_stream:
while chunk := blob_stream.read(4096):
yield chunk

def download(self, filename, target_filepath):
bucket = self.client.get_bucket(self.bucket_name)
Expand Down
9 changes: 3 additions & 6 deletions api/extensions/storage/huawei_obs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
while chunk := response.read(4096):
yield chunk

return generate()
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
while chunk := response.read(4096):
yield chunk

def download(self, filename, target_filepath):
self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath)
Expand Down
14 changes: 5 additions & 9 deletions api/extensions/storage/local_fs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,11 @@ def load_once(self, filename: str) -> bytes:

def load_stream(self, filename: str) -> Generator:
filepath = self._build_filepath(filename)

def generate() -> Generator:
if not os.path.exists(filepath):
raise FileNotFoundError("File not found")
with open(filepath, "rb") as f:
while chunk := f.read(4096): # Read in chunks of 4KB
yield chunk

return generate()
if not os.path.exists(filepath):
raise FileNotFoundError("File not found")
with open(filepath, "rb") as f:
while chunk := f.read(4096): # Read in chunks of 4KB
yield chunk

def download(self, filename, target_filepath):
filepath = self._build_filepath(filename)
Expand Down
19 changes: 8 additions & 11 deletions api/extensions/storage/oracle_oci_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].iter_chunks()
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError("File not found")
else:
raise

return generate()
try:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].iter_chunks()
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError("File not found")
else:
raise

def download(self, filename, target_filepath):
self.client.download_file(self.bucket_name, filename, target_filepath)
Expand Down
13 changes: 5 additions & 8 deletions api/extensions/storage/supabase_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,14 @@ def load_once(self, filename: str) -> bytes:
return content

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
result = self.client.storage.from_(self.bucket_name).download(filename)
byte_stream = io.BytesIO(result)
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
yield chunk

return generate()
result = self.client.storage.from_(self.bucket_name).download(filename)
byte_stream = io.BytesIO(result)
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
yield chunk

def download(self, filename, target_filepath):
result = self.client.storage.from_(self.bucket_name).download(filename)
Path(result).write_bytes(result)
Path(target_filepath).write_bytes(result)

def exists(self, filename):
result = self.client.storage.from_(self.bucket_name).list(filename)
Expand Down
7 changes: 2 additions & 5 deletions api/extensions/storage/tencent_cos_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].get_stream(chunk_size=4096)

return generate()
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
yield from response["Body"].get_stream(chunk_size=4096)

def download(self, filename, target_filepath):
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
Expand Down
9 changes: 3 additions & 6 deletions api/extensions/storage/volcengine_tos_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ def load_once(self, filename: str) -> bytes:
return data

def load_stream(self, filename: str) -> Generator:
def generate(filename: str = filename) -> Generator:
response = self.client.get_object(bucket=self.bucket_name, key=filename)
while chunk := response.read(4096):
yield chunk

return generate()
response = self.client.get_object(bucket=self.bucket_name, key=filename)
while chunk := response.read(4096):
yield chunk

def download(self, filename, target_filepath):
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
Expand Down

0 comments on commit 77005f3

Please sign in to comment.