-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GCSHook only perform list if delimiter specified #36130
Conversation
Good catch! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @shahar1 we need tests to avoid regression.
Can you please add unit tests?
list(blobs) | ||
if delimiter: | ||
list(blobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, there is a problem with the original PR (#34919) that added this statement, where we create the generators to avoid loading all the data in the memory, but by applying list
on the blobs generator, it looks like you are forcing the generator to loop over all the pages and load them in memory.
Instead, the solution should be based on a loop on the generator elements similar to what we do here:
airflow/airflow/providers/google/cloud/hooks/gcs.py
Lines 941 to 945 in 5d74ffb
ids.extend( | |
blob.name | |
for blob in blobs | |
if timespan_start <= blob.updated.replace(tzinfo=timezone.utc) < timespan_end | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a delimiter is used, the iterator must be started so that blobs.prefixes
is populated, which is how the blob names are then captured.
Whereas, when match_glob
is used, blobs.prefixes
is never populated and instead you must iterate over blobs and access each blob.name
.
I'm not sure why this is the case. The original version of the code started the iterator by using a for loop to iterate over blobs no matter whether a delimiter had been passed in or not.
airflow/airflow/providers/google/cloud/hooks/gcs.py
Lines 828 to 838 in 9079093
blob_names = [] | |
for blob in blobs: | |
blob_names.append(blob.name) | |
prefixes = blobs.prefixes | |
if prefixes: | |
ids += list(prefixes) | |
else: | |
ids += blob_names | |
When delimiter is used, if you modify the code to be the following:
for blob in blobs:
print("here")
blob_names.append(blob.name)
Nothing is printed. Simply by starting the iterator blobs.prefixes
is populated.
The code has been modified so rather than using a list(
, it will use next(
. I don't think it will make any difference since blobs appears to be an empty list.
The code has been tested against a bucket with over 100k files, the same number of files is returned when either delimiter
or match_glob
is passed in.
You also make an interesting point with referencing the code in list_by_timespan
, I don't believe that this method would have ever filtered the files based on timespan when delimiter has been passed in:
airflow/airflow/providers/google/cloud/hooks/gcs.py
Lines 938 to 945 in 5d74ffb
if blobs.prefixes: | |
ids.extend(blobs.prefixes) | |
else: | |
ids.extend( | |
blob.name | |
for blob in blobs | |
if timespan_start <= blob.updated.replace(tzinfo=timezone.utc) < timespan_end | |
) |
It would simply add all the prefixes without doing any filtering.
@hussein-awala I agree I think that would be better. I found an issue where this would fail if a prefix is passed in which does not exist, it would throw the else:
blobs = bucket.list_blobs(
max_results=max_results,
page_token=page_token,
prefix=prefix,
delimiter=delimiter,
versions=versions,
)
if delimiter:
blobs_iter = next(blobs, None)
if blobs.prefixes:
ids.extend(blobs.prefixes)
elif not delimiter:
ids.extend(blob.name for blob in blobs)
page_token = blobs.next_page_token
if page_token is None:
# empty next page token
break
return ids I don't understand why the google storage client is behaving differently when a delimiter is used, and when results exist. The issue seems to be stemming from that. The above snippet however is able to list objects when a delimiter is passed in, handles prefixes with no objects and avoids using a Using the above however, it doesn't seem to page. Lets say we have: GCSHook().list(
bucket_name='a-testbucket',
prefix='a/prefix/in/the/bucket/',
delimiter='.csv',
max_results=10
) When running this on a bucket with 1000s of files, everything is captured with However, if called with: GCSHook().list(
bucket_name='a-testbucket',
prefix='a/prefix/in/the/bucket/',
match_glob='**/*/*.csv',
max_results=10
)
I'm not sure if this is expected behaviour of the gcs client. It's possible that this behaviour may change in the future and I'm not sure how the above code would handle that. |
The old existing code before this file was refactored seems to work fine for airflow/airflow/providers/google/cloud/hooks/gcs.py Lines 828 to 838 in 9079093
|
70352af
to
2f9d483
Compare
When delimiter is passed into list_blobs, prefixes is not populated, loading is forced with next(blobs, None). Do not attempt to iterate again over blobs if blobs.prefixes is empty and delimiter has been passed in. This prevents the "iterator has already started" error message.
2f9d483
to
0728cfa
Compare
I think we should close this in favour of #36202 |
This fixes the issue raised here against #34919
When testing locally the list only appears to be required when a delimiter is passed in.
A sample test can be performed with:
With this fix, the code no longer fails with:
To confirm that this block:
Is still required, when this is removed, and this is ran:
No results are returned. Everything works as expected however once the block is in place.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.