Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add bbolt as internal/db/kvs #2177

Merged
merged 23 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ replace (
github.com/xlab/treeprint => github.com/xlab/treeprint v1.2.0
github.com/zeebo/assert => github.com/zeebo/assert v1.3.1
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.7
go.opencensus.io => go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
Expand Down Expand Up @@ -373,6 +374,7 @@ require (
github.com/scylladb/gocqlx v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
github.com/zeebo/xxh3 v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.35.0
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.0.0-00010101000000-000000000000
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,8 @@ github.com/zeebo/assert v1.3.1 h1:vukIABvugfNMZMQO1ABsyQDJDTVQbn+LWSMy1ol1h6A=
github.com/zeebo/assert v1.3.1/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0 h1:+uFejS4DCfNH6d3xODVIGsdhzgzhh45p9gpbHQMbdZI=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ replace (
github.com/zeebo/assert => github.com/zeebo/assert latest
github.com/zeebo/xxh3 => github.com/zeebo/xxh3 latest
go.opencensus.io => go.opencensus.io latest
go.etcd.io/bbolt => go.etcd.io/bbolt latest
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc => go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel => go.opentelemetry.io/otel v1.11.1
go.opentelemetry.io/otel/exporters/otlp/internal/retry => go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1
Expand Down
133 changes: 133 additions & 0 deletions internal/db/kvs/bbolt/bbolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt

import (
"fmt"
"io/fs"
"os"
"reflect"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/sync/errgroup"
bolt "go.etcd.io/bbolt"
)

type Bbolt interface {
Set(key, val []byte) error
Get(key []byte) ([]byte, bool, error)
AsyncSet(eg errgroup.Group, key, val []byte)
Close(remove bool) error
}

type bbolt struct {
db *bolt.DB
file string
bucket []byte
}

const defaultBucket = "vald-bbolt-bucket"
ykadowak marked this conversation as resolved.
Show resolved Hide resolved

// New returns a new Bbolt instance.
// If file does not exist, it creates a new file. If bucket is empty, it uses default_bucket.
// If opts is nil, it uses default options.
func New(file, bucket string, mode fs.FileMode, opts ...Option) (Bbolt, error) {
bopts := new(bolt.Options)
for _, opt := range opts {
if err := opt(bopts); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

db, err := bolt.Open(file, mode, bopts)
if err != nil {
return nil, err
}

bk := []byte(defaultBucket)
if bucket != "" {
bk = []byte(bucket)
}

if err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bk)
if err != nil {
return fmt.Errorf("failed to create bucket: %w", err)
}
return nil
}); err != nil {
return nil, err
}

return &bbolt{
db: db,
file: file,
bucket: bk,
}, nil
}

func (b *bbolt) Set(key, val []byte) error {
return b.db.Update(func(tx *bolt.Tx) error {
return tx.Bucket(b.bucket).Put(key, val)
})
}

func (b *bbolt) Get(key []byte) (val []byte, ok bool, err error) {
if err := b.db.View(func(tx *bolt.Tx) error {
ret := tx.Bucket(b.bucket).Get(key)
if ret == nil {
// key not found. just return without copying anything to val
return nil
}

// key found. copy the value to val because ret is only valid in this scope
val = make([]byte, len(ret))
copy(val, ret)
return nil
}); err != nil {
return nil, false, err
}

if val == nil {
return nil, false, nil
}

return val, true, nil
}

// AsyncSet sets the key and value asynchronously for better write performance.
// It accumulates the keys and values until the batch size is reached or the timeout comes, then
// writes them all at once. Wait for the errgroup to make sure all the batches finished if required.
func (b *bbolt) AsyncSet(eg errgroup.Group, key, val []byte) {
eg.Go(func() error {
return b.db.Batch(func(tx *bolt.Tx) error {
return tx.Bucket(b.bucket).Put(key, val)
})
})
}

// Close closes the database and removes the file if remove is true.
func (b *bbolt) Close(remove bool) (err error) {
if cerr := b.db.Close(); cerr != nil {
err = cerr
}

if remove {
if rerr := os.RemoveAll(b.file); rerr != nil {
err = errors.Join(err, rerr)
}
}

return err
}
213 changes: 213 additions & 0 deletions internal/db/kvs/bbolt/bbolt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright (C) 2019-2023 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bbolt_test

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
"github.com/vdaas/vald/internal/db/kvs/bbolt"
"github.com/vdaas/vald/internal/sync/errgroup"
)

const mode = os.FileMode(0o600)

func TestNew(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

tests := []test{
{
name: "New returns bbolt instance with new file when file does not exist",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt instance with existing file",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

// create a file
f, err := os.Create(tmpfile)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

b, err := bbolt.New(f.Name(), "", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
{
name: "New returns bbolt with custom bucket name",
testfunc: func(t *testing.T) {
ykadowak marked this conversation as resolved.
Show resolved Hide resolved
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")

b, err := bbolt.New(tmpfile, "my bucket name", mode)
require.NoError(t, err)
require.NotNil(t, b)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_GetSetClose(t *testing.T) {
t.Parallel()

type test struct {
name string
testfunc func(t *testing.T)
}

setup := func(t *testing.T) (b bbolt.Bbolt, file string) {
tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)

return b, tmpfile
}

tests := []test{
{
name: "Succeed to set and get with the key returns the value",
testfunc: func(t *testing.T) {
b, _ := setup(t)

k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

val, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, val)
},
},
{
name: "Get with non-existing key returns false",
testfunc: func(t *testing.T) {
b, _ := setup(t)
val, ok, err := b.Get([]byte("no exist key"))
require.NoError(t, err)
require.False(t, ok)
require.Nil(t, val)
},
},
{
name: "Successfully close without removing and recover from the db file",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

err = b.Close(false)
require.NoError(t, err)

// recover from the file
b, err = bbolt.New(file, "", mode)
require.NoError(t, err)

res, ok, err := b.Get(k)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, v, res)
},
},
{
name: "Successfully close with removing",
testfunc: func(t *testing.T) {
b, file := setup(t)
k, v := []byte("key"), []byte("value")
err := b.Set(k, v)
require.NoError(t, err)

// set remove flag to true
err = b.Close(true)
require.NoError(t, err)

require.NoFileExists(t, file)
},
},
}

for _, tc := range tests {
test := tc
t.Run(test.name, func(tt *testing.T) {
tt.Parallel()
test.testfunc(tt)
})
}
}

func Test_bbolt_AsyncSet(t *testing.T) {
t.Parallel()

tempdir := t.TempDir()
tmpfile := filepath.Join(tempdir, "test.db")
b, err := bbolt.New(tmpfile, "", mode)
require.NoError(t, err)

kv := map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "val3",
"key4": "val4",
"key5": "val5",
}

eg, _ := errgroup.New(context.Background())
for k, v := range kv {
b.AsyncSet(eg, []byte(k), []byte(v))
}

// wait until all set is done
err = eg.Wait()
require.NoError(t, err)

for k := range kv {
_, ok, err := b.Get([]byte(k))
require.NoError(t, err)
require.True(t, ok)
}

err = b.Close(true)
require.NoError(t, err)
}

// NOT IMPLEMENTED BELOW
Loading
Loading