Skip to content

Commit

Permalink
Merge branch 'main' into er/fix/processor/sample_types
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard authored Sep 18, 2023
2 parents 5634904 + cd25ada commit 9374049
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 273 deletions.
27 changes: 27 additions & 0 deletions .chloggen/mysqlreceicer-checkapi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mysqlreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not export the function `Query`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26304]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 0 additions & 1 deletion cmd/checkapi/allowlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ receiver/journaldreceiver
receiver/kafkareceiver
receiver/mongodbatlasreceiver
receiver/mongodbreceiver
receiver/mysqlreceiver
receiver/podmanreceiver
receiver/pulsarreceiver
receiver/windowseventlogreceiver
45 changes: 45 additions & 0 deletions exporter/influxdbexporter/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package influxdbexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
Expand All @@ -15,6 +17,8 @@ import (
"github.com/influxdata/line-protocol/v2/lineprotocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
)

func Test_influxHTTPWriterBatch_optimizeTags(t *testing.T) {
Expand Down Expand Up @@ -141,3 +145,44 @@ func Test_influxHTTPWriterBatch_maxPayload(t *testing.T) {
})
}
}

func Test_influxHTTPWriterBatch_EnqueuePoint_emptyTagValue(t *testing.T) {
var recordedRequest *http.Request
var recordedRequestBody []byte
noopHTTPServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if assert.Nil(t, recordedRequest) {
recordedRequest = r
recordedRequestBody, _ = io.ReadAll(r.Body)
}
}))
t.Cleanup(noopHTTPServer.Close)

nowTime := time.Unix(1000, 2000)

influxWriter, err := newInfluxHTTPWriter(
new(common.NoopLogger),
&Config{
HTTPClientSettings: confighttp.HTTPClientSettings{
Endpoint: noopHTTPServer.URL,
},
},
component.TelemetrySettings{})
require.NoError(t, err)
influxWriter.httpClient = noopHTTPServer.Client()
influxWriterBatch := influxWriter.NewBatch()

err = influxWriterBatch.EnqueuePoint(
context.Background(),
"m",
map[string]string{"k": "v", "empty": ""},
map[string]interface{}{"f": int64(1)},
nowTime,
common.InfluxMetricValueTypeUntyped)
require.NoError(t, err)
err = influxWriterBatch.WriteBatch(context.Background())
require.NoError(t, err)

if assert.NotNil(t, recordedRequest) {
assert.Equal(t, "m,k=v f=1i 1000000002000", strings.TrimSpace(string(recordedRequestBody)))
}
}
23 changes: 11 additions & 12 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/featuregate"
"go.uber.org/zap"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
Expand Down Expand Up @@ -101,12 +102,18 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
return nil, err
}

// Ensure that splitter is buildable
factory := splitter.NewSplitFuncFactory(c.SplitConfig, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod)
if _, err := factory.SplitFunc(); err != nil {
splitFunc, err := c.SplitConfig.Func(enc, false, int(c.MaxLogSize))
if err != nil {
return nil, err
}

trimFunc := trim.Nop
if enc != encoding.Nop {
trimFunc = c.TrimConfig.Func()
}

// Ensure that splitter is buildable
factory := splitter.NewFactory(splitFunc, trimFunc, c.FlushPeriod)
return c.buildManager(logger, emit, factory)
}

Expand All @@ -116,16 +123,8 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback
return nil, err
}

if splitFunc == nil {
return nil, fmt.Errorf("must provide split function")
}

// Ensure that splitter is buildable
factory := splitter.NewCustomFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod)
if _, err := factory.SplitFunc(); err != nil {
return nil, err
}

factory := splitter.NewFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod)
return c.buildManager(logger, emit, factory)
}

Expand Down
33 changes: 0 additions & 33 deletions pkg/stanza/fileconsumer/internal/splitter/custom.go

This file was deleted.

27 changes: 26 additions & 1 deletion pkg/stanza/fileconsumer/internal/splitter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,33 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bufio"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type Factory interface {
SplitFunc() (bufio.SplitFunc, error)
SplitFunc() bufio.SplitFunc
}

type factory struct {
splitFunc bufio.SplitFunc
trimFunc trim.Func
flushPeriod time.Duration
}

var _ Factory = (*factory)(nil)

func NewFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory {
return &factory{
splitFunc: splitFunc,
trimFunc: trimFunc,
flushPeriod: flushPeriod,
}
}

// SplitFunc builds a bufio.SplitFunc based on the configuration
func (f *factory) SplitFunc() bufio.SplitFunc {
return trim.WithFunc(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.trimFunc)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestCustom(t *testing.T) {
factory := NewCustomFactory(bufio.ScanLines, trim.Nop, 0)
splitFunc, err := factory.SplitFunc()
assert.NoError(t, err)
func TestFactory(t *testing.T) {
factory := NewFactory(bufio.ScanLines, trim.Nop, 0)
splitFunc := factory.SplitFunc()
assert.NotNil(t, splitFunc)

input := []byte(" hello \n world \n extra ")
Expand All @@ -38,9 +37,8 @@ func TestCustom(t *testing.T) {
}

func TestCustomWithTrim(t *testing.T) {
factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, 0)
splitFunc, err := factory.SplitFunc()
assert.NoError(t, err)
factory := NewFactory(bufio.ScanLines, trim.Whitespace, 0)
splitFunc := factory.SplitFunc()
assert.NotNil(t, splitFunc)

input := []byte(" hello \n world \n extra ")
Expand All @@ -63,9 +61,8 @@ func TestCustomWithTrim(t *testing.T) {

func TestCustomWithFlush(t *testing.T) {
flushPeriod := 100 * time.Millisecond
factory := NewCustomFactory(bufio.ScanLines, trim.Nop, flushPeriod)
splitFunc, err := factory.SplitFunc()
assert.NoError(t, err)
factory := NewFactory(bufio.ScanLines, trim.Nop, flushPeriod)
splitFunc := factory.SplitFunc()
assert.NotNil(t, splitFunc)

input := []byte(" hello \n world \n extra ")
Expand Down Expand Up @@ -95,9 +92,8 @@ func TestCustomWithFlush(t *testing.T) {

func TestCustomWithFlushTrim(t *testing.T) {
flushPeriod := 100 * time.Millisecond
factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, flushPeriod)
splitFunc, err := factory.SplitFunc()
assert.NoError(t, err)
factory := NewFactory(bufio.ScanLines, trim.Whitespace, flushPeriod)
splitFunc := factory.SplitFunc()
assert.NotNil(t, splitFunc)

input := []byte(" hello \n world \n extra ")
Expand Down
55 changes: 0 additions & 55 deletions pkg/stanza/fileconsumer/internal/splitter/multiline.go

This file was deleted.

Loading

0 comments on commit 9374049

Please sign in to comment.