Skip to content

Commit

Permalink
Merge pull request #46 from hafenkran/add_jobs_funcs
Browse files Browse the repository at this point in the history
Add jobs funcs
  • Loading branch information
hafenkran authored Oct 26, 2024
2 parents bb66f6e + a32a54f commit 850b429
Show file tree
Hide file tree
Showing 13 changed files with 553 additions and 29 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,37 @@ D CALL bigquery_execute('bq', '
└─────────┴──────────────────────────────────┴─────────────────┴──────────┴────────────┴───────────────────────┴───────────────────────┘
```

### `bigquery_jobs` Function

The `bigquery_jobs` fucntion retrieves a list of jobs within the specified project. It displays job metadata such as
job state, start and end time, configuration, statistics, and many more. More information on the job information can be found [here](https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/list).

```sql
D ATTACH 'project=my_gcp_project' as bq (TYPE bigquery);
D SELECT * FROM bigquery_jobs('bq', maxResults=2);
┌───────────┬──────────────────────┬───────────┬───┬──────────────────────┬──────────────────┐
│ state │ job_id │ project │ … │ configuration │ status │
varcharvarcharvarchar │ │ json │ json │
├───────────┼──────────────────────┼───────────┼───┼──────────────────────┼──────────────────┤
│ Completed │ job_zAAv42SdMT51qk… │ my_gcp_p… │ … │ {"query":{"query":… │ {"state":"DONE"} │
│ Completed │ job_ro2WURJlGlkXCC… │ my_gcp_p… │ … │ {"query":{"query":… │ {"state":"DONE"} │
├───────────┴──────────────────────┴───────────┴───┴──────────────────────┴──────────────────┤
2 rows 16 columns (5 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────┘
```

The operation supports the following additional named parameters as query arguments:

| Parameter | Type | Description |
| ----------------- | ----------- | ------------------------------------------------------------------------------------------------ |
| `jobId` | `VARCHAR` | Filters results by job ID. Returns only the matching job, ignoring all other arguments. |
| `allUsers` | `BOOLEAN` | If true, returns jobs for all users in the project. Default is false (only current user's jobs). |
| `maxResults` | `INTEGER` | Limits the number of jobs returned. |
| `minCreationTime` | `TIMESTAMP` | Filters jobs created after the specified time (in milliseconds since the epoch). |
| `maxCreationTime` | `TIMESTAMP` | Filters jobs created before the specified time (in milliseconds since the epoch). |
| `stateFilter` | `VARCHAR` | Filters jobs by state (e.g., `PENDING`, `RUNNING`,`DONE`). |
| `parentJobId` | `VARCHAR` | Filters results to only include child jobs of the specified parent job ID. |

### `bigquery_clear_cache` Function

DuckDB caches schema metadata, such as datasets and table structures, to avoid repeated fetches from BigQuery. If the schema changes externally, use `bigquery_clear_cache` to update the cache and retrieve the latest schema information:
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ set(EXTENSION_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/bigquery_storage.cpp
${CMAKE_CURRENT_SOURCE_DIR}/bigquery_utils.cpp
${CMAKE_CURRENT_SOURCE_DIR}/bigquery_sql.cpp
${CMAKE_CURRENT_SOURCE_DIR}/bigquery_jobs.cpp
PARENT_SCOPE)
123 changes: 114 additions & 9 deletions src/bigquery_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ google::cloud::Options BigqueryClient::OptionsGRPC() {
auto options = google::cloud::Options{};
if (!config.grpc_endpoint.empty()) {
options.set<google::cloud::EndpointOption>(config.grpc_endpoint);
if (config.is_dev_env()) {
if (config.is_dev_env()) {
options.set<google::cloud::GrpcCredentialOption>(grpc::InsecureChannelCredentials());
}
}
Expand All @@ -118,7 +118,7 @@ vector<BigqueryDatasetRef> BigqueryClient::GetDatasets() {
}

google::cloud::bigquery::v2::ListFormatDataset dataset_val = dataset.value();
auto dataset_ref = dataset_val.dataset_reference();
const auto &dataset_ref = dataset_val.dataset_reference();

BigqueryDatasetRef info;
info.project_id = dataset_ref.project_id();
Expand Down Expand Up @@ -150,7 +150,7 @@ vector<BigqueryTableRef> BigqueryClient::GetTables(const string &dataset_id) {
}

google::cloud::bigquery::v2::ListFormatTable table_val = table.value();
auto table_ref = table_val.table_reference();
const auto &table_ref = table_val.table_reference();

BigqueryTableRef info;
info.project_id = table_ref.project_id();
Expand All @@ -175,7 +175,7 @@ BigqueryDatasetRef BigqueryClient::GetDataset(const string &dataset_id) {
}

auto dataset = response.value();
auto dataset_ref = dataset.dataset_reference();
const auto &dataset_ref = dataset.dataset_reference();

BigqueryDatasetRef info;
info.project_id = dataset_ref.project_id();
Expand All @@ -184,6 +184,111 @@ BigqueryDatasetRef BigqueryClient::GetDataset(const string &dataset_id) {
return info;
}

vector<google::cloud::bigquery::v2::ListFormatJob> BigqueryClient::ListJobs(const ListJobsParams &params) {
auto client = google::cloud::bigquerycontrol_v2::JobServiceClient(
google::cloud::bigquerycontrol_v2::MakeJobServiceConnectionRest(OptionsAPI()));

auto request = google::cloud::bigquery::v2::ListJobsRequest();
request.set_project_id(config.project_id);

// Default is 1000
std::int32_t max_results = 1000;
if (params.max_results.has_value()) {
max_results = params.max_results.value();
}
request.mutable_max_results()->set_value(max_results);

if (params.all_users.has_value()) {
auto all_users = params.all_users.value();
request.set_all_users(all_users);
}
if (params.min_creation_time.has_value()) {
auto min_creation_time = params.min_creation_time.value();
auto timestamp_ms = Timestamp::GetEpochMs(min_creation_time);
request.set_min_creation_time(timestamp_ms);
}
if (params.max_creation_time.has_value()) {
auto max_creation_time = params.max_creation_time.value();
auto timestamp_ms = Timestamp::GetEpochMs(max_creation_time);
request.mutable_max_creation_time()->set_value(timestamp_ms);
}
if (params.projection.has_value()) {
auto projection = params.projection.value();
if (projection == "full") {
auto mapped = google::cloud::bigquery::v2::ListJobsRequest_Projection::ListJobsRequest_Projection_FULL;
request.set_projection(mapped);
} else if (projection == "minimal") {
auto mapped = google::cloud::bigquery::v2::ListJobsRequest_Projection::ListJobsRequest_Projection_MINIMAL;
request.set_projection(mapped);
} else {
throw BinderException("Invalid projection value: %s", projection);
}
}
if (params.state_filter.has_value()) {
auto state_filter = params.state_filter.value();
std::transform(state_filter.begin(), state_filter.end(), state_filter.begin(), ::tolower);

if (state_filter == "done") {
request.add_state_filter(
google::cloud::bigquery::v2::ListJobsRequest_StateFilter::ListJobsRequest_StateFilter_DONE);
} else if (state_filter == "pending") {
request.add_state_filter(
google::cloud::bigquery::v2::ListJobsRequest_StateFilter::ListJobsRequest_StateFilter_PENDING);
} else if (state_filter == "running") {
request.add_state_filter(
google::cloud::bigquery::v2::ListJobsRequest_StateFilter::ListJobsRequest_StateFilter_RUNNING);
} else {
throw BinderException("Invalid state filter value: %s", state_filter);
}
}
if (params.parent_job_id.has_value()) {
auto parent_job_id = params.parent_job_id.value();
request.set_parent_job_id(parent_job_id);
}

vector<google::cloud::bigquery::v2::ListFormatJob> result;
google::cloud::v2_27::StreamRange<google::cloud::bigquery::v2::ListFormatJob> response = client.ListJobs(request);

int num_results = 0;
for (const auto &job : response) {
if (!job.ok()) {
throw BinderException(job.status().message());
}
auto job_val = job.value();
result.push_back(job_val);

num_results++;
if (num_results >= max_results) {
break;
}
}
return result;
}

google::cloud::bigquery::v2::Job BigqueryClient::GetJob(const string &job_id, const string &location) {
if (job_id.empty()) {
throw BinderException("Job ID cannot be empty");
}

auto client = google::cloud::bigquerycontrol_v2::JobServiceClient(
google::cloud::bigquerycontrol_v2::MakeJobServiceConnectionRest(OptionsAPI()));

auto request = google::cloud::bigquery::v2::GetJobRequest();
request.set_project_id(config.project_id);
request.set_job_id(job_id);
if (!location.empty()) {
request.set_location(location);
}

auto response = client.GetJob(request);
if (!response.ok()) {
throw BinderException(response.status().message());
}

auto job = response.value();
return job;
}

BigqueryTableRef BigqueryClient::GetTable(const string &dataset_id, const string &table_id) {
auto client = make_shared_ptr<google::cloud::bigquerycontrol_v2::TableServiceClient>(
google::cloud::bigquerycontrol_v2::MakeTableServiceConnectionRest(OptionsAPI()));
Expand All @@ -198,8 +303,8 @@ BigqueryTableRef BigqueryClient::GetTable(const string &dataset_id, const string
throw InternalException(response.status().message());
}

auto tablae = response.value();
auto table_ref = tablae.table_reference();
auto table = response.value();
const auto &table_ref = table.table_reference();

BigqueryTableRef info;
info.project_id = table_ref.project_id();
Expand Down Expand Up @@ -315,8 +420,8 @@ void BigqueryClient::GetTableInfo(const string &dataset_id,
ColumnDefinition column(field.name(), std::move(column_type));
// column.SetComment(std::move(field.description));

auto default_value_expr = field.default_value_expression();
auto default_value = default_value_expr.value();
const auto &default_value_expr = field.default_value_expression();
const auto &default_value = default_value_expr.value();
if (!default_value.empty() && default_value != "\"\"") {
auto expressions = Parser::ParseExpressionList(default_value);
if (expressions.empty()) {
Expand All @@ -329,7 +434,7 @@ void BigqueryClient::GetTableInfo(const string &dataset_id,

// The field mode. Possible values include NULLABLE, REQUIRED and REPEATED.
// The default value is NULLABLE.
auto mode = field.mode();
const auto &mode = field.mode();
if (mode == "REQUIRED") {
auto field_name = field.name();
auto field_index = res_columns.GetColumnIndex(field_name);
Expand Down
4 changes: 4 additions & 0 deletions src/bigquery_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "bigquery_extension.hpp"
#include "bigquery_scan.hpp"
#include "bigquery_storage.hpp"
#include "bigquery_jobs.hpp"

namespace duckdb {

Expand All @@ -36,6 +37,9 @@ static void LoadInternal(DatabaseInstance &instance) {
bigquery::BigQueryExecuteFunction bigquery_execute_function;
ExtensionUtil::RegisterFunction(instance, bigquery_execute_function);

bigquery::BigQueryListJobsFunction bigquery_list_jobs_function;
ExtensionUtil::RegisterFunction(instance, bigquery_list_jobs_function);

auto &config = DBConfig::GetConfig(instance);
config.storage_extensions["bigquery"] = make_uniq<bigquery::BigqueryStorageExtension>();

Expand Down
Loading

0 comments on commit 850b429

Please sign in to comment.