Skip to content

Commit

Permalink
feat: make it easier to disable best-effort deduplication with stream…
Browse files Browse the repository at this point in the history
…ing inserts (#734)

* feat: make it easier to disable row insert IDs

* Also accept any iterables for row_ids
  • Loading branch information
plamut authored Jul 1, 2021
1 parent 38b3ef9 commit 1246da8
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 14 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.enums import KeyResultStatementKind
from google.cloud.bigquery.enums import SqlTypeNames
from google.cloud.bigquery.enums import StandardSqlDataTypes
Expand Down Expand Up @@ -144,6 +145,7 @@
"DEFAULT_RETRY",
# Enum Constants
"enums",
"AutoRowIDs",
"Compression",
"CreateDisposition",
"DestinationFormat",
Expand Down
47 changes: 41 additions & 6 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery.exceptions import LegacyBigQueryStorageError
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
Expand Down Expand Up @@ -3349,7 +3350,7 @@ def insert_rows_json(
self,
table: Union[Table, TableReference, str],
json_rows: Sequence[Dict],
row_ids: Sequence[str] = None,
row_ids: Union[Iterable[str], AutoRowIDs, None] = AutoRowIDs.GENERATE_UUID,
skip_invalid_rows: bool = None,
ignore_unknown_values: bool = None,
template_suffix: str = None,
Expand All @@ -3371,11 +3372,20 @@ def insert_rows_json(
json_rows (Sequence[Dict]):
Row data to be inserted. Keys must match the table schema fields
and values must be JSON-compatible representations.
row_ids (Optional[Sequence[Optional[str]]]):
row_ids (Union[Iterable[str], AutoRowIDs, None]):
Unique IDs, one per row being inserted. An ID can also be
``None``, indicating that an explicit insert ID should **not**
be used for that row. If the argument is omitted altogether,
unique IDs are created automatically.
.. versionchanged:: 2.21.0
Can also be an iterable, not just a sequence, or an
:class:`AutoRowIDs` enum member.
.. deprecated:: 2.21.0
Passing ``None`` to explicitly request autogenerating insert IDs is
deprecated, use :attr:`AutoRowIDs.GENERATE_UUID` instead.
skip_invalid_rows (Optional[bool]):
Insert all valid rows of a request, even if invalid rows exist.
The default value is ``False``, which causes the entire request
Expand Down Expand Up @@ -3415,12 +3425,37 @@ def insert_rows_json(
rows_info = []
data = {"rows": rows_info}

for index, row in enumerate(json_rows):
if row_ids is None:
warnings.warn(
"Passing None for row_ids is deprecated. To explicitly request "
"autogenerated insert IDs, use AutoRowIDs.GENERATE_UUID instead",
category=DeprecationWarning,
)
row_ids = AutoRowIDs.GENERATE_UUID

if not isinstance(row_ids, AutoRowIDs):
try:
row_ids_iter = iter(row_ids)
except TypeError:
msg = "row_ids is neither an iterable nor an AutoRowIDs enum member"
raise TypeError(msg)

for i, row in enumerate(json_rows):
info = {"json": row}
if row_ids is not None:
info["insertId"] = row_ids[index]
else:

if row_ids is AutoRowIDs.GENERATE_UUID:
info["insertId"] = str(uuid.uuid4())
elif row_ids is AutoRowIDs.DISABLED:
info["insertId"] = None
else:
try:
insert_id = next(row_ids_iter)
except StopIteration:
msg = f"row_ids did not generate enough IDs, error at index {i}"
raise ValueError(msg)
else:
info["insertId"] = insert_id

rows_info.append(info)

if skip_invalid_rows is not None:
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/bigquery/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
from google.cloud.bigquery.query import ScalarQueryParameterType


class AutoRowIDs(enum.Enum):
"""How to handle automatic insert IDs when inserting rows as a stream."""

DISABLED = enum.auto()
GENERATE_UUID = enum.auto()


class Compression(object):
"""The compression type to use for exported files. The default value is
:attr:`NONE`.
Expand Down
153 changes: 145 additions & 8 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5434,7 +5434,7 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self):
method="POST", path=API_PATH, data=EXPECTED_SENT_DATA, timeout=None
)

def test_insert_rows_json(self):
def test_insert_rows_json_default_behavior(self):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import Table
Expand Down Expand Up @@ -5481,29 +5481,127 @@ def test_insert_rows_json(self):
method="POST", path="/%s" % PATH, data=SENT, timeout=7.5,
)

def test_insert_rows_json_with_string_id(self):
rows = [{"col1": "val1"}]
def test_insert_rows_json_w_explicitly_requested_autogenerated_insert_ids(self):
from google.cloud.bigquery import AutoRowIDs

rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

with mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows)))):
errors = client.insert_rows_json("proj.dset.tbl", rows)
uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows))))
with uuid_patcher:
errors = client.insert_rows_json(
"proj.dset.tbl", rows, row_ids=AutoRowIDs.GENERATE_UUID
)

self.assertEqual(len(errors), 0)
expected = {
"rows": [{"json": row, "insertId": str(i)} for i, row in enumerate(rows)]

# Check row data sent to the backend.
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "0"},
{"json": {"col2": "val2"}, "insertId": "1"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected,
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_w_explicitly_disabled_insert_ids(self):
from google.cloud.bigquery import AutoRowIDs

rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

errors = client.insert_rows_json(
"proj.dset.tbl", rows, row_ids=AutoRowIDs.DISABLED,
)

self.assertEqual(len(errors), 0)

expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": None},
{"json": {"col2": "val2"}, "insertId": None},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_with_iterator_row_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

row_ids_iter = map(str, itertools.count(42))
errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=row_ids_iter)

self.assertEqual(len(errors), 0)
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "42"},
{"json": {"col2": "val2"}, "insertId": "43"},
{"json": {"col3": "val3"}, "insertId": "44"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_json_with_non_iterable_row_ids(self):
rows = [{"col1": "val1"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
client._connection = make_connection({})

with self.assertRaises(TypeError) as exc:
client.insert_rows_json("proj.dset.tbl", rows, row_ids=object())

err_msg = str(exc.exception)
self.assertIn("row_ids", err_msg)
self.assertIn("iterable", err_msg)

def test_insert_rows_json_with_too_few_row_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}, {"col3": "val3"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
client._connection = make_connection({})

insert_ids = ["10", "20"]

error_msg_pattern = "row_ids did not generate enough IDs.*index 2"
with self.assertRaisesRegex(ValueError, error_msg_pattern):
client.insert_rows_json("proj.dset.tbl", rows, row_ids=insert_ids)

def test_insert_rows_json_w_explicit_none_insert_ids(self):
rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
Expand All @@ -5526,6 +5624,45 @@ def test_insert_rows_json_w_explicit_none_insert_ids(self):
timeout=None,
)

def test_insert_rows_json_w_none_insert_ids_sequence(self):
rows = [{"col1": "val1"}, {"col2": "val2"}]
creds = _make_credentials()
http = object()
client = self._make_one(
project="default-project", credentials=creds, _http=http
)
conn = client._connection = make_connection({})

uuid_patcher = mock.patch("uuid.uuid4", side_effect=map(str, range(len(rows))))
with warnings.catch_warnings(record=True) as warned, uuid_patcher:
errors = client.insert_rows_json("proj.dset.tbl", rows, row_ids=None)

self.assertEqual(len(errors), 0)

# Passing row_ids=None should have resulted in a deprecation warning.
matches = [
warning
for warning in warned
if issubclass(warning.category, DeprecationWarning)
and "row_ids" in str(warning)
and "AutoRowIDs.GENERATE_UUID" in str(warning)
]
assert matches, "The expected deprecation warning was not raised."

# Check row data sent to the backend.
expected_row_data = {
"rows": [
{"json": {"col1": "val1"}, "insertId": "0"},
{"json": {"col2": "val2"}, "insertId": "1"},
]
}
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/proj/datasets/dset/tables/tbl/insertAll",
data=expected_row_data,
timeout=None,
)

def test_insert_rows_w_wrong_arg(self):
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.schema import SchemaField
Expand Down

0 comments on commit 1246da8

Please sign in to comment.