Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

LocalStore #1032

Closed
wants to merge 81 commits into from
Closed
Show file tree
Hide file tree
Changes from 71 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
d8acb12
swarm/storage/localstore: most basic database
janos Nov 29, 2018
9ec535a
swarm/storage/localstore: fix typos and comments
janos Dec 3, 2018
37205de
swarm/shed: add uint64 field Dec and DecInBatch methods
janos Dec 3, 2018
b1ded5a
swarm/storage/localstore: decrement size counter on ModeRemoval update
janos Dec 3, 2018
572f3cb
swarm/storage/localstore: unexport modeAccess and modeRemoval
janos Dec 3, 2018
cbb510b
swarm/storage/localstore: add WithRetrievalCompositeIndex
janos Dec 3, 2018
c7beb22
swarm/storage/localstore: add TestModeSyncing
janos Dec 3, 2018
391faa7
swarm/storage/localstore: fix test name
janos Dec 3, 2018
58f3f86
swarm/storage/localstore: add TestModeUpload
janos Dec 3, 2018
2d928bf
swarm/storage/localstore: add TestModeRequest
janos Dec 3, 2018
4d58a6f
swarm/storage/localstore: add TestModeSynced
janos Dec 4, 2018
af1b137
swarm/storage/localstore: add TestModeAccess
janos Dec 4, 2018
96409ff
swarm/storage/localstore: add TestModeRemoval
janos Dec 4, 2018
b782bfe
swarm/storage/localstore: add mock store option for chunk data
janos Dec 5, 2018
35376d8
swarm/storage/localstore: add TestDB_pullIndex
janos Dec 5, 2018
e6a7196
swarm/storage/localstore: add TestDB_gcIndex
janos Dec 6, 2018
58c7f11
swarm/storage/localstore: change how batches are written
janos Dec 6, 2018
6e8b2ad
swarm/storage/localstore: add updateOnAccess function
janos Dec 6, 2018
7b8510e
swarm/storage/localhost: add DB.gcSize
janos Dec 6, 2018
cf3ec30
swarm/storage/localstore: update comments
janos Dec 7, 2018
d58e1ee
swarm/storage/localstore: add BenchmarkNew
janos Dec 7, 2018
f2299f4
swarm/storage/localstore: add retrieval tests benchmarks
janos Dec 7, 2018
e6bdda7
swarm/storage/localstore: accessors redesign
janos Dec 13, 2018
b6f5b7a
swarm/storage/localstore: add semaphore for updateGC goroutine
janos Dec 13, 2018
5732c38
swarm/storage/localstore: implement basic garbage collection
janos Dec 13, 2018
ac9d153
swarm/storage/localstore: optimize collectGarbage
janos Dec 14, 2018
e6e29f5
swarm/storage/localstore: add more garbage collection tests cases
janos Dec 14, 2018
750268d
swarm/shed, swarm/storage/localstore: rename IndexItem to Item
janos Dec 17, 2018
a486a09
swarm/shed: add Index.CountFrom
janos Dec 18, 2018
e17fec2
swarm/storage/localstore: persist gcSize
janos Dec 18, 2018
5605323
swarm/storage/localstore: remove composite retrieval index
janos Dec 18, 2018
5ebbcb5
swarm/shed: IterateWithPrefix and IterateWithPrefixFrom Index functions
janos Dec 19, 2018
80c269b
swarm/storage/localstore: writeGCSize function with leveldb batch
janos Dec 19, 2018
bca85ef
swarm/storage/localstore: unexport modeSetRemove
janos Dec 19, 2018
cee4004
swarm/storage/localstore: update writeGCSizeWorker comment
janos Dec 19, 2018
b5a0fd2
swarm/storage/localstore: add triggerGarbageCollection function
janos Dec 19, 2018
4fca004
swarm/storage/localstore: call writeGCSize on DB Close
janos Dec 19, 2018
5ad0c8d
swarm/storage/localstore: additional comment in writeGCSizeWorker
janos Dec 19, 2018
cb8078e
Merge branch 'master' into localstore
janos Dec 19, 2018
c9f5130
swarm/storage/localstore: add MetricsPrefix option
janos Dec 19, 2018
da9bab4
swarm/storage/localstore: fix a typo
janos Dec 19, 2018
d54d7ae
swamr/shed: only one Index Iterate function
janos Dec 19, 2018
67473be
swarm/storage/localstore: use shed Iterate function
janos Dec 19, 2018
2299147
swarm/shed: pass a new byte slice copy to index decode functions
janos Dec 20, 2018
5488a2b
swarm/storage/localstore: implement feed subscriptions
janos Dec 20, 2018
38bdf7f
swarm/storage/localstore: add more subscriptions tests
janos Dec 21, 2018
a74eae2
swarm/storage/localsore: add parallel upload test
janos Dec 21, 2018
534009f
swarm/storage/localstore: use storage.MaxPO in subscription tests
janos Dec 21, 2018
5edd22d
swarm/storage/localstore: subscription of addresses instead chunks
janos Dec 21, 2018
95726d7
swarm/storage/localstore: lock item address in collectGarbage iterator
janos Dec 21, 2018
c5dcae3
swarm/storage/localstore: fix TestSubscribePull to include MaxPO
janos Dec 22, 2018
f3380ea
swarm/storage/localstore: improve subscriptions
janos Jan 7, 2019
1d2d470
swarm/storage/localstore: add TestDB_SubscribePull_sinceAndUntil test
janos Jan 7, 2019
c26c979
swarm/storage/localstore: adjust pull sync tests
janos Jan 7, 2019
c188281
Merge branch 'master' into localstore
janos Jan 7, 2019
c5e4c61
swarm/storage/localstore: remove writeGCSizeDelay and use literal
janos Jan 8, 2019
015d977
swarm/storage/localstore: adjust subscriptions tests delays and comments
janos Jan 8, 2019
abbb4a6
swarm/storage/localstore: add godoc package overview
janos Jan 10, 2019
fb0a822
swarm/storage/localstore: fix a typo
janos Jan 10, 2019
c423060
swarm/storage/localstore: update package overview
janos Jan 10, 2019
c5a6456
swarm/storage/localstore: remove repeated index change
janos Jan 11, 2019
33726a4
swarm/storage/localstore: rename ChunkInfo to ChunkDescriptor
janos Jan 11, 2019
25a068a
swarm/storage/localstore: add comment in collectGarbageWorker
janos Jan 11, 2019
ca1e24f
swarm/storage/localstore: replace atomics with mutexes for gcSize and…
janos Jan 11, 2019
c62fee3
Merge branch 'master' into localstore
janos Jan 14, 2019
5ddc75f
swarm/storage/localstore: protect addrs map in pull subs tests
janos Jan 14, 2019
87bbd61
swarm/storage/localstore: protect slices in push subs test
janos Jan 14, 2019
6ad67d7
swarm/storage/localstore: protect chunks in TestModePutUpload_parallel
janos Jan 14, 2019
a550388
swarm/storage/localstore: fix a race in TestDB_updateGCSem defers
janos Jan 14, 2019
1dae999
swarm/storage/localstore: remove parallel flag from tests
janos Jan 14, 2019
eda338a
swarm/storage/localstore: fix a race in testDB_collectGarbageWorker
janos Jan 14, 2019
ad5b329
swarm/storage/localstore: remove unused code
janos Jan 22, 2019
8d15e82
swarm/storage/localstore: add more context to pull sub log messages
janos Jan 25, 2019
3948044
swarm/storage/localstore: merge branch 'master' into localstore
janos Jan 25, 2019
6c8208a
swarm/storage/localstore: BenchmarkPutUpload and global lock option
janos Jan 26, 2019
6accc6b
swarm/storage/localstore: pre-generate chunks in BenchmarkPutUpload
janos Jan 28, 2019
85cd349
swarm/storage/localstore: correct useGlobalLock in collectGarbage
janos Jan 28, 2019
7fa1ba9
swarm/storage/localstore: fix typos and update comments
janos Jan 29, 2019
ebecd05
swarm/storage/localstore: update writeGCSize comment
janos Jan 29, 2019
f056e86
swarm/storage/localstore: remove global lock option
janos Feb 4, 2019
40432d9
swarm/storage/localstore: add description for gc size counting
janos Feb 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions swarm/storage/localstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

