From a9c1e378757965a5e5bc81b7b5a3f09cbbdeec97 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Thu, 25 Aug 2022 14:29:03 +0200 Subject: [PATCH] feat: add Flush() --- CHANGELOG.md | 2 +- api/writeAPIBlocking.go | 14 ++++++++++++++ api/writeAPIBlocking_test.go | 11 +++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0abe9231..2ad78518 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ - [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error). ### Breaking change -- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()`. +- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()` and `Flush()`. ## 2.9.2 [2022-07-29] ### Bug fixes diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index 033cdbf9..d348aa8f 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -21,6 +21,7 @@ import ( // to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server // and the result of the operation is returned. // When a point is written to the buffer, nil error is always returned. +// Flush() can be used to trigger sending of batch when it doesn't have the batch-size. // // Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches. @@ -42,6 +43,8 @@ type WriteAPIBlocking interface { // EnableBatching turns on implicit batching // Batch size is controlled via write.Options EnableBatching() + // Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size. + Flush(ctx context.Context) error } // writeAPIBlocking implements WriteAPIBlocking interface @@ -108,3 +111,14 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point } return w.write(ctx, line) } + +func (w *writeAPIBlocking) Flush(ctx context.Context) error { + w.mu.Lock() + defer w.mu.Unlock() + if w.batching && len(w.batch) > 0 { + body := strings.Join(w.batch, "\n") + w.batch = w.batch[:0] + return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + } + return nil +} diff --git a/api/writeAPIBlocking_test.go b/api/writeAPIBlocking_test.go index 910a2506..c237241e 100644 --- a/api/writeAPIBlocking_test.go +++ b/api/writeAPIBlocking_test.go @@ -146,4 +146,15 @@ func TestWriteBatchIng(t *testing.T) { service.Close() } } + + for i := 0; i < 4; i++ { + err := writeAPI.WriteRecord(context.Background(), lines[i]) + require.Nil(t, err) + } + assert.Equal(t, 0, service.Requests()) + require.Len(t, service.Lines(), 0) + err := writeAPI.Flush(context.Background()) + require.Nil(t, err) + assert.Equal(t, 1, service.Requests()) + require.Len(t, service.Lines(), 4) }