Skip to content

Commit

Permalink
feat: add Flush()
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Aug 25, 2022
1 parent 830a29a commit a9c1e37
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).

### Breaking change
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()`.
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()` and `Flush()`.

## 2.9.2 [2022-07-29]
### Bug fixes
Expand Down
14 changes: 14 additions & 0 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
// and the result of the operation is returned.
// When a point is written to the buffer, nil error is always returned.
// Flush() can be used to trigger sending of batch when it doesn't have the batch-size.
//
// Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.

Expand All @@ -42,6 +43,8 @@ type WriteAPIBlocking interface {
// EnableBatching turns on implicit batching
// Batch size is controlled via write.Options
EnableBatching()
// Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size.
Flush(ctx context.Context) error
}

// writeAPIBlocking implements WriteAPIBlocking interface
Expand Down Expand Up @@ -108,3 +111,14 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
}
return w.write(ctx, line)
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.batching && len(w.batch) > 0 {
body := strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
}
return nil
}
11 changes: 11 additions & 0 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,15 @@ func TestWriteBatchIng(t *testing.T) {
service.Close()
}
}

for i := 0; i < 4; i++ {
err := writeAPI.WriteRecord(context.Background(), lines[i])
require.Nil(t, err)
}
assert.Equal(t, 0, service.Requests())
require.Len(t, service.Lines(), 0)
err := writeAPI.Flush(context.Background())
require.Nil(t, err)
assert.Equal(t, 1, service.Requests())
require.Len(t, service.Lines(), 4)
}

0 comments on commit a9c1e37

Please sign in to comment.