/*
Package localstore provides disk storage layer for Swarm Chunk persistence.
It uses swarm/shed abstractions on top of github.com/syndtr/goleveldb LevelDB
implementation.

The main type is DB which manages the storage by providing methods to
access and add Chunks and to manage their status.

Modes are abstractions that do specific changes to Chunks. There are three
mode types:

- ModeGet, for Chunk access
- ModePut, for adding Chunks to the database
- ModeSet, for changing Chunk statuses

Every mode type has a corresponding type (Getter, Putter and Setter)
that provides adequate method to perform the opperation and that type
should be injected into localstore consumers instead the whole DB.
This provides more clear insight which operations consumer is performing
on the database.

Getters, Putters and Setters accept different get, put and set modes
to perform different actions. For example, ModeGet has two different
variables ModeGetRequest and ModeGetSync and dwo different Getters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/dwo/two

can be constructed with them that are used when the chunk is requested
or when the chunk is synced as this two events are differently changing
the database.

Subscription methods are implemented for a specific purpose of
continuous iterations over Chunks that should be provided to
Push and Pull syncing.

DB implements an internal garbage collector that removes only synced
Chunks from the database based on their most recent access time.

Internally, DB stores Chunk data and any required information, such as
store and access timestamps in different shed indexes that can be
iterated on by garbage collector or subscriptions.
*/
package localstore
229 changes: 229 additions & 0 deletions swarm/storage/localstore/gc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)

