diff --git a/plugins/outputs/bigquery/README.md b/plugins/outputs/bigquery/README.md index 0274336516112..544e75f8d8eab 100644 --- a/plugins/outputs/bigquery/README.md +++ b/plugins/outputs/bigquery/README.md @@ -36,6 +36,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Character to replace hyphens on Metric name # replace_hyphen_to = "_" + + ## Write all metrics in a single compact table + # compact_table = "" ``` Leaving `project` empty indicates the plugin will try to retrieve the project @@ -54,6 +57,36 @@ table on BigQuery: * Should contain the metric's fields with the same name and the column type should match the field type. +## Compact table + +When enabling the compact table, all metrics are inserted to the given table +with the following schema: + +```json +[ + { + "mode": "REQUIRED", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "REQUIRED", + "name": "name", + "type": "STRING" + }, + { + "mode": "REQUIRED", + "name": "tags", + "type": "JSON" + }, + { + "mode": "REQUIRED", + "name": "fields", + "type": "JSON" + } +] +``` + ## Restrictions Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts diff --git a/plugins/outputs/bigquery/bigquery.go b/plugins/outputs/bigquery/bigquery.go index 6190ca44cd17d..21b2fbc26cd29 100644 --- a/plugins/outputs/bigquery/bigquery.go +++ b/plugins/outputs/bigquery/bigquery.go @@ -4,6 +4,7 @@ package bigquery import ( "context" _ "embed" + "encoding/json" "errors" "fmt" "reflect" @@ -34,6 +35,7 @@ type BigQuery struct { Timeout config.Duration `toml:"timeout"` ReplaceHyphenTo string `toml:"replace_hyphen_to"` + CompactTable string `toml:"compact_table"` Log telegraf.Logger `toml:"-"` @@ -62,9 +64,22 @@ func (s *BigQuery) Init() error { func (s *BigQuery) Connect() error { if s.client == nil { - return s.setUpDefaultClient() + if err := s.setUpDefaultClient(); err != nil { + return err + } } + if s.CompactTable != "" { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout)) + defer cancel() + + // Check if the compact table exists + _, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Metadata(ctx) + if err != nil { + return fmt.Errorf("compact table: %w", err) + } + } return nil } @@ -81,7 +96,7 @@ func (s *BigQuery) setUpDefaultClient() error { creds, err := google.FindDefaultCredentials(ctx) if err != nil { return fmt.Errorf( - "unable to find Google Cloud Platform Application Default Credentials: %v. "+ + "unable to find Google Cloud Platform Application Default Credentials: %w. "+ "Either set ADC or provide CredentialsFile config", err) } credentialsOption = option.WithCredentials(creds) @@ -94,6 +109,10 @@ func (s *BigQuery) setUpDefaultClient() error { // Write the metrics to Google Cloud BigQuery. func (s *BigQuery) Write(metrics []telegraf.Metric) error { + if s.CompactTable != "" { + return s.writeCompact(metrics) + } + groupedMetrics := s.groupByMetricName(metrics) var wg sync.WaitGroup @@ -111,6 +130,26 @@ func (s *BigQuery) Write(metrics []telegraf.Metric) error { return nil } +func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout)) + defer cancel() + + // Always returns an instance, even if table doesn't exist (anymore). + inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Inserter() + + var compactValues []*bigquery.ValuesSaver + for _, m := range metrics { + valueSaver, err := s.newCompactValuesSaver(m) + if err != nil { + s.Log.Warnf("could not prepare metric as compact value: %v", err) + } else { + compactValues = append(compactValues, valueSaver) + } + } + return inserter.Put(ctx, compactValues) +} + func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver { groupedMetrics := make(map[string][]bigquery.ValueSaver) @@ -138,6 +177,33 @@ func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver { } } +func (s *BigQuery) newCompactValuesSaver(m telegraf.Metric) (*bigquery.ValuesSaver, error) { + tags, err := json.Marshal(m.Tags()) + if err != nil { + return nil, fmt.Errorf("serializing tags: %w", err) + } + + fields, err := json.Marshal(m.Fields()) + if err != nil { + return nil, fmt.Errorf("serializing fields: %w", err) + } + + return &bigquery.ValuesSaver{ + Schema: bigquery.Schema{ + timeStampFieldSchema(), + newStringFieldSchema("name"), + newJSONFieldSchema("tags"), + newJSONFieldSchema("fields"), + }, + Row: []bigquery.Value{ + m.Time(), + m.Name(), + string(tags), + string(fields), + }, + }, nil +} + func timeStampFieldSchema() *bigquery.FieldSchema { return &bigquery.FieldSchema{ Name: timeStampFieldName, @@ -145,22 +211,29 @@ func timeStampFieldSchema() *bigquery.FieldSchema { } } +func newStringFieldSchema(name string) *bigquery.FieldSchema { + return &bigquery.FieldSchema{ + Name: name, + Type: bigquery.StringFieldType, + } +} + +func newJSONFieldSchema(name string) *bigquery.FieldSchema { + return &bigquery.FieldSchema{ + Name: name, + Type: bigquery.JSONFieldType, + } +} + func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) { for _, t := range m.TagList() { - s = append(s, tagFieldSchema(t)) + s = append(s, newStringFieldSchema(t.Key)) r = append(r, t.Value) } return s, r } -func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema { - return &bigquery.FieldSchema{ - Name: t.Key, - Type: bigquery.StringFieldType, - } -} - func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) { for _, f := range m.FieldList() { s = append(s, valuesSchema(f)) diff --git a/plugins/outputs/bigquery/bigquery_test.go b/plugins/outputs/bigquery/bigquery_test.go index 57f668c0eaf59..20b8f149c45a8 100644 --- a/plugins/outputs/bigquery/bigquery_test.go +++ b/plugins/outputs/bigquery/bigquery_test.go @@ -107,15 +107,42 @@ func TestConnect(t *testing.T) { srv := localBigQueryServer(t) defer srv.Close() - b := &BigQuery{ - Project: "test-project", - Dataset: "test-dataset", - Timeout: defaultTimeout, + tests := []struct { + name string + compactTable string + errorString string + }{ + {name: "normal"}, + { + name: "compact table existing", + compactTable: "test-metrics", + }, + { + name: "compact table not existing", + compactTable: "foobar", + errorString: "compact table: googleapi: got HTTP response code 404", + }, } - require.NoError(t, b.Init()) - require.NoError(t, b.setUpTestClient(srv.URL)) - require.NoError(t, b.Connect()) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: defaultTimeout, + CompactTable: tt.compactTable, + } + + require.NoError(t, b.Init()) + require.NoError(t, b.setUpTestClient(srv.URL)) + + if tt.errorString != "" { + require.ErrorContains(t, b.Connect(), tt.errorString) + } else { + require.NoError(t, b.Connect()) + } + }) + } } func TestWrite(t *testing.T) { @@ -148,6 +175,42 @@ func TestWrite(t *testing.T) { require.Equal(t, mockMetrics[0].Fields()["value"], row.Value) } +func TestWriteCompact(t *testing.T) { + srv := localBigQueryServer(t) + defer srv.Close() + + b := &BigQuery{ + Project: "test-project", + Dataset: "test-dataset", + Timeout: defaultTimeout, + CompactTable: "test-metrics", + } + + mockMetrics := testutil.MockMetrics() + + require.NoError(t, b.Init()) + require.NoError(t, b.setUpTestClient(srv.URL)) + require.NoError(t, b.Connect()) + + require.NoError(t, b.Write(mockMetrics)) + + var rows []map[string]json.RawMessage + require.NoError(t, json.Unmarshal(receivedBody["rows"], &rows)) + require.Len(t, rows, 1) + require.Contains(t, rows[0], "json") + + var row interface{} + require.NoError(t, json.Unmarshal(rows[0]["json"], &row)) + require.Equal(t, map[string]interface{}{ + "timestamp": "2009-11-10T23:00:00Z", + "name": "test1", + "tags": `{"tag1":"value1"}`, + "fields": `{"value":1}`, + }, row) + + require.NoError(t, b.Close()) +} + func (b *BigQuery) setUpTestClient(endpointURL string) error { noAuth := option.WithoutAuthentication() endpoint := option.WithEndpoint(endpointURL) @@ -170,15 +233,22 @@ func localBigQueryServer(t *testing.T) *httptest.Server { srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { - case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll": + case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll", + "/projects/test-project/datasets/test-dataset/tables/test-metrics/insertAll": decoder := json.NewDecoder(r.Body) require.NoError(t, decoder.Decode(&receivedBody)) w.WriteHeader(http.StatusOK) _, err := w.Write([]byte(successfulResponse)) require.NoError(t, err) + case "/projects/test-project/datasets/test-dataset/tables/test-metrics": + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("{}")) + require.NoError(t, err) default: w.WriteHeader(http.StatusNotFound) + _, err := w.Write([]byte(r.URL.String())) + require.NoError(t, err) } }) diff --git a/plugins/outputs/bigquery/sample.conf b/plugins/outputs/bigquery/sample.conf index b556195f92edd..bd51051f643d5 100644 --- a/plugins/outputs/bigquery/sample.conf +++ b/plugins/outputs/bigquery/sample.conf @@ -14,3 +14,6 @@ ## Character to replace hyphens on Metric name # replace_hyphen_to = "_" + + ## Write all metrics in a single compact table + # compact_table = ""