Skip to content

Commit

Permalink
Merge pull request #15 from grafana/moreConfigFixes
Browse files Browse the repository at this point in the history
More config fixes
  • Loading branch information
mstoykov authored Sep 27, 2021
2 parents 914f21b + 6bcc0ff commit bff0f1e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 43 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/influxdata/influxdb1-client v0.0.0-20190402204710-8ff2fc3824fc
github.com/kelseyhightower/envconfig v1.4.0
github.com/kubernetes/helm v2.9.0+incompatible
github.com/mitchellh/mapstructure v1.1.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v1.0.3
Expand Down
74 changes: 44 additions & 30 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ package kafka
import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/kelseyhightower/envconfig"
"github.com/kubernetes/helm/pkg/strvals"
"github.com/mitchellh/mapstructure"
"gopkg.in/guregu/null.v3"

"go.k6.io/k6/lib/types"
Expand All @@ -54,24 +54,6 @@ type Config struct {
InfluxDBConfig influxdbConfig `json:"influxdb"`
}

// config is a duplicate of ConfigFields as we can not mapstructure.Decode into
// null types so we duplicate the struct with primitive types to Decode into
type config struct {
Brokers []string `json:"brokers" mapstructure:"brokers" envconfig:"K6_KAFKA_BROKERS"`
Topic string `json:"topic" mapstructure:"topic" envconfig:"K6_KAFKA_TOPIC"`
Format string `json:"format" mapstructure:"format" envconfig:"K6_KAFKA_FORMAT"`
PushInterval string `json:"pushInterval" mapstructure:"pushInterval" envconfig:"K6_KAFKA_PUSH_INTERVAL"`
User string `json:"user" mapstructure:"user" envconfig:"K6_KAFKA_SASL_USER"`
Password string `json:"password" mapstructure:"password" envconfig:"K6_KAFKA_SASL_PASSWORD"`
AuthMechanism string `json:"authMechanism" mapstructure:"authMechanism" envconfig:"K6_KAFKA_AUTH_MECHANISM"`

InfluxDBConfig influxdbConfig `json:"influxdb" mapstructure:"influxdb"`
Version string `json:"version" mapstructure:"version"`
SSL bool `json:"ssl" mapstructure:"ssl"`
Insecure bool `json:"insecureSkipTLSVerify" mapstructure:"insecure"`
LogError bool `json:"logError" mapstructure:"logError"`
}

