diff --git a/plugins/outputs/kinesis/README.md b/plugins/outputs/kinesis/README.md index 809bb77906d1c..12b6178fd9197 100644 --- a/plugins/outputs/kinesis/README.md +++ b/plugins/outputs/kinesis/README.md @@ -71,7 +71,7 @@ All metrics will be mapped to the same shard which may limit throughput. #### tag This will take the value of the specified tag from each metric as the paritionKey. -If the tag is not found an empty string will be used. +If the tag is not found the `default` value will be used or `telegraf` if unspecified #### measurement diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 402f95156e0a7..93fc87a669987 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -36,8 +36,9 @@ type ( } Partition struct { - Method string `toml:"method"` - Key string `toml:"key"` + Method string `toml:"method"` + Key string `toml:"key"` + Default string `toml:"default"` } ) @@ -90,10 +91,11 @@ var sampleConfig = ` # method = "measurement" # ## Use the value of a tag for all writes, if the tag is not set the empty - ## string will be used: + ## default option will be used. When no default, defaults to "telegraf" # [outputs.kinesis.partition] # method = "tag" # key = "host" + # default = "mykey" ## Data format to output. @@ -187,10 +189,13 @@ func (k *KinesisOutput) getPartitionKey(metric telegraf.Metric) string { case "measurement": return metric.Name() case "tag": - if metric.HasTag(k.Partition.Key) { - return metric.Tags()[k.Partition.Key] + if t, ok := metric.GetTag(k.Partition.Key); ok { + return t + } else if len(k.Partition.Default) > 0 { + return k.Partition.Default } - log.Printf("E! kinesis : You have configured a Partition using tag %+v which does not exist.", k.Partition.Key) + // Default partition name if default is not set + return "telegraf" default: log.Printf("E! kinesis : You have configured a Partition method of %+v which is not supported", k.Partition.Method) } diff --git a/plugins/outputs/kinesis/kinesis_test.go b/plugins/outputs/kinesis/kinesis_test.go index 3c6321abdc132..627a459dbd582 100644 --- a/plugins/outputs/kinesis/kinesis_test.go +++ b/plugins/outputs/kinesis/kinesis_test.go @@ -29,13 +29,22 @@ func TestPartitionKey(t *testing.T) { } assert.Equal(testPoint.Tags()["tag1"], k.getPartitionKey(testPoint), "PartitionKey should be value of 'tag1'") + k = KinesisOutput{ + Partition: &Partition{ + Method: "tag", + Key: "doesnotexist", + Default: "somedefault", + }, + } + assert.Equal("somedefault", k.getPartitionKey(testPoint), "PartitionKey should use default") + k = KinesisOutput{ Partition: &Partition{ Method: "tag", Key: "doesnotexist", }, } - assert.Equal("", k.getPartitionKey(testPoint), "PartitionKey should be value of ''") + assert.Equal("telegraf", k.getPartitionKey(testPoint), "PartitionKey should be telegraf") k = KinesisOutput{ Partition: &Partition{