Skip to content

Commit

Permalink
Add BigQuery plugin (flyteorg#161)
Browse files Browse the repository at this point in the history
Signed-off-by: Gleb Kanterov <[email protected]>
  • Loading branch information
kanterov authored Jun 4, 2021
1 parent b7bb6d1 commit 632c508
Show file tree
Hide file tree
Showing 17 changed files with 1,714 additions and 2 deletions.
4 changes: 3 additions & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ require (
go.uber.org/zap v1.16.0 // indirect
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 // indirect
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93 // indirect
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04 // indirect
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
google.golang.org/api v0.40.0
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
gotest.tools v2.2.0+incompatible
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
Expand Down
4 changes: 4 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 h1:cQyO5JQ2iuHnEcF3v24kdDMsgh04RjyFPDtuvD6PCE0=
Expand Down Expand Up @@ -216,6 +217,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 h1:cTavhURetDkezJCvxFggiyLeP40Mrk/TtVg2+ycw1Es=
github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607/go.mod h1:Cg4fM0vhYWOZdgM7RIOSTRNIc8/VT7CXClC3Ni86lu4=
github.com/evanphx/json-patch v0.0.0-20200808040245-162e5629780b/go.mod h1:NAJj0yf/KaRKURN6nyi7A9IZydMivZEm9oQLWNjfKDc=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand All @@ -225,6 +227,7 @@ github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLi
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.18.48 h1:WYTat8kFS0mDxLoTEQai2/uy4YO/cavsvh1t3/EKQCw=
github.com/flyteorg/flyteidl v0.18.48/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
Expand Down Expand Up @@ -1212,6 +1215,7 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4 h1:UoveltGrhghAA7ePc+e+QYDHXrBps2PqFZiHkGR/xK8=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210217171935-8e2decd92398/go.mod h1:60tmSUpHxGPFerNHbo/ayI2lKxvtrhbxFyXuEIWJd78=
k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78=
Expand Down
6 changes: 6 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
return pi
}

func PhaseInfoQueuedWithTaskInfo(version uint32, reason string, info *TaskInfo) PhaseInfo {
pi := phaseInfo(PhaseQueued, version, nil, info)
pi.reason = reason
return pi
}

func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {

pi := phaseInfo(PhaseInitializing, version, nil, info)
Expand Down
19 changes: 19 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/google/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package google

type TokenSourceFactoryType = string

const (
TokenSourceTypeDefault = "default"
)

type TokenSourceFactoryConfig struct {
// Type is type of TokenSourceFactory, possible values are 'default' or 'gke'.
// - 'default' uses default credentials, see https://cloud.google.com/iam/docs/service-accounts#default
Type TokenSourceFactoryType `json:"type" pflag:",Defines type of TokenSourceFactory, possible values are 'default'"`
}

func GetDefaultConfig() TokenSourceFactoryConfig {
return TokenSourceFactoryConfig{
Type: "default",
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package google

import (
"context"

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
)

type defaultTokenSource struct{}

func (m *defaultTokenSource) GetTokenSource(ctx context.Context, identity Identity) (oauth2.TokenSource, error) {
return google.DefaultTokenSource(ctx)
}

func NewDefaultTokenSourceFactory() (TokenSourceFactory, error) {
return &defaultTokenSource{}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package google

import (
"context"

"github.com/pkg/errors"
"golang.org/x/oauth2"
)

type Identity struct {
K8sNamespace string
K8sServiceAccount string
}

type TokenSourceFactory interface {
GetTokenSource(ctx context.Context, identity Identity) (oauth2.TokenSource, error)
}

func NewTokenSourceFactory(config TokenSourceFactoryConfig) (TokenSourceFactory, error) {
if config.Type == TokenSourceTypeDefault {
return NewDefaultTokenSourceFactory()
}

return nil, errors.Errorf("unknown token source type [%v], possible values are: 'default'", config.Type)
}
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (p *taskPluginRegistry) RegisterRemotePlugin(info webapi.PluginEntry) {
p.corePlugin = append(p.corePlugin, internalRemote.CreateRemotePlugin(info))
}

func CreateRemotePlugin(pluginEntry webapi.PluginEntry) core.PluginEntry {
return internalRemote.CreateRemotePlugin(pluginEntry)
}

// Use this method to register Kubernetes Plugins
func (p *taskPluginRegistry) RegisterK8sPlugin(info k8s.PluginEntry) {
if info.ID == "" {
Expand Down
74 changes: 74 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/bigquery/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Package bigquery implements WebAPI plugin for Google BigQuery
package bigquery

import (
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/google"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/webapi"
"github.com/flyteorg/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig

var (
defaultConfig = Config{
WebAPI: webapi.PluginConfig{
ResourceQuotas: map[core.ResourceNamespace]int{
"default": 1000,
},
ReadRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
WriteRateLimiter: webapi.RateLimiterConfig{
Burst: 100,
QPS: 10,
},
Caching: webapi.CachingConfig{
Size: 500000,
ResyncInterval: config.Duration{Duration: 30 * time.Second},
Workers: 10,
MaxSystemFailures: 5,
},
ResourceMeta: nil,
},
ResourceConstraints: core.ResourceConstraintsSpec{
ProjectScopeResourceConstraint: &core.ResourceConstraint{
Value: 100,
},
NamespaceScopeResourceConstraint: &core.ResourceConstraint{
Value: 50,
},
},
GoogleTokenSource: google.GetDefaultConfig(),
}

configSection = pluginsConfig.MustRegisterSubSection("bigquery", &defaultConfig)
)

// Config is config for 'bigquery' plugin
type Config struct {
// WebAPI defines config for the base WebAPI plugin
WebAPI webapi.PluginConfig `json:"webApi" pflag:",Defines config for the base WebAPI plugin."`

// ResourceConstraints defines resource constraints on how many executions to be created per project/overall at any given time
ResourceConstraints core.ResourceConstraintsSpec `json:"resourceConstraints" pflag:"-,Defines resource constraints on how many executions to be created per project/overall at any given time."`

// GoogleTokenSource configures token source for BigQuery client
GoogleTokenSource google.TokenSourceFactoryConfig `json:"googleTokenSource" pflag:",Defines Google token source"`

// bigQueryEndpoint overrides BigQuery client endpoint, only for testing
bigQueryEndpoint string
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)
}
55 changes: 55 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/bigquery/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 632c508

Please sign in to comment.