Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.bigquery): Allow to add metrics in one compact table #14342

Merged
33 changes: 33 additions & 0 deletions plugins/outputs/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"

## Write all metrics in a single compact table
# compact_table = ""
```

Leaving `project` empty indicates the plugin will try to retrieve the project
Expand All @@ -54,6 +57,36 @@ table on BigQuery:
* Should contain the metric's fields with the same name and the column type
should match the field type.

## Compact table

When enabling the compact table, all metrics are inserted to the given table
with the following schema:

```json
[
{
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP"
},
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "tags",
"type": "JSON"
},
{
"mode": "REQUIRED",
"name": "fields",
"type": "JSON"
}
]
```

## Restrictions

Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts
Expand Down
93 changes: 83 additions & 10 deletions plugins/outputs/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bigquery
import (
"context"
_ "embed"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -34,6 +35,7 @@ type BigQuery struct {

Timeout config.Duration `toml:"timeout"`
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
CompactTable string `toml:"compact_table"`

Log telegraf.Logger `toml:"-"`

Expand Down Expand Up @@ -62,9 +64,22 @@ func (s *BigQuery) Init() error {

func (s *BigQuery) Connect() error {
if s.client == nil {
return s.setUpDefaultClient()
if err := s.setUpDefaultClient(); err != nil {
return err
}
}

if s.CompactTable != "" {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()

// Check if the compact table exists
_, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Metadata(ctx)
if err != nil {
return fmt.Errorf("compact table: %w", err)
}
}
return nil
}

Expand All @@ -81,7 +96,7 @@ func (s *BigQuery) setUpDefaultClient() error {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return fmt.Errorf(
"unable to find Google Cloud Platform Application Default Credentials: %v. "+
"unable to find Google Cloud Platform Application Default Credentials: %w. "+
"Either set ADC or provide CredentialsFile config", err)
}
credentialsOption = option.WithCredentials(creds)
Expand All @@ -94,6 +109,10 @@ func (s *BigQuery) setUpDefaultClient() error {

// Write the metrics to Google Cloud BigQuery.
func (s *BigQuery) Write(metrics []telegraf.Metric) error {
if s.CompactTable != "" {
return s.writeCompact(metrics)
}

groupedMetrics := s.groupByMetricName(metrics)

var wg sync.WaitGroup
Expand All @@ -111,6 +130,26 @@ func (s *BigQuery) Write(metrics []telegraf.Metric) error {
return nil
}

func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()

// Always returns an instance, even if table doesn't exist (anymore).
inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTable).Inserter()
Hipska marked this conversation as resolved.
Show resolved Hide resolved

var compactValues []*bigquery.ValuesSaver
for _, m := range metrics {
valueSaver, err := s.newCompactValuesSaver(m)
if err != nil {
s.Log.Warnf("could not prepare metric as compact value: %v", err)
} else {
compactValues = append(compactValues, valueSaver)
}
}
return inserter.Put(ctx, compactValues)
}

func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver {
groupedMetrics := make(map[string][]bigquery.ValueSaver)

Expand Down Expand Up @@ -138,29 +177,63 @@ func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
}
}

func (s *BigQuery) newCompactValuesSaver(m telegraf.Metric) (*bigquery.ValuesSaver, error) {
tags, err := json.Marshal(m.Tags())
if err != nil {
return nil, fmt.Errorf("serializing tags: %w", err)
}

fields, err := json.Marshal(m.Fields())
if err != nil {
return nil, fmt.Errorf("serializing fields: %w", err)
}

return &bigquery.ValuesSaver{
Schema: bigquery.Schema{
timeStampFieldSchema(),
newStringFieldSchema("name"),
newJSONFieldSchema("tags"),
newJSONFieldSchema("fields"),
},
Row: []bigquery.Value{
m.Time(),
m.Name(),
string(tags),
string(fields),
},
}, nil
}

func timeStampFieldSchema() *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: timeStampFieldName,
Type: bigquery.TimestampFieldType,
}
}

func newStringFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.StringFieldType,
}
}

func newJSONFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.JSONFieldType,
}
}

func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, t := range m.TagList() {
s = append(s, tagFieldSchema(t))
s = append(s, newStringFieldSchema(t.Key))
r = append(r, t.Value)
}

return s, r
}

func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: t.Key,
Type: bigquery.StringFieldType,
}
}

func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, f := range m.FieldList() {
s = append(s, valuesSchema(f))
Expand Down
86 changes: 78 additions & 8 deletions plugins/outputs/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,42 @@ func TestConnect(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()

b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
tests := []struct {
name string
compactTable string
errorString string
}{
{name: "normal"},
{
name: "compact table existing",
compactTable: "test-metrics",
},
{
name: "compact table not existing",
compactTable: "foobar",
errorString: "compact table: googleapi: got HTTP response code 404",
},
}

require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: tt.compactTable,
}

require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))

if tt.errorString != "" {
require.ErrorContains(t, b.Connect(), tt.errorString)
} else {
require.NoError(t, b.Connect())
}
})
}
}

func TestWrite(t *testing.T) {
Expand Down Expand Up @@ -148,6 +175,42 @@ func TestWrite(t *testing.T) {
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
}

func TestWriteCompact(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()

b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: "test-metrics",
}

mockMetrics := testutil.MockMetrics()

require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())

require.NoError(t, b.Write(mockMetrics))

var rows []map[string]json.RawMessage
require.NoError(t, json.Unmarshal(receivedBody["rows"], &rows))
require.Len(t, rows, 1)
require.Contains(t, rows[0], "json")

var row interface{}
require.NoError(t, json.Unmarshal(rows[0]["json"], &row))
require.Equal(t, map[string]interface{}{
"timestamp": "2009-11-10T23:00:00Z",
"name": "test1",
"tags": `{"tag1":"value1"}`,
"fields": `{"value":1}`,
}, row)

require.NoError(t, b.Close())
}

func (b *BigQuery) setUpTestClient(endpointURL string) error {
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
Expand All @@ -170,15 +233,22 @@ func localBigQueryServer(t *testing.T) *httptest.Server {

srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll":
case "/projects/test-project/datasets/test-dataset/tables/test1/insertAll",
"/projects/test-project/datasets/test-dataset/tables/test-metrics/insertAll":
decoder := json.NewDecoder(r.Body)
require.NoError(t, decoder.Decode(&receivedBody))

w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(successfulResponse))
require.NoError(t, err)
case "/projects/test-project/datasets/test-dataset/tables/test-metrics":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
default:
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte(r.URL.String()))
require.NoError(t, err)
}
})

Expand Down
3 changes: 3 additions & 0 deletions plugins/outputs/bigquery/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@

## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"

## Write all metrics in a single compact table
# compact_table = ""
Loading