var (
// gcTargetRatio defines the target number of items
// in garbage collection index that will not be removed
// on garbage collection. The target number of items
// is calculated by gcTarget function. This value must be
// in range (0,1]. For example, with 0.9 value,
// garbage collection will leave 90% of defined capacity
// in database after its run. This prevents frequent
// garbage collection runt.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/runt/runs

gcTargetRatio = 0.9
// gcBatchSize limits the number of chunks in a single
// leveldb batch on garbage collection.
gcBatchSize int64 = 1000
)

// collectGarbageWorker is a long running function that waits for
// collectGarbageTrigger channel to signal a garbage collection
// run. GC run iterates on gcIndex and removes older items
// form retrieval and other indexes.
func (db *DB) collectGarbageWorker() {
for {
select {
case <-db.collectGarbageTrigger:
// run a single collect garbage run and
// if done is false, gcBatchSize is reached and
// another collect garbage run is needed
collectedCount, done, err := db.collectGarbage()
if err != nil {
log.Error("localstore collect garbage", "err", err)
}
// check if another gc run is needed
if !done {
db.triggerGarbageCollection()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this worker needed and trigger needed? see my comment above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if testHookCollectGarbage != nil {
testHookCollectGarbage(collectedCount)
}
case <-db.close:
return
}
}
}

// collectGarbage removes chunks from retrieval and other
// indexes if maximal number of chunks in database is reached.
// This function returns the number of removed chunks. If done
// is false, another call to this function is needed to collect
// the rest of the garbage as the batch size limit is reached.
// This function is called in collectGarbageWorker.
func (db *DB) collectGarbage() (collectedCount int64, done bool, err error) {
batch := new(leveldb.Batch)
target := db.gcTarget()

done = true
err = db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
zelig marked this conversation as resolved.
Show resolved Hide resolved
// protect parallel updates
unlock, err := db.lockAddr(item.Address)
if err != nil {
return false, err
}
defer unlock()

gcSize := db.getGCSize()
if gcSize-collectedCount <= target {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there not the danger of the following here:

  • We reach GC target
  • GC kicks in
  • Global Lock On
  • Run GC
  • In The meanwhile new syncing happens, new chunks arrive and wait
  • GC terminates
  • New chunks from above now have to be written again, then GC runs again - If the amount of new chunks exceeds the target, GC runs again....

"Loop" repeats

What happens if we reach target while new chunks wait for write, is the write interrupted so that GC runs? What if DB is at 90%, then new chunks arrive with exceeding capacity, (say 105%), will write be interrupted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that you are referring only to a global lock.

You can see that every chunk is saved in its own batch, so this can not happen, as gc should kick in. There should be one difference with global lock. The gc size counting should be much simpler and included in the chunk write batch. But this is only for the global lock, which is here only for BenchmarkPutUpload, until we decide whether we want to use it, or stick with address lock.

return true, nil
}
// delete from retrieve, pull, gc
db.retrievalDataIndex.DeleteInBatch(batch, item)
db.retrievalAccessIndex.DeleteInBatch(batch, item)
db.pullIndex.DeleteInBatch(batch, item)
db.gcIndex.DeleteInBatch(batch, item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ANY of these DeleteInBatch produce an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. You can check how LevelDB handles batches. All errors are handled in its Write(batch) method.

collectedCount++
if collectedCount >= gcBatchSize {
// bach size limit reached,
// another gc run is needed
done = false
return true, nil
}
return false, nil
}, nil)
if err != nil {
return 0, false, err
}

err = db.shed.WriteBatch(batch)
if err != nil {
return 0, false, err
}
// batch is written, decrement gcSize
db.incGCSize(-collectedCount)
return collectedCount, done, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this done parameter.

If done were false, as I understand, because another batch round is needed, and then the function returns, then the global lock is released. Is it possible that meanwhile db.incGCSize could be called by other routines, and can it cause any problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you suggest where done parameter should be explained in more detail? There is a comment above in the code, but maybe it is badly written. The done parameter provides information if the garbage collection run reached the target. This can happen if the max gc batch size is reached. Do you have an idea how it can be implemented to be more clear?

This should not cause problems, as gc size change is protected. If the global lock is released, we can allow more chunks to be saved, but they will not be collected as they will be on the top of the gc index, and gc iterates from the bottom of it.

}

