Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery): preserve job config passed to Client methods #9735

Merged
merged 1 commit into from
Nov 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def __init__(

self._connection = Connection(self, **kw_args)
self._location = location
self._default_query_job_config = default_query_job_config
self._default_query_job_config = copy.deepcopy(default_query_job_config)

@property
def location(self):
Expand Down Expand Up @@ -1381,6 +1381,7 @@ def load_table_from_uri(
destination = _table_arg_to_table_ref(destination, default_project=self.project)

if job_config:
job_config = copy.deepcopy(job_config)
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)

load_job = job.LoadJob(job_ref, source_uris, destination, self, job_config)
Expand Down Expand Up @@ -1465,6 +1466,7 @@ def load_table_from_file(
destination = _table_arg_to_table_ref(destination, default_project=self.project)
job_ref = job._JobReference(job_id, project=project, location=location)
if job_config:
job_config = copy.deepcopy(job_config)
_verify_job_config_type(job_config, google.cloud.bigquery.job.LoadJobConfig)
load_job = job.LoadJob(job_ref, None, destination, self, job_config)
job_resource = load_job.to_api_repr()
Expand Down Expand Up @@ -1969,6 +1971,8 @@ def copy_table(

if job_config:
_verify_job_config_type(job_config, google.cloud.bigquery.job.CopyJobConfig)
job_config = copy.deepcopy(job_config)

copy_job = job.CopyJob(
job_ref, sources, destination, client=self, job_config=job_config
)
Expand Down Expand Up @@ -2049,6 +2053,8 @@ def extract_table(
_verify_job_config_type(
job_config, google.cloud.bigquery.job.ExtractJobConfig
)
job_config = copy.deepcopy(job_config)

extract_job = job.ExtractJob(
job_ref, source, destination_uris, client=self, job_config=job_config
)
Expand Down Expand Up @@ -2112,6 +2118,8 @@ def query(
if location is None:
location = self.location

job_config = copy.deepcopy(job_config)

if self._default_query_job_config:
if job_config:
_verify_job_config_type(
Expand All @@ -2129,7 +2137,7 @@ def query(
self._default_query_job_config,
google.cloud.bigquery.job.QueryJobConfig,
)
job_config = self._default_query_job_config
job_config = copy.deepcopy(self._default_query_job_config)

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
Expand Down
135 changes: 129 additions & 6 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,8 @@ def test_load_table_from_uri(self):
creds = _make_credentials()
http = object()
job_config = LoadJobConfig()
original_config_copy = copy.deepcopy(job_config)

client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection(RESOURCE)
destination = client.dataset(self.DS_ID).table(DESTINATION)
Expand All @@ -3010,6 +3012,9 @@ def test_load_table_from_uri(self):
method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE
)

# the original config object should not have been modified
self.assertEqual(job_config.to_api_repr(), original_config_copy.to_api_repr())

self.assertIsInstance(job, LoadJob)
self.assertIsInstance(job._configuration, LoadJobConfig)
self.assertIs(job._client, client)
Expand Down Expand Up @@ -3496,19 +3501,24 @@ def test_copy_table_w_valid_job_config(self):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
job_config = CopyJobConfig()
conn = client._connection = make_connection(RESOURCE)
dataset = client.dataset(self.DS_ID)
source = dataset.table(SOURCE)
destination = dataset.table(DESTINATION)

job_config = CopyJobConfig()
original_config_copy = copy.deepcopy(job_config)
job = client.copy_table(source, destination, job_id=JOB, job_config=job_config)

# Check that copy_table actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/%s/jobs" % self.PROJECT, data=RESOURCE
)
self.assertIsInstance(job._configuration, CopyJobConfig)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_extract_table(self):
from google.cloud.bigquery.job import ExtractJob

Expand Down Expand Up @@ -3679,6 +3689,7 @@ def test_extract_table_generated_job_id(self):
source = dataset.table(SOURCE)
job_config = ExtractJobConfig()
job_config.destination_format = DestinationFormat.NEWLINE_DELIMITED_JSON
original_config_copy = copy.deepcopy(job_config)

job = client.extract_table(source, DESTINATION, job_config=job_config)

Expand All @@ -3695,6 +3706,9 @@ def test_extract_table_generated_job_id(self):
self.assertEqual(job.source, source)
self.assertEqual(list(job.destination_uris), [DESTINATION])

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_extract_table_w_destination_uris(self):
from google.cloud.bigquery.job import ExtractJob

Expand Down Expand Up @@ -3840,6 +3854,7 @@ def test_query_w_explicit_job_config(self):
job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
original_config_copy = copy.deepcopy(job_config)

client.query(
query, job_id=job_id, location=self.LOCATION, job_config=job_config
Expand All @@ -3850,6 +3865,105 @@ def test_query_w_explicit_job_config(self):
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_query_preserving_explicit_job_config(self):
job_id = "some-job-id"
query = "select count(*) from persons"
resource = {
"jobReference": {
"jobId": job_id,
"projectId": self.PROJECT,
"location": self.LOCATION,
},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"useQueryCache": True,
"maximumBytesBilled": "2000",
}
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig

client = self._make_one(project=self.PROJECT, credentials=creds, _http=http,)
conn = client._connection = make_connection(resource)

job_config = QueryJobConfig()
job_config.use_query_cache = True
job_config.maximum_bytes_billed = 2000
original_config_copy = copy.deepcopy(job_config)

client.query(
query, job_id=job_id, location=self.LOCATION, job_config=job_config
)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_query_preserving_explicit_default_job_config(self):
job_id = "some-job-id"
query = "select count(*) from persons"
resource = {
"jobReference": {
"jobId": job_id,
"projectId": self.PROJECT,
"location": self.LOCATION,
},
"configuration": {
"query": {
"query": query,
"defaultDataset": {
"projectId": self.PROJECT,
"datasetId": "some-dataset",
},
"useLegacySql": False,
"maximumBytesBilled": "1000",
}
},
}

creds = _make_credentials()
http = object()

from google.cloud.bigquery import QueryJobConfig, DatasetReference

default_job_config = QueryJobConfig()
default_job_config.default_dataset = DatasetReference(
self.PROJECT, "some-dataset"
)
default_job_config.maximum_bytes_billed = 1000
default_config_copy = copy.deepcopy(default_job_config)

client = self._make_one(
project=self.PROJECT,
credentials=creds,
_http=http,
default_query_job_config=default_job_config,
)
conn = client._connection = make_connection(resource)

client.query(query, job_id=job_id, location=self.LOCATION, job_config=None)

# Check that query actually starts the job.
conn.api_request.assert_called_once_with(
method="POST", path="/projects/PROJECT/jobs", data=resource
)

# the original default config object should not have been modified
assert default_job_config.to_api_repr() == default_config_copy.to_api_repr()

def test_query_w_invalid_job_config(self):
from google.cloud.bigquery import QueryJobConfig, DatasetReference
from google.cloud.bigquery import job
Expand Down Expand Up @@ -5429,22 +5543,24 @@ def test_load_table_from_file_resumable(self):

client = self._make_client()
file_obj = self._make_file_obj()
job_config = self._make_config()
original_config_copy = copy.deepcopy(job_config)

do_upload_patch = self._make_do_upload_patch(
client, "_do_resumable_upload", self.EXPECTED_CONFIGURATION
)
with do_upload_patch as do_upload:
client.load_table_from_file(
file_obj,
self.TABLE_REF,
job_id="job_id",
job_config=self._make_config(),
file_obj, self.TABLE_REF, job_id="job_id", job_config=job_config,
)

do_upload.assert_called_once_with(
file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES
)

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_load_table_from_file_w_explicit_project(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES

Expand Down Expand Up @@ -5790,6 +5906,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
job_config = job.LoadJobConfig(
write_disposition=job.WriteDisposition.WRITE_TRUNCATE
)
original_config_copy = copy.deepcopy(job_config)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
Expand Down Expand Up @@ -5826,6 +5943,9 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
assert sent_config.source_format == job.SourceFormat.PARQUET
assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_automatic_schema(self):
Expand Down Expand Up @@ -6466,6 +6586,7 @@ def test_load_table_from_json_non_default_args(self):
]
job_config = job.LoadJobConfig(schema=schema)
job_config._properties["load"]["unknown_field"] = "foobar"
original_config_copy = copy.deepcopy(job_config)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
Expand Down Expand Up @@ -6493,13 +6614,15 @@ def test_load_table_from_json_non_default_args(self):
)

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert job_config.source_format is None # the original was not modified
assert sent_config.source_format == job.SourceFormat.NEWLINE_DELIMITED_JSON
assert sent_config.schema == schema
assert not sent_config.autodetect
# all properties should have been cloned and sent to the backend
assert sent_config._properties.get("load", {}).get("unknown_field") == "foobar"

# the original config object should not have been modified
assert job_config.to_api_repr() == original_config_copy.to_api_repr()

def test_load_table_from_json_w_invalid_job_config(self):
from google.cloud.bigquery import job

Expand Down