Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming upload/download for blobs #2871

Closed
cristicbz opened this issue Dec 15, 2016 · 16 comments
Closed

Streaming upload/download for blobs #2871

cristicbz opened this issue Dec 15, 2016 · 16 comments
Assignees
Labels
api: storage Issues related to the Cloud Storage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@cristicbz
Copy link

We're in the process of moving from S3 to GCS and we've hit a snag: with boto S3 one can stream a download using the read method on a key, and stream an upload using multiple 'multipart' calls: initiate_multipart_upload, upload_part_from* and complete_upload.

It's imperfect, but this allows simple streaming uploads and downloads of very large files. Furthermore, stick some buffering in there and you can easily wrap S3 reads and writes into file-like objects (without seeking), like smart_open does.

I think that the API supports all the pieces necessary to implement this in the client (which would be greatly appreciated!), but in the meantime can anyone suggest a workaround which does not require a temporary file?

Thanks!

@daspecster daspecster added the api: storage Issues related to the Cloud Storage API. label Dec 15, 2016
@danoscarmike danoscarmike added priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. Status: Acknowledged labels Feb 28, 2017
@linar-jether
Copy link
Contributor

linar-jether commented Jun 27, 2017

At least for uploads, there is support, #1914

Could be documented better...

@linar-jether
Copy link
Contributor

Simple example for a streaming upload of a large dataframe

impos os
import threading

r, w = os.pipe()
r, w = os.fdopen(r, 'rb'), os.fdopen(w, 'wb')

df = _large_pandas_df

path = 'gs://test_bucket/testfile.pickle'

# Important to set a chunk size when creating the blob `bucket.blob(blob_path, chunk_size=10 << 20)`
b = get_blob(path)

# Perform the upload in a separate thread
t = threading.Thread(target=lambda b, r: b.upload_from_file(r), name='upload-blob:%s' % path, args=(b, r))

# Since the blob upload is reading from a pipe, the operation is blocking until the pipe closes
t.start()

# This starts writing to the pipe
df.to_pickle(w)

# Make sure to close the pipe so the upload completes
w.close()

# Wait for the upload to complete
t.join()

@teoguso
Copy link

teoguso commented Jul 14, 2017

@linar-jether that's a great snippet. I have a question though: what does your get_blob() function do exactly? Is it the same as the .blob() method of the bucket class?

@linar-jether
Copy link
Contributor

Just a util function that gets a Blob object from a path ('gs://bucket_name/obj.name')

@teoguso
Copy link

teoguso commented Jul 14, 2017

Thanks! Do you have suggestions for the recommended chunk size?

@linar-jether
Copy link
Contributor

linar-jether commented Jul 14, 2017 via email

@teoguso
Copy link

teoguso commented Jul 14, 2017

Thanks a lot, and I'm sorry, I am quite new to this. Could I use the same procedure to stream up JSON strings directly instead of bytes?

@linar-jether
Copy link
Contributor

linar-jether commented Jul 14, 2017 via email

@lukesneeringer lukesneeringer removed the priority: p2 Moderately-important priority. Fix may not be included in next release. label Aug 9, 2017
@lukesneeringer
Copy link
Contributor

lukesneeringer commented Aug 11, 2017

This seems to be resolved, as best as I can tell. Closing.

@cristicbz / @teoguso, poke if I got it wrong, and I will reopen.

@eap
Copy link

eap commented Aug 30, 2017

Can I request that this issue is reopened? We use GCS extensively at Verily and we frequently deal with large indexed files since we use a variety of industry-standard genomics file formats (VCF, BAM, SAM, etc...). Being able to read a file header then seek to a specific position is a MAJOR speedup for some processes and we've had to hack around the lack of this by manually modifying the Content-Range headers a few times.

It seems to me that implementing Python's io.IOBase interface would be an ideal solution to this problem.

There is prior work on this task (see gcs-client) and it's even under a compatible license, but I'm guessing that some amount of work would be needed to clean this code up and adapt it to follow the conventions and style here.

@dhermes
Copy link
Contributor

dhermes commented Aug 30, 2017

@e4p This functionality exists right now in google-resumable-media.

I'm fairly certain the thing you're asking for (i.e. partial reads) are not the same as what this issue was originally about.

Just in case I misunderstand you, could you provide a code snippet (real or "fictional") of what you'd like to do and how it fails with the current code?

@eap
Copy link

eap commented Aug 30, 2017

