Skip to content

Commit

Permalink
Add bbolt kvs
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Sep 11, 2023
1 parent eca6108 commit ee840ea
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ replace (
github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0
github.com/zeebo/assert => github.com/zeebo/assert v1.3.1
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7
go.opencensus.io => go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
Expand Down Expand Up @@ -373,6 +374,7 @@ require (
github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/zeebo/assert => github.com/zeebo/assert latest
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest
go.opencensus.io => go.opencensus.io latest
go.etcd.io/bbolt => go.etcd.io/bbolt latest
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1
Expand Down
115 changes: 115 additions & 0 deletions internal/db/kvs/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package bbolt

import (
"fmt"
"os"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync/errgroup"
bolt "go.etcd.io/bbolt"
)

type Bbolt struct {
db *bolt.DB
file string
bucket string
}

const default_bucket = "vald-bbolt-bucket"

// New returns a new Bbolt instance.
// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket.
// If opts is nil, it uses default options.
func New(file string, bucket string, opts *bolt.Options) (*Bbolt, error) {
db, err := bolt.Open(file, 0600, opts)
if err != nil {
return nil, err
}

Check warning on line 27 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L26-L27

Added lines #L26 - L27 were not covered by tests

if bucket == "" {
bucket = default_bucket
}
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucket([]byte(bucket))
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
})
return &Bbolt{
db: db,
file: file,
bucket: bucket,
}, nil
}

func (b *Bbolt) Set(key string, val []byte) error {
if err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
err := b.Put([]byte(key), val)
return err
}); err != nil {
return err
}

Check warning on line 53 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L52-L53

Added lines #L52 - L53 were not covered by tests

return nil
}

func (b *Bbolt) Get(key string) ([]byte, bool, error) {
var val []byte
if err := b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ret := b.Get([]byte(key))
if ret == nil {
// key not found. just return without copying anything to val
return nil
}

// key found. copy the value to val because ret is only valid in this scope
val = make([]byte, len(ret))
copy(val, ret)
return nil
}); err != nil {
return nil, false, err
}

Check warning on line 74 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L72-L74

Added lines #L72 - L74 were not covered by tests

if val == nil {
return nil, false, nil
}

return val, true, nil
}

// AsyncSet sets the key and value asynchronously for better write performance.
// It accumulates the keys and values until the batch size is reached or the timeout comes, then
// writes them all at once. Wait for the errgroup to make sure all the batches finished if required.
func (b *Bbolt) AsyncSet(eg *errgroup.Group, key string, val []byte) error {
if eg == nil {
return errors.ErrNilErrGroup
}

Check warning on line 89 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L88-L89

Added lines #L88 - L89 were not covered by tests
(*eg).Go(func() error {
b.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
err := b.Put([]byte(key), val)
return err
})
return nil
})

return nil
}

// Close closes the database and removes the file if remove is true.
func (b *Bbolt) Close(remove bool) (err error) {
if cerr := b.db.Close(); cerr != nil {
err = cerr
}

Check warning on line 106 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L105-L106

Added lines #L105 - L106 were not covered by tests

if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Wrap(rerr, err.Error())
}

Check warning on line 111 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}

return err
}
87 changes: 87 additions & 0 deletions internal/db/kvs/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package bbolt_test

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/vdaas/vald/internal/db/kvs/bbolt"
"github.com/vdaas/vald/internal/sync/errgroup"
)

func TestGetSetClose(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

err = b.Set("key", []byte("value"))
require.NoError(t, err)

val, ok, err := b.Get("key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, []byte("value"), val)

val, ok, err = b.Get("no exist key")
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, val)

err = b.Close(false)
require.NoError(t, err)

b, err = bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

// recover from the file
val, ok, err = b.Get("key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, []byte("value"), val)

err = b.Close(true)
require.NoError(t, err)

// now the file is deleted
_, err = os.Stat(tmpfile)
require.True(t, os.IsNotExist(err))
}

func TestAsyncSet(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

kv := map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "val3",
"key4": "val4",
"key5": "val5",
}

eg, _ := errgroup.New(context.Background())
for k, v := range kv {
b.AsyncSet(&eg, k, []byte(v))
}

// wait until all set is done
eg.Wait()

for k := range kv {
_, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
}

err = b.Close(true)
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions internal/errors/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package errors

var ErrNilErrGroup = New("the input errgroup is nil")

0 comments on commit ee840ea

Please sign in to comment.