Skip to content

Commit

Permalink
feat: add implicit batching to writeAPIBlocking
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Aug 25, 2022
1 parent 5b9008c commit 830a29a
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 37 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
## [unreleased]
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Added support for implicit batching to `WriteAPIBlocking`. It's off by default, enabled by `EnableBatching()`.

### Bug fixes
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).

### Breaking change
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()`.

## 2.9.2 [2022-07-29]
### Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func main() {
```
### Blocking write client
Blocking write client writes given point(s) synchronously. It doesn't have implicit batching. Batch is created from given set of points.
Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points.
Implicit batching can be enabled with `WriteAPIBlocking.EnableBatching()`.
```go
package main
Expand Down
84 changes: 49 additions & 35 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,87 @@ package api
import (
"context"
"strings"
"sync"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
)

// WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.
// It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
// It doesn't implicitly create batches of points by default. Batches are created from array of points/records.
//
// WriteAPIBlocking can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
//
// To add implicit batching, use a wrapper, such as:
// type writer struct {
// batch []*write.Point
// writeAPI api.WriteAPIBlocking
// batchSize int
// }
//
// func (w *writer) CurrentBatch() []*write.Point {
// return w.batch
// }
// Implicit batching is enabled with EnableBatching(). In this mode, each call to WritePoint or WriteRecord adds a line
// to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
// and the result of the operation is returned.
// When a point is written to the buffer, nil error is always returned.
//
// func newWriter(writeAPI api.WriteAPIBlocking, batchSize int) *writer {
// return &writer{
// batch: make([]*write.Point, 0, batchSize),
// writeAPI: writeAPI,
// batchSize: batchSize,
// }
// }
// Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.

//
// func (w *writer) write(ctx context.Context, p *write.Point) error {
// w.batch = append(w.batch, p)
// if len(w.batch) == w.batchSize {
// err := w.writeAPI.WritePoint(ctx, w.batch...)
// if err != nil {
// return err
// }
// w.batch = w.batch[:0]
// }
// return nil
// }
// WriteAPIBlocking can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
type WriteAPIBlocking interface {
// WriteRecord writes line protocol record(s) into bucket.
// WriteRecord writes without implicit batching. Batch is created from given number of records.
// WriteRecord writes lines without implicit batching by default, batch is created from given number of records.
// Automatic batching can be enabled by EnableBatching()
// Individual arguments can also be batches (multiple records separated by newline).
// Non-blocking alternative is available in the WriteAPI interface
WriteRecord(ctx context.Context, line ...string) error
// WritePoint data point into bucket.
// WritePoint writes without implicit batching. Batch is created from given number of points
// WriteRecord writes points without implicit batching by default, batch is created from given number of points.
// Automatic batching can be enabled by EnableBatching().
// Non-blocking alternative is available in the WriteAPI interface
WritePoint(ctx context.Context, point ...*write.Point) error
// EnableBatching turns on implicit batching
// Batch size is controlled via write.Options
EnableBatching()
}

// writeAPIBlocking implements WriteAPIBlocking interface
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
batching bool
batch []string
mu sync.Mutex
}

// NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
}

// NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled
func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
api := &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
api.EnableBatching()
return api
}

func (w *writeAPIBlocking) EnableBatching() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.batching {
w.batching = true
w.batch = make([]string, 0, w.writeOptions.BatchSize())
}
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
w.mu.Lock()
defer w.mu.Unlock()
body := line
if w.batching {
w.batch = append(w.batch, line)
if len(w.batch) == int(w.writeOptions.BatchSize()) {
body = strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
} else {
return nil
}
}
err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand Down
29 changes: 28 additions & 1 deletion api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,20 @@ func TestWriteRecord(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := test.GenRecords(10)
for _, line := range lines {
err := writeAPI.WriteRecord(context.Background(), line)
require.Nil(t, err)
}
require.Len(t, service.Lines(), 10)
require.Equal(t, 10, service.Requests())
for i, l := range lines {
assert.Equal(t, l, service.Lines()[i])
}
service.Close()

err := writeAPI.WriteRecord(context.Background(), lines...)
require.Nil(t, err)
require.Len(t, service.Lines(), 10)
require.Equal(t, 1, service.Requests())
for i, l := range lines {
assert.Equal(t, l, service.Lines()[i])
}
Expand Down Expand Up @@ -120,3 +131,19 @@ func TestWriteErrors(t *testing.T) {
require.Equal(t, 10, errors)

}

func TestWriteBatchIng(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlockingWithBatching("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := test.GenRecords(10)
for i, line := range lines {
err := writeAPI.WriteRecord(context.Background(), line)
require.Nil(t, err)
if i == 4 || i == 9 {
assert.Equal(t, 1, service.Requests())
require.Len(t, service.Lines(), 5)

service.Close()
}
}
}
8 changes: 8 additions & 0 deletions internal/test/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HTTPService struct {
requestHandler func(url string, body io.Reader) error
replyError *http2.Error
lock sync.Mutex
requests int
}

// WasGzip returns true of request was in GZip format
Expand Down Expand Up @@ -67,6 +68,11 @@ func (t *HTTPService) HTTPClient() *http.Client {
return nil
}

// Requests returns number of requests
func (t *HTTPService) Requests() int {
return t.requests
}

// Close clears instance
func (t *HTTPService) Close() {
t.lock.Lock()
Expand All @@ -76,6 +82,7 @@ func (t *HTTPService) Close() {
t.wasGzip = false
t.replyError = nil
t.requestHandler = nil
t.requests = 0
t.lock.Unlock()
}

Expand Down Expand Up @@ -116,6 +123,7 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request
// DoPostRequest reads http request, validates URL and stores data in the request
func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error {
req, err := http.NewRequest("POST", url, nil)
t.requests++
if err != nil {
return http2.NewError(err)
}
Expand Down

0 comments on commit 830a29a

Please sign in to comment.