Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bbolt as internal/db/kvs #2177

Merged
merged 23 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ replace (
github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0
github.com/zeebo/assert => github.com/zeebo/assert v1.3.1
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7
go.opencensus.io => go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
Expand Down Expand Up @@ -373,6 +374,7 @@ require (
github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/zeebo/assert => github.com/zeebo/assert latest
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest
go.opencensus.io => go.opencensus.io latest
go.etcd.io/bbolt => go.etcd.io/bbolt latest
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1
Expand Down
128 changes: 128 additions & 0 deletions internal/db/kvs/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt

import (
"fmt"
"os"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync/errgroup"
bolt "go.etcd.io/bbolt"
)

type Bbolt interface {
Set(key, val []byte) error
Get(key []byte) ([]byte, bool, error)
AsyncSet(eg errgroup.Group, key, val []byte) error
Close(remove bool) error
}

type bbolt struct {
db *bolt.DB
file string
bucket string
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
}

const defaultBucket = "vald-bbolt-bucket"
ykadowak marked this conversation as resolved.
Show resolved Hide resolved

// New returns a new Bbolt instance.
// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket.
// If opts is nil, it uses default options.
func New(file, bucket string, opts *bolt.Options) (Bbolt, error) {
db, err := bolt.Open(file, 0o600, opts)
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

Check warning on line 47 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L46-L47

Added lines #L46 - L47 were not covered by tests

if bucket == "" {
bucket = defaultBucket
}
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
db.Update(func(tx *bolt.Tx) error {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
_, err := tx.CreateBucket([]byte(bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
})
return &bbolt{
db: db,
file: file,
bucket: bucket,
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
}, nil
}

func (b *bbolt) Set(key, val []byte) error {
return b.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
err := b.Put(key, val)
return err
})
}

func (b *bbolt) Get(key []byte) ([]byte, bool, error) {
var val []byte
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
if err := b.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
ret := b.Get(key)
if ret == nil {
// key not found. just return without copying anything to val
return nil
}

// key found. copy the value to val because ret is only valid in this scope
val = make([]byte, len(ret))
copy(val, ret)
return nil
}); err != nil {
return nil, false, err
}

Check warning on line 90 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L88-L90

Added lines #L88 - L90 were not covered by tests

if val == nil {
return nil, false, nil
}

return val, true, nil
}

// AsyncSet sets the key and value asynchronously for better write performance.
// It accumulates the keys and values until the batch size is reached or the timeout comes, then
// writes them all at once. Wait for the errgroup to make sure all the batches finished if required.
func (b *bbolt) AsyncSet(eg errgroup.Group, key, val []byte) error {
eg.Go(func() error {
b.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(b.bucket))
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
err := b.Put(key, val)
return err
})
return nil
})

return nil
}

// Close closes the database and removes the file if remove is true.
func (b *bbolt) Close(remove bool) (err error) {
if cerr := b.db.Close(); cerr != nil {
err = cerr
}

Check warning on line 119 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L118-L119

Added lines #L118 - L119 were not covered by tests

if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Wrap(rerr, err.Error())
}

Check warning on line 124 in internal/db/kvs/bbolt/bbolt.go

View check run for this annotation

Codecov / codecov/patch

internal/db/kvs/bbolt/bbolt.go#L123-L124

Added lines #L123 - L124 were not covered by tests
}

return err
Copy link
Collaborator

@hlts2 hlts2 Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use errors.Join than errors.Wrap because there are multiple errors? 🤔

Suggested change
if cerr := b.db.Close(); cerr != nil {
err = cerr
}
if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Wrap(rerr, err.Error())
}
}
return err
if !remove {
return b.db.Close()
}
return errors.Join(b.db.Close(), os.RemoveAll(b.file))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to use errors.Join. But let me keep the coding style like this because I believe it's easier to understand the logic.

if cerr := b.db.Close(); cerr != nil {
err = cerr
}
if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Join(err, rerr)
}
}
return err

}
210 changes: 210 additions & 0 deletions internal/db/kvs/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt_test

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/vdaas/vald/internal/db/kvs/bbolt"
"github.com/vdaas/vald/internal/sync/errgroup"
)

func TestNew(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

tests := []test{
{
name: "New returns bbolt instance with new file when file does not exist",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt instance with existing file",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

// create a file
f, err := os.Create(tmpfile)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

b, err := bbolt.New(f.Name(), "", nil)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt with custom bucket name",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "my bucket name", nil)
require.NoError(t, err)
require.NotNil(t, b)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_GetSetClose(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

setup := func(t *testing.T) (b bbolt.Bbolt, file string) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

return b, tmpfile
}

tests := []test{
{
name: "Succeed to set and get with the key returns the value",
testfunc: func(t *testing.T) {
b, _ := setup(t)

k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

val, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, val)
},
},
{
name: "Get with non-existing key returns false",
testfunc: func(t *testing.T) {
b, _ := setup(t)
val, ok, err := b.Get([]byte("no exist key"))
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, val)
},
},
{
name: "Successfully close without removing and recover from the db file",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

err = b.Close(false)
require.NoError(t, err)

// recover from the file
b, err = bbolt.New(file, "", nil)
require.NoError(t, err)

res, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, res)
},
},
{
name: "Successfully close with removing",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

// set remove flag to true
err = b.Close(true)
require.NoError(t, err)

require.NoFileExists(t, file)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_AsyncSet(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", nil)
require.NoError(t, err)

kv := map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "val3",
"key4": "val4",
"key5": "val5",
}

eg, _ := errgroup.New(context.Background())
for k, v := range kv {
b.AsyncSet(eg, []byte(k), []byte(v))
}

// wait until all set is done
eg.Wait()

for k := range kv {
_, ok, err := b.Get([]byte(k))
require.NoError(t, err)
require.True(t, ok)
}

err = b.Close(true)
require.NoError(t, err)
}

// NOT IMPLEMENTED BELOW
16 changes: 16 additions & 0 deletions internal/errors/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package errors

var ErrNilErrGroup = New("the input errgroup is nil")
Loading