Skip to content

Commit

Permalink
feat(cmd/influx/write): allow to limit write rate
Browse files Browse the repository at this point in the history
  • Loading branch information
sranka committed Sep 30, 2020
1 parent bde1209 commit 2a27128
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
1. [19508](https://github.com/influxdata/influxdb/pull/19508): Add subset of InfluxQL coordinator options as flags
1. [19457](https://github.com/influxdata/influxdb/pull/19457): Add ability to export resources by name via the CLI
1. [19640](https://github.com/influxdata/influxdb/pull/19640): Turn on Community Templates
1. [19660](https://github.com/influxdata/influxdb/pull/19660): Add --rate-limit option to `influx write`.

### Bug Fixes

Expand Down
10 changes: 10 additions & 0 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"strings"

"github.com/fujiwara/shapeio"
platform "github.com/influxdata/influxdb/v2"
ihttp "github.com/influxdata/influxdb/v2/http"
"github.com/influxdata/influxdb/v2/kit/signals"
Expand Down Expand Up @@ -40,6 +41,7 @@ type writeFlagsType struct {
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
RateLimit float64
}

var writeFlags writeFlagsType
Expand Down Expand Up @@ -89,6 +91,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmd.PersistentFlags().Float64Var(&writeFlags.RateLimit, "rate-limit", 0.0, "How many megabytes per minute the write will allow. Defaults to zero, which disables throttling.")

cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
Expand Down Expand Up @@ -240,6 +243,13 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
// throttle reader if requested
if writeFlags.RateLimit > 0.0 {
throttledReader := shapeio.NewReaderWithContext(r, ctx)
throttledReader.SetRateLimit(writeFlags.RateLimit * 1024 * 1024 / 60) // convert from MB/minute to bytes/sec
r = throttledReader
}

return r, csv2lp.MultiCloser(closers...), nil
}

Expand Down
12 changes: 11 additions & 1 deletion cmd/influx/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ func Test_writeFlags_createLineReader(t *testing.T) {
lines: strings.Split(fileContents, "\n"),
lpData: true,
},
{
name: "read data from CSV file + transform to line protocol + throttle read to 1MB/min",
flags: writeFlagsType{
Files: []string{csvFile1},
RateLimit: 1.0,
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
},
},
}

for _, test := range tests {
Expand All @@ -244,7 +254,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
defer closer.Close()
require.Nil(t, err)
require.NotNil(t, reader)
if !test.lpData {
if !test.lpData && test.flags.RateLimit == 0.0 {
csvToLineReader, ok := reader.(*csv2lp.CsvToLineReader)
require.True(t, ok)
require.Equal(t, csvToLineReader.LineNumber, test.firstLineCorrection)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/fatih/color v1.9.0
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745
github.com/getkin/kin-openapi v0.2.0
github.com/ghodss/yaml v1.0.0
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745 h1:+tPNWeI7Uk5JKgSj4IYytZc0mdch+e3Yf8g1sGOg4hQ=
github.com/fujiwara/shapeio v0.0.0-20170602072123-c073257dd745/go.mod h1:/WpqsrSkjgwEG2Es2qnZXbXwHDVbawpdlXJIjJMmnZs=
github.com/getkin/kin-openapi v0.2.0 h1:PbHHtYZpjKwZtGlIyELgA2DploRrsaXztoNNx9HjwNY=
github.com/getkin/kin-openapi v0.2.0/go.mod h1:V1z9xl9oF5Wt7v32ne4FmiF1alpS4dM6mNzoywPOXlk=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down

0 comments on commit 2a27128

Please sign in to comment.