Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bbolt as internal/db/kvs #2177

Merged
merged 23 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ replace (
github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0
github.com/zeebo/assert => github.com/zeebo/assert v1.3.1
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7
go.opencensus.io => go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
Expand Down Expand Up @@ -373,6 +374,7 @@ require (
github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/zeebo/assert => github.com/zeebo/assert latest
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest
go.opencensus.io => go.opencensus.io latest
go.etcd.io/bbolt => go.etcd.io/bbolt latest
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1
Expand Down
115 changes: 115 additions & 0 deletions internal/db/kvs/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package bbolt

import (
"fmt"
"os"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync/errgroup"
bolt "go.etcd.io/bbolt"
)

type Bbolt struct {
db *bolt.DB
file string
bucket string
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
}

const default_bucket = "vald-bbolt-bucket"
ykadowak marked this conversation as resolved.
Show resolved Hide resolved

// New returns a new Bbolt instance.
// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket.
// If opts is nil, it uses default options.
func New(file string, bucket string, opts *bolt.Options) (*Bbolt, error) {
db, err := bolt.Open(file, 0600, opts)
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

Check warning on line 27 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L26-L27

Added lines #L26 - L27 were not covered by tests

if bucket == "" {
bucket = default_bucket
}
db.Update(func(tx *bolt.Tx) error {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
_, err := tx.CreateBucket([]byte(bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
})
return &Bbolt{
db: db,
file: file,
bucket: bucket,
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

func (b *Bbolt) Set(key string, val []byte) error {
if err := b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
err := b.Put([]byte(key), val)
return err
}); err != nil {
return err
}

Check warning on line 53 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L52-L53

Added lines #L52 - L53 were not covered by tests

return nil
}

func (b *Bbolt) Get(key string) ([]byte, bool, error) {
var val []byte
if err := b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
ret := b.Get([]byte(key))
if ret == nil {
// key not found. just return without copying anything to val
return nil
}

// key found. copy the value to val because ret is only valid in this scope
val = make([]byte, len(ret))
copy(val, ret)
return nil
}); err != nil {
return nil, false, err
}

Check warning on line 74 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L72-L74

Added lines #L72 - L74 were not covered by tests

if val == nil {
return nil, false, nil
}

return val, true, nil
}

// AsyncSet sets the key and value asynchronously for better write performance.
// It accumulates the keys and values until the batch size is reached or the timeout comes, then
// writes them all at once. Wait for the errgroup to make sure all the batches finished if required.
func (b *Bbolt) AsyncSet(eg *errgroup.Group, key string, val []byte) error {
if eg == nil {
return errors.ErrNilErrGroup
}

Check warning on line 89 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L88-L89

Added lines #L88 - L89 were not covered by tests
(*eg).Go(func() error {
b.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
err := b.Put([]byte(key), val)
return err
})
return nil
})

return nil
}

// Close closes the database and removes the file if remove is true.
func (b *Bbolt) Close(remove bool) (err error) {
if cerr := b.db.Close(); cerr != nil {
err = cerr
}

Check warning on line 106 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L105-L106

Added lines #L105 - L106 were not covered by tests

if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Wrap(rerr, err.Error())
}

Check warning on line 111 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}

return err
Copy link
Collaborator

@hlts2 hlts2 Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use errors.Join than errors.Wrap because there are multiple errors? 🤔

Suggested change
if cerr := b.db.Close(); cerr != nil {
err = cerr
}
if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Wrap(rerr, err.Error())
}
}
return err
if !remove {
return b.db.Close()
}
return errors.Join(b.db.Close(), os.RemoveAll(b.file))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use errors.Join. But let me keep the coding style like this because I believe it's easier to understand the logic.

if cerr := b.db.Close(); cerr != nil {
err = cerr
}
if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Join(err, rerr)
}
}
return err

}
87 changes: 87 additions & 0 deletions internal/db/kvs/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package bbolt_test

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/vdaas/vald/internal/db/kvs/bbolt"
"github.com/vdaas/vald/internal/sync/errgroup"
)

func TestGetSetClose(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

err = b.Set("key", []byte("value"))
require.NoError(t, err)

val, ok, err := b.Get("key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, []byte("value"), val)

val, ok, err = b.Get("no exist key")
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, val)

err = b.Close(false)
require.NoError(t, err)

b, err = bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

// recover from the file
val, ok, err = b.Get("key")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, []byte("value"), val)

err = b.Close(true)
require.NoError(t, err)

// now the file is deleted
_, err = os.Stat(tmpfile)
require.True(t, os.IsNotExist(err))
}

func TestAsyncSet(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

kv := map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "val3",
"key4": "val4",
"key5": "val5",
}

eg, _ := errgroup.New(context.Background())
for k, v := range kv {
b.AsyncSet(&eg, k, []byte(v))
}

// wait until all set is done
eg.Wait()

for k := range kv {
_, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
}

err = b.Close(true)
require.NoError(t, err)
}
3 changes: 3 additions & 0 deletions internal/errors/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package errors

var ErrNilErrGroup = New("the input errgroup is nil")
Loading