Skip to content

Commit

Permalink
Add bbolt as internal/db/kvs (#2177)
Browse files Browse the repository at this point in the history
* Add bbolt kvs

* changed key type to []byte

* combine types

* add license

* add interface

* use test template

* use interface type as input parameter

* Split test cases

* Have bucket as []byte in the struct

* remove unused bbolt error

* refactor

* add filemode as func arg

* refactor

* fix recover from file

* apply format

* add functional option for bbolt
  • Loading branch information
ykadowak authored Sep 15, 2023
1 parent 2dbea5b commit a95676a
Show file tree
Hide file tree
Showing 7 changed files with 1,306 additions and 0 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ replace (
github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0
github.com/zeebo/assert => github.com/zeebo/assert v1.3.1
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7
go.opencensus.io => go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
Expand Down Expand Up @@ -373,6 +374,7 @@ require (
github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/zeebo/assert => github.com/zeebo/assert latest
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest
go.opencensus.io => go.opencensus.io latest
go.etcd.io/bbolt => go.etcd.io/bbolt latest
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1
Expand Down
133 changes: 133 additions & 0 deletions internal/db/kvs/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt

import (
"fmt"
"io/fs"
"os"
"reflect"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync/errgroup"
bolt "go.etcd.io/bbolt"
)

type Bbolt interface {
Set(key, val []byte) error
Get(key []byte) ([]byte, bool, error)
AsyncSet(eg errgroup.Group, key, val []byte)
Close(remove bool) error
}

type bbolt struct {
db *bolt.DB
file string
bucket []byte
}

const defaultBucket = "vald-bbolt-bucket"

// New returns a new Bbolt instance.
// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket.
// If opts is nil, it uses default options.
func New(file, bucket string, mode fs.FileMode, opts ...Option) (Bbolt, error) {
bopts := new(bolt.Options)
for _, opt := range opts {
if err := opt(bopts); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

db, err := bolt.Open(file, mode, bopts)
if err != nil {
return nil, err
}

bk := []byte(defaultBucket)
if bucket != "" {
bk = []byte(bucket)
}

if err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bk)
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
}); err != nil {
return nil, err
}

return &bbolt{
db: db,
file: file,
bucket: bk,
}, nil
}

func (b *bbolt) Set(key, val []byte) error {
return b.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(b.bucket).Put(key, val)
})
}

func (b *bbolt) Get(key []byte) (val []byte, ok bool, err error) {
if err := b.db.View(func(tx *bolt.Tx) error {
ret := tx.Bucket(b.bucket).Get(key)
if ret == nil {
// key not found. just return without copying anything to val
return nil
}

// key found. copy the value to val because ret is only valid in this scope
val = make([]byte, len(ret))
copy(val, ret)
return nil
}); err != nil {
return nil, false, err
}

if val == nil {
return nil, false, nil
}

return val, true, nil
}

// AsyncSet sets the key and value asynchronously for better write performance.
// It accumulates the keys and values until the batch size is reached or the timeout comes, then
// writes them all at once. Wait for the errgroup to make sure all the batches finished if required.
func (b *bbolt) AsyncSet(eg errgroup.Group, key, val []byte) {
eg.Go(func() error {
return b.db.Batch(func(tx *bolt.Tx) error {
return tx.Bucket(b.bucket).Put(key, val)
})
})
}

// Close closes the database and removes the file if remove is true.
func (b *bbolt) Close(remove bool) (err error) {
if cerr := b.db.Close(); cerr != nil {
err = cerr
}

if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Join(err, rerr)
}
}

return err
}
213 changes: 213 additions & 0 deletions internal/db/kvs/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt_test

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/vdaas/vald/internal/db/kvs/bbolt"
"github.com/vdaas/vald/internal/sync/errgroup"
)

const mode = os.FileMode(0o600)

func TestNew(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

tests := []test{
{
name: "New returns bbolt instance with new file when file does not exist",
testfunc: func(t *testing.T) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt instance with existing file",
testfunc: func(t *testing.T) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

// create a file
f, err := os.Create(tmpfile)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

b, err := bbolt.New(f.Name(), "", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt with custom bucket name",
testfunc: func(t *testing.T) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "my bucket name", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_GetSetClose(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

setup := func(t *testing.T) (b bbolt.Bbolt, file string) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)

return b, tmpfile
}

tests := []test{
{
name: "Succeed to set and get with the key returns the value",
testfunc: func(t *testing.T) {
b, _ := setup(t)

k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

val, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, val)
},
},
{
name: "Get with non-existing key returns false",
testfunc: func(t *testing.T) {
b, _ := setup(t)
val, ok, err := b.Get([]byte("no exist key"))
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, val)
},
},
{
name: "Successfully close without removing and recover from the db file",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

err = b.Close(false)
require.NoError(t, err)

// recover from the file
b, err = bbolt.New(file, "", mode)
require.NoError(t, err)

res, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, res)
},
},
{
name: "Successfully close with removing",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

// set remove flag to true
err = b.Close(true)
require.NoError(t, err)

require.NoFileExists(t, file)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_AsyncSet(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)

kv := map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "val3",
"key4": "val4",
"key5": "val5",
}

eg, _ := errgroup.New(context.Background())
for k, v := range kv {
b.AsyncSet(eg, []byte(k), []byte(v))
}

// wait until all set is done
err = eg.Wait()
require.NoError(t, err)

for k := range kv {
_, ok, err := b.Get([]byte(k))
require.NoError(t, err)
require.True(t, ok)
}

err = b.Close(true)
require.NoError(t, err)
}

// NOT IMPLEMENTED BELOW
Loading

0 comments on commit a95676a

Please sign in to comment.