// gcTrigger retruns the absolute value for garbage collection
// target value, calculated from db.capacity and gcTargetRatio.
func (db *DB) gcTarget() (target int64) {
return int64(float64(db.capacity) * gcTargetRatio)
}

// incGCSize increments gcSize by the provided number.
// If count is negative, it will decrement gcSize.
func (db *DB) incGCSize(count int64) {
if count == 0 {
return
}

db.gcSizeMu.Lock()
new := db.gcSize + count
db.gcSize = new
db.gcSizeMu.Unlock()

select {
case db.writeGCSizeTrigger <- struct{}{}:
default:
}
if new >= db.capacity {
db.triggerGarbageCollection()
zelig marked this conversation as resolved.
Show resolved Hide resolved
}
}

// getGCSize returns gcSize value by locking it
// with gcSizeMu mutex.
func (db *DB) getGCSize() (count int64) {
db.gcSizeMu.RLock()
count = db.gcSize
db.gcSizeMu.RUnlock()
return count
}

// triggerGarbageCollection signals collectGarbageWorker
// to call collectGarbage.
func (db *DB) triggerGarbageCollection() {
select {
case db.collectGarbageTrigger <- struct{}{}:
case <-db.close:
default:
}
}

// writeGCSizeWorker writes gcSize on trigger event
// and waits writeGCSizeDelay after each write.
// It implements a linear backoff with delay of
// writeGCSizeDelay duration to avoid very frequent
// database operations.
func (db *DB) writeGCSizeWorker() {
for {
select {
case <-db.writeGCSizeTrigger:
err := db.writeGCSize(db.getGCSize())
if err != nil {
log.Error("localstore write gc size", "err", err)
}
// Wait some time before writing gc size in the next
// iteration. This prevents frequent I/O operations.
select {
frncmx marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(10 * time.Second):
case <-db.close:
return
}
case <-db.close:
return
}
}
}

// writeGCSize stores the number of items in gcIndex.
// It removes all hashes from gcUncountedHashesIndex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What Is gcUncountedHashesIndex? Is there also a gcCountedHashesIndex?

More information please

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try to explain it better in the comment in constructor. Do you think that there should be gcCountedHashesIndex?

// not to include them on the next database initialization
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? next database initialization? Is that when the node reboots? Or on next GC run?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe initialization is not the best word here. I will update the comment. It is meant when the node starts again (reboots).

// when gcSize is counted.
func (db *DB) writeGCSize(gcSize int64) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there should be some higher level description of how the process of GC looks like.

What is writeGCSize()? Does it correspond to increasing the GC size because new chunks have been written to the DB? Why does it have only a int64 param? Why does it have to iterate gc.UncountedHashesIndex? And why does it need to DeleteInBatch if we are actually writing?

I know you may have explained this during your walk through, but I already can't remember many details, also, I participated in the first walk through only, there we din't go into the details of GC. Also if a new member comes on board it would be useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to write documentation around gcSize persistence. I've added BenchmarkPutUpload to measure should we use global lock and reduce the complexity around gcSize counting.

const maxBatchSize = 1000

batch := new(leveldb.Batch)
db.storedGCSize.PutInBatch(batch, uint64(gcSize))
batchSize := 1

// use only one iterator as it acquires its snapshot
// not to remove hashes from index that are added
// after stored gc size is written
err = db.gcUncountedHashesIndex.Iterate(func(item shed.Item) (stop bool, err error) {
db.gcUncountedHashesIndex.DeleteInBatch(batch, item)
batchSize++
if batchSize >= maxBatchSize {
err = db.shed.WriteBatch(batch)
if err != nil {
return false, err
frncmx marked this conversation as resolved.
Show resolved Hide resolved
}
batch.Reset()
batchSize = 0
}
return false, nil
}, nil)
if err != nil {
return err
}
return db.shed.WriteBatch(batch)
}

// testHookCollectGarbage is a hook that can provide
// information when a garbage collection run is done
// and how many items it removed.
var testHookCollectGarbage func(collectedCount int64)
Loading