diff --git a/.chloggen/mysqlreceicer-checkapi.yaml b/.chloggen/mysqlreceicer-checkapi.yaml new file mode 100644 index 000000000000..cbe565ccc073 --- /dev/null +++ b/.chloggen/mysqlreceicer-checkapi.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: mysqlreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Do not export the function `Query` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26304] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] \ No newline at end of file diff --git a/cmd/checkapi/allowlist.txt b/cmd/checkapi/allowlist.txt index 516f9cbc08e2..0f815ddd8428 100644 --- a/cmd/checkapi/allowlist.txt +++ b/cmd/checkapi/allowlist.txt @@ -27,7 +27,6 @@ receiver/journaldreceiver receiver/kafkareceiver receiver/mongodbatlasreceiver receiver/mongodbreceiver -receiver/mysqlreceiver receiver/podmanreceiver receiver/pulsarreceiver receiver/windowseventlogreceiver diff --git a/exporter/influxdbexporter/writer_test.go b/exporter/influxdbexporter/writer_test.go index f59145422891..ff64a462d5a3 100644 --- a/exporter/influxdbexporter/writer_test.go +++ b/exporter/influxdbexporter/writer_test.go @@ -5,8 +5,10 @@ package influxdbexporter import ( "context" + "io" "net/http" "net/http/httptest" + "strings" "sync" "testing" "time" @@ -15,6 +17,8 @@ import ( "github.com/influxdata/line-protocol/v2/lineprotocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/confighttp" ) func Test_influxHTTPWriterBatch_optimizeTags(t *testing.T) { @@ -141,3 +145,44 @@ func Test_influxHTTPWriterBatch_maxPayload(t *testing.T) { }) } } + +func Test_influxHTTPWriterBatch_EnqueuePoint_emptyTagValue(t *testing.T) { + var recordedRequest *http.Request + var recordedRequestBody []byte + noopHTTPServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if assert.Nil(t, recordedRequest) { + recordedRequest = r + recordedRequestBody, _ = io.ReadAll(r.Body) + } + })) + t.Cleanup(noopHTTPServer.Close) + + nowTime := time.Unix(1000, 2000) + + influxWriter, err := newInfluxHTTPWriter( + new(common.NoopLogger), + &Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: noopHTTPServer.URL, + }, + }, + component.TelemetrySettings{}) + require.NoError(t, err) + influxWriter.httpClient = noopHTTPServer.Client() + influxWriterBatch := influxWriter.NewBatch() + + err = influxWriterBatch.EnqueuePoint( + context.Background(), + "m", + map[string]string{"k": "v", "empty": ""}, + map[string]interface{}{"f": int64(1)}, + nowTime, + common.InfluxMetricValueTypeUntyped) + require.NoError(t, err) + err = influxWriterBatch.WriteBatch(context.Background()) + require.NoError(t, err) + + if assert.NotNil(t, recordedRequest) { + assert.Equal(t, "m,k=v f=1i 1000000002000", strings.TrimSpace(string(recordedRequestBody))) + } +} diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index b2ced7b76666..bf9c054ab4f6 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/featuregate" "go.uber.org/zap" + "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" @@ -101,12 +102,18 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, return nil, err } - // Ensure that splitter is buildable - factory := splitter.NewSplitFuncFactory(c.SplitConfig, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod) - if _, err := factory.SplitFunc(); err != nil { + splitFunc, err := c.SplitConfig.Func(enc, false, int(c.MaxLogSize)) + if err != nil { return nil, err } + trimFunc := trim.Nop + if enc != encoding.Nop { + trimFunc = c.TrimConfig.Func() + } + + // Ensure that splitter is buildable + factory := splitter.NewFactory(splitFunc, trimFunc, c.FlushPeriod) return c.buildManager(logger, emit, factory) } @@ -116,16 +123,8 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback return nil, err } - if splitFunc == nil { - return nil, fmt.Errorf("must provide split function") - } - // Ensure that splitter is buildable - factory := splitter.NewCustomFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod) - if _, err := factory.SplitFunc(); err != nil { - return nil, err - } - + factory := splitter.NewFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod) return c.buildManager(logger, emit, factory) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go deleted file mode 100644 index 6cb8afced95f..000000000000 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package splitter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" - -import ( - "bufio" - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" -) - -type customFactory struct { - splitFunc bufio.SplitFunc - trimFunc trim.Func - flushPeriod time.Duration -} - -var _ Factory = (*customFactory)(nil) - -func NewCustomFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory { - return &customFactory{ - splitFunc: splitFunc, - trimFunc: trimFunc, - flushPeriod: flushPeriod, - } -} - -// SplitFunc builds a bufio.SplitFunc based on the configuration -func (f *customFactory) SplitFunc() (bufio.SplitFunc, error) { - return trim.WithFunc(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.trimFunc), nil -} diff --git a/pkg/stanza/fileconsumer/internal/splitter/factory.go b/pkg/stanza/fileconsumer/internal/splitter/factory.go index ec1e9e79335a..cf1a76fac385 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/factory.go +++ b/pkg/stanza/fileconsumer/internal/splitter/factory.go @@ -5,8 +5,33 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bufio" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type Factory interface { - SplitFunc() (bufio.SplitFunc, error) + SplitFunc() bufio.SplitFunc +} + +type factory struct { + splitFunc bufio.SplitFunc + trimFunc trim.Func + flushPeriod time.Duration +} + +var _ Factory = (*factory)(nil) + +func NewFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory { + return &factory{ + splitFunc: splitFunc, + trimFunc: trimFunc, + flushPeriod: flushPeriod, + } +} + +// SplitFunc builds a bufio.SplitFunc based on the configuration +func (f *factory) SplitFunc() bufio.SplitFunc { + return trim.WithFunc(flush.WithPeriod(f.splitFunc, f.flushPeriod), f.trimFunc) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/factory_test.go similarity index 83% rename from pkg/stanza/fileconsumer/internal/splitter/custom_test.go rename to pkg/stanza/fileconsumer/internal/splitter/factory_test.go index 1b65b08d0097..91f994b1f49d 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/factory_test.go @@ -13,10 +13,9 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -func TestCustom(t *testing.T) { - factory := NewCustomFactory(bufio.ScanLines, trim.Nop, 0) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) +func TestFactory(t *testing.T) { + factory := NewFactory(bufio.ScanLines, trim.Nop, 0) + splitFunc := factory.SplitFunc() assert.NotNil(t, splitFunc) input := []byte(" hello \n world \n extra ") @@ -38,9 +37,8 @@ func TestCustom(t *testing.T) { } func TestCustomWithTrim(t *testing.T) { - factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, 0) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) + factory := NewFactory(bufio.ScanLines, trim.Whitespace, 0) + splitFunc := factory.SplitFunc() assert.NotNil(t, splitFunc) input := []byte(" hello \n world \n extra ") @@ -63,9 +61,8 @@ func TestCustomWithTrim(t *testing.T) { func TestCustomWithFlush(t *testing.T) { flushPeriod := 100 * time.Millisecond - factory := NewCustomFactory(bufio.ScanLines, trim.Nop, flushPeriod) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) + factory := NewFactory(bufio.ScanLines, trim.Nop, flushPeriod) + splitFunc := factory.SplitFunc() assert.NotNil(t, splitFunc) input := []byte(" hello \n world \n extra ") @@ -95,9 +92,8 @@ func TestCustomWithFlush(t *testing.T) { func TestCustomWithFlushTrim(t *testing.T) { flushPeriod := 100 * time.Millisecond - factory := NewCustomFactory(bufio.ScanLines, trim.Whitespace, flushPeriod) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) + factory := NewFactory(bufio.ScanLines, trim.Whitespace, flushPeriod) + splitFunc := factory.SplitFunc() assert.NotNil(t, splitFunc) input := []byte(" hello \n world \n extra ") diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go deleted file mode 100644 index 917bf2aeddb7..000000000000 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package splitter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" - -import ( - "bufio" - "time" - - "golang.org/x/text/encoding" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" -) - -type splitFuncFactory struct { - splitConfig split.Config - encoding encoding.Encoding - maxLogSize int - trimFunc trim.Func - flushPeriod time.Duration -} - -var _ Factory = (*splitFuncFactory)(nil) - -func NewSplitFuncFactory( - splitConfig split.Config, - encoding encoding.Encoding, - maxLogSize int, - trimFunc trim.Func, - flushPeriod time.Duration, -) Factory { - return &splitFuncFactory{ - splitConfig: splitConfig, - encoding: encoding, - maxLogSize: maxLogSize, - trimFunc: trimFunc, - flushPeriod: flushPeriod, - } -} - -// SplitFunc builds a bufio.SplitFunc based on the configuration -func (f *splitFuncFactory) SplitFunc() (bufio.SplitFunc, error) { - splitFunc, err := f.splitConfig.Func(f.encoding, false, f.maxLogSize) - if err != nil { - return nil, err - } - splitFunc = flush.WithPeriod(splitFunc, f.flushPeriod) - if f.encoding == encoding.Nop { - // Special case where we should never trim - return splitFunc, nil - } - return trim.WithFunc(splitFunc, f.trimFunc), nil -} diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go deleted file mode 100644 index 89b773ad802d..000000000000 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package splitter - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "golang.org/x/text/encoding/unicode" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" -) - -func TestSplitFuncError(t *testing.T) { - sCfg := split.Config{ - LineStartPattern: "START", - LineEndPattern: "END", - } - factory := NewSplitFuncFactory(sCfg, unicode.UTF8, 1024, trim.Nop, 0) - splitFunc, err := factory.SplitFunc() - assert.Error(t, err) - assert.Nil(t, splitFunc) -} - -func TestSplitFunc(t *testing.T) { - factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Nop, 0) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) - assert.NotNil(t, splitFunc) - - input := []byte(" hello \n world \n extra ") - - advance, token, err := splitFunc(input, false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte(" hello "), token) - - advance, token, err = splitFunc(input[8:], false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte(" world "), token) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Nil(t, token) -} - -func TestSplitFuncWithTrim(t *testing.T) { - factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, 0) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) - assert.NotNil(t, splitFunc) - - input := []byte(" hello \n world \n extra ") - - advance, token, err := splitFunc(input, false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte("hello"), token) - - advance, token, err = splitFunc(input[8:], false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte("world"), token) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Nil(t, token) -} - -func TestSplitFuncWithFlush(t *testing.T) { - flushPeriod := 100 * time.Millisecond - factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Nop, flushPeriod) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) - assert.NotNil(t, splitFunc) - - input := []byte(" hello \n world \n extra ") - - advance, token, err := splitFunc(input, false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte(" hello "), token) - - advance, token, err = splitFunc(input[8:], false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte(" world "), token) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Nil(t, token) - - time.Sleep(2 * flushPeriod) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 7, advance) - assert.Equal(t, []byte(" extra "), token) -} - -func TestSplitFuncWithFlushTrim(t *testing.T) { - flushPeriod := 100 * time.Millisecond - factory := NewSplitFuncFactory(split.Config{}, unicode.UTF8, 1024, trim.Whitespace, flushPeriod) - splitFunc, err := factory.SplitFunc() - assert.NoError(t, err) - assert.NotNil(t, splitFunc) - - input := []byte(" hello \n world \n extra ") - - advance, token, err := splitFunc(input, false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte("hello"), token) - - advance, token, err = splitFunc(input[8:], false) - assert.NoError(t, err) - assert.Equal(t, 8, advance) - assert.Equal(t, []byte("world"), token) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 0, advance) - assert.Nil(t, token) - - time.Sleep(2 * flushPeriod) - - advance, token, err = splitFunc(input[16:], false) - assert.NoError(t, err) - assert.Equal(t, 7, advance) - assert.Equal(t, []byte("extra"), token) // Ensure trim applies to flushed token -} diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/reader_factory.go index e01144efdef1..6318af74291d 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/reader_factory.go @@ -29,25 +29,17 @@ type readerFactory struct { } func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { - lineSplitFunc, err := f.splitterFactory.SplitFunc() - if err != nil { - return nil, err - } return f.build(file, &readerMetadata{ Fingerprint: fp, FileAttributes: map[string]any{}, - }, lineSplitFunc) + }, f.splitterFactory.SplitFunc()) } // copy creates a deep copy of a reader func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) { - var err error lineSplitFunc := old.lineSplitFunc if lineSplitFunc == nil { - lineSplitFunc, err = f.splitterFactory.SplitFunc() - if err != nil { - return nil, err - } + lineSplitFunc = f.splitterFactory.SplitFunc() } return f.build(newFile, &readerMetadata{ Fingerprint: old.Fingerprint.Copy(), diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index b8f3d0bc89e1..1e7fc82e81cf 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -227,6 +227,10 @@ func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPer emitChan := make(chan *emitParams, 100) enc, err := decode.LookupEncoding(defaultEncoding) require.NoError(t, err) + + splitFunc, err := sCfg.Func(enc, false, maxLogSize) + require.NoError(t, err) + return &readerFactory{ SugaredLogger: testutil.Logger(t), readerConfig: &readerConfig{ @@ -235,7 +239,7 @@ func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPer emit: testEmitFunc(emitChan), }, fromBeginning: true, - splitterFactory: splitter.NewSplitFuncFactory(sCfg, enc, maxLogSize, trim.Whitespace, flushPeriod), + splitterFactory: splitter.NewFactory(splitFunc, trim.Whitespace, flushPeriod), encoding: enc, }, emitChan } diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index 20e48e3fe3da..a3cdf02646c8 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -38,11 +38,11 @@ func (c Config) Func() Func { return Whitespace } -var Nop Func = func(token []byte) []byte { +func Nop(token []byte) []byte { return token } -var Leading Func = func(data []byte) []byte { +func Leading(data []byte) []byte { token := bytes.TrimLeft(data, "\r\n\t ") if token == nil { // TrimLeft sometimes overwrites something with nothing. @@ -52,10 +52,10 @@ var Leading Func = func(data []byte) []byte { return token } -var Trailing Func = func(data []byte) []byte { +func Trailing(data []byte) []byte { return bytes.TrimRight(data, "\r\n\t ") } -var Whitespace Func = func(data []byte) []byte { +func Whitespace(data []byte) []byte { return Leading(Trailing(data)) } diff --git a/receiver/mysqlreceiver/client.go b/receiver/mysqlreceiver/client.go index 53b72f44601a..4acd14bfa8d6 100644 --- a/receiver/mysqlreceiver/client.go +++ b/receiver/mysqlreceiver/client.go @@ -205,14 +205,14 @@ func (c *mySQLClient) getVersion() (string, error) { // getGlobalStats queries the db for global status metrics. func (c *mySQLClient) getGlobalStats() (map[string]string, error) { - query := "SHOW GLOBAL STATUS;" - return Query(*c, query) + q := "SHOW GLOBAL STATUS;" + return query(*c, q) } // getInnodbStats queries the db for innodb metrics. func (c *mySQLClient) getInnodbStats() (map[string]string, error) { - query := "SELECT name, count FROM information_schema.innodb_metrics WHERE name LIKE '%buffer_pool_size%';" - return Query(*c, query) + q := "SELECT name, count FROM information_schema.innodb_metrics WHERE name LIKE '%buffer_pool_size%';" + return query(*c, q) } // getTableIoWaitsStats queries the db for table_io_waits metrics. @@ -505,7 +505,7 @@ func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) { return stats, nil } -func Query(c mySQLClient, query string) (map[string]string, error) { +func query(c mySQLClient, query string) (map[string]string, error) { rows, err := c.client.Query(query) if err != nil { return nil, err