-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[chore][receiver/sqlquery] Move reusable logic to internal package (#…
…30709) **Description:** As noted in the related issues, a lot of the logic in the sqlquery receiver can be moved to a central package to be used by other receivers. I realize this appears to be a large change, but it's purely a re-organization PR, there's no functional changes. GitHub is not recognizing moved files, so please refer to new file names and their deleted counterparts. A lot of members are now public in the internal package that were private in the receiver which is likely what's causing GitHub to miss that they're just moved files. This doesn't require a changelog (in my mind) because all things moving were originally private to the receiver. They're now still purely internal, so they won't be published. I believe `internal` is the best destination for now as it allows us to solidify the usage and interface before moving it to `pkg` to be publicly available. There are still a lot of unknowns as far as how it can be used by other receivers, so I fully expect "breaking changes" to this. If we keep it internal we can update all usages with the changing interface, so the changes won't be breaking. **Link to tracking Issue:** #30297, #13546
- Loading branch information
Showing
33 changed files
with
1,122 additions
and
623 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery" | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/receiver/scraperhelper" | ||
"go.uber.org/multierr" | ||
) | ||
|
||
type Config struct { | ||
scraperhelper.ScraperControllerSettings `mapstructure:",squash"` | ||
Driver string `mapstructure:"driver"` | ||
DataSource string `mapstructure:"datasource"` | ||
Queries []Query `mapstructure:"queries"` | ||
StorageID *component.ID `mapstructure:"storage"` | ||
Telemetry TelemetryConfig `mapstructure:"telemetry"` | ||
} | ||
|
||
func (c Config) Validate() error { | ||
if c.Driver == "" { | ||
return errors.New("'driver' cannot be empty") | ||
} | ||
if c.DataSource == "" { | ||
return errors.New("'datasource' cannot be empty") | ||
} | ||
if len(c.Queries) == 0 { | ||
return errors.New("'queries' cannot be empty") | ||
} | ||
for _, query := range c.Queries { | ||
if err := query.Validate(); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
type Query struct { | ||
SQL string `mapstructure:"sql"` | ||
Metrics []MetricCfg `mapstructure:"metrics"` | ||
Logs []LogsCfg `mapstructure:"logs"` | ||
TrackingColumn string `mapstructure:"tracking_column"` | ||
TrackingStartValue string `mapstructure:"tracking_start_value"` | ||
} | ||
|
||
func (q Query) Validate() error { | ||
var errs error | ||
if q.SQL == "" { | ||
errs = multierr.Append(errs, errors.New("'query.sql' cannot be empty")) | ||
} | ||
if len(q.Logs) == 0 && len(q.Metrics) == 0 { | ||
errs = multierr.Append(errs, errors.New("at least one of 'query.logs' and 'query.metrics' must not be empty")) | ||
} | ||
for _, logs := range q.Logs { | ||
if err := logs.Validate(); err != nil { | ||
errs = multierr.Append(errs, err) | ||
} | ||
} | ||
for _, metric := range q.Metrics { | ||
if err := metric.Validate(); err != nil { | ||
errs = multierr.Append(errs, err) | ||
} | ||
} | ||
return errs | ||
} | ||
|
||
type LogsCfg struct { | ||
BodyColumn string `mapstructure:"body_column"` | ||
} | ||
|
||
func (config LogsCfg) Validate() error { | ||
var errs error | ||
if config.BodyColumn == "" { | ||
errs = multierr.Append(errs, errors.New("'body_column' must not be empty")) | ||
} | ||
return errs | ||
} | ||
|
||
type MetricCfg struct { | ||
MetricName string `mapstructure:"metric_name"` | ||
ValueColumn string `mapstructure:"value_column"` | ||
AttributeColumns []string `mapstructure:"attribute_columns"` | ||
Monotonic bool `mapstructure:"monotonic"` | ||
ValueType MetricValueType `mapstructure:"value_type"` | ||
DataType MetricType `mapstructure:"data_type"` | ||
Aggregation MetricAggregation `mapstructure:"aggregation"` | ||
Unit string `mapstructure:"unit"` | ||
Description string `mapstructure:"description"` | ||
StaticAttributes map[string]string `mapstructure:"static_attributes"` | ||
StartTsColumn string `mapstructure:"start_ts_column"` | ||
TsColumn string `mapstructure:"ts_column"` | ||
} | ||
|
||
func (c MetricCfg) Validate() error { | ||
var errs error | ||
if c.MetricName == "" { | ||
errs = multierr.Append(errs, errors.New("'metric_name' cannot be empty")) | ||
} | ||
if c.ValueColumn == "" { | ||
errs = multierr.Append(errs, errors.New("'value_column' cannot be empty")) | ||
} | ||
if err := c.ValueType.Validate(); err != nil { | ||
errs = multierr.Append(errs, err) | ||
} | ||
if err := c.DataType.Validate(); err != nil { | ||
errs = multierr.Append(errs, err) | ||
} | ||
if err := c.Aggregation.Validate(); err != nil { | ||
errs = multierr.Append(errs, err) | ||
} | ||
if c.DataType == MetricTypeGauge && c.Aggregation != "" { | ||
errs = multierr.Append(errs, fmt.Errorf("aggregation=%s but data_type=%s does not support aggregation", c.Aggregation, c.DataType)) | ||
} | ||
if errs != nil && c.MetricName != "" { | ||
errs = multierr.Append(fmt.Errorf("invalid metric config with metric_name '%s'", c.MetricName), errs) | ||
} | ||
return errs | ||
} | ||
|
||
type MetricType string | ||
|
||
const ( | ||
MetricTypeUnspecified MetricType = "" | ||
MetricTypeGauge MetricType = "gauge" | ||
MetricTypeSum MetricType = "sum" | ||
) | ||
|
||
func (t MetricType) Validate() error { | ||
switch t { | ||
case MetricTypeUnspecified, MetricTypeGauge, MetricTypeSum: | ||
return nil | ||
} | ||
return fmt.Errorf("metric config has unsupported data_type: '%s'", t) | ||
} | ||
|
||
type MetricValueType string | ||
|
||
const ( | ||
MetricValueTypeUnspecified MetricValueType = "" | ||
MetricValueTypeInt MetricValueType = "int" | ||
MetricValueTypeDouble MetricValueType = "double" | ||
) | ||
|
||
func (t MetricValueType) Validate() error { | ||
switch t { | ||
case MetricValueTypeUnspecified, MetricValueTypeInt, MetricValueTypeDouble: | ||
return nil | ||
} | ||
return fmt.Errorf("metric config has unsupported value_type: '%s'", t) | ||
} | ||
|
||
type MetricAggregation string | ||
|
||
const ( | ||
MetricAggregationUnspecified MetricAggregation = "" | ||
MetricAggregationCumulative MetricAggregation = "cumulative" | ||
MetricAggregationDelta MetricAggregation = "delta" | ||
) | ||
|
||
func (a MetricAggregation) Validate() error { | ||
switch a { | ||
case MetricAggregationUnspecified, MetricAggregationCumulative, MetricAggregationDelta: | ||
return nil | ||
} | ||
return fmt.Errorf("metric config has unsupported aggregation: '%s'", a) | ||
} | ||
|
||
type TelemetryConfig struct { | ||
Logs TelemetryLogsConfig `mapstructure:"logs"` | ||
} | ||
|
||
type TelemetryLogsConfig struct { | ||
Query bool `mapstructure:"query"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery" | ||
|
||
import ( | ||
"context" | ||
|
||
// register Db drivers | ||
_ "github.com/SAP/go-hdb/driver" | ||
_ "github.com/go-sql-driver/mysql" | ||
_ "github.com/lib/pq" | ||
_ "github.com/microsoft/go-mssqldb" | ||
_ "github.com/microsoft/go-mssqldb/integratedauth/krb5" | ||
_ "github.com/sijms/go-ora/v2" | ||
_ "github.com/snowflakedb/gosnowflake" | ||
"go.uber.org/multierr" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type StringMap map[string]string | ||
|
||
type DbClient interface { | ||
QueryRows(ctx context.Context, args ...any) ([]StringMap, error) | ||
} | ||
|
||
type DbSQLClient struct { | ||
Db Db | ||
Logger *zap.Logger | ||
Telemetry TelemetryConfig | ||
SQL string | ||
} | ||
|
||
func NewDbClient(db Db, sql string, logger *zap.Logger, telemetry TelemetryConfig) DbClient { | ||
return DbSQLClient{ | ||
Db: db, | ||
SQL: sql, | ||
Logger: logger, | ||
Telemetry: telemetry, | ||
} | ||
} | ||
|
||
func (cl DbSQLClient) QueryRows(ctx context.Context, args ...any) ([]StringMap, error) { | ||
cl.Logger.Debug("Running query", cl.prepareQueryFields(cl.SQL, args)...) | ||
sqlRows, err := cl.Db.QueryContext(ctx, cl.SQL, args...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
var out []StringMap | ||
colTypes, err := sqlRows.ColumnTypes() | ||
if err != nil { | ||
return nil, err | ||
} | ||
scanner := newRowScanner(colTypes) | ||
var warnings error | ||
for sqlRows.Next() { | ||
err = scanner.scan(sqlRows) | ||
if err != nil { | ||
return nil, err | ||
} | ||
sm, scanErr := scanner.toStringMap() | ||
if scanErr != nil { | ||
warnings = multierr.Append(warnings, scanErr) | ||
} | ||
out = append(out, sm) | ||
} | ||
return out, warnings | ||
} | ||
|
||
func (cl DbSQLClient) prepareQueryFields(sql string, args []any) []zap.Field { | ||
var logFields []zap.Field | ||
if cl.Telemetry.Logs.Query { | ||
logFields = append(logFields, zap.String("query", sql)) | ||
logFields = append(logFields, zap.Any("parameters", args)) | ||
} | ||
return logFields | ||
} | ||
|
||
// This is only used for testing, but need to be exposed to other packages. | ||
type FakeDBClient struct { | ||
RequestCounter int | ||
StringMaps [][]StringMap | ||
Err error | ||
} | ||
|
||
func (c *FakeDBClient) QueryRows(context.Context, ...any) ([]StringMap, error) { | ||
if c.Err != nil { | ||
return nil, c.Err | ||
} | ||
idx := c.RequestCounter | ||
c.RequestCounter++ | ||
return c.StringMaps[idx], nil | ||
} |
Oops, something went wrong.