Skip to content

Commit

Permalink
Merge pull request #20666 from MushuEE/kettle/bad_request
Browse files Browse the repository at this point in the history
[Kettle] Decrease Max chunk size and catch Over Size error
  • Loading branch information
k8s-ci-robot authored Jan 29, 2021
2 parents e8db635 + 8762f21 commit 8d83b22
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions kettle/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import multiprocessing.pool

try:
from google.api_core import exceptions as api_exceptions
from google.cloud import bigquery
from google.cloud import pubsub_v1
import google.cloud.exceptions
Expand All @@ -40,7 +41,7 @@
import make_db
import make_json

MAX_ROW_UPLOAD = 25
MAX_ROW_UPLOAD = 10 # See https://github.com/googleapis/google-cloud-go/issues/2855

def process_changes(results):
"""Split GCS change events into trivial ack_ids and builds to further process."""
Expand Down Expand Up @@ -94,7 +95,7 @@ def retry(func, *args, **kwargs):
for attempt in range(20):
try:
return func(*args, **kwargs)
except (socket.error, google.cloud.exceptions.ServerError):
except (socket.error, google.cloud.exceptions.ServerError, api_exceptions.BadRequest):
# retry with exponential backoff
traceback.print_exc()
time.sleep(1.4 ** attempt)
Expand Down Expand Up @@ -131,9 +132,9 @@ def divide_chunks(l, bin_size=MAX_ROW_UPLOAD):
# Insert rows with row_ids into table, retrying as necessary.
errors = retry(bq_client.insert_rows, table, chunk, skip_invalid_rows=True)
if not errors:
print('Loaded {} builds into {}'.format(len(chunk), table.full_table_id))
print(f'Loaded {len(chunk)} builds into {table.full_table_id}')
else:
print('Errors:')
print(f'Errors on Chunk: {chunk}')
pprint.pprint(errors)
pprint.pprint(table.schema)

Expand Down

0 comments on commit 8d83b22

Please sign in to comment.