diff --git a/digdag-docs/src/operators/bq.md b/digdag-docs/src/operators/bq.md index fa898cf276..3d80738ade 100644 --- a/digdag-docs/src/operators/bq.md +++ b/digdag-docs/src/operators/bq.md @@ -46,6 +46,16 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. bq>: queries/step1.sql ``` +* **location**: LOCATION + + The location where the query job should run. See [locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations. + + Examples: + + ``` + location: asia-northeast1 + ``` + * **dataset**: NAME Specifies the default dataset to use in the query and in the `destination_table` parameter. @@ -164,4 +174,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. The id of the BigQuery job that executed this query. Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release. - diff --git a/digdag-docs/src/operators/bq_extract.md b/digdag-docs/src/operators/bq_extract.md index d7e1ca6dd0..649457467e 100644 --- a/digdag-docs/src/operators/bq_extract.md +++ b/digdag-docs/src/operators/bq_extract.md @@ -57,6 +57,15 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. - gs://my_bucket/my_export_2.csv ``` +* **location**: LOCATION + The location where the job should run. The table and the destination must be in this location. See [BigQuery locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations. + + Examples: + + ``` + location: asia-northeast1 + ``` + * **print_header**: BOOLEAN Whether to print out a header row in the results. *Default*: `true`. @@ -106,4 +115,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. The id of the BigQuery job that performed this export. Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release. - diff --git a/digdag-docs/src/operators/bq_load.md b/digdag-docs/src/operators/bq_load.md index 5236bee108..96b2c85307 100644 --- a/digdag-docs/src/operators/bq_load.md +++ b/digdag-docs/src/operators/bq_load.md @@ -80,6 +80,16 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. destination_table: some_dataset.some_partitioned_table$20160101 ``` +* **location**: LOCATION + + The location where the job should run. The source GCS bucket and the table must be in this location. See [BigQuery locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations. + + Examples: + + ``` + location: asia-northeast1 + ``` + * **project**: NAME The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter. @@ -280,4 +290,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag. The id of the BigQuery job that performed this import. Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release. - diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java index a802c860c5..5aebeac220 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BaseBqJobOperator.java @@ -2,6 +2,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.common.base.Optional; import io.digdag.client.config.Config; import io.digdag.client.config.ConfigFactory; import io.digdag.client.config.ConfigKey; @@ -22,7 +23,8 @@ abstract class BaseBqJobOperator protected TaskResult run(BqClient bq, String projectId) { BqJobRunner jobRunner = new BqJobRunner(request, bq, projectId); - Job completed = jobRunner.runJob(jobConfiguration(projectId)); + Optional location = params.getOptional("location", String.class); + Job completed = jobRunner.runJob(jobConfiguration(projectId), location); return result(completed); } diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqClient.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqClient.java index f246b24286..e66544e3ec 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqClient.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqClient.java @@ -170,12 +170,21 @@ void submitJob(String projectId, Job job) .execute(); } - Job jobStatus(String projectId, String jobId) + Job jobStatus(String projectId, String jobId, Optional location) throws IOException { - return client.jobs() - .get(projectId, jobId) - .execute(); + if (location.isPresent()) { + return client.jobs() + .get(projectId, jobId) + // newer version of google-api-services-bigquery-v2 has setLocation() + .set("location", location.get()) + .execute(); + } + else { + return client.jobs() + .get(projectId, jobId) + .execute(); + } } static class Factory diff --git a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java index e675bd06e7..4dccb936ce 100644 --- a/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java +++ b/digdag-standards/src/main/java/io/digdag/standards/operator/gcp/BqJobRunner.java @@ -49,7 +49,7 @@ class BqJobRunner this.projectId = Objects.requireNonNull(projectId, "projectId"); } - Job runJob(JobConfiguration config) + Job runJob(JobConfiguration config, Optional location) { // Generate job id Optional jobId = state.params().getOptional(JOB_ID, String.class); @@ -97,7 +97,7 @@ Job runJob(JobConfiguration config) .withErrorMessage("BigQuery job status check failed: %s", canonicalJobId) .run(s -> { logger.info("Checking BigQuery job status: {}", canonicalJobId); - return bq.jobStatus(projectId, jobId.get()); + return bq.jobStatus(projectId, jobId.get(), location); }); // Done yet? diff --git a/digdag-tests/src/test/java/acceptance/td/BigQueryIT.java b/digdag-tests/src/test/java/acceptance/td/BigQueryIT.java index 6c0e121e76..a946284108 100644 --- a/digdag-tests/src/test/java/acceptance/td/BigQueryIT.java +++ b/digdag-tests/src/test/java/acceptance/td/BigQueryIT.java @@ -52,9 +52,11 @@ import static acceptance.td.GcpUtil.GCP_CREDENTIAL; import static acceptance.td.GcpUtil.GCS_PREFIX; import static acceptance.td.GcpUtil.GCS_TEST_BUCKET; +import static acceptance.td.GcpUtil.GCS_TEST_BUCKET_ASIA; import static acceptance.td.GcpUtil.createDataset; import static acceptance.td.GcpUtil.createTable; import static acceptance.td.GcpUtil.datasetExists; +import static acceptance.td.GcpUtil.getDatasetLocation; import static acceptance.td.GcpUtil.listTables; import static acceptance.td.GcpUtil.tableExists; import static java.nio.charset.StandardCharsets.UTF_8; @@ -220,8 +222,36 @@ public static class QueryIT public void testQuery() throws Exception { + String testQueryTokyoDataset1 = BQ_TAG + "_query_tokyo_dataset_1"; + String testQueryTable1 = "test_query_table1"; + + createDataset(bq, gcpProjectId, testQueryTokyoDataset1, "asia-northeast1"); + createTable(bq, gcpProjectId, testQueryTokyoDataset1, new Table() + .setTableReference(new TableReference() + .setProjectId(gcpProjectId) + .setDatasetId(testQueryTokyoDataset1) + .setTableId(testQueryTable1)) + .setSchema(new TableSchema() + .setFields(ImmutableList.of( + new TableFieldSchema().setName("f1").setType("STRING"), + new TableFieldSchema().setName("f2").setType("STRING") + )))); + + retryExecutor.run(() -> + bq.tabledata().insertAll(gcpProjectId, testQueryTokyoDataset1, testQueryTable1, new TableDataInsertAllRequest() + .setRows(ImmutableList.of( + new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of("f1", "v1a", "f2", "v2a")), + new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of("f1", "v1b", "f2", "v2b")) + ) + ) + ) + ); + addWorkflow(projectDir, "acceptance/bigquery/query.dig"); - Id attemptId = pushAndStart(server.endpoint(), projectDir, "query", ImmutableMap.of("outfile", outfile.toString())); + Id attemptId = pushAndStart(server.endpoint(), projectDir, "query", ImmutableMap.of( + "outfile", outfile.toString(), + "test_query_tokyo_dataset_1", testQueryTokyoDataset1, + "test_query_table_1", testQueryTable1)); expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId)); assertThat(Files.exists(outfile), is(true)); } @@ -249,11 +279,7 @@ public void testLoad() // Create output dataset String datasetId = BQ_TAG + "_load_test"; - Dataset dataset = new Dataset().setDatasetReference(new DatasetReference() - .setProjectId(gcpProjectId) - .setDatasetId(datasetId)); - retryExecutor.run(() -> bq.datasets().insert(gcpProjectId, dataset) - .execute()); + createDataset(bq, gcpProjectId, datasetId); // Run load String tableId = "data"; @@ -271,6 +297,43 @@ public void testLoad() Table destinationTable = retryExecutor.run(() -> bq.tables().get(gcpProjectId, datasetId, tableId).execute()); assertThat(destinationTable.getTableReference().getTableId(), is(tableId)); } + + @Test + public void testLoadAsia() + throws Exception + { + assertThat(GCS_TEST_BUCKET_ASIA, not(isEmptyOrNullString())); + + // Create source data object + String objectName = GCS_PREFIX + "test.csv"; + byte[] data = Joiner.on('\n').join("a,b", "c,d").getBytes(UTF_8); + InputStreamContent content = new InputStreamContent("text/csv", new ByteArrayInputStream(data)) + .setLength(data.length); + StorageObject metadata = new StorageObject().setName(objectName); + retryExecutor.run(() -> gcs.objects() + .insert(GCS_TEST_BUCKET_ASIA, metadata, content) + .execute()); + + // Create output dataset + String datasetId = BQ_TAG + "_load_test_asia"; + createDataset(bq, gcpProjectId, datasetId, "asia-northeast1"); + + // Run load + String tableId = "data"; + addWorkflow(projectDir, "acceptance/bigquery/load-asia.dig"); + Id attemptId = pushAndStart(server.endpoint(), projectDir, "load-asia", ImmutableMap.of( + "source_bucket", GCS_TEST_BUCKET_ASIA, + "source_object", objectName, + "target_dataset", datasetId, + "target_table", tableId, + "outfile", outfile.toString())); + expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId)); + assertThat(Files.exists(outfile), is(true)); + + // Check that destination table was created + Table destinationTable = retryExecutor.run(() -> bq.tables().get(gcpProjectId, datasetId, tableId).execute()); + assertThat(destinationTable.getTableReference().getTableId(), is(tableId)); + } } public static class ExtractIT @@ -285,11 +348,7 @@ public void testExtract() // Create source table String tableId = "data"; String datasetId = BQ_TAG + "_extract_test"; - Dataset dataset = new Dataset().setDatasetReference(new DatasetReference() - .setProjectId(gcpProjectId) - .setDatasetId(datasetId)); - retryExecutor.run(() -> bq.datasets().insert(gcpProjectId, dataset) - .execute()); + createDataset(bq, gcpProjectId, datasetId); Table table = new Table().setTableReference(new TableReference() .setProjectId(gcpProjectId) .setTableId(tableId)) @@ -340,6 +399,67 @@ public void testExtract() } }); } + + @Test + public void testExtractAsia() + throws Exception + { + assertThat(GCS_TEST_BUCKET_ASIA, not(isEmptyOrNullString())); + + // Create source table + String tableId = "data"; + String datasetId = BQ_TAG + "_extract_test_asia"; + createDataset(bq, gcpProjectId, datasetId, "asia-northeast1"); + Table table = new Table().setTableReference(new TableReference() + .setProjectId(gcpProjectId) + .setTableId(tableId)) + .setSchema(new TableSchema() + .setFields(ImmutableList.of( + new TableFieldSchema().setName("foo").setType("STRING"), + new TableFieldSchema().setName("bar").setType("STRING") + ))); + retryExecutor.run(() -> bq.tables().insert(gcpProjectId, datasetId, table) + .execute()); + + // Populate source table + TableDataInsertAllRequest content = new TableDataInsertAllRequest() + .setRows(ImmutableList.of( + new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of( + "foo", "a", + "bar", "b")), + new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of( + "foo", "c", + "bar", "d")))); + retryExecutor.run(() -> bq.tabledata().insertAll(gcpProjectId, datasetId, tableId, content) + .execute()); + + // Run extract + String objectName = GCS_PREFIX + "test.csv"; + addWorkflow(projectDir, "acceptance/bigquery/extract-asia.dig"); + Id attemptId = pushAndStart(server.endpoint(), projectDir, "extract-asia", ImmutableMap.of( + "src_dataset", datasetId, + "src_table", tableId, + "dst_bucket", GCS_TEST_BUCKET_ASIA, + "dst_object", objectName, + "outfile", outfile.toString())); + expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId)); + assertThat(Files.exists(outfile), is(true)); + + // Check that destination file was created + StorageObject metadata = retryExecutor.run(() -> gcs.objects().get(GCS_TEST_BUCKET_ASIA, objectName) + .execute()); + assertThat(metadata.getName(), is(objectName)); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + retryExecutor.run(() -> { + try { + gcs.objects().get(GCS_TEST_BUCKET_ASIA, objectName) + .executeMediaAndDownloadTo(data); + } + catch (IOException e) { + throw ThrowablesUtil.propagate(e); + } + }); + } } public static class DdlIT @@ -363,11 +483,13 @@ public void testDdl() String testCreateTable2EmptyDataset = BQ_TAG + "_create_table_2_empty_dataset"; String testCreateTable3CreateDataset = BQ_TAG + "_create_table_3_create_dataset"; String testCreateTable4ExistingDataset = BQ_TAG + "_create_table_4_existing_dataset"; + String testCreateTableAtTokyo1CreateDataset = BQ_TAG + "_create_table_at_tokyo_1_create_dataset"; String testCreateTable1 = "test_create_table_1"; String testCreateTable2 = "test_create_table_2"; String testCreateTable3 = "test_create_table_3"; String testCreateTable4 = "test_create_table_4"; String testCreateTable5 = "test_create_table_5"; + String testCreateTableAtTokyo1 = "test_create_table_at_tokyo_1"; String testDeleteTable1 = "test_delete_table_1"; String testDeleteTable2 = "test_delete_table_2"; String testEmptyTable2EmptyDataset = BQ_TAG + "_empty_table_2_empty_dataset"; @@ -426,9 +548,11 @@ public void testDdl() .put("test_create_table_3", testCreateTable3) .put("test_create_table_4", testCreateTable4) .put("test_create_table_5", testCreateTable5) + .put("test_create_table_at_tokyo_1", testCreateTableAtTokyo1) .put("test_create_table_2_empty_dataset", testCreateTable2EmptyDataset) .put("test_create_table_3_create_dataset", testCreateTable3CreateDataset) .put("test_create_table_4_existing_dataset", testCreateTable4ExistingDataset) + .put("test_create_table_at_tokyo_1_create_dataset", testCreateTableAtTokyo1CreateDataset) .put("test_delete_table_2_dataset", testDeleteTable2ExistingDataset) .put("test_delete_table_1", testDeleteTable1) .put("test_delete_table_2", testDeleteTable2) @@ -453,6 +577,7 @@ public void testDdl() assertThat(datasetExists(bq, gcpProjectId, testEmptyDataset2), is(true)); assertThat(datasetExists(bq, gcpProjectId, testDeleteDataset1), is(false)); assertThat(datasetExists(bq, gcpProjectId, testDeleteDataset2), is(false)); + assertThat(getDatasetLocation(bq, gcpProjectId, testCreateTableAtTokyo1CreateDataset), is(Optional.of("asia-northeast1"))); assertThat(listTables(bq, gcpProjectId, testEmptyDataset1), is(empty())); assertThat(listTables(bq, gcpProjectId, testEmptyDataset2), is(empty())); @@ -462,6 +587,7 @@ public void testDdl() assertThat(tableExists(bq, gcpProjectId, testCreateTable3CreateDataset, testCreateTable3), is(true)); assertThat(tableExists(bq, gcpProjectId, testCreateTable4ExistingDataset, testCreateTable4), is(true)); assertThat(tableExists(bq, gcpProjectId, testDefaultDataset, testCreateTable5), is(true)); + assertThat(tableExists(bq, gcpProjectId, testCreateTableAtTokyo1CreateDataset, testCreateTableAtTokyo1), is(true)); assertThat(tableExists(bq, gcpProjectId, testDefaultDataset, testDeleteTable1), is(false)); assertThat(tableExists(bq, gcpProjectId, testDeleteTable2ExistingDataset, testDeleteTable2), is(false)); diff --git a/digdag-tests/src/test/java/acceptance/td/GcpUtil.java b/digdag-tests/src/test/java/acceptance/td/GcpUtil.java index 4ab10c443a..7119954df8 100644 --- a/digdag-tests/src/test/java/acceptance/td/GcpUtil.java +++ b/digdag-tests/src/test/java/acceptance/td/GcpUtil.java @@ -13,6 +13,7 @@ import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Objects; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.Optional; import io.digdag.client.DigdagClient; import io.digdag.commons.ThrowablesUtil; import io.digdag.util.RetryExecutor; @@ -35,6 +36,7 @@ public class GcpUtil static final String GCP_CREDENTIAL = System.getenv().getOrDefault("GCP_CREDENTIAL", ""); static final String GCS_TEST_BUCKET = System.getenv().getOrDefault("GCS_TEST_BUCKET", ""); + static final String GCS_TEST_BUCKET_ASIA = System.getenv().getOrDefault("GCS_TEST_BUCKET_ASIA", ""); static final String GCP_PROJECT_ID; @@ -138,6 +140,18 @@ static Table createTable(Bigquery bq, String projectId, String datasetId, Table return bq.tables().insert(projectId, datasetId, table).execute(); } + static Dataset createDataset(Bigquery bq, String projectId, String datasetId, String location) + throws IOException, RetryExecutor.RetryGiveupException + { + Dataset dataset = new Dataset() + .setDatasetReference(new DatasetReference() + .setDatasetId(datasetId)) + .setLocation(location); + Dataset created = createDataset(bq, projectId, dataset); + assertThat(datasetExists(bq, projectId, datasetId), is(true)); + return created; + } + static Dataset createDataset(Bigquery bq, String projectId, String datasetId) throws IOException, RetryExecutor.RetryGiveupException { @@ -170,6 +184,13 @@ static boolean datasetExists(Bigquery bq, String projectId, String datasetId) } } + static Optional getDatasetLocation(Bigquery bq, String projectId, String datasetId) + throws IOException + { + Dataset dataset = bq.datasets().get(projectId, datasetId).execute(); + return Optional.fromNullable(dataset.getLocation()); + } + static List listTables(Bigquery bq, String projectId, String datasetId) throws IOException { diff --git a/digdag-tests/src/test/resources/acceptance/bigquery/ddl.dig b/digdag-tests/src/test/resources/acceptance/bigquery/ddl.dig index 0ec801cf64..aa64585656 100644 --- a/digdag-tests/src/test/resources/acceptance/bigquery/ddl.dig +++ b/digdag-tests/src/test/resources/acceptance/bigquery/ddl.dig @@ -15,6 +15,8 @@ timezone: UTC l2: v2 - ${test_create_table_3_create_dataset} - ${test_empty_table_3_create_dataset} + - id: ${test_create_table_at_tokyo_1_create_dataset} + location: asia-northeast1 delete_datasets: - ${test_delete_dataset_1} @@ -43,6 +45,7 @@ timezone: UTC fields: - {name: f1, type: STRING} - {name: f2, type: STRING} + - ${test_create_table_at_tokyo_1_create_dataset}.${test_create_table_at_tokyo_1} delete_tables: - ${test_delete_table_1} diff --git a/digdag-tests/src/test/resources/acceptance/bigquery/extract-asia.dig b/digdag-tests/src/test/resources/acceptance/bigquery/extract-asia.dig new file mode 100644 index 0000000000..28627ddb54 --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/bigquery/extract-asia.dig @@ -0,0 +1,9 @@ +timezone: UTC + ++run: + bq_extract>: ${src_dataset}.${src_table} + destination: gs://${dst_bucket}/${dst_object} + location: asia-northeast1 + ++post: + sh>: touch ${outfile} diff --git a/digdag-tests/src/test/resources/acceptance/bigquery/load-asia.dig b/digdag-tests/src/test/resources/acceptance/bigquery/load-asia.dig new file mode 100644 index 0000000000..9a2b56ebac --- /dev/null +++ b/digdag-tests/src/test/resources/acceptance/bigquery/load-asia.dig @@ -0,0 +1,15 @@ +timezone: UTC + ++run: + bq_load>: gs://${source_bucket}/${source_object} + schema: + fields: + - name: foo + type: STRING + - name: bar + type: STRING + destination_table: ${target_dataset}.${target_table} + location: asia-northeast1 + ++post: + sh>: touch ${outfile} diff --git a/digdag-tests/src/test/resources/acceptance/bigquery/query.dig b/digdag-tests/src/test/resources/acceptance/bigquery/query.dig index 1162129e8a..0994eced72 100644 --- a/digdag-tests/src/test/resources/acceptance/bigquery/query.dig +++ b/digdag-tests/src/test/resources/acceptance/bigquery/query.dig @@ -10,5 +10,11 @@ timezone: UTC ORDER BY weight_pounds DESC LIMIT 10; dataset: publicdata:samples ++run_tokyo1: + bq>: + data: SELECT f1, f2 FROM ${test_query_table_1}; + dataset: ${test_query_tokyo_dataset_1} + location: asia-northeast1 + +post: sh>: touch ${outfile}