Skip to content

Commit

Permalink
Add whenDone method and CompletionCallback to BigQuery Job
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed May 19, 2016
1 parent d5cf726 commit f3cd884
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 64 deletions.
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ Complete source code can be found at
```java
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
Expand All @@ -155,6 +156,8 @@ import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import java.util.List;
BigQuery bigquery = BigQueryOptions.defaultInstance().service();
TableId tableId = TableId.of("dataset", "table");
Table table = bigquery.getTable(tableId);
Expand All @@ -166,14 +169,17 @@ if (table == null) {
}
System.out.println("Loading data into table " + tableId);
Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
while (!loadJob.isDone()) {
Thread.sleep(1000L);
}
if (loadJob.status().error() != null) {
System.out.println("Job completed with errors");
} else {
System.out.println("Job succeeded");
}
loadJob.whenDone(new Job.CompletionCallback() {
@Override
public void success(Job job) {
System.out.println("Job succeeded");
}
@Override
public void error(BigQueryError error, List<BigQueryError> executionErrors) {
System.out.println("Job completed with errors");
}
});
```
Google Cloud Compute (Alpha)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.List;
import java.util.Objects;

/**
Expand All @@ -37,6 +38,24 @@ public class Job extends JobInfo {
private final BigQueryOptions options;
private transient BigQuery bigquery;

/**
* A callback for job completion.
*/
public interface CompletionCallback {
/**
* The method called when the job completes successfully.
*/
void success(Job job);

/**
* The method called when the job completes with errors. {@code error} is the final error that
* caused the job to fail (see {@link JobStatus#error()}). {@code executionErrors} are all the
* errors (possibly not fatal) encountered by the job during its execution (see
* {@link JobStatus#executionErrors()}).
*/
void error(BigQueryError error, List<BigQueryError> executionErrors);
}

/**
* A builder for {@code Job} objects.
*/
Expand Down Expand Up @@ -143,6 +162,43 @@ public boolean isDone() {
return job == null || job.status().state() == JobStatus.State.DONE;
}

/**
* Waits until this job completes its execution, either failing or succeeding. If the job does not
* exist, this method returns without executing any method of the provided callback. If the job
* completed successfully the {@link CompletionCallback#success(Job)} method is called. If the job
* completed with errors the {@link CompletionCallback#error(BigQueryError, List)} method is
* called.
* <pre> {@code
* job.whenDone(new CompletionCallback() {
* void success(Job job) {
* // completed successfully
* }
*
* void error(BigQueryError error, List<BigQueryError> executionErrors) {
* // handle error
* }
* });}</pre>
*
* @throws BigQueryException upon failure
* @throws InterruptedException if the current thread gets interrupted while waiting for the job
* to complete
*/
public void whenDone(CompletionCallback callback) throws InterruptedException {
while (!isDone()) {
Thread.sleep(500L);
}
Job updatedJob = reload();
if (updatedJob == null) {
return;
}
BigQueryError error = updatedJob.status().error();
if (error != null) {
callback.error(error, updatedJob.status().executionErrors());
} else {
callback.success(updatedJob);
}
}

/**
* Fetches current job's latest information. Returns {@code null} if the job does not exist.
*
Expand All @@ -151,7 +207,7 @@ public boolean isDone() {
* @throws BigQueryException upon failure
*/
public Job reload(BigQuery.JobOption... options) {
return bigquery.getJob(jobId().job(), options);
return bigquery.getJob(jobId(), options);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
* }
* System.out.println("Loading data into table " + tableId);
* Job loadJob = table.load(FormatOptions.csv(), "gs://bucket/path");
* while (!loadJob.isDone()) {
* Thread.sleep(1000L);
* }
* if (loadJob.status().error() != null) {
* System.out.println("Job completed with errors");
* } else {
* System.out.println("Job succeeded");
* }}</pre>
* loadJob.whenDone(new Job.CompletionCallback() {
* public void success(Job job) {
* System.out.println("Job succeeded");
* }
*
* public void error(BigQueryError error, List<BigQueryError> executionErrors) {
* System.out.println("Job completed with errors");
* }
* });}</pre>
*
* @see <a href="https://cloud.google.com/bigquery/">Google Cloud BigQuery</a>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
import static org.junit.Assert.assertTrue;

