diff --git a/kettle/stream.py b/kettle/stream.py index 9b4fe0cdbe82..6c0f4c6c3cb0 100755 --- a/kettle/stream.py +++ b/kettle/stream.py @@ -29,6 +29,7 @@ import multiprocessing.pool try: + from google.api_core import exceptions as api_exceptions from google.cloud import bigquery from google.cloud import pubsub_v1 import google.cloud.exceptions @@ -40,7 +41,7 @@ import make_db import make_json -MAX_ROW_UPLOAD = 25 +MAX_ROW_UPLOAD = 10 # See https://github.com/googleapis/google-cloud-go/issues/2855 def process_changes(results): """Split GCS change events into trivial ack_ids and builds to further process.""" @@ -94,7 +95,7 @@ def retry(func, *args, **kwargs): for attempt in range(20): try: return func(*args, **kwargs) - except (socket.error, google.cloud.exceptions.ServerError): + except (socket.error, google.cloud.exceptions.ServerError, api_exceptions.BadRequest): # retry with exponential backoff traceback.print_exc() time.sleep(1.4 ** attempt) @@ -131,9 +132,9 @@ def divide_chunks(l, bin_size=MAX_ROW_UPLOAD): # Insert rows with row_ids into table, retrying as necessary. errors = retry(bq_client.insert_rows, table, chunk, skip_invalid_rows=True) if not errors: - print('Loaded {} builds into {}'.format(len(chunk), table.full_table_id)) + print(f'Loaded {len(chunk)} builds into {table.full_table_id}') else: - print('Errors:') + print(f'Errors on Chunk: {chunk}') pprint.pprint(errors) pprint.pprint(table.schema)