Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit for BigQuery backend #30

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/planet-federator/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"planet-exporter/federator"
"planet-exporter/federator/bigquery"
"planet-exporter/prometheus"

cron "github.com/robfig/cron/v3"
Expand Down Expand Up @@ -54,6 +55,10 @@ type Config struct {
InfluxdbBucket string
InfluxdbBatchSize int

BigqueryProjectID string
BigqueryTrafficTable bigquery.TableMetadata
BigqueryDependencyTable bigquery.TableMetadata

PrometheusAddr string
}

Expand Down
59 changes: 44 additions & 15 deletions cmd/planet-federator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (

"planet-exporter/cmd/planet-federator/internal"
federator "planet-exporter/federator"
bigqueryFederator "planet-exporter/federator/bigquery"
influxdbFederator "planet-exporter/federator/influxdb"
"planet-exporter/prometheus"

"cloud.google.com/go/bigquery"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2domain "github.com/influxdata/influxdb-client-go/v2/domain"
promapi "github.com/prometheus/client_golang/api"
Expand Down Expand Up @@ -62,15 +64,29 @@ func main() {
flag.BoolVar(&config.LogDisableColors, "log-disable-colors", false, "Disable colors on logger")
flag.BoolVar(&showVersionAndExit, "version", false, "Show version and exit")

// Influxdb
// Prometheus
flag.StringVar(&config.PrometheusAddr, "prometheus-addr", "http://127.0.0.1:9090/", "Prometheus address containing planet-exporter metrics")

//
// Federator backends
//

// Backend: Influxdb (default)
flag.StringVar(&config.InfluxdbAddr, "influxdb-addr", "http://127.0.0.1:8086", "Target Influxdb HTTP Address to store pre-processed planet-exporter data")
flag.StringVar(&config.InfluxdbToken, "influxdb-token", "", "Target Influxdb token")
flag.StringVar(&config.InfluxdbOrg, "influxdb-org", "mothership", "Influxdb organization")
flag.StringVar(&config.InfluxdbBucket, "influxdb-bucket", "mothership", "Influxdb bucket")
flag.IntVar(&config.InfluxdbBatchSize, "influxdb-batch-size", defaultInfluxBatchSize, "Influxdb batch size")

// Prometheus
flag.StringVar(&config.PrometheusAddr, "prometheus-addr", "http://127.0.0.1:9090/", "Prometheus address containing planet-exporter metrics")
// Backend: BigQuery
// If bq-project-id is set, Federator will use Bigquery backend over Influxdb.
// TODO: Support a more flexible approach to multiple Federator backends.
flag.StringVar(&config.BigqueryProjectID, "bq-project-id", "", "BQ Project ID for target dataset")
// We assume the datasets/tables live in the same GCP Project
flag.StringVar(&config.BigqueryTrafficTable.DatasetID, "bq-traffic-dataset-id", "", "BQ Dataset ID for traffic table")
flag.StringVar(&config.BigqueryTrafficTable.TableID, "bq-traffic-table-id", "", "BQ Table ID for traffic table")
flag.StringVar(&config.BigqueryDependencyTable.DatasetID, "bq-dependency-dataset-id", "", "BQ Dataset ID for dependency table")
flag.StringVar(&config.BigqueryDependencyTable.TableID, "bq-dependency-table-id", "", "BQ Table ID for dependency table")

flag.Parse()

Expand Down Expand Up @@ -109,22 +125,35 @@ func main() {
log.Fatalf("Error initializing Prometheus client for addr %v: %v", config.PrometheusAddr, err)
}

log.Info("Initialize Influxdb client")
influxdbClient := influxdb2.NewClient(config.InfluxdbAddr, config.InfluxdbToken)
influxdbHealth, err := influxdbClient.Health(ctx)
if err != nil {
log.Fatalf("Target Influxdb (%v) health-check error: %v", config.InfluxdbAddr, err)
}
if influxdbHealth.Status != influxdb2domain.HealthCheckStatusPass {
log.Fatalf("Target Influxdb (%v) is unhealthy: %v", config.InfluxdbAddr, err)
}
defer influxdbClient.Close()

log.Info("Initialize Prometheus service")
prometheusSvc := prometheus.New(promapiClient)

log.Info("Initialize Federator service")
federatorBackend := influxdbFederator.New(influxdbClient, config.InfluxdbOrg, config.InfluxdbBucket)
var federatorBackend federator.Backend
// Prefer BigQuery backend whenever it's set
if config.BigqueryProjectID != "" {
log.Info("Initialize Bigquery client")
bqClient, err := bigquery.NewClient(ctx, config.BigqueryProjectID)
if err != nil {
log.Fatalf("Error initializing BigQuery client for GCP Project %v: %v", config.BigqueryProjectID, err)
}

federatorBackend = bigqueryFederator.New(bqClient, config.BigqueryTrafficTable, config.BigqueryDependencyTable)

} else { // Default Influxdb backend
log.Info("Initialize Influxdb client")
influxdbClient := influxdb2.NewClient(config.InfluxdbAddr, config.InfluxdbToken)
influxdbHealth, err := influxdbClient.Health(ctx)
if err != nil {
log.Fatalf("Target Influxdb (%v) health-check error: %v", config.InfluxdbAddr, err)
}
if influxdbHealth.Status != influxdb2domain.HealthCheckStatusPass {
log.Fatalf("Target Influxdb (%v) is unhealthy: %v", config.InfluxdbAddr, err)
}
defer influxdbClient.Close()

federatorBackend = influxdbFederator.New(influxdbClient, config.InfluxdbOrg, config.InfluxdbBucket)
}
federatorSvc := federator.New(federatorBackend)

log.Info("Initialize main service")
Expand Down
Loading