Skip to content

Commit

Permalink
kv: track nodes reference count; fixes #617
Browse files Browse the repository at this point in the history
  • Loading branch information
dennwc committed Dec 22, 2017
1 parent 31b0f48 commit 36463db
Show file tree
Hide file tree
Showing 15 changed files with 1,040 additions and 335 deletions.
451 changes: 340 additions & 111 deletions graph/kv/indexing.go

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions graph/kv/kvtest/kvtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ type Config struct {

func (c Config) quadStore() *graphtest.Config {
return &graphtest.Config{
NoPrimitives: true,
SkipNodeDelAfterQuadDel: true,
SkipIntHorizon: true,
AlwaysRunIntegration: c.AlwaysRunIntegration,
NoPrimitives: true,
SkipIntHorizon: true,
AlwaysRunIntegration: c.AlwaysRunIntegration,
}
}

Expand Down
58 changes: 25 additions & 33 deletions graph/kv/quadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Register(name string, r Registration) {
}

const (
latestDataVersion = 1
latestDataVersion = 2
nilDataVersion = 1
)

Expand All @@ -93,12 +93,6 @@ type QuadStore struct {
writer sync.Mutex
mapBucket map[string]map[string][]uint64

meta struct {
sync.RWMutex
size int64
horizon int64
}

exists struct {
sync.Mutex
buf []byte
Expand Down Expand Up @@ -155,35 +149,44 @@ func setVersion(kv BucketKV, version int64) error {
})
}

func (qs *QuadStore) getMetaInt(key string) (int64, error) {
var v int64
err := View(qs.db, func(tx BucketTx) error {
b := tx.Bucket(metaBucket)
var err error
vals, err := b.Get([][]byte{
[]byte(key),
})
if err != nil {
return err
} else if vals[0] == nil {
return ErrNoBucket
}
v, err = asInt64(vals[0], 0)
if err != nil {
return err
}
return nil
})
return v, err
}

func (qs *QuadStore) Size() int64 {
qs.meta.RLock()
sz := qs.meta.size
qs.meta.RUnlock()
sz, _ := qs.getMetaInt("size")
return sz
}

func (qs *QuadStore) Close() error {
err := Update(qs.db, func(tx BucketTx) error {
return qs.writeHorizonAndSize(tx, -1, -1)
})
if err != nil {
qs.db.Close()
return err
}
return qs.db.Close()
}

func (qs *QuadStore) getMetadata() (int64, error) {
qs.meta.Lock()
defer qs.meta.Unlock()
var vers int64
err := View(qs.db, func(tx BucketTx) error {
b := tx.Bucket(metaBucket)
var err error
vals, err := b.Get([][]byte{
[]byte("version"),
[]byte("size"),
[]byte("horizon"),
})
if err == ErrNotFound {
return ErrNoBucket
Expand All @@ -192,19 +195,10 @@ func (qs *QuadStore) getMetadata() (int64, error) {
} else if vals[0] == nil {
return ErrNoBucket
}

vers, err = asInt64(vals[0], nilDataVersion)
if err != nil {
return err
}
qs.meta.size, err = asInt64(vals[1], 0)
if err != nil {
return err
}
qs.meta.horizon, err = asInt64(vals[2], 0)
if err != nil {
return err
}
return nil
})
return vers, err
Expand All @@ -221,9 +215,7 @@ func asInt64(b []byte, empty int64) (int64, error) {
}

func (qs *QuadStore) horizon() int64 {
qs.meta.RLock()
h := qs.meta.horizon
qs.meta.RUnlock()
h, _ := qs.getMetaInt("horizon")
return h
}

Expand Down
Loading

0 comments on commit 36463db

Please sign in to comment.