Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add internal KVS pogreb package #2302

Merged
merged 16 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b // indirect
github.com/akrylysov/pogreb v0.10.2 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/campoy/embedmd v1.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b h1:slYM766cy2nI3BwyR
github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/akavel/rsrc v0.10.2/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/akrylysov/pogreb v0.10.2 h1:e6PxmeyEhWyi2AKOBIJzAEi4HkiC+lKyCocRGlnDi78=
github.com/akrylysov/pogreb v0.10.2/go.mod h1:pNs6QmpQ1UlTJKDezuRWmaqkgUE2TuU0YTWyqJZ7+lI=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/stroke v0.0.0-20221221101821-bd29b49d73f0/go.mod h1:ccdDYaY5+gO+cbnQdFxEXqfy0RkoV25H3jLXUDNM3wg=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
Expand Down
1 change: 1 addition & 0 deletions hack/go.mod.default
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ replace (
github.com/pkg/errors => github.com/pkg/errors upgrade
github.com/pkg/sftp => github.com/pkg/sftp upgrade
github.com/pmezard/go-difflib => github.com/pmezard/go-difflib upgrade
github.com/akrylysov/pogreb => github.com/akrylysov/pogreb upgrade
github.com/prashantv/gostub => github.com/prashantv/gostub upgrade
github.com/prometheus/client_golang => github.com/prometheus/client_golang upgrade
github.com/prometheus/client_model => github.com/prometheus/client_model upgrade
Expand Down
73 changes: 73 additions & 0 deletions internal/db/kvs/pogreb/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pogreb

import (
"time"

"github.com/akrylysov/pogreb"
)

var deafultOpts = []Option{
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
WithPath("pogreb.db"),
}

// Option represents the functional option for database.
type Option func(*db) error

// WithPath returns the option to set path.
func WithPath(path string) Option {
return func(d *db) error {
if path != "" {
d.path = path
}
return nil
}
}

// WithBackgroundSyncInterval returns the option to sets the amount of time between background Sync() calls.
func WithBackgroundSyncInterval(s string) Option {
return func(d *db) error {
if s == "" {
return nil
}
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
if d.opts == nil {
d.opts = new(pogreb.Options)
}
d.opts.BackgroundSyncInterval = dur
return nil
}
}

// WithBackgroundCompactionInterval returns the option to sets the amount of time between background Compact() calls.
func WithBackgroundCompactionInterval(s string) Option {
return func(d *db) error {
if s == "" {
return nil
}
dur, err := time.ParseDuration(s)
if err != nil {
return err
}
if d.opts == nil {
d.opts = new(pogreb.Options)
}
d.opts.BackgroundCompactionInterval = dur
return nil
}
}
123 changes: 123 additions & 0 deletions internal/db/kvs/pogreb/pogreb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (C) 2019-2024 vdaas.org vald team <[email protected]>
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pogreb

import (
"os"
"reflect"

"github.com/akrylysov/pogreb"

"github.com/vdaas/vald/internal/conv"
"github.com/vdaas/vald/internal/errors"
)

// Pogreb represents an interface for operating the pogreb database.
type Pogreb interface {
Set(key string, val []byte) error
Get(key string) ([]byte, bool, error)
Delete(key string) error
Range(f func(key string, val []byte) bool) error
Len() uint32
Close(remove bool) error
}

type db struct {
db *pogreb.DB
opts *pogreb.Options
path string
}

// New returns a new pogreb instance.
// If the directory path does not exist, it creates a directory for database.
// If opts is nil, it uses default options.
func New(opts ...Option) (_ Pogreb, err error) {
db := new(db)
for _, opt := range append(deafultOpts, opts...) {
if err := opt(db); err != nil {
return nil, errors.ErrOptionFailed(err, reflect.ValueOf(opt))
}
}

// If A is nil, an default value is used.
db.db, err = pogreb.Open(db.path, db.opts)
if err != nil {
return nil, err
}
return db, nil
}

// Set sets the value for a key.
func (d *db) Set(key string, val []byte) error {
return d.db.Put(conv.Atob(key), val)
}

// Get returns the value stored in the database for a key.
// The ok result indicates whether value was found in the database.
func (d *db) Get(key string) ([]byte, bool, error) {
val, err := d.db.Get(conv.Atob(key))
if err != nil {
return nil, false, err
}
// If val is nil, it means that there is no value associated with key, so false is returned.
if val == nil {
return val, false, nil
}
return val, true, nil
}

// Delete deletes the given key from the database.
func (d *db) Delete(key string) error {
return d.db.Delete(conv.Atob(key))
}

// Range calls f sequentially for each key and value present in the database.
// If f returns false, range stops the iteration.
func (d *db) Range(f func(key string, val []byte) bool) error {
it := d.db.Items()
for {
key, val, err := it.Next()
if err == pogreb.ErrIterationDone {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
break
}
if err != nil {
return err
}
if !f(conv.Btoa(key), val) {
return nil
}
}
return nil
}

// Len returns the number of keys in the DB.
func (d *db) Len() uint32 {
return d.db.Count()
}

// Close closes the database and removes the file if remove is true.
func (d *db) Close(remove bool) (err error) {
if serr := d.db.Sync(); serr != nil {
err = serr
}
if cerr := d.db.Close(); cerr != nil {
err = errors.Join(err, cerr)
}
if remove {
if rerr := os.RemoveAll(d.path); rerr != nil {
err = errors.Join(err, rerr)
}
}
return err
}
Loading