First, I want to thank you for pointing me to google-resumable-media it certainly will cover some of our uses. That said, it's not a replacement for a standard file object. One example of this; biopython can accept file handles as arguments. The following code can stream a FASTQ (genomic read quality file) if you have access to a file object.

from Bio import SeqIO
with open("example.fastq", "rU") as fileobj:
    for rec in SeqIO.parse(fileobj, "fastq"):
        print(rec.seq)
        print('  name=%s\n  annotations=%r' % (rec.name, rec.annotations))

Should this be opened as a new issue? I was looking for an existing issue covering this request and couldn't find one.

@gastlich
Copy link

gastlich commented Mar 21, 2018

@linar-jether do you know if there is any other way nowadays to achieve what you have done in your code snippet? It's not working anymore, because of the changes in reusable-media, which expects to invoke seek on stream object

@linar-jether
Copy link
Contributor

@gastlich Yes, the snippet seems to be broken with the current version...

@dhermes, looks like a regression on recent versions

Haven't had the chance to use the google-resumable-media library but it might provide the functionality to do streaming uploads.

Don't know the reason but our use case for streaming uploads still works (uploading from a streaming http request), might be a different code path for different objects

import threading
import requests

# Fetch  
res = requests.get('http://speedtest.ftp.otenet.gr/files/test10Mb.db', stream=True)
r = res.raw

path = 'gs://test_bucket/testfile.pickle'

# Important to set a chunk size when creating the blob `bucket.blob(blob_path, chunk_size=10 << 20)`
b = get_blob(path, 1 << 20)

# Perform the upload in a separate thread
t = threading.Thread(target=lambda b, r: b.upload_from_file(r), name='upload-blob:%s' % path, args=(b, r))

# Since the blob upload is reading from a pipe, the operation is blocking until the pipe closes
t.start()

# Wait for the upload to complete
t.join()

@gastlich
Copy link

@linar-jether thanks for quick reply

I checked your code and it's working fine with requests response.raw object. I think what's happening here is, writing to response.raw doesn't move a position of cursor, so that the upload_from_file might start from the beginning - otherwise it complains with:

if stream.tell() != 0:
    raise ValueError(u'Stream must be at beginning.')

I still haven't solved my issue, because I'm writing into memory buffer io.BytesIO, and every write moves the cursor to the end of written bytes...

with BytesIO() as file:
    destination = f'postgres/{table_name}/{org_id}.csv'
    blob = bucket.blob(destination, chunk_size=10 << 20)

    thread = threading.Thread(
        target=lambda b, r: blob.upload_from_file(r),
        name='upload-blob:%s' % destination,
        args=(blob, file))

    query = f'SELECT * FROM {table_name} WHERE organisation_id={org_id} LIMIT 10000'
    cursor.execute(query)

    # writes into memory, but moves cursor all the time
    export_query = f'COPY ({query}) TO STDOUT WITH CSV HEADER'
    cursor.copy_expert(export_query, file)

    # Seek to the beginning
    file.seek(0)
    # I cannot start thread before `cursor.copy_expert` because resumable_media expects to start from beginning
    thread.start()

    # Wait for the upload to complete
    thread.join()

As you can see, in my example I'm able to get rid of threading and do everything in blocking/synchronous way. But my problem is that the data set might be very big, and python will consume all of the allowed RAM memory. I partially solved my issue by using stored temp file on file disk, at least it won't consume memory but it's slower and I'm still hoping to do this properly with in-memory approach.

@Edorka
Copy link

Edorka commented Jul 22, 2021

I don't get why stream.tell() != 0: should raise an exception instead of a warning.
I was using the example #2871 (comment) we were able to continue mocking the tell method as a opened pipe tell will give an os exception.

However when the program continues we get an extrange error from the API:

google.api_core.exceptions.BadRequest: 400 PUT https://storage.googleapis.com/upload/storage/v1/b/proteus-dev-eov-delete_me/o?uploadType=resumable&upload_id=ADPycdvFahNsxwUYj0gixPSamAP8N07ru_yTNRef49xdXbAt89IKNiAyY1tp5CDxXzt3U0M0Jo4KRd-awc8tyHYmu48: Client sent query request with a non-empty body.: ('Request failed with status code', 400, 'Expected one of', <HTTPStatus.OK: 200>, 308)

This error is caused in this line:

venv/lib/python3.8/site-packages/google/cloud/storage/blob.py", line 2348, in upload_from_file
      created_json = self._do_upload(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

10 participants