Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
BigQuery plugin failed to read credential (#230)
Browse files Browse the repository at this point in the history
* BigQuery failed to get credential

Signed-off-by: Kevin Su <[email protected]>

* Fixed test errors

Signed-off-by: Kevin Su <[email protected]>

* Fixed test errors

Signed-off-by: Kevin Su <[email protected]>

* Add log

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Jan 11, 2022
1 parent 973a51a commit 4362382
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go/tasks/plugins/webapi/bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func TestEndToEnd(t *testing.T) {
t.Run("SELECT 1", func(t *testing.T) {
queryJobConfig := QueryJobConfig{
ProjectID: "flyte",
Query: "SELECT 1",
}

inputs, _ := coreutils.MakeLiteralMap(map[string]interface{}{"x": 1})
custom, _ := pluginUtils.MarshalObjToStruct(queryJobConfig)
template := flyteIdlCore.TaskTemplate{
Type: bigqueryQueryJobTask,
Custom: custom,
Target: &flyteIdlCore.TaskTemplate_Sql{Sql: &flyteIdlCore.Sql{Statement: "SELECT 1", Dialect: flyteIdlCore.Sql_ANSI}},
}

phase := tests.RunPluginEndToEndTest(t, plugin, &template, inputs, nil, nil, iter)
Expand Down
10 changes: 7 additions & 3 deletions go/tasks/plugins/webapi/bigquery/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"net/http"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"golang.org/x/oauth2"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/google"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/api/bigquery/v2"
Expand Down Expand Up @@ -105,6 +105,7 @@ func (p Plugin) createImpl(ctx context.Context, taskCtx webapi.TaskExecutionCont
return nil, nil, err
}

job.Configuration.Query.Query = taskTemplate.GetSql().Statement
job.Configuration.Labels = taskCtx.TaskExecutionMetadata().GetLabels()

resp, err := client.Jobs.Insert(job.JobReference.ProjectId, job).Do()
Expand Down Expand Up @@ -456,14 +457,17 @@ func (p Plugin) newBigQueryClient(ctx context.Context, identity google.Identity)
options = append(options,
option.WithEndpoint(p.cfg.bigQueryEndpoint),
option.WithTokenSource(oauth2.StaticTokenSource(&oauth2.Token{})))
} else {
} else if p.cfg.GoogleTokenSource.Type != "default" {

tokenSource, err := p.googleTokenSource.GetTokenSource(ctx, identity)

if err != nil {
return nil, pluginErrors.Wrapf(pluginErrors.RuntimeFailure, err, "unable to get token source")
}

options = append(options, option.WithTokenSource(tokenSource))
} else {
logger.Infof(ctx, "BigQuery client read $GOOGLE_APPLICATION_CREDENTIALS by default")
}

return bigquery.NewService(ctx, options...)
Expand Down
5 changes: 4 additions & 1 deletion go/tasks/plugins/webapi/bigquery/query_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func getJobConfigurationQuery(custom *QueryJobConfig, inputs *flyteIdlCore.Liter
return nil, pluginErrors.Errorf(pluginErrors.BadTaskSpecification, "unable build query parameters [%v]", err.Error())
}

// BigQuery supports query parameters to help prevent SQL injection when queries are constructed using user input.
// This feature is only available with standard SQL syntax. For more detail: https://cloud.google.com/bigquery/docs/parameterized-queries
useLegacySQL := false
return &bigquery.JobConfigurationQuery{
AllowLargeResults: custom.AllowLargeResults,
Clustering: custom.Clustering,
Expand All @@ -178,7 +181,7 @@ func getJobConfigurationQuery(custom *QueryJobConfig, inputs *flyteIdlCore.Liter
SchemaUpdateOptions: custom.SchemaUpdateOptions,
TableDefinitions: custom.TableDefinitions,
TimePartitioning: custom.TimePartitioning,
UseLegacySql: custom.UseLegacySQL,
UseLegacySql: &useLegacySQL,
UseQueryCache: custom.UseQueryCache,
UserDefinedFunctionResources: custom.UserDefinedFunctionResources,
WriteDisposition: custom.WriteDisposition,
Expand Down
3 changes: 2 additions & 1 deletion go/tasks/plugins/webapi/bigquery/query_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ func TestGetJobConfigurationQuery(t *testing.T) {
})

jobConfigurationQuery, err := getJobConfigurationQuery(&config, inputs)
useLegacySQL := false

assert.NoError(t, err)
assert.Equal(t, "NAMED", jobConfigurationQuery.ParameterMode)

assert.Equal(t, &useLegacySQL, jobConfigurationQuery.UseLegacySql)
assert.Equal(t, 1, len(jobConfigurationQuery.QueryParameters))
assert.Equal(t, bigquery.QueryParameter{
Name: "integer",
Expand Down

0 comments on commit 4362382

Please sign in to comment.