import com.google.cloud.bigquery.JobStatistics.CopyStatistics;
import com.google.common.collect.ImmutableList;

import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Test;

import java.util.List;

public class JobTest {

private static final JobId JOB_ID = JobId.of("project", "job");
Expand Down Expand Up @@ -177,13 +181,70 @@ public void testIsDone_NotExists() throws Exception {
assertTrue(job.isDone());
}

@Test
public void testWhenDone_Success() throws InterruptedException {
initializeExpectedJob(2);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(status.error()).andReturn(null);
expect(bigquery.options()).andReturn(mockOptions);
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
callback.success(completedJob);
EasyMock.expectLastCall();
replay(status, bigquery, callback);
initializeJob();
job.whenDone(callback);
verify(status, callback);
}

@Test
public void testWhenDone_Error() throws InterruptedException {
initializeExpectedJob(2);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
BigQueryError error = new BigQueryError("reason", "location", "message");
List<BigQueryError> executionErrors = ImmutableList.of(error);
JobStatus status = createStrictMock(JobStatus.class);
expect(status.state()).andReturn(JobStatus.State.DONE);
expect(status.error()).andReturn(error);
expect(status.executionErrors()).andReturn(executionErrors);
expect(bigquery.options()).andReturn(mockOptions);
Job completedJob = expectedJob.toBuilder().status(status).build();
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(completedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(completedJob);
callback.error(error, executionErrors);
EasyMock.expectLastCall();
replay(status, bigquery, callback);
initializeJob();
job.whenDone(callback);
verify(status, callback);
}

@Test
public void testWhenDone_Null() throws InterruptedException {
initializeExpectedJob(1);
Job.CompletionCallback callback = EasyMock.mock(Job.CompletionCallback.class);
BigQuery.JobOption[] expectedOptions = {BigQuery.JobOption.fields(BigQuery.JobField.STATUS)};
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId(), expectedOptions)).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery, callback);
initializeJob();
job.whenDone(callback);
verify(callback);
}

