Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

LocalStore #1032

Closed
wants to merge 81 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
d8acb12
swarm/storage/localstore: most basic database
janos Nov 29, 2018
9ec535a
swarm/storage/localstore: fix typos and comments
janos Dec 3, 2018
37205de
swarm/shed: add uint64 field Dec and DecInBatch methods
janos Dec 3, 2018
b1ded5a
swarm/storage/localstore: decrement size counter on ModeRemoval update
janos Dec 3, 2018
572f3cb
swarm/storage/localstore: unexport modeAccess and modeRemoval
janos Dec 3, 2018
cbb510b
swarm/storage/localstore: add WithRetrievalCompositeIndex
janos Dec 3, 2018
c7beb22
swarm/storage/localstore: add TestModeSyncing
janos Dec 3, 2018
391faa7
swarm/storage/localstore: fix test name
janos Dec 3, 2018
58f3f86
swarm/storage/localstore: add TestModeUpload
janos Dec 3, 2018
2d928bf
swarm/storage/localstore: add TestModeRequest
janos Dec 3, 2018
4d58a6f
swarm/storage/localstore: add TestModeSynced
janos Dec 4, 2018
af1b137
swarm/storage/localstore: add TestModeAccess
janos Dec 4, 2018
96409ff
swarm/storage/localstore: add TestModeRemoval
janos Dec 4, 2018
b782bfe
swarm/storage/localstore: add mock store option for chunk data
janos Dec 5, 2018
35376d8
swarm/storage/localstore: add TestDB_pullIndex
janos Dec 5, 2018
e6a7196
swarm/storage/localstore: add TestDB_gcIndex
janos Dec 6, 2018
58c7f11
swarm/storage/localstore: change how batches are written
janos Dec 6, 2018
6e8b2ad
swarm/storage/localstore: add updateOnAccess function
janos Dec 6, 2018
7b8510e
swarm/storage/localhost: add DB.gcSize
janos Dec 6, 2018
cf3ec30
swarm/storage/localstore: update comments
janos Dec 7, 2018
d58e1ee
swarm/storage/localstore: add BenchmarkNew
janos Dec 7, 2018
f2299f4
swarm/storage/localstore: add retrieval tests benchmarks
janos Dec 7, 2018
e6bdda7
swarm/storage/localstore: accessors redesign
janos Dec 13, 2018
b6f5b7a
swarm/storage/localstore: add semaphore for updateGC goroutine
janos Dec 13, 2018
5732c38
swarm/storage/localstore: implement basic garbage collection
janos Dec 13, 2018
ac9d153
swarm/storage/localstore: optimize collectGarbage
janos Dec 14, 2018
e6e29f5
swarm/storage/localstore: add more garbage collection tests cases
janos Dec 14, 2018
750268d
swarm/shed, swarm/storage/localstore: rename IndexItem to Item
janos Dec 17, 2018
a486a09
swarm/shed: add Index.CountFrom
janos Dec 18, 2018
e17fec2
swarm/storage/localstore: persist gcSize
janos Dec 18, 2018
5605323
swarm/storage/localstore: remove composite retrieval index
janos Dec 18, 2018
5ebbcb5
swarm/shed: IterateWithPrefix and IterateWithPrefixFrom Index functions
janos Dec 19, 2018
80c269b
swarm/storage/localstore: writeGCSize function with leveldb batch
janos Dec 19, 2018
bca85ef
swarm/storage/localstore: unexport modeSetRemove
janos Dec 19, 2018
cee4004
swarm/storage/localstore: update writeGCSizeWorker comment
janos Dec 19, 2018
b5a0fd2
swarm/storage/localstore: add triggerGarbageCollection function
janos Dec 19, 2018
4fca004
swarm/storage/localstore: call writeGCSize on DB Close
janos Dec 19, 2018
5ad0c8d
swarm/storage/localstore: additional comment in writeGCSizeWorker
janos Dec 19, 2018
cb8078e
Merge branch 'master' into localstore
janos Dec 19, 2018
c9f5130
swarm/storage/localstore: add MetricsPrefix option
janos Dec 19, 2018
da9bab4
swarm/storage/localstore: fix a typo
janos Dec 19, 2018
d54d7ae
swamr/shed: only one Index Iterate function
janos Dec 19, 2018
67473be
swarm/storage/localstore: use shed Iterate function
janos Dec 19, 2018
2299147
swarm/shed: pass a new byte slice copy to index decode functions
janos Dec 20, 2018
5488a2b
swarm/storage/localstore: implement feed subscriptions
janos Dec 20, 2018
38bdf7f
swarm/storage/localstore: add more subscriptions tests
janos Dec 21, 2018
a74eae2
swarm/storage/localsore: add parallel upload test
janos Dec 21, 2018
534009f
swarm/storage/localstore: use storage.MaxPO in subscription tests
janos Dec 21, 2018
5edd22d
swarm/storage/localstore: subscription of addresses instead chunks
janos Dec 21, 2018
95726d7
swarm/storage/localstore: lock item address in collectGarbage iterator
janos Dec 21, 2018
c5dcae3
swarm/storage/localstore: fix TestSubscribePull to include MaxPO
janos Dec 22, 2018
f3380ea
swarm/storage/localstore: improve subscriptions
janos Jan 7, 2019
1d2d470
swarm/storage/localstore: add TestDB_SubscribePull_sinceAndUntil test
janos Jan 7, 2019
c26c979
swarm/storage/localstore: adjust pull sync tests
janos Jan 7, 2019
c188281
Merge branch 'master' into localstore
janos Jan 7, 2019
c5e4c61
swarm/storage/localstore: remove writeGCSizeDelay and use literal
janos Jan 8, 2019
015d977
swarm/storage/localstore: adjust subscriptions tests delays and comments
janos Jan 8, 2019
abbb4a6
swarm/storage/localstore: add godoc package overview
janos Jan 10, 2019
fb0a822
swarm/storage/localstore: fix a typo
janos Jan 10, 2019
c423060
swarm/storage/localstore: update package overview
janos Jan 10, 2019
c5a6456
swarm/storage/localstore: remove repeated index change
janos Jan 11, 2019
33726a4
swarm/storage/localstore: rename ChunkInfo to ChunkDescriptor
janos Jan 11, 2019
25a068a
swarm/storage/localstore: add comment in collectGarbageWorker
janos Jan 11, 2019
ca1e24f
swarm/storage/localstore: replace atomics with mutexes for gcSize and…
janos Jan 11, 2019
c62fee3
Merge branch 'master' into localstore
janos Jan 14, 2019
5ddc75f
swarm/storage/localstore: protect addrs map in pull subs tests
janos Jan 14, 2019
87bbd61
swarm/storage/localstore: protect slices in push subs test
janos Jan 14, 2019
6ad67d7
swarm/storage/localstore: protect chunks in TestModePutUpload_parallel
janos Jan 14, 2019
a550388
swarm/storage/localstore: fix a race in TestDB_updateGCSem defers
janos Jan 14, 2019
1dae999
swarm/storage/localstore: remove parallel flag from tests
janos Jan 14, 2019
eda338a
swarm/storage/localstore: fix a race in testDB_collectGarbageWorker
janos Jan 14, 2019
ad5b329
swarm/storage/localstore: remove unused code
janos Jan 22, 2019
8d15e82
swarm/storage/localstore: add more context to pull sub log messages
janos Jan 25, 2019
3948044
swarm/storage/localstore: merge branch 'master' into localstore
janos Jan 25, 2019
6c8208a
swarm/storage/localstore: BenchmarkPutUpload and global lock option
janos Jan 26, 2019
6accc6b
swarm/storage/localstore: pre-generate chunks in BenchmarkPutUpload
janos Jan 28, 2019
85cd349
swarm/storage/localstore: correct useGlobalLock in collectGarbage
janos Jan 28, 2019
7fa1ba9
swarm/storage/localstore: fix typos and update comments
janos Jan 29, 2019
ebecd05
swarm/storage/localstore: update writeGCSize comment
janos Jan 29, 2019
f056e86
swarm/storage/localstore: remove global lock option
janos Feb 4, 2019
40432d9
swarm/storage/localstore: add description for gc size counting
janos Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions swarm/shed/example_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(path string) (s *Store, err error) {
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.IndexItem, value []byte) (e shed.IndexItem, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
return e, nil
Expand All @@ -108,7 +108,7 @@ func New(path string) (s *Store, err error) {
binary.BigEndian.PutUint64(b, uint64(fields.AccessTimestamp))
return b, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.IndexItem, value []byte) (e shed.IndexItem, err error) {
e.AccessTimestamp = int64(binary.BigEndian.Uint64(value))
return e, nil
},
Expand All @@ -134,7 +134,7 @@ func New(path string) (s *Store, err error) {
EncodeValue: func(fields shed.IndexItem) (value []byte, err error) {
return nil, nil
},
DecodeValue: func(value []byte) (e shed.IndexItem, err error) {
DecodeValue: func(keyItem shed.IndexItem, value []byte) (e shed.IndexItem, err error) {
return e, nil
},
})
Expand Down
18 changes: 9 additions & 9 deletions swarm/shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Index struct {
encodeKeyFunc func(fields IndexItem) (key []byte, err error)
decodeKeyFunc func(key []byte) (e IndexItem, err error)
encodeValueFunc func(fields IndexItem) (value []byte, err error)
decodeValueFunc func(value []byte) (e IndexItem, err error)
decodeValueFunc func(keyFields IndexItem, value []byte) (e IndexItem, err error)
}

// IndexFuncs structure defines functions for encoding and decoding
Expand All @@ -86,7 +86,7 @@ type IndexFuncs struct {
EncodeKey func(fields IndexItem) (key []byte, err error)
DecodeKey func(key []byte) (e IndexItem, err error)
EncodeValue func(fields IndexItem) (value []byte, err error)
DecodeValue func(value []byte) (e IndexItem, err error)
DecodeValue func(keyFields IndexItem, value []byte) (e IndexItem, err error)
}

// NewIndex returns a new Index instance with defined name and
Expand Down Expand Up @@ -135,7 +135,7 @@ func (f Index) Get(keyFields IndexItem) (out IndexItem, err error) {
if err != nil {
return out, err
}
out, err = f.decodeValueFunc(value)
out, err = f.decodeValueFunc(keyFields, value)
if err != nil {
return out, err
}
Expand Down Expand Up @@ -210,15 +210,15 @@ func (f Index) IterateAll(fn IndexIterFunc) (err error) {
if key[0] != f.prefix[0] {
break
}
keyIndexItem, err := f.decodeKeyFunc(key)
keyItem, err := f.decodeKeyFunc(key)
if err != nil {
return err
}
valueIndexItem, err := f.decodeValueFunc(it.Value())
valueItem, err := f.decodeValueFunc(keyItem, it.Value())
if err != nil {
return err
}
stop, err := fn(keyIndexItem.Merge(valueIndexItem))
stop, err := fn(keyItem.Merge(valueItem))
if err != nil {
return err
}
Expand All @@ -244,15 +244,15 @@ func (f Index) IterateFrom(start IndexItem, fn IndexIterFunc) (err error) {
if key[0] != f.prefix[0] {
break
}
keyIndexItem, err := f.decodeKeyFunc(key)
keyItem, err := f.decodeKeyFunc(key)
if err != nil {
return err
}
valueIndexItem, err := f.decodeValueFunc(it.Value())
valueItem, err := f.decodeValueFunc(keyItem, it.Value())
if err != nil {
return err
}
stop, err := fn(keyIndexItem.Merge(valueIndexItem))
stop, err := fn(keyItem.Merge(valueItem))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/shed/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var retrievalIndexFuncs = IndexFuncs{
value = append(b, fields.Data...)
return value, nil
},
DecodeValue: func(value []byte) (e IndexItem, err error) {
DecodeValue: func(keyItem IndexItem, value []byte) (e IndexItem, err error) {
e.StoreTimestamp = int64(binary.BigEndian.Uint64(value[:8]))
e.Data = value[8:]
return e, nil
Expand Down
4 changes: 2 additions & 2 deletions swarm/storage/localstore/accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (db *DB) Accessor(mode Mode) *Accessor {
}

// Put uses the underlying DB for the specific mode of update to store the chunk.
func (u *Accessor) Put(ctx context.Context, ch storage.Chunk) error {
return u.db.update(ctx, u.mode, chunkToItem(ch))
func (u *Accessor) Put(_ context.Context, ch storage.Chunk) error {
return u.db.update(u.mode, chunkToItem(ch))
}

// Get uses the underlying DB for the specific mode of access to get the chunk.
Expand Down
110 changes: 86 additions & 24 deletions swarm/storage/localstore/accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,74 @@ package localstore
import (
"bytes"
"context"
"io/ioutil"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
)

// TestAccessors tests most basic Put and Get functionalities
// for different accessors.
func TestAccessors(t *testing.T) {
db, cleanupFunc := newTestDB(t)
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()

testAccessors(t, db)
}

// TestAccessors_withRetrievalCompositeIndex tests most basic
// TestAccessors_useRetrievalCompositeIndex tests most basic
// Put and Get functionalities for different accessors
// by using retrieval composite index.
func TestAccessors_withRetrievalCompositeIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, WithRetrievalCompositeIndex(true))
func TestAccessors_useRetrievalCompositeIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t, &Options{UseRetrievalCompositeIndex: true})
defer cleanupFunc()

testAccessors(t, db)
}

// TestAccessors_mockStore tests most basic Put and Get
// functionalities for different accessors with the mock store
// as the storage for chunk data.
func TestAccessors_mockStore(t *testing.T) {
globalStore := mem.NewGlobalStore()

addr := common.BytesToAddress(make([]byte, 32))

db, cleanupFunc := newTestDB(t, &Options{
MockStore: globalStore.NewNodeStore(addr),
})
defer cleanupFunc()

testAccessors(t, db)

// testAccessors leaves 5 chunks in global store
checkGlobalStoreChunkCount(t, globalStore, 5)
}

// TestAccessors_mockStore_useRetrievalCompositeIndex tests
// most basic Put and Get functionalities for different accessors
// with the mock store as the storage for chunk data and by using
// retrieval composite index.
func TestAccessors_mockStore_useRetrievalCompositeIndex(t *testing.T) {
globalStore := mem.NewGlobalStore()

addr := common.BytesToAddress(make([]byte, 32))

db, cleanupFunc := newTestDB(t, &Options{
MockStore: globalStore.NewNodeStore(addr),
UseRetrievalCompositeIndex: true,
})
defer cleanupFunc()

testAccessors(t, db)

// testAccessors leaves 5 chunks in global store
checkGlobalStoreChunkCount(t, globalStore, 5)
}

// testAccessors tests most basic Put and Get functionalities
// for different accessors. This test validates that the chunk
// is retrievable from the database, not if all indexes are set
Expand All @@ -51,7 +95,6 @@ func testAccessors(t *testing.T, db *DB) {
for _, m := range []Mode{
ModeSyncing,
ModeUpload,
ModeRequest,
} {
t.Run(ModeName(m), func(t *testing.T) {
a := db.Accessor(m)
Expand Down Expand Up @@ -92,29 +135,34 @@ func testAccessors(t *testing.T, db *DB) {
}
})

// Access mode is a special as it does not store the chunk
// in the database.
t.Run(ModeName(modeAccess), func(t *testing.T) {
a := db.Accessor(ModeUpload)
// Request and access modes are special as they do not store
// chunks in the database.
for _, m := range []Mode{
ModeRequest,
modeAccess,
} {
t.Run(ModeName(m), func(t *testing.T) {
a := db.Accessor(ModeUpload)

want := generateRandomChunk()
want := generateRandomChunk()

// first put a random chunk to the database
err := a.Put(context.Background(), want)
if err != nil {
t.Fatal(err)
}
// first put a random chunk to the database
err := a.Put(context.Background(), want)
if err != nil {
t.Fatal(err)
}

a = db.Accessor(modeAccess)
a = db.Accessor(ModeRequest)

got, err := a.Get(context.Background(), want.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), want.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), want.Data())
}
})
got, err := a.Get(context.Background(), want.Address())
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(got.Data(), want.Data()) {
t.Errorf("got chunk data %x, want %x", got.Data(), want.Data())
}
})
}

// Removal mode is a special case as it removes the chunk
// from the database.
Expand Down Expand Up @@ -153,3 +201,17 @@ func testAccessors(t *testing.T, db *DB) {
}
})
}

// checkGlobalStoreChunkCount counts the number of chunks
// in a global mock store to validate it against the expected value.
func checkGlobalStoreChunkCount(t *testing.T, s mock.ImportExporter, want int) {
t.Helper()

n, err := s.Export(ioutil.Discard)
if err != nil {
t.Fatal(err)
}
if n != want {
t.Errorf("got %v chunks, want %v", n, want)
}
}
Loading