Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove JobInfo hierarchy, add JobConfiguration hierarchy #584

Merged
merged 6 commits into from
Jan 29, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ public static JobListOption startPageToken(String pageToken) {
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()}, {@link JobStatus#state()},
* {@link JobStatus#error()} as well as type-specific configuration (e.g.
* {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if not specified.
* {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when listing jobs.
* {@link QueryJobConfiguration#query()} for Query Jobs) are always returned, even if not
* specified. {@link JobField#SELF_LINK} and {@link JobField#ETAG} can not be selected when
* listing jobs.
*/
public static JobListOption fields(JobField... fields) {
String selector = JobField.selector(fields);
Expand All @@ -397,8 +398,8 @@ private JobOption(BigQueryRpc.Option option, Object value) {
* Returns an option to specify the job's fields to be returned by the RPC call. If this option
* is not provided all job's fields are returned. {@code JobOption.fields()} can be used to
* specify only the fields of interest. {@link JobInfo#jobId()} as well as type-specific
* configuration (e.g. {@link QueryJobInfo#query()} for Query Jobs) are always returned, even if
* not specified.
* configuration (e.g. {@link QueryJobConfiguration#query()} for Query Jobs) are always
* returned, even if not specified.
*/
public static JobOption fields(JobField... fields) {
return new JobOption(BigQueryRpc.Option.FIELDS, JobField.selector(fields));
Expand Down Expand Up @@ -470,7 +471,7 @@ public static QueryResultsOption maxWaitTime(long maxWaitTime) {
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException;
JobInfo create(JobInfo job, JobOption... options) throws BigQueryException;

/**
* Returns the requested dataset or {@code null} if not found.
Expand Down Expand Up @@ -611,14 +612,14 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(String jobId, JobOption... options) throws BigQueryException;

/**
* Returns the requested job or {@code null} if not found.
*
* @throws BigQueryException upon failure
*/
<T extends JobInfo> T getJob(JobId jobId, JobOption... options) throws BigQueryException;
JobInfo getJob(JobId jobId, JobOption... options) throws BigQueryException;

/**
* Lists the jobs.
Expand Down Expand Up @@ -665,9 +666,9 @@ Page<List<FieldValue>> listTableData(TableId tableId, TableDataListOption... opt

/**
* Returns a channel to write data to be inserted into a BigQuery table. Data format and other
* options can be configured using the {@link LoadConfiguration} parameter.
* options can be configured using the {@link WriteChannelConfiguration} parameter.
*
* @throws BigQueryException upon failure
*/
TableDataWriteChannel writer(LoadConfiguration loadConfiguration);
TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public Table call() {
}

@Override
public <T extends JobInfo> T create(T job, JobOption... options) throws BigQueryException {
public JobInfo create(JobInfo job, JobOption... options) throws BigQueryException {
final Job jobPb = setProjectId(job).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand Down Expand Up @@ -442,12 +442,12 @@ public List<FieldValue> apply(TableRow rowPb) {
}

@Override
public <T extends JobInfo> T getJob(String jobId, JobOption... options) throws BigQueryException {
public JobInfo getJob(String jobId, JobOption... options) throws BigQueryException {
return getJob(JobId.of(jobId), options);
}

@Override
public <T extends JobInfo> T getJob(final JobId jobId, JobOption... options)
public JobInfo getJob(final JobId jobId, JobOption... options)
throws BigQueryException {
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
try {
Expand All @@ -457,7 +457,7 @@ public Job call() {
return bigQueryRpc.getJob(jobId.job(), optionsMap);
}
}, options().retryParams(), EXCEPTION_HANDLER);
return answer == null ? null : JobInfo.<T>fromPb(answer);
return answer == null ? null : JobInfo.fromPb(answer);
} catch (RetryHelper.RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -596,8 +596,8 @@ private static QueryResult.Builder transformQueryResults(JobId jobId, List<Table
.results(transformTableData(rowsPb));
}

public TableDataWriteChannel writer(LoadConfiguration loadConfiguration) {
return new TableDataWriteChannel(options(), setProjectId(loadConfiguration));
public TableDataWriteChannel writer(WriteChannelConfiguration writeChannelConfiguration) {
return new TableDataWriteChannel(options(), setProjectId(writeChannelConfiguration));
}

private Map<BigQueryRpc.Option, ?> optionMap(Option... options) {
Expand Down Expand Up @@ -646,42 +646,48 @@ private TableId setProjectId(TableId table) {
}

private JobInfo setProjectId(JobInfo job) {
if (job instanceof CopyJobInfo) {
CopyJobInfo copyJob = (CopyJobInfo) job;
CopyJobInfo.Builder copyBuilder = copyJob.toBuilder();
copyBuilder.destinationTable(setProjectId(copyJob.destinationTable()));
copyBuilder.sourceTables(
Lists.transform(copyJob.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
return copyBuilder.build();
}
if (job instanceof QueryJobInfo) {
QueryJobInfo queryJob = (QueryJobInfo) job;
QueryJobInfo.Builder queryBuilder = queryJob.toBuilder();
if (queryJob.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryJob.destinationTable()));
}
if (queryJob.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryJob.defaultDataset()));
}
return queryBuilder.build();
}
if (job instanceof ExtractJobInfo) {
ExtractJobInfo extractJob = (ExtractJobInfo) job;
ExtractJobInfo.Builder extractBuilder = extractJob.toBuilder();
extractBuilder.sourceTable(setProjectId(extractJob.sourceTable()));
return extractBuilder.build();
}
if (job instanceof LoadJobInfo) {
LoadJobInfo loadJob = (LoadJobInfo) job;
LoadJobInfo.Builder loadBuilder = loadJob.toBuilder();
return loadBuilder.configuration(setProjectId(loadJob.configuration())).build();
}
return job;
JobConfiguration configuration = job.configuration();
JobInfo.Builder jobBuilder = job.toBuilder();
switch (configuration.type()) {

This comment was marked as spam.

case COPY:
CopyJobConfiguration copyConfiguration = (CopyJobConfiguration) configuration;
CopyJobConfiguration.Builder copyBuilder = copyConfiguration.toBuilder();
copyBuilder.sourceTables(
Lists.transform(copyConfiguration.sourceTables(), new Function<TableId, TableId>() {
@Override
public TableId apply(TableId tableId) {
return setProjectId(tableId);
}
}));
copyBuilder.destinationTable(setProjectId(copyConfiguration.destinationTable()));
jobBuilder.configuration(copyBuilder.build());
break;
case QUERY:
QueryJobConfiguration queryConfiguration = (QueryJobConfiguration) configuration;
QueryJobConfiguration.Builder queryBuilder = queryConfiguration.toBuilder();
if (queryConfiguration.destinationTable() != null) {
queryBuilder.destinationTable(setProjectId(queryConfiguration.destinationTable()));
}
if (queryConfiguration.defaultDataset() != null) {
queryBuilder.defaultDataset(setProjectId(queryConfiguration.defaultDataset()));
}
jobBuilder.configuration(queryBuilder.build());
break;
case EXTRACT:
ExtractJobConfiguration extractConfiguration = (ExtractJobConfiguration) configuration;
ExtractJobConfiguration.Builder extractBuilder = extractConfiguration.toBuilder();
extractBuilder.sourceTable(setProjectId(extractConfiguration.sourceTable()));
jobBuilder.configuration(extractBuilder.build());
break;
case LOAD:
LoadJobConfiguration loadConfiguration = (LoadJobConfiguration) configuration;
jobBuilder.configuration(setProjectId(loadConfiguration));
break;
default:
// never reached
throw new IllegalArgumentException("Job configuration is not supported");
}
return jobBuilder.build();
}

private QueryRequest setProjectId(QueryRequest request) {
Expand All @@ -692,9 +698,10 @@ private QueryRequest setProjectId(QueryRequest request) {
return builder.build();
}

private LoadConfiguration setProjectId(LoadConfiguration configuration) {
@SuppressWarnings("unchecked")
private <T extends LoadConfiguration> T setProjectId(T configuration) {
LoadConfiguration.Builder builder = configuration.toBuilder();
builder.destinationTable(setProjectId(configuration.destinationTable()));
return builder.build();
return (T) builder.build();
}
}
Loading