Skip to content

Commit

Permalink
Support location in bq operator (#1767)
Browse files Browse the repository at this point in the history
Support location in BigQuery operators
  • Loading branch information
tkawachi authored Oct 17, 2022
1 parent e6e5ce6 commit f328ed9
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 21 deletions.
11 changes: 10 additions & 1 deletion digdag-docs/src/operators/bq.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
bq>: queries/step1.sql
```

* **location**: LOCATION

The location where the query job should run. See [locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations.

Examples:

```
location: asia-northeast1
```

* **dataset**: NAME

Specifies the default dataset to use in the query and in the `destination_table` parameter.
Expand Down Expand Up @@ -164,4 +174,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
The id of the BigQuery job that executed this query.

Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release.

10 changes: 9 additions & 1 deletion digdag-docs/src/operators/bq_extract.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
- gs://my_bucket/my_export_2.csv
```

* **location**: LOCATION
The location where the job should run. The table and the destination must be in this location. See [BigQuery locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations.

Examples:

```
location: asia-northeast1
```

* **print_header**: BOOLEAN
Whether to print out a header row in the results. *Default*: `true`.

Expand Down Expand Up @@ -106,4 +115,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
The id of the BigQuery job that performed this export.

Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release.

11 changes: 10 additions & 1 deletion digdag-docs/src/operators/bq_load.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
destination_table: some_dataset.some_partitioned_table$20160101
```

* **location**: LOCATION

The location where the job should run. The source GCS bucket and the table must be in this location. See [BigQuery locations](https://cloud.google.com/bigquery/docs/locations) for a list of available locations.

Examples:

```
location: asia-northeast1
```

* **project**: NAME

The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter.
Expand Down Expand Up @@ -280,4 +290,3 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
The id of the BigQuery job that performed this import.

Note: `bq.last_jobid` parameter is kept only for backward compatibility but you must not use it because it will be removed removed in a near future release.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.common.base.Optional;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigFactory;
import io.digdag.client.config.ConfigKey;
Expand All @@ -22,7 +23,8 @@ abstract class BaseBqJobOperator
protected TaskResult run(BqClient bq, String projectId)
{
BqJobRunner jobRunner = new BqJobRunner(request, bq, projectId);
Job completed = jobRunner.runJob(jobConfiguration(projectId));
Optional<String> location = params.getOptional("location", String.class);
Job completed = jobRunner.runJob(jobConfiguration(projectId), location);
return result(completed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,21 @@ void submitJob(String projectId, Job job)
.execute();
}

Job jobStatus(String projectId, String jobId)
Job jobStatus(String projectId, String jobId, Optional<String> location)
throws IOException
{
return client.jobs()
.get(projectId, jobId)
.execute();
if (location.isPresent()) {
return client.jobs()
.get(projectId, jobId)
// newer version of google-api-services-bigquery-v2 has setLocation()
.set("location", location.get())
.execute();
}
else {
return client.jobs()
.get(projectId, jobId)
.execute();
}
}

static class Factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class BqJobRunner
this.projectId = Objects.requireNonNull(projectId, "projectId");
}

Job runJob(JobConfiguration config)
Job runJob(JobConfiguration config, Optional<String> location)
{
// Generate job id
Optional<String> jobId = state.params().getOptional(JOB_ID, String.class);
Expand Down Expand Up @@ -97,7 +97,7 @@ Job runJob(JobConfiguration config)
.withErrorMessage("BigQuery job status check failed: %s", canonicalJobId)
.run(s -> {
logger.info("Checking BigQuery job status: {}", canonicalJobId);
return bq.jobStatus(projectId, jobId.get());
return bq.jobStatus(projectId, jobId.get(), location);
});

// Done yet?
Expand Down
148 changes: 137 additions & 11 deletions digdag-tests/src/test/java/acceptance/td/BigQueryIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@
import static acceptance.td.GcpUtil.GCP_CREDENTIAL;
import static acceptance.td.GcpUtil.GCS_PREFIX;
import static acceptance.td.GcpUtil.GCS_TEST_BUCKET;
import static acceptance.td.GcpUtil.GCS_TEST_BUCKET_ASIA;
import static acceptance.td.GcpUtil.createDataset;
import static acceptance.td.GcpUtil.createTable;
import static acceptance.td.GcpUtil.datasetExists;
import static acceptance.td.GcpUtil.getDatasetLocation;
import static acceptance.td.GcpUtil.listTables;
import static acceptance.td.GcpUtil.tableExists;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -220,8 +222,36 @@ public static class QueryIT
public void testQuery()
throws Exception
{
String testQueryTokyoDataset1 = BQ_TAG + "_query_tokyo_dataset_1";
String testQueryTable1 = "test_query_table1";

createDataset(bq, gcpProjectId, testQueryTokyoDataset1, "asia-northeast1");
createTable(bq, gcpProjectId, testQueryTokyoDataset1, new Table()
.setTableReference(new TableReference()
.setProjectId(gcpProjectId)
.setDatasetId(testQueryTokyoDataset1)
.setTableId(testQueryTable1))
.setSchema(new TableSchema()
.setFields(ImmutableList.of(
new TableFieldSchema().setName("f1").setType("STRING"),
new TableFieldSchema().setName("f2").setType("STRING")
))));

retryExecutor.run(() ->
bq.tabledata().insertAll(gcpProjectId, testQueryTokyoDataset1, testQueryTable1, new TableDataInsertAllRequest()
.setRows(ImmutableList.of(
new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of("f1", "v1a", "f2", "v2a")),
new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of("f1", "v1b", "f2", "v2b"))
)
)
)
);

addWorkflow(projectDir, "acceptance/bigquery/query.dig");
Id attemptId = pushAndStart(server.endpoint(), projectDir, "query", ImmutableMap.of("outfile", outfile.toString()));
Id attemptId = pushAndStart(server.endpoint(), projectDir, "query", ImmutableMap.of(
"outfile", outfile.toString(),
"test_query_tokyo_dataset_1", testQueryTokyoDataset1,
"test_query_table_1", testQueryTable1));
expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId));
assertThat(Files.exists(outfile), is(true));
}
Expand Down Expand Up @@ -249,11 +279,7 @@ public void testLoad()

// Create output dataset
String datasetId = BQ_TAG + "_load_test";
Dataset dataset = new Dataset().setDatasetReference(new DatasetReference()
.setProjectId(gcpProjectId)
.setDatasetId(datasetId));
retryExecutor.run(() -> bq.datasets().insert(gcpProjectId, dataset)
.execute());
createDataset(bq, gcpProjectId, datasetId);

// Run load
String tableId = "data";
Expand All @@ -271,6 +297,43 @@ public void testLoad()
Table destinationTable = retryExecutor.run(() -> bq.tables().get(gcpProjectId, datasetId, tableId).execute());
assertThat(destinationTable.getTableReference().getTableId(), is(tableId));
}

@Test
public void testLoadAsia()
throws Exception
{
assertThat(GCS_TEST_BUCKET_ASIA, not(isEmptyOrNullString()));

// Create source data object
String objectName = GCS_PREFIX + "test.csv";
byte[] data = Joiner.on('\n').join("a,b", "c,d").getBytes(UTF_8);
InputStreamContent content = new InputStreamContent("text/csv", new ByteArrayInputStream(data))
.setLength(data.length);
StorageObject metadata = new StorageObject().setName(objectName);
retryExecutor.run(() -> gcs.objects()
.insert(GCS_TEST_BUCKET_ASIA, metadata, content)
.execute());

// Create output dataset
String datasetId = BQ_TAG + "_load_test_asia";
createDataset(bq, gcpProjectId, datasetId, "asia-northeast1");

// Run load
String tableId = "data";
addWorkflow(projectDir, "acceptance/bigquery/load-asia.dig");
Id attemptId = pushAndStart(server.endpoint(), projectDir, "load-asia", ImmutableMap.of(
"source_bucket", GCS_TEST_BUCKET_ASIA,
"source_object", objectName,
"target_dataset", datasetId,
"target_table", tableId,
"outfile", outfile.toString()));
expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId));
assertThat(Files.exists(outfile), is(true));

// Check that destination table was created
Table destinationTable = retryExecutor.run(() -> bq.tables().get(gcpProjectId, datasetId, tableId).execute());
assertThat(destinationTable.getTableReference().getTableId(), is(tableId));
}
}

public static class ExtractIT
Expand All @@ -285,11 +348,7 @@ public void testExtract()
// Create source table
String tableId = "data";
String datasetId = BQ_TAG + "_extract_test";
Dataset dataset = new Dataset().setDatasetReference(new DatasetReference()
.setProjectId(gcpProjectId)
.setDatasetId(datasetId));
retryExecutor.run(() -> bq.datasets().insert(gcpProjectId, dataset)
.execute());
createDataset(bq, gcpProjectId, datasetId);
Table table = new Table().setTableReference(new TableReference()
.setProjectId(gcpProjectId)
.setTableId(tableId))
Expand Down Expand Up @@ -340,6 +399,67 @@ public void testExtract()
}
});
}

@Test
public void testExtractAsia()
throws Exception
{
assertThat(GCS_TEST_BUCKET_ASIA, not(isEmptyOrNullString()));

// Create source table
String tableId = "data";
String datasetId = BQ_TAG + "_extract_test_asia";
createDataset(bq, gcpProjectId, datasetId, "asia-northeast1");
Table table = new Table().setTableReference(new TableReference()
.setProjectId(gcpProjectId)
.setTableId(tableId))
.setSchema(new TableSchema()
.setFields(ImmutableList.of(
new TableFieldSchema().setName("foo").setType("STRING"),
new TableFieldSchema().setName("bar").setType("STRING")
)));
retryExecutor.run(() -> bq.tables().insert(gcpProjectId, datasetId, table)
.execute());

// Populate source table
TableDataInsertAllRequest content = new TableDataInsertAllRequest()
.setRows(ImmutableList.of(
new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of(
"foo", "a",
"bar", "b")),
new TableDataInsertAllRequest.Rows().setJson(ImmutableMap.of(
"foo", "c",
"bar", "d"))));
retryExecutor.run(() -> bq.tabledata().insertAll(gcpProjectId, datasetId, tableId, content)
.execute());

// Run extract
String objectName = GCS_PREFIX + "test.csv";
addWorkflow(projectDir, "acceptance/bigquery/extract-asia.dig");
Id attemptId = pushAndStart(server.endpoint(), projectDir, "extract-asia", ImmutableMap.of(
"src_dataset", datasetId,
"src_table", tableId,
"dst_bucket", GCS_TEST_BUCKET_ASIA,
"dst_object", objectName,
"outfile", outfile.toString()));
expect(Duration.ofMinutes(5), attemptSuccess(server.endpoint(), attemptId));
assertThat(Files.exists(outfile), is(true));

// Check that destination file was created
StorageObject metadata = retryExecutor.run(() -> gcs.objects().get(GCS_TEST_BUCKET_ASIA, objectName)
.execute());
assertThat(metadata.getName(), is(objectName));
ByteArrayOutputStream data = new ByteArrayOutputStream();
retryExecutor.run(() -> {
try {
gcs.objects().get(GCS_TEST_BUCKET_ASIA, objectName)
.executeMediaAndDownloadTo(data);
}
catch (IOException e) {
throw ThrowablesUtil.propagate(e);
}
});
}
}

public static class DdlIT
Expand All @@ -363,11 +483,13 @@ public void testDdl()
String testCreateTable2EmptyDataset = BQ_TAG + "_create_table_2_empty_dataset";
String testCreateTable3CreateDataset = BQ_TAG + "_create_table_3_create_dataset";
String testCreateTable4ExistingDataset = BQ_TAG + "_create_table_4_existing_dataset";
String testCreateTableAtTokyo1CreateDataset = BQ_TAG + "_create_table_at_tokyo_1_create_dataset";
String testCreateTable1 = "test_create_table_1";
String testCreateTable2 = "test_create_table_2";
String testCreateTable3 = "test_create_table_3";
String testCreateTable4 = "test_create_table_4";
String testCreateTable5 = "test_create_table_5";
String testCreateTableAtTokyo1 = "test_create_table_at_tokyo_1";
String testDeleteTable1 = "test_delete_table_1";
String testDeleteTable2 = "test_delete_table_2";
String testEmptyTable2EmptyDataset = BQ_TAG + "_empty_table_2_empty_dataset";
Expand Down Expand Up @@ -426,9 +548,11 @@ public void testDdl()
.put("test_create_table_3", testCreateTable3)
.put("test_create_table_4", testCreateTable4)
.put("test_create_table_5", testCreateTable5)
.put("test_create_table_at_tokyo_1", testCreateTableAtTokyo1)
.put("test_create_table_2_empty_dataset", testCreateTable2EmptyDataset)
.put("test_create_table_3_create_dataset", testCreateTable3CreateDataset)
.put("test_create_table_4_existing_dataset", testCreateTable4ExistingDataset)
.put("test_create_table_at_tokyo_1_create_dataset", testCreateTableAtTokyo1CreateDataset)
.put("test_delete_table_2_dataset", testDeleteTable2ExistingDataset)
.put("test_delete_table_1", testDeleteTable1)
.put("test_delete_table_2", testDeleteTable2)
Expand All @@ -453,6 +577,7 @@ public void testDdl()
assertThat(datasetExists(bq, gcpProjectId, testEmptyDataset2), is(true));
assertThat(datasetExists(bq, gcpProjectId, testDeleteDataset1), is(false));
assertThat(datasetExists(bq, gcpProjectId, testDeleteDataset2), is(false));
assertThat(getDatasetLocation(bq, gcpProjectId, testCreateTableAtTokyo1CreateDataset), is(Optional.of("asia-northeast1")));

assertThat(listTables(bq, gcpProjectId, testEmptyDataset1), is(empty()));
assertThat(listTables(bq, gcpProjectId, testEmptyDataset2), is(empty()));
Expand All @@ -462,6 +587,7 @@ public void testDdl()
assertThat(tableExists(bq, gcpProjectId, testCreateTable3CreateDataset, testCreateTable3), is(true));
assertThat(tableExists(bq, gcpProjectId, testCreateTable4ExistingDataset, testCreateTable4), is(true));
assertThat(tableExists(bq, gcpProjectId, testDefaultDataset, testCreateTable5), is(true));
assertThat(tableExists(bq, gcpProjectId, testCreateTableAtTokyo1CreateDataset, testCreateTableAtTokyo1), is(true));

assertThat(tableExists(bq, gcpProjectId, testDefaultDataset, testDeleteTable1), is(false));
assertThat(tableExists(bq, gcpProjectId, testDeleteTable2ExistingDataset, testDeleteTable2), is(false));
Expand Down
Loading

0 comments on commit f328ed9

Please sign in to comment.