// NewConfig creates a new Config instance with default values for some fields.
func NewConfig() Config {
return Config{
Expand Down Expand Up @@ -135,10 +117,6 @@ func ParseArg(arg string) (Config, error) {
return c, err
}

if v, ok := params["brokers"].(string); ok {
params["brokers"] = []string{v}
}

if v, ok := params["influxdb"].(map[string]interface{}); ok {
influxConfig, err := influxdbParseMap(v)
if err != nil {
Expand All @@ -154,49 +132,85 @@ func ParseArg(arg string) (Config, error) {
if err != nil {
return c, err
}
delete(params, "pushInterval")
}

if v, ok := params["version"].(string); ok {
c.Version = null.StringFrom(v)
delete(params, "version")
}

if v, ok := params["ssl"].(bool); ok {
c.SSL = null.BoolFrom(v)
delete(params, "ssl")
}

if v, ok := params["insecureSkipTLSVerify"].(bool); ok {
c.InsecureSkipTLSVerify = null.BoolFrom(v)
delete(params, "insecureSkipTLSVerify")
}

if v, ok := params["logError"].(bool); ok {
c.LogError = null.BoolFrom(v)
delete(params, "logError")
}

if v, ok := params["authMechanism"].(string); ok {
c.AuthMechanism = null.StringFrom(v)
delete(params, "authMechanism")
}

if v, ok := params["user"].(string); ok {
c.User = null.StringFrom(v)
delete(params, "user")
}

if v, ok := params["password"].(string); ok {
c.Password = null.StringFrom(v)
delete(params, "password")
}
if v, ok := params["topic"].(string); ok {
c.Topic = null.StringFrom(v)
delete(params, "topic")
}
if v, ok := params["format"].(string); ok {
c.Format = null.StringFrom(v)

var cfg config
err = mapstructure.Decode(params, &cfg)
if err != nil {
return c, err
delete(params, "format")
}

c.Brokers = cfg.Brokers
c.Topic = null.StringFrom(cfg.Topic)
c.Format = null.StringFrom(cfg.Format)
if v, ok := params["brokers"].(string); ok {
c.Brokers = []string{v}

delete(params, "brokers")
}
if v, ok := params["brokers"].([]interface{}); ok {
c.Brokers = interfaceSliceToStringSlice(v)
delete(params, "brokers")
}

if len(params) > 0 {
return c, errors.New("Unknown or unparsed options '" + mapToString(params) + "'")
}
return c, nil
}

func mapToString(m map[string]interface{}) string {
var s string
for k, v := range m {
s += fmt.Sprintf("%s=%v,", k, v)
}
return s[:len(s)-1]
}

func interfaceSliceToStringSlice(input []interface{}) []string {
output := make([]string, len(input))
for i, v := range input {
output[i] = fmt.Sprintf("%v", v)
}
return output
}

// GetConsolidatedConfig combines {default config values + JSON config +
// environment vars + arg config values}, and returns the final result.
func GetConsolidatedConfig(jsonRawConf json.RawMessage, env map[string]string, arg string) (Config, error) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ func TestConfigParseArg(t *testing.T) {
assert.Equal(t, null.StringFrom("SASL_PLAINTEXT"), c.AuthMechanism)
assert.Equal(t, null.StringFrom("johndoe"), c.User)
assert.Equal(t, null.BoolFrom(false), c.LogError)

c, err = ParseArg("badOption=212")
assert.Error(t, err)
assert.Contains(t, err.Error(), `Unknown or unparsed options 'badOption=212'`)

c, err = ParseArg("ssl=nottrue")
assert.Error(t, err)
assert.Contains(t, err.Error(), `Unknown or unparsed options 'ssl=nottrue'`)

c, err = ParseArg("brokers={broker2,broker3:9092},topic=someTopic,format=influxdb,influxdb.tagsAsFields=fake,influxdb.something=else")
assert.Error(t, err)
assert.Contains(t, err.Error(), `Unknown or unparsed options 'something=else'`)
}

func TestConsolidatedConfig(t *testing.T) {
Expand Down Expand Up @@ -256,6 +268,25 @@ func TestConsolidatedConfig(t *testing.T) {
LogError: null.BoolFrom(false),
},
},
"arg_over_env_with_brokers": {
env: map[string]string{
"K6_KAFKA_AUTH_MECHANISM": "none",
"K6_KAFKA_LOG_ERROR": "false",
"K6_KAFKA_BROKERS": "something",
},
arg: "logError=true",
config: Config{
Format: null.StringFrom("json"),
PushInterval: types.NullDurationFrom(1 * time.Second),
InfluxDBConfig: newInfluxdbConfig(),
AuthMechanism: null.StringFrom("none"),
Version: null.StringFrom(sarama.DefaultVersion.String()),
SSL: null.BoolFrom(false),
InsecureSkipTLSVerify: null.BoolFrom(false),
LogError: null.BoolFrom(true),
Brokers: []string{"something"},
},
},
}

for name, testCase := range testCases {
Expand Down
22 changes: 10 additions & 12 deletions pkg/kafka/format_influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
package kafka

import (
"errors"
"fmt"
"strconv"
"strings"

client "github.com/influxdata/influxdb1-client/v2"
"github.com/mitchellh/mapstructure"
"github.com/sirupsen/logrus"
"go.k6.io/k6/lib/types"
"go.k6.io/k6/stats"
)

Expand Down Expand Up @@ -172,18 +171,17 @@ func (c influxdbConfig) Apply(cfg influxdbConfig) influxdbConfig {
func influxdbParseMap(m map[string]interface{}) (influxdbConfig, error) {
c := influxdbConfig{}
if v, ok := m["tagsAsFields"].(string); ok {
m["tagsAsFields"] = []string{v}
c.TagsAsFields = []string{v}
delete(m, "tagsAsFields")
}
dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: types.NullDecoder,
Result: &c,
})
if err != nil {
return c, err
if v, ok := m["tagsAsFields"].([]interface{}); ok {
c.TagsAsFields = interfaceSliceToStringSlice(v)
delete(m, "tagsAsFields")
}

err = dec.Decode(m)
return c, err
if len(m) > 0 {
return c, errors.New("Unknown or unparsed options '" + mapToString(m) + "'")
}
return c, nil
}

type influxdbConfig struct {
Expand Down

0 comments on commit bff0f1e

Please sign in to comment.