@Test
public void testReload() throws Exception {
initializeExpectedJob(4);
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(expectedJob);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(expectedJob);
replay(bigquery);
initializeJob();
Job updatedJob = job.reload();
Expand All @@ -194,7 +255,7 @@ public void testReload() throws Exception {
public void testReloadNull() throws Exception {
initializeExpectedJob(1);
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job())).andReturn(null);
expect(bigquery.getJob(JOB_INFO.jobId())).andReturn(null);
replay(bigquery);
initializeJob();
assertNull(job.reload());
Expand All @@ -206,7 +267,7 @@ public void testReloadWithOptions() throws Exception {
JobInfo updatedInfo = JOB_INFO.toBuilder().etag("etag").build();
Job expectedJob = new Job(serviceMockReturnsOptions, new JobInfo.BuilderImpl(updatedInfo));
expect(bigquery.options()).andReturn(mockOptions);
expect(bigquery.getJob(JOB_INFO.jobId().job(), BigQuery.JobOption.fields()))
expect(bigquery.getJob(JOB_INFO.jobId(), BigQuery.JobOption.fields()))
.andReturn(expectedJob);
replay(bigquery);
initializeJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
Expand Down Expand Up @@ -188,10 +188,11 @@ public static void beforeClass() throws InterruptedException {
.schema(TABLE_SCHEMA)
.build();
Job job = bigquery.create(JobInfo.of(configuration));
while (!job.isDone()) {
Thread.sleep(1000);
}
assertNull(job.status().error());
Job.CompletionCallback callback = EasyMock.createStrictMock(Job.CompletionCallback.class);
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
job.whenDone(callback);
EasyMock.verify(callback);
}

@AfterClass
Expand Down Expand Up @@ -799,10 +800,11 @@ public void testCopyJob() throws InterruptedException {
TableId destinationTable = TableId.of(DATASET, destinationTableName);
CopyJobConfiguration configuration = CopyJobConfiguration.of(destinationTable, sourceTable);
Job remoteJob = bigquery.create(JobInfo.of(configuration));
while (!remoteJob.isDone()) {
Thread.sleep(1000);
}
assertNull(remoteJob.status().error());
Job.CompletionCallback callback = EasyMock.createStrictMock(Job.CompletionCallback.class);
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
remoteJob.whenDone(callback);
EasyMock.verify(callback);
Table remoteTable = bigquery.getTable(DATASET, destinationTableName);
assertNotNull(remoteTable);
assertEquals(destinationTable.dataset(), remoteTable.tableId().dataset());
Expand All @@ -825,10 +827,11 @@ public void testQueryJob() throws InterruptedException {
.destinationTable(destinationTable)
.build();
Job remoteJob = bigquery.create(JobInfo.of(configuration));
while (!remoteJob.isDone()) {
Thread.sleep(1000);
}
assertNull(remoteJob.status().error());
Job.CompletionCallback callback = EasyMock.createStrictMock(Job.CompletionCallback.class);
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
remoteJob.whenDone(callback);
EasyMock.reset(callback);

QueryResponse response = bigquery.getQueryResults(remoteJob.jobId());
while (!response.jobCompleted()) {
Expand Down Expand Up @@ -866,20 +869,21 @@ public void testExtractJob() throws InterruptedException {
.schema(SIMPLE_SCHEMA)
.build();
Job remoteLoadJob = bigquery.create(JobInfo.of(configuration));
while (!remoteLoadJob.isDone()) {
Thread.sleep(1000);
}
assertNull(remoteLoadJob.status().error());
Job.CompletionCallback callback = EasyMock.createStrictMock(Job.CompletionCallback.class);
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
remoteLoadJob.whenDone(callback);
EasyMock.reset(callback);

ExtractJobConfiguration extractConfiguration =
ExtractJobConfiguration.builder(destinationTable, "gs://" + BUCKET + "/" + EXTRACT_FILE)
.printHeader(false)
.build();
Job remoteExtractJob = bigquery.create(JobInfo.of(extractConfiguration));
while (!remoteExtractJob.isDone()) {
Thread.sleep(1000);
}
assertNull(remoteExtractJob.status().error());
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
remoteExtractJob.whenDone(callback);
EasyMock.verify(callback);
assertEquals(CSV_CONTENT,
new String(storage.readAllBytes(BUCKET, EXTRACT_FILE), StandardCharsets.UTF_8));
assertTrue(bigquery.delete(DATASET, tableName));
Expand All @@ -896,10 +900,11 @@ public void testCancelJob() throws InterruptedException {
.build();
Job remoteJob = bigquery.create(JobInfo.of(configuration));
assertTrue(remoteJob.cancel());
while (!remoteJob.isDone()) {
Thread.sleep(1000);
}
assertNull(remoteJob.status().error());
Job.CompletionCallback callback = EasyMock.createStrictMock(Job.CompletionCallback.class);
callback.success(EasyMock.<Job>anyObject());
EasyMock.replay(callback);
remoteJob.whenDone(callback);
EasyMock.verify(callback);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,17 +523,19 @@ private abstract static class JobRunAction extends BigQueryAction<JobInfo> {
@Override
void run(BigQuery bigquery, JobInfo job) throws Exception {
System.out.println("Creating job");
Job startedJob = bigquery.create(job);
while (!startedJob.isDone()) {
System.out.println("Waiting for job " + startedJob.jobId().job() + " to complete");
Thread.sleep(1000L);
}
if (startedJob.status().error() == null) {
System.out.println("Job " + startedJob.jobId().job() + " succeeded");
} else {
System.out.println("Job " + startedJob.jobId().job() + " failed");
System.out.println("Error: " + startedJob.status().error());
}
final Job startedJob = bigquery.create(job);
startedJob.whenDone(new Job.CompletionCallback() {
@Override
public void success(Job updatedJob) {
System.out.println("Job " + updatedJob.jobId().job() + " succeeded");
}

@Override
public void error(BigQueryError error, List<BigQueryError> executionErrors) {
System.out.println("Job " + startedJob.jobId().job() + " failed");
System.out.println("Error: " + startedJob.status().error());
}
});
}
}

Expand Down
Loading

0 comments on commit f3cd884

Please sign in to comment.