Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(outputs.bigquery): Allow to add metrics in one compact table #14342

Merged
34 changes: 34 additions & 0 deletions plugins/outputs/bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.

## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"

## Write all metrics in a single compact table
# compact_table = false
# compact_table_name = ""
```

Leaving `project` empty indicates the plugin will try to retrieve the project
Expand All @@ -54,6 +58,36 @@ table on BigQuery:
* Should contain the metric's fields with the same name and the column type
should match the field type.

## Compact table

When enabling the compact table, all metrics are inserted to the given table
with the following schema:

```json
[
{
"mode": "REQUIRED",
"name": "timestamp",
"type": "TIMESTAMP"
},
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "tags",
"type": "JSON"
},
{
"mode": "REQUIRED",
"name": "fields",
"type": "JSON"
}
]
```

## Restrictions

Avoid hyphens on BigQuery tables, underlying SDK cannot handle streaming inserts
Expand Down
101 changes: 87 additions & 14 deletions plugins/outputs/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers/json"
)

//go:embed sample.conf
Expand All @@ -32,12 +33,15 @@ type BigQuery struct {
Project string `toml:"project"`
Dataset string `toml:"dataset"`

Timeout config.Duration `toml:"timeout"`
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
Timeout config.Duration `toml:"timeout"`
ReplaceHyphenTo string `toml:"replace_hyphen_to"`
CompactTable bool `toml:"compact_table"`
CompactTableName string `toml:"compact_table_name"`

Log telegraf.Logger `toml:"-"`

client *bigquery.Client
client *bigquery.Client
serializer json.Serializer

warnedOnHyphens map[string]bool
}
Expand All @@ -55,16 +59,31 @@ func (s *BigQuery) Init() error {
return errors.New(`"dataset" is required`)
}

if s.CompactTable && s.CompactTableName == "" {
return errors.New(`"compact_table_name" is required`)
}

s.warnedOnHyphens = make(map[string]bool)

return nil
return s.serializer.Init()
}

func (s *BigQuery) Connect() error {
if s.client == nil {
return s.setUpDefaultClient()
if err := s.setUpDefaultClient(); err != nil {
return err
}
}

ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()

// Check if the compact table exists
_, err := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTableName).Metadata(ctx)
if s.CompactTable && err != nil {
return fmt.Errorf("compact table: %w", err)
}
Hipska marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

Expand All @@ -81,7 +100,7 @@ func (s *BigQuery) setUpDefaultClient() error {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return fmt.Errorf(
"unable to find Google Cloud Platform Application Default Credentials: %v. "+
"unable to find Google Cloud Platform Application Default Credentials: %w. "+
"Either set ADC or provide CredentialsFile config", err)
}
credentialsOption = option.WithCredentials(creds)
Expand All @@ -94,6 +113,10 @@ func (s *BigQuery) setUpDefaultClient() error {

// Write the metrics to Google Cloud BigQuery.
func (s *BigQuery) Write(metrics []telegraf.Metric) error {
if s.CompactTable {
return s.writeCompact(metrics)
}

groupedMetrics := s.groupByMetricName(metrics)

var wg sync.WaitGroup
Expand All @@ -111,6 +134,20 @@ func (s *BigQuery) Write(metrics []telegraf.Metric) error {
return nil
}

func (s *BigQuery) writeCompact(metrics []telegraf.Metric) error {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Duration(s.Timeout))
defer cancel()

inserter := s.client.DatasetInProject(s.Project, s.Dataset).Table(s.CompactTableName).Inserter()
Hipska marked this conversation as resolved.
Show resolved Hide resolved

compactValues := make([]*bigquery.ValuesSaver, len(metrics))
for i, m := range metrics {
compactValues[i] = s.newCompactValuesSaver(m)
}
return inserter.Put(ctx, compactValues)
}

func (s *BigQuery) groupByMetricName(metrics []telegraf.Metric) map[string][]bigquery.ValueSaver {
groupedMetrics := make(map[string][]bigquery.ValueSaver)

Expand Down Expand Up @@ -138,29 +175,65 @@ func newValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
}
}

func (s *BigQuery) newCompactValuesSaver(m telegraf.Metric) *bigquery.ValuesSaver {
s.serializer.Transformation = "tags"
tags, err := s.serializer.Serialize(m)
if err != nil {
s.Log.Warnf("serializing tags: %v", err)
}

s.serializer.Transformation = "fields"
fields, err := s.serializer.Serialize(m)
if err != nil {
s.Log.Warnf("serializing fields: %v", err)
}
Hipska marked this conversation as resolved.
Show resolved Hide resolved

return &bigquery.ValuesSaver{
Schema: bigquery.Schema{
timeStampFieldSchema(),
newStringFieldSchema("name"),
newJSONFieldSchema("tags"),
newJSONFieldSchema("fields"),
},
Row: []bigquery.Value{
m.Time(),
m.Name(),
string(tags),
string(fields),
},
}
}

func timeStampFieldSchema() *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: timeStampFieldName,
Type: bigquery.TimestampFieldType,
}
}

func newStringFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.StringFieldType,
}
}

func newJSONFieldSchema(name string) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: name,
Type: bigquery.JSONFieldType,
}
}

func tagsSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, t := range m.TagList() {
s = append(s, tagFieldSchema(t))
s = append(s, newStringFieldSchema(t.Key))
r = append(r, t.Value)
}

return s, r
}

func tagFieldSchema(t *telegraf.Tag) *bigquery.FieldSchema {
return &bigquery.FieldSchema{
Name: t.Key,
Type: bigquery.StringFieldType,
}
}

func valuesSchemaAndValues(m telegraf.Metric, s bigquery.Schema, r []bigquery.Value) ([]*bigquery.FieldSchema, []bigquery.Value) {
for _, f := range m.FieldList() {
s = append(s, valuesSchema(f))
Expand Down
57 changes: 57 additions & 0 deletions plugins/outputs/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func TestInit(t *testing.T) {
errorString: `"dataset" is required`,
plugin: &BigQuery{},
},
{
name: "compact table is not set",
plugin: &BigQuery{
Dataset: "test-dataset",
CompactTable: true,
},
errorString: `"compact_table_name" is required`,
},
{
name: "valid config",
plugin: &BigQuery{
Expand Down Expand Up @@ -148,6 +156,42 @@ func TestWrite(t *testing.T) {
require.Equal(t, mockMetrics[0].Fields()["value"], row.Value)
}

func TestWriteCompact(t *testing.T) {
srv := localBigQueryServer(t)
defer srv.Close()

b := &BigQuery{
Project: "test-project",
Dataset: "test-dataset",
Timeout: defaultTimeout,
CompactTable: true,
CompactTableName: "test-metrics",
}

mockMetrics := testutil.MockMetrics()

require.NoError(t, b.Init())
require.NoError(t, b.setUpTestClient(srv.URL))
require.NoError(t, b.Connect())

require.NoError(t, b.Write(mockMetrics))

var rows []map[string]json.RawMessage
require.NoError(t, json.Unmarshal(receivedBody["rows"], &rows))

var row interface{}
require.NoError(t, json.Unmarshal(rows[0]["json"], &row))

require.Equal(t, map[string]interface{}{
"timestamp": "2009-11-10T23:00:00Z",
"name": "test1",
"tags": "{\"tag1\":\"value1\"}\n",
"fields": "{\"value\":1}\n",
}, row)

require.NoError(t, b.Close())
}

func (b *BigQuery) setUpTestClient(endpointURL string) error {
noAuth := option.WithoutAuthentication()
endpoint := option.WithEndpoint(endpointURL)
Expand All @@ -174,11 +218,24 @@ func localBigQueryServer(t *testing.T) *httptest.Server {
decoder := json.NewDecoder(r.Body)
require.NoError(t, decoder.Decode(&receivedBody))

w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(successfulResponse))
require.NoError(t, err)
case "/projects/test-project/datasets/test-dataset/tables/test-metrics":
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte("{}"))
require.NoError(t, err)
case "/projects/test-project/datasets/test-dataset/tables/test-metrics/insertAll":
decoder := json.NewDecoder(r.Body)
require.NoError(t, decoder.Decode(&receivedBody))

w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(successfulResponse))
require.NoError(t, err)
default:
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte(r.URL.String()))
require.NoError(t, err)
}
})

Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/bigquery/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@

## Character to replace hyphens on Metric name
# replace_hyphen_to = "_"

## Write all metrics in a single compact table
# compact_table = false
# compact_table_name = ""
Hipska marked this conversation as resolved.
Show resolved Hide resolved
Loading