From ee840eaf41d32a1a3aa8ab345664232651cf8f1e Mon Sep 17 00:00:00 2001 From: ykadowak Date: Mon, 11 Sep 2023 09:02:35 +0000 Subject: [PATCH] Add bbolt kvs --- go.mod | 2 + go.sum | 2 + hack/go.mod.default | 1 + internal/db/kvs/bbolt/bbolt.go | 115 ++++++++++++++++++++++++++++ internal/db/kvs/bbolt/bbolt_test.go | 87 +++++++++++++++++++++ internal/errors/bbolt.go | 3 + 6 files changed, 210 insertions(+) create mode 100644 internal/db/kvs/bbolt/bbolt.go create mode 100644 internal/db/kvs/bbolt/bbolt_test.go create mode 100644 internal/errors/bbolt.go diff --git a/go.mod b/go.mod index d759a7989d..f4de80b3fa 100755 --- a/go.mod +++ b/go.mod @@ -270,6 +270,7 @@ replace ( github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0 github.com/zeebo/assert => github.com/zeebo/assert v1.3.1 github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2 + go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7 go.opencensus.io => go.opencensus.io v0.24.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1 @@ -373,6 +374,7 @@ require ( github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.4 github.com/zeebo/xxh3 v1.0.2 + go.etcd.io/bbolt v1.3.6 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0 go.opentelemetry.io/otel v1.11.2 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000 diff --git a/go.sum b/go.sum index 04e935e515..f8054b5c99 100644 --- a/go.sum +++ b/go.sum @@ -572,6 +572,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A= github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= +go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI= diff --git a/hack/go.mod.default b/hack/go.mod.default index ee8282afa5..3fd1767d19 100755 --- a/hack/go.mod.default +++ b/hack/go.mod.default @@ -271,6 +271,7 @@ replace ( github.com/zeebo/assert => github.com/zeebo/assert latest github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest go.opencensus.io => go.opencensus.io latest + go.etcd.io/bbolt => go.etcd.io/bbolt latest go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1 go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 diff --git a/internal/db/kvs/bbolt/bbolt.go b/internal/db/kvs/bbolt/bbolt.go new file mode 100644 index 0000000000..88d38aa17a --- /dev/null +++ b/internal/db/kvs/bbolt/bbolt.go @@ -0,0 +1,115 @@ +package bbolt + +import ( + "fmt" + "os" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/sync/errgroup" + bolt "go.etcd.io/bbolt" +) + +type Bbolt struct { + db *bolt.DB + file string + bucket string +} + +const default_bucket = "vald-bbolt-bucket" + +// New returns a new Bbolt instance. +// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket. +// If opts is nil, it uses default options. +func New(file string, bucket string, opts *bolt.Options) (*Bbolt, error) { + db, err := bolt.Open(file, 0600, opts) + if err != nil { + return nil, err + } + + if bucket == "" { + bucket = default_bucket + } + db.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucket([]byte(bucket)) + if err != nil { + return fmt.Errorf("failed to create bucket: %w", err) + } + return nil + }) + return &Bbolt{ + db: db, + file: file, + bucket: bucket, + }, nil +} + +func (b *Bbolt) Set(key string, val []byte) error { + if err := b.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(b.bucket)) + err := b.Put([]byte(key), val) + return err + }); err != nil { + return err + } + + return nil +} + +func (b *Bbolt) Get(key string) ([]byte, bool, error) { + var val []byte + if err := b.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(b.bucket)) + ret := b.Get([]byte(key)) + if ret == nil { + // key not found. just return without copying anything to val + return nil + } + + // key found. copy the value to val because ret is only valid in this scope + val = make([]byte, len(ret)) + copy(val, ret) + return nil + }); err != nil { + return nil, false, err + } + + if val == nil { + return nil, false, nil + } + + return val, true, nil +} + +// AsyncSet sets the key and value asynchronously for better write performance. +// It accumulates the keys and values until the batch size is reached or the timeout comes, then +// writes them all at once. Wait for the errgroup to make sure all the batches finished if required. +func (b *Bbolt) AsyncSet(eg *errgroup.Group, key string, val []byte) error { + if eg == nil { + return errors.ErrNilErrGroup + } + (*eg).Go(func() error { + b.db.Batch(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(b.bucket)) + err := b.Put([]byte(key), val) + return err + }) + return nil + }) + + return nil +} + +// Close closes the database and removes the file if remove is true. +func (b *Bbolt) Close(remove bool) (err error) { + if cerr := b.db.Close(); cerr != nil { + err = cerr + } + + if remove { + if rerr := os.RemoveAll(b.file); rerr != nil { + err = errors.Wrap(rerr, err.Error()) + } + } + + return err +} diff --git a/internal/db/kvs/bbolt/bbolt_test.go b/internal/db/kvs/bbolt/bbolt_test.go new file mode 100644 index 0000000000..1e6664b112 --- /dev/null +++ b/internal/db/kvs/bbolt/bbolt_test.go @@ -0,0 +1,87 @@ +package bbolt_test + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + "github.com/vdaas/vald/internal/db/kvs/bbolt" + "github.com/vdaas/vald/internal/sync/errgroup" +) + +func TestGetSetClose(t *testing.T) { + t.Parallel() + + tempdir := t.TempDir() + tmpfile := filepath.Join(tempdir, "test.db") + b, err := bbolt.New(tmpfile, "", nil) + require.NoError(t, err) + + err = b.Set("key", []byte("value")) + require.NoError(t, err) + + val, ok, err := b.Get("key") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, []byte("value"), val) + + val, ok, err = b.Get("no exist key") + require.NoError(t, err) + require.False(t, ok) + require.Nil(t, val) + + err = b.Close(false) + require.NoError(t, err) + + b, err = bbolt.New(tmpfile, "", nil) + require.NoError(t, err) + + // recover from the file + val, ok, err = b.Get("key") + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, []byte("value"), val) + + err = b.Close(true) + require.NoError(t, err) + + // now the file is deleted + _, err = os.Stat(tmpfile) + require.True(t, os.IsNotExist(err)) +} + +func TestAsyncSet(t *testing.T) { + t.Parallel() + + tempdir := t.TempDir() + tmpfile := filepath.Join(tempdir, "test.db") + b, err := bbolt.New(tmpfile, "", nil) + require.NoError(t, err) + + kv := map[string]string{ + "key1": "val1", + "key2": "val2", + "key3": "val3", + "key4": "val4", + "key5": "val5", + } + + eg, _ := errgroup.New(context.Background()) + for k, v := range kv { + b.AsyncSet(&eg, k, []byte(v)) + } + + // wait until all set is done + eg.Wait() + + for k := range kv { + _, ok, err := b.Get(k) + require.NoError(t, err) + require.True(t, ok) + } + + err = b.Close(true) + require.NoError(t, err) +} diff --git a/internal/errors/bbolt.go b/internal/errors/bbolt.go new file mode 100644 index 0000000000..a68504d92f --- /dev/null +++ b/internal/errors/bbolt.go @@ -0,0 +1,3 @@ +package errors + +var ErrNilErrGroup = New("the input errgroup is nil")