diff --git a/api/api_test.go b/api/api_test.go
index b2dbebf8a6..d29b3af9ff 100644
--- a/api/api_test.go
+++ b/api/api_test.go
@@ -141,7 +141,7 @@ func TestApiPut(t *testing.T) {
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
- chunktesting.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
+ chunktesting.CheckTag(t, tag, 2, 2, 0, 0, 0, 2) //1 chunk data, 1 chunk manifest
})
}
@@ -150,7 +150,7 @@ func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
- tag, err := api.Tags.Create("unnamed-tag", 0)
+ tag, err := api.Tags.Create(context.Background(), "unnamed-tag", 0)
if err != nil {
t.Fatal(err)
}
@@ -168,11 +168,11 @@ func TestApiTagLarge(t *testing.T) {
if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
- chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
+ chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
- chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
+ chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
}
})
}
@@ -549,18 +549,18 @@ func TestDetectContentType(t *testing.T) {
// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
- tag, err := a.Tags.Create("unnamed-tag", 0)
+ tag, err := a.Tags.Create(ctx, "unnamed-tag", 0)
- log.Trace("created new tag", "uid", tag.Uid)
+ log.Trace("created new tag", "id", tag.Uid)
- cCtx := sctx.SetTag(ctx, tag.Uid)
- key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
+ ctx = sctx.SetTag(ctx, tag.Uid)
+ key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
- key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
+ key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
diff --git a/api/client/client_test.go b/api/client/client_test.go
index a7e63310a6..a69c4d103d 100644
--- a/api/client/client_test.go
+++ b/api/client/client_test.go
@@ -51,7 +51,7 @@ func TestClientUploadDownloadRaw(t *testing.T) {
// check the tag was created successfully
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
+ chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}
func TestClientUploadDownloadRawEncrypted(t *testing.T) {
@@ -69,7 +69,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {
// check the tag was created successfully
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
+ chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}
func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
@@ -228,7 +228,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {
// check the tag was created successfully
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
+ chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)
// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
@@ -372,7 +372,7 @@ func TestClientMultipartUpload(t *testing.T) {
// check the tag was created successfully
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
+ chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)
// check we can download the individual files
checkDownloadFile := func(path string) {
diff --git a/api/http/middleware.go b/api/http/middleware.go
index 9952247983..2cd2c0cbdf 100644
--- a/api/http/middleware.go
+++ b/api/http/middleware.go
@@ -1,6 +1,7 @@
package http
import (
+ "context"
"fmt"
"net/http"
"runtime/debug"
@@ -123,7 +124,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)
- t, err := tags.Create(tagName, estimatedTotal)
+ t, err := tags.Create(context.Background(), tagName, estimatedTotal)
if err != nil {
log.Error("error creating tag", "err", err, "tagName", tagName)
}
diff --git a/api/http/server.go b/api/http/server.go
index 516db8b419..ce66a06182 100644
--- a/api/http/server.go
+++ b/api/http/server.go
@@ -42,6 +42,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
+ "github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/pin"
@@ -276,10 +277,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)
- tagUid := sctx.GetTag(r.Context())
- tag, err := s.api.Tags.Get(tagUid)
+ tagUID := sctx.GetTag(r.Context())
+ tag, err := s.api.Tags.Get(tagUID)
if err != nil {
- log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
+ log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}
postRawCount.Inc(1)
@@ -334,7 +335,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "text/plain")
- w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
+ w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
@@ -349,6 +350,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)
+ tagUID := sctx.GetTag(r.Context())
+ tag, err := s.api.Tags.Get(tagUID)
+ if err != nil {
+ log.Error("handle post raw got an error retrieving tag", "tagUID", tagUID, "err", err)
+ }
+
+ ctx, sp := spancontext.StartSpan(tag.Context(), "http.post")
+ defer sp.Finish()
+
contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
@@ -375,7 +385,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
} else {
- addr, err = s.api.NewManifest(r.Context(), toEncrypt)
+ addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
@@ -383,7 +393,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
- newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
+ newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case tarContentType:
_, err := s.handleTarUpload(r, mw)
@@ -405,10 +415,10 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}
- tagUid := sctx.GetTag(r.Context())
- tag, err := s.api.Tags.Get(tagUid)
+ tagUID = sctx.GetTag(r.Context())
+ tag, err = s.api.Tags.Get(tagUID)
if err != nil {
- log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
+ log.Error("got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}
log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
@@ -427,7 +437,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)
w.Header().Set("Content-Type", "text/plain")
- w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
+ w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
diff --git a/api/http/server_test.go b/api/http/server_test.go
index d9a94b3f0f..1e5d810ce9 100644
--- a/api/http/server_test.go
+++ b/api/http/server_test.go
@@ -945,7 +945,7 @@ func testBzzTar(encrypted bool, t *testing.T) {
// check that the tag was written correctly
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 4, 4, 0, 4)
+ chunktesting.CheckTag(t, tag, 3, 3, 0, 0, 0, 3)
swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
@@ -1081,7 +1081,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
- chunktesting.CheckTag(t, tag, 0, 0, 0, v.expChunks)
+ chunktesting.CheckTag(t, tag, 0, 0, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
diff --git a/api/inspector.go b/api/inspector.go
index 24a98cc9f2..e48ed6d6c0 100644
--- a/api/inspector.go
+++ b/api/inspector.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
stream "github.com/ethersphere/swarm/network/stream/v2"
@@ -53,6 +54,20 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}
+func (i *Inspector) IsPushSynced(tagname string) bool {
+ tags := i.api.Tags.All()
+
+ for _, t := range tags {
+ if t.Name == tagname {
+ ds := t.Done(chunk.StateSynced)
+ log.Trace("found tag", "tagname", tagname, "done-syncing", ds)
+ return ds
+ }
+ }
+
+ return false
+}
+
func (i *Inspector) IsPullSyncing() bool {
t := i.stream.LastReceivedChunkTime()
diff --git a/api/pullsync/client.go b/api/pullsync/client.go
new file mode 100644
index 0000000000..922f1d98ae
--- /dev/null
+++ b/api/pullsync/client.go
@@ -0,0 +1,14 @@
+package pullsync
+
+import stream "github.com/ethersphere/swarm/network/stream/v2"
+
+
+
+// the node-wide pullsync.Client
+type Client struct {
+ stream.Syncer // embed stream.Syncer
+ // when pullsync
+ // here you simply put the update sync logic listening to kademlia depth changes
+ // and call `Request`
+ // remember the request, when no longer relevant just call request.Cancel()
+}
diff --git a/api/pullsync/server.go b/api/pullsync/server.go
new file mode 100644
index 0000000000..17fe44e169
--- /dev/null
+++ b/api/pullsync/server.go
@@ -0,0 +1,9 @@
+package pullsync
+
+// pullSync.Server implements stream.Provider
+// uses localstore SubscribePull for the bins
+// server is node-wide
+type Server struct {
+ // ...
+ *stream.LocalProvider
+}
diff --git a/chunk/chunk.go b/chunk/chunk.go
index bcce0eba39..50de9c1c56 100644
--- a/chunk/chunk.go
+++ b/chunk/chunk.go
@@ -40,12 +40,15 @@ type Chunk interface {
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
+ TagID() uint32
+ WithTagID(t uint32) Chunk
}
type chunk struct {
addr Address
sdata []byte
pinCounter uint64
+ tagID uint32
}
func NewChunk(addr Address, data []byte) Chunk {
@@ -60,6 +63,11 @@ func (c *chunk) WithPinCounter(p uint64) Chunk {
return c
}
+func (c *chunk) WithTagID(t uint32) Chunk {
+ c.tagID = t
+ return c
+}
+
func (c *chunk) Address() Address {
return c.addr
}
@@ -72,6 +80,10 @@ func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}
+func (c *chunk) TagID() uint32 {
+ return c.tagID
+}
+
func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
diff --git a/chunk/tag.go b/chunk/tag.go
index 7f1c3a1e68..c1a96eeb3e 100644
--- a/chunk/tag.go
+++ b/chunk/tag.go
@@ -17,10 +17,14 @@
package chunk
import (
+ "context"
"encoding/binary"
"errors"
"sync/atomic"
"time"
+
+ "github.com/ethersphere/swarm/spancontext"
+ "github.com/opentracing/opentracing-go"
)
var (
@@ -53,22 +57,38 @@ type Tag struct {
Sent int64 // number of chunks sent for push syncing
Synced int64 // number of chunks synced with proof
StartedAt time.Time // tag started to calculate ETA
+
+ // end-to-end tag tracing
+ ctx context.Context // tracing context
+ span opentracing.Span // tracing root span
}
// New creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
-func NewTag(uid uint32, s string, total int64) *Tag {
+func NewTag(ctx context.Context, uid uint32, s string, total int64) *Tag {
t := &Tag{
Uid: uid,
Name: s,
StartedAt: time.Now(),
Total: total,
}
+
+ t.ctx, t.span = spancontext.StartSpan(ctx, "new.upload.tag")
return t
}
-// Inc increments the count for a state
-func (t *Tag) Inc(state State) {
+// Context accessor
+func (t *Tag) Context() context.Context {
+ return t.ctx
+}
+
+// FinishRootSpan closes the pushsync span of the tags
+func (t *Tag) FinishRootSpan() {
+ t.span.Finish()
+}
+
+// IncN increments the count for a state
+func (t *Tag) IncN(state State, n int) {
var v *int64
switch state {
case StateSplit:
@@ -82,7 +102,12 @@ func (t *Tag) Inc(state State) {
case StateSynced:
v = &t.Synced
}
- atomic.AddInt64(v, 1)
+ atomic.AddInt64(v, int64(n))
+}
+
+// Inc increments the count for a state
+func (t *Tag) Inc(state State) {
+ t.IncN(state, 1)
}
// Get returns the count for a state on a tag
@@ -108,6 +133,32 @@ func (t *Tag) TotalCounter() int64 {
return atomic.LoadInt64(&t.Total)
}
+// WaitTillDone returns without error once the tag is complete
+// wrt the state given as argument
+// it returns an error if the context is done
+func (t *Tag) WaitTillDone(ctx context.Context, s State) error {
+ if t.Done(s) {
+ return nil
+ }
+ ticker := time.NewTicker(100 * time.Millisecond)
+ for {
+ select {
+ case <-ticker.C:
+ if t.Done(s) {
+ return nil
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+}
+
+// Done returns true if tag is complete wrt the state given as argument
+func (t *Tag) Done(s State) bool {
+ n, total, err := t.Status(s)
+ return err == nil && n == total
+}
+
// DoneSplit sets total count to SPLIT count and sets the associated swarm hash for this tag
// is meant to be called when splitter finishes for input streams of unknown size
func (t *Tag) DoneSplit(address Address) int64 {
@@ -168,9 +219,7 @@ func (tag *Tag) MarshalBinary() (data []byte, err error) {
n = binary.PutVarint(intBuffer, int64(len(tag.Address)))
buffer = append(buffer, intBuffer[:n]...)
-
- buffer = append(buffer, tag.Address...)
-
+ buffer = append(buffer, tag.Address[:]...)
buffer = append(buffer, []byte(tag.Name)...)
return buffer, nil
diff --git a/chunk/tag_test.go b/chunk/tag_test.go
index c57094a6b7..3e71bf3bba 100644
--- a/chunk/tag_test.go
+++ b/chunk/tag_test.go
@@ -18,6 +18,7 @@ package chunk
import (
"bytes"
+ "context"
"sync"
"testing"
"time"
@@ -137,12 +138,13 @@ func TestTagConcurrentIncrements(t *testing.T) {
// TestTagsMultipleConcurrentIncrements tests Inc calls concurrently
func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
ts := NewTags()
+ ctx := context.Background()
n := 100
wg := sync.WaitGroup{}
wg.Add(10 * 5 * n)
for i := 0; i < 10; i++ {
s := string([]byte{uint8(i)})
- tag, err := ts.Create(s, int64(n))
+ tag, err := ts.Create(ctx, s, int64(n))
if err != nil {
t.Fatal(err)
}
@@ -183,7 +185,7 @@ func TestTagsMultipleConcurrentIncrementsSyncMap(t *testing.T) {
// TestMarshallingWithAddr tests that marshalling and unmarshalling is done correctly when the
// tag Address (byte slice) contains some arbitrary value
func TestMarshallingWithAddr(t *testing.T) {
- tg := NewTag(111, "test/tag", 10)
+ tg := NewTag(context.Background(), 111, "test/tag", 10)
tg.Address = []byte{0, 1, 2, 3, 4, 5, 6}
for _, f := range allStates {
@@ -231,7 +233,7 @@ func TestMarshallingWithAddr(t *testing.T) {
// TestMarshallingNoAddress tests that marshalling and unmarshalling is done correctly
func TestMarshallingNoAddr(t *testing.T) {
- tg := NewTag(111, "test/tag", 10)
+ tg := NewTag(context.Background(), 111, "test/tag", 10)
for _, f := range allStates {
tg.Inc(f)
}
diff --git a/chunk/tags.go b/chunk/tags.go
index 6b216d2b5c..f965ad8aa0 100644
--- a/chunk/tags.go
+++ b/chunk/tags.go
@@ -25,27 +25,26 @@ import (
"time"
"github.com/ethersphere/swarm/sctx"
+ "github.com/ethersphere/swarm/spancontext"
)
// Tags hold tag information indexed by a unique random uint32
type Tags struct {
tags *sync.Map
- rng *rand.Rand
}
// NewTags creates a tags object
func NewTags() *Tags {
return &Tags{
tags: &sync.Map{},
- rng: rand.New(rand.NewSource(time.Now().Unix())),
}
}
// Create creates a new tag, stores it by the name and returns it
// it returns an error if the tag with this name already exists
-func (ts *Tags) Create(s string, total int64) (*Tag, error) {
+func (ts *Tags) Create(ctx context.Context, s string, total int64) (*Tag, error) {
t := &Tag{
- Uid: ts.rng.Uint32(),
+ Uid: rand.Uint32(),
Name: s,
StartedAt: time.Now(),
Total: total,
@@ -53,6 +52,8 @@ func (ts *Tags) Create(s string, total int64) (*Tag, error) {
if _, loaded := ts.tags.LoadOrStore(t.Uid, t); loaded {
return nil, errExists
}
+
+ t.ctx, t.span = spancontext.StartSpan(ctx, "new.upload.tag")
return t, nil
}
diff --git a/chunk/tags_test.go b/chunk/tags_test.go
index db9c16b85b..cd82787d74 100644
--- a/chunk/tags_test.go
+++ b/chunk/tags_test.go
@@ -16,13 +16,16 @@
package chunk
-import "testing"
+import (
+ "context"
+ "testing"
+)
func TestAll(t *testing.T) {
ts := NewTags()
-
- ts.Create("1", 1)
- ts.Create("2", 1)
+ ctx := context.Background()
+ ts.Create(ctx, "1", 1)
+ ts.Create(ctx, "2", 1)
all := ts.All()
@@ -38,7 +41,7 @@ func TestAll(t *testing.T) {
t.Fatalf("expected tag 1 Total to be 1 got %d", n)
}
- ts.Create("3", 1)
+ ts.Create(ctx, "3", 1)
all = ts.All()
if len(all) != 3 {
diff --git a/chunk/testing/tag.go b/chunk/testing/tag.go
index f785c51227..0dcfb20d87 100644
--- a/chunk/testing/tag.go
+++ b/chunk/testing/tag.go
@@ -23,12 +23,11 @@ import (
)
// CheckTag checks the first tag in the api struct to be in a certain state
-func CheckTag(t *testing.T, tag *chunk.Tag, split, stored, seen, total int64) {
+func CheckTag(t *testing.T, tag *chunk.Tag, split, stored, seen, sent, synced, total int64) {
t.Helper()
if tag == nil {
t.Fatal("no tag found")
}
-
tSplit := tag.Get(chunk.StateSplit)
if tSplit != split {
t.Fatalf("should have had split chunks, got %d want %d", tSplit, split)
@@ -44,6 +43,16 @@ func CheckTag(t *testing.T, tag *chunk.Tag, split, stored, seen, total int64) {
t.Fatalf("mismatch stored chunks, got %d want %d", tStored, stored)
}
+ tSent := tag.Get(chunk.StateSent)
+ if tStored != stored {
+ t.Fatalf("mismatch sent chunks, got %d want %d", tSent, sent)
+ }
+
+ tSynced := tag.Get(chunk.StateSynced)
+ if tStored != stored {
+ t.Fatalf("mismatch synced chunks, got %d want %d", tSynced, synced)
+ }
+
tTotal := tag.TotalCounter()
if tTotal != total {
t.Fatalf("mismatch total chunks, got %d want %d", tTotal, total)
diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go
index 5f0a045da6..d803e35d04 100644
--- a/cmd/swarm-smoke/upload_and_sync.go
+++ b/cmd/swarm-smoke/upload_and_sync.go
@@ -37,7 +37,6 @@ import (
"github.com/ethersphere/swarm/testutil"
"github.com/pborman/uuid"
"golang.org/x/sync/errgroup"
-
cli "gopkg.in/urfave/cli.v1"
)
@@ -208,6 +207,8 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
}
}
+ log.Trace("sync mode", "sync mode", syncMode)
+
if syncMode == "pullsync" || syncMode == "both" {
for _, maxProxHost := range maxProxHosts {
if allHostChunks[maxProxHost][i] == '0' {
diff --git a/network/kademlia.go b/network/kademlia.go
index 6af58d6a74..94b2ce47c2 100644
--- a/network/kademlia.go
+++ b/network/kademlia.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/pot"
sv "github.com/ethersphere/swarm/version"
@@ -568,6 +569,33 @@ func (k *Kademlia) callable(e *entry) bool {
return true
}
+// IsClosestTo returns true if self is the closest peer to addr among filtered peers
+// ie. return false iff there is a peer that
+// - filter(bzzpeer) == true AND
+// - pot.DistanceCmp(addr, peeraddress, selfaddress) == 1
+func (k *Kademlia) IsClosestTo(addr []byte, filter func(*BzzPeer) bool) (closest bool) {
+ myPo := chunk.Proximity(addr, k.BaseAddr())
+ // iterate connection in kademlia
+ closest = true
+ k.EachConn(addr, 255, func(p *Peer, po int) bool {
+ if !filter(p.BzzPeer) {
+ return true
+ }
+ if po != myPo {
+ closest = po < myPo
+ return false
+ }
+ // if proximity order of closest PO nodes equal our own,
+ // then use XOR-based DistanceCmp and return if self is not closest
+ if d, _ := pot.DistanceCmp(addr, p.Over(), k.BaseAddr()); d == 1 {
+ closest = false
+ return false
+ }
+ return true
+ })
+ return closest
+}
+
// BaseAddr return the kademlia base address
func (k *Kademlia) BaseAddr() []byte {
return k.base
diff --git a/network_test.go b/network_test.go
index 4001eb274f..91fbcb452f 100644
--- a/network_test.go
+++ b/network_test.go
@@ -29,9 +29,6 @@ import (
"testing"
"time"
- "github.com/ethersphere/swarm/sctx"
- "github.com/ethersphere/swarm/testutil"
-
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
@@ -39,7 +36,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/api"
"github.com/ethersphere/swarm/network/simulation"
+ "github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/testutil"
)
var (
@@ -483,18 +482,18 @@ func retrieveF(
// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *api.API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
- tag, err := a.Tags.Create("unnamed-tag", 0)
+ tag, err := a.Tags.Create(ctx, "unnamed-tag", 0)
log.Trace("created new tag", "uid", tag.Uid)
- cCtx := sctx.SetTag(ctx, tag.Uid)
- key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
+ ctx = sctx.SetTag(ctx, tag.Uid)
+ key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
- key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
+ key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
diff --git a/pot/address.go b/pot/address.go
index d7bed6c46e..cc88c35d37 100644
--- a/pot/address.go
+++ b/pot/address.go
@@ -114,7 +114,7 @@ func DistanceCmp(a, x, y []byte) (int, error) {
if len(a) != len(x) || len(a) != len(y) {
return 0, errors.New("address length must match")
}
- return ProxCmp(a, x, y), nil
+ return ProxCmp(a, y, x), nil
}
// ProxCmp compares the distances x->a and y->a
diff --git a/pot/address_test.go b/pot/address_test.go
index 3c914fe84c..f33e374325 100644
--- a/pot/address_test.go
+++ b/pot/address_test.go
@@ -34,13 +34,13 @@ var (
x: hexutil.MustDecode("0x9100000000000000000000000000000000000000000000000000000000000000"),
y: hexutil.MustDecode("0x8200000000000000000000000000000000000000000000000000000000000000"),
z: hexutil.MustDecode("0x1200000000000000000000000000000000000000000000000000000000000000"),
- result: -1,
+ result: 1,
},
{
x: hexutil.MustDecode("0x9100000000000000000000000000000000000000000000000000000000000000"),
y: hexutil.MustDecode("0x1200000000000000000000000000000000000000000000000000000000000000"),
z: hexutil.MustDecode("0x8200000000000000000000000000000000000000000000000000000000000000"),
- result: 1,
+ result: -1,
},
{
x: hexutil.MustDecode("0x9100000000000000000000000000000000000000000000000000000000000000"),
diff --git a/pss/pss.go b/pss/pss.go
index fe04f56943..5e9b64a09e 100644
--- a/pss/pss.go
+++ b/pss/pss.go
@@ -37,7 +37,6 @@ import (
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pot"
"github.com/ethersphere/swarm/pss/crypto"
- "github.com/ethersphere/swarm/storage"
"golang.org/x/crypto/sha3"
)
@@ -269,7 +268,7 @@ func New(k *network.Kademlia, params *Params) (*Pss, error) {
ps.outbox = newOutbox(defaultOutboxCapacity, ps.quitC, ps.forward)
for i := 0; i < hasherCount; i++ {
- hashfunc := storage.MakeHashFunc(storage.DefaultHash)()
+ hashfunc := sha3.NewLegacyKeccak256()
ps.hashPool.Put(hashfunc)
}
@@ -486,7 +485,7 @@ func (p *Pss) handle(ctx context.Context, msg interface{}) error {
if pssmsg.isRaw() {
if capabilities, ok := p.getTopicHandlerCaps(psstopic); ok {
if !capabilities.raw {
- log.Debug("No handler for raw message", "topic", psstopic)
+ log.Warn("No handler for raw message", "topic", label(psstopic[:]))
return nil
}
}
@@ -569,7 +568,7 @@ func (p *Pss) executeHandlers(topic Topic, payload []byte, from PssAddress, raw
defer metrics.GetOrRegisterResettingTimer("pss.execute-handlers", nil).UpdateSince(time.Now())
handlers := p.getHandlers(topic)
- peer := p2p.NewPeer(enode.ID{}, fmt.Sprintf("%x", from), []p2p.Cap{})
+ peer := p2p.NewPeer(enode.ID{}, hex.EncodeToString(from), []p2p.Cap{})
for _, h := range handlers {
if !h.caps.raw && raw {
log.Warn("norawhandler")
@@ -593,7 +592,7 @@ func (p *Pss) isSelfRecipient(msg *PssMsg) bool {
// test match of leftmost bytes in given message to node's Kademlia address
func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
- local := p.Kademlia.BaseAddr()
+ local := p.BaseAddr()
// if a partial address matches we are possible recipient regardless of prox
// if not and prox is not set, we are surely not
@@ -604,7 +603,7 @@ func (p *Pss) isSelfPossibleRecipient(msg *PssMsg, prox bool) bool {
return false
}
- depth := p.Kademlia.NeighbourhoodDepth()
+ depth := p.NeighbourhoodDepth()
po, _ := network.Pof(p.Kademlia.BaseAddr(), msg.To, 0)
log.Trace("selfpossible", "po", po, "depth", depth)
@@ -734,7 +733,7 @@ func sendMsg(p *Pss, sp *network.Peer, msg *PssMsg) bool {
}
}
if !isPssEnabled {
- log.Error("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps)
+ log.Warn("peer doesn't have matching pss capabilities, skipping", "peer", info.Name, "caps", info.Caps, "peer", label(sp.BzzAddr.Address()))
return false
}
@@ -769,7 +768,7 @@ func (p *Pss) forward(msg *PssMsg) error {
sent := 0 // number of successful sends
to := make([]byte, addressLength)
copy(to[:len(msg.To)], msg.To)
- neighbourhoodDepth := p.Kademlia.NeighbourhoodDepth()
+ neighbourhoodDepth := p.NeighbourhoodDepth()
// luminosity is the opposite of darkness. the more bytes are removed from the address, the higher is darkness,
// but the luminosity is less. here luminosity equals the number of bits given in the destination address.
@@ -794,7 +793,7 @@ func (p *Pss) forward(msg *PssMsg) error {
onlySendOnce = true
}
- p.Kademlia.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
+ p.EachConn(to, addressLength*8, func(sp *network.Peer, po int) bool {
if po < broadcastThreshold && sent > 0 {
return false // stop iterating
}
@@ -838,7 +837,14 @@ func (p *Pss) cleanFwdCache() {
}
func label(b []byte) string {
- return fmt.Sprintf("%04x", b[:2])
+ if len(b) == 0 {
+ return "-"
+ }
+ l := 2
+ if len(b) == 1 {
+ l = 1
+ }
+ return fmt.Sprintf("%04x", b[:l])
}
// add a message to the cache
@@ -848,10 +854,11 @@ func (p *Pss) addFwdCache(msg *PssMsg) error {
var entry cacheEntry
var ok bool
+ digest := p.msgDigest(msg)
+
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
- digest := p.msgDigest(msg)
if entry, ok = p.fwdCache[digest]; !ok {
entry = cacheEntry{}
}
@@ -862,14 +869,15 @@ func (p *Pss) addFwdCache(msg *PssMsg) error {
// check if message is in the cache
func (p *Pss) checkFwdCache(msg *PssMsg) bool {
+ digest := p.msgDigest(msg)
+
p.fwdCacheMu.Lock()
defer p.fwdCacheMu.Unlock()
- digest := p.msgDigest(msg)
entry, ok := p.fwdCache[digest]
if ok {
if entry.expiresAt.After(time.Now()) {
- log.Trace("unexpired cache", "digest", fmt.Sprintf("%x", digest))
+ log.Trace("unexpired cache", "self", label(p.BaseAddr()), "digest", label(digest[:]), "to", label(msg.To))
metrics.GetOrRegisterCounter("pss.checkfwdcache.unexpired", nil).Inc(1)
return true
}
diff --git a/pss/pubsub.go b/pss/pubsub.go
new file mode 100644
index 0000000000..decb4819f5
--- /dev/null
+++ b/pss/pubsub.go
@@ -0,0 +1,69 @@
+// Copyright 2019 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package pss
+
+import (
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethersphere/swarm/network"
+)
+
+// PubSub implements the pushsync.PubSub interface using pss
+type PubSub struct {
+ pss *Pss
+}
+
+// NewPubSub creates a new PubSub
+func NewPubSub(p *Pss) *PubSub {
+ return &PubSub{
+ pss: p,
+ }
+}
+
+// BaseAddr returns Kademlia base address
+func (p *PubSub) BaseAddr() []byte {
+ return p.pss.BaseAddr()
+}
+
+func isPssPeer(bp *network.BzzPeer) bool {
+ return bp.HasCap(protocolName)
+}
+
+// IsClosestTo returns true is self is the closest known node to addr
+// as uniquely defined by the MSB XOR distance
+// among pss capable peers
+func (p *PubSub) IsClosestTo(addr []byte) bool {
+ return p.pss.IsClosestTo(addr, isPssPeer)
+}
+
+// Register registers a handler
+func (p *PubSub) Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func() {
+ f := func(msg []byte, peer *p2p.Peer, _ bool, _ string) error {
+ return handler(msg, peer)
+ }
+ h := NewHandler(f).WithRaw()
+ if prox {
+ h = h.WithProxBin()
+ }
+ pt := BytesToTopic([]byte(topic))
+ return p.pss.Register(&pt, h)
+}
+
+// Send sends a message using pss SendRaw
+func (p *PubSub) Send(to []byte, topic string, msg []byte) error {
+ pt := BytesToTopic([]byte(topic))
+ return p.pss.SendRaw(PssAddress(to), pt, msg)
+}
diff --git a/pushsync/protocol.go b/pushsync/protocol.go
new file mode 100644
index 0000000000..1a1b700948
--- /dev/null
+++ b/pushsync/protocol.go
@@ -0,0 +1,89 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "crypto/rand"
+ "io"
+
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ pssChunkTopic = "PUSHSYNC_CHUNKS" // pss topic for chunks
+ pssReceiptTopic = "PUSHSYNC_RECEIPTS" // pss topic for statement of custody receipts
+)
+
+// PubSub is a Postal Service interface needed to send/receive chunks and receipts for push syncing
+type PubSub interface {
+ Register(topic string, prox bool, handler func(msg []byte, p *p2p.Peer) error) func()
+ Send(to []byte, topic string, msg []byte) error
+ BaseAddr() []byte
+ IsClosestTo([]byte) bool
+}
+
+// chunkMsg is the message construct to send chunks to their local neighbourhood
+type chunkMsg struct {
+ Addr []byte // chunk address
+ Data []byte // chunk data
+ Origin []byte // originator - need this for sending receipt back to origin
+ Nonce []byte // nonce to make multiple instances of send immune to deduplication cache
+}
+
+// receiptMsg is a statement of custody response to receiving a push-synced chunk
+// it is currently a notification only (contains no proof) sent to the originator
+// Nonce is there to make multiple responses immune to deduplication cache
+type receiptMsg struct {
+ Addr []byte // chunk address
+ Nonce []byte // nonce to make multiple instances of send immune to deduplication cache
+}
+
+func decodeChunkMsg(msg []byte) (*chunkMsg, error) {
+ var chmsg chunkMsg
+ err := rlp.DecodeBytes(msg, &chmsg)
+ if err != nil {
+ return nil, err
+ }
+ return &chmsg, nil
+}
+
+func decodeReceiptMsg(msg []byte) (*receiptMsg, error) {
+ var rmsg receiptMsg
+ err := rlp.DecodeBytes(msg, &rmsg)
+ if err != nil {
+ return nil, err
+ }
+ return &rmsg, nil
+}
+
+// newNonce creates a random nonce;
+// even without POC it is important otherwise resending a chunk is deduplicated by pss
+func newNonce() []byte {
+ buf := make([]byte, 32)
+ io.ReadFull(rand.Reader, buf)
+ return buf
+}
+
+func label(b []byte) string {
+ l := len(b)
+ if l > 8 {
+ l = 8
+ }
+ return hexutil.Encode(b[:l])
+}
diff --git a/pushsync/protocol_test.go b/pushsync/protocol_test.go
new file mode 100644
index 0000000000..b51c23ffb6
--- /dev/null
+++ b/pushsync/protocol_test.go
@@ -0,0 +1,113 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "encoding/binary"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+)
+
+// TestProtocol tests the push sync protocol
+// push syncer node communicate with storers via mock loopback PubSub
+func TestProtocol(t *testing.T) {
+ timeout := 10 * time.Second
+ chunkCnt := 1024
+ tagCnt := 4
+ storerCnt := 4
+
+ sent := &sync.Map{}
+ store := &sync.Map{}
+ // mock pubsub messenger
+ lb := newLoopBack()
+
+ // set up a number of storers
+ storers := make([]*Storer, storerCnt)
+ for i := 0; i < storerCnt; i++ {
+ // every chunk is closest to exactly one storer
+ j := i
+ isClosestTo := func(addr []byte) bool {
+ n := int(binary.BigEndian.Uint64(addr[:8]))
+ log.Debug("closest node?", "n", n, "n%storerCnt", n%storerCnt, "storer", j)
+ return n%storerCnt == j
+ }
+ storers[j] = NewStorer(&testStore{store}, &testPubSub{lb, isClosestTo})
+ }
+
+ tags, tagIDs := setupTags(chunkCnt, tagCnt)
+ // construct the mock push sync index iterator
+ tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
+ // isClosestTo function mocked
+ isClosestTo := func([]byte) bool { return false }
+ // start push syncing in a go routine
+ p := NewPusher(tp, &testPubSub{lb, isClosestTo}, tags)
+ defer p.Close()
+
+ synced := make(map[int]int)
+ for {
+ select {
+ // receive indexes on synced channel when a chunk is set as synced
+ case idx := <-tp.synced:
+ n := synced[idx]
+ synced[idx] = n + 1
+ case <-time.After(timeout):
+ t.Fatalf("timeout waiting for all chunks to be synced")
+ }
+ // all chunks set as synced
+ if len(synced) == chunkCnt {
+ expTotal := int64(chunkCnt / tagCnt)
+ checkTags(t, expTotal, tagIDs[:tagCnt-1], tags)
+ for i := uint64(0); i < uint64(chunkCnt); i++ {
+ if n := synced[int(i)]; n != 1 {
+ t.Fatalf("expected to receive exactly 1 receipt for chunk %v, got %v", i, n)
+ }
+ v, ok := store.Load(i)
+ if !ok {
+ t.Fatalf("chunk %v not stored", i)
+ }
+ if cnt := *(v.(*uint32)); cnt < uint32(storerCnt) {
+ t.Fatalf("chunk %v expected to be saved at least %v times, got %v", i, storerCnt, cnt)
+ }
+ }
+ return
+ }
+ }
+}
+
+type testStore struct {
+ store *sync.Map
+}
+
+func (t *testStore) Put(_ context.Context, _ chunk.ModePut, chs ...chunk.Chunk) ([]bool, error) {
+ exists := make([]bool, len(chs))
+ for i, ch := range chs {
+ idx := binary.BigEndian.Uint64(ch.Address()[:8])
+ var storedCnt uint32 = 1
+ if v, loaded := t.store.LoadOrStore(idx, &storedCnt); loaded {
+ atomic.AddUint32(v.(*uint32), 1)
+ exists[i] = loaded
+ }
+ log.Debug("testStore put", "idx", idx)
+ }
+ return exists, nil
+}
diff --git a/pushsync/pusher.go b/pushsync/pusher.go
new file mode 100644
index 0000000000..830590e0e3
--- /dev/null
+++ b/pushsync/pusher.go
@@ -0,0 +1,353 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "encoding/hex"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/spancontext"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/opentracing/opentracing-go"
+ olog "github.com/opentracing/opentracing-go/log"
+)
+
+// DB interface implemented by localstore
+type DB interface {
+ // subscribe to chunk to be push synced - iterates from earliest to newest
+ SubscribePush(context.Context) (<-chan storage.Chunk, func())
+ // called to set a chunk as synced - and allow it to be garbage collected
+ // TODO this should take ... last argument to delete many in one batch
+ Set(context.Context, chunk.ModeSet, ...storage.Address) error
+}
+
+// Pusher takes care of the push syncing
+type Pusher struct {
+ store DB // localstore DB
+ tags *chunk.Tags // tags to update counts
+ quit chan struct{} // channel to signal quitting on all loops
+ closed chan struct{} // channel to signal sync loop terminated
+ pushed map[string]*pushedItem // cache of items push-synced
+ receipts chan []byte // channel to receive receipts
+ ps PubSub // PubSub interface to send chunks and receive receipts
+ logger log.Logger // custom logger
+ retryInterval time.Duration // dynamically adjusted time interval between retries
+}
+
+const maxMeasurements = 20000
+
+// pushedItem captures the info needed for the pusher about a chunk during the
+// push-sync--receipt roundtrip
+type pushedItem struct {
+ tag *chunk.Tag // tag for the chunk
+ shortcut bool // if the chunk receipt was sent by self
+ firstSentAt time.Time // first sent at time
+ lastSentAt time.Time // most recently sent at time
+ synced bool // set when chunk got synced
+ span opentracing.Span // roundtrip span
+}
+
+// NewPusher constructs a Pusher and starts up the push sync protocol
+// takes
+// - a DB interface to subscribe to push sync index to allow iterating over recently stored chunks
+// - a pubsub interface to send chunks and receive statements of custody
+// - tags that hold the tags
+func NewPusher(store DB, ps PubSub, tags *chunk.Tags) *Pusher {
+ p := &Pusher{
+ store: store,
+ tags: tags,
+ quit: make(chan struct{}),
+ closed: make(chan struct{}),
+ pushed: make(map[string]*pushedItem),
+ receipts: make(chan []byte),
+ ps: ps,
+ logger: log.New("self", label(ps.BaseAddr())),
+ retryInterval: 100 * time.Millisecond,
+ }
+ go p.sync()
+ return p
+}
+
+// Close closes the pusher
+func (p *Pusher) Close() {
+ close(p.quit)
+ select {
+ case <-p.closed:
+ case <-time.After(3 * time.Second):
+ log.Error("timeout closing pusher")
+ }
+}
+
+// sync starts a forever loop that pushes chunks to their neighbourhood
+// and receives receipts (statements of custody) for them.
+// chunks that are not acknowledged with a receipt are retried
+// not earlier than retryInterval after they were last pushed
+// the routine also updates counts of states on a tag in order
+// to monitor the proportion of saved, sent and synced chunks of
+// a file or collection
+func (p *Pusher) sync() {
+ var chunks <-chan chunk.Chunk
+ var unsubscribe func()
+ var syncedAddrs []storage.Address
+ defer close(p.closed)
+ // timer, initially set to 0 to fall through select case on timer.C for initialisation
+ timer := time.NewTimer(0)
+ defer timer.Stop()
+
+ // register handler for pssReceiptTopic on pss pubsub
+ deregister := p.ps.Register(pssReceiptTopic, false, func(msg []byte, _ *p2p.Peer) error {
+ return p.handleReceiptMsg(msg)
+ })
+ defer deregister()
+
+ chunksInBatch := -1
+ var batchStartTime time.Time
+ ctx := context.Background()
+
+ var average uint64 = 100000 // microseconds
+ var measurements uint64
+
+ for {
+ select {
+ // handle incoming chunks
+ case ch, more := <-chunks:
+ // if no more, set to nil, reset timer to 0 to finalise batch immediately
+ if !more {
+ chunks = nil
+ var dur time.Duration
+ if chunksInBatch == 0 {
+ dur = 500 * time.Millisecond
+ }
+ timer.Reset(dur)
+ break
+ }
+
+ chunksInBatch++
+ metrics.GetOrRegisterCounter("pusher.send-chunk", nil).Inc(1)
+ // if no need to sync this chunk then continue
+ if !p.needToSync(ch) {
+ break
+ }
+
+ metrics.GetOrRegisterCounter("pusher.send-chunk.send-to-sync", nil).Inc(1)
+ // send the chunk and ignore the error
+ go func(ch chunk.Chunk) {
+ if err := p.sendChunkMsg(ch); err != nil {
+ p.logger.Error("error sending chunk", "addr", ch.Address().Hex(), "err", err)
+ }
+ }(ch)
+
+ // handle incoming receipts
+ case addr := <-p.receipts:
+ hexaddr := hex.EncodeToString(addr)
+ p.logger.Trace("got receipt", "addr", hexaddr)
+ metrics.GetOrRegisterCounter("pusher.receipts.all", nil).Inc(1)
+ // ignore if already received receipt
+ item, found := p.pushed[hexaddr]
+ if !found {
+ metrics.GetOrRegisterCounter("pusher.receipts.not-found", nil).Inc(1)
+ p.logger.Trace("not wanted or already got... ignore", "addr", hexaddr)
+ break
+ }
+ if item.synced { // already got receipt in this same batch
+ metrics.GetOrRegisterCounter("pusher.receipts.already-synced", nil).Inc(1)
+ p.logger.Trace("just synced... ignore", "addr", hexaddr)
+ break
+ }
+ // increment synced count for the tag if exists
+ tag := item.tag
+ if tag != nil {
+ tag.Inc(chunk.StateSynced)
+ if tag.Done(chunk.StateSynced) {
+ p.logger.Debug("closing root span for tag", "taguid", tag.Uid, "tagname", tag.Name)
+ tag.FinishRootSpan()
+ }
+ // finish span for pushsync roundtrip, only have this span if we have a tag
+ item.span.Finish()
+ }
+
+ totalDuration := time.Since(item.firstSentAt)
+ metrics.GetOrRegisterResettingTimer("pusher.chunk.roundtrip", nil).Update(totalDuration)
+ metrics.GetOrRegisterCounter("pusher.receipts.synced", nil).Inc(1)
+
+ // calibrate retryInterval based on roundtrip times
+ measurements, average = p.updateRetryInterval(item, measurements, average)
+
+ // collect synced addresses and corresponding items to do subsequent batch operations
+ syncedAddrs = append(syncedAddrs, addr)
+ item.synced = true
+
+ // retry interval timer triggers starting from new
+ case <-timer.C:
+ // initially timer is set to go off as well as every time we hit the end of push index
+ // so no wait for retryInterval needed to set items synced
+ metrics.GetOrRegisterCounter("pusher.subscribe-push", nil).Inc(1)
+ // if subscribe was running, stop it
+ if unsubscribe != nil {
+ unsubscribe()
+ }
+
+ // delete from pushed items
+ for i := 0; i < len(syncedAddrs); i++ {
+ delete(p.pushed, syncedAddrs[i].Hex())
+ }
+ // set chunk status to synced, insert to db GC index
+ if err := p.store.Set(ctx, chunk.ModeSetSync, syncedAddrs...); err != nil {
+ log.Error("pushsync: error setting chunks to synced", "err", err)
+ }
+
+ // reset synced list
+ syncedAddrs = nil
+
+ // we don't want to record the first iteration
+ if chunksInBatch != -1 {
+ // this measurement is not a timer, but we want a histogram, so it fits the data structure
+ metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.hist", nil).Update(time.Duration(chunksInBatch))
+ metrics.GetOrRegisterResettingTimer("pusher.subscribe-push.chunks-in-batch.time", nil).UpdateSince(batchStartTime)
+ metrics.GetOrRegisterCounter("pusher.subscribe-push.chunks-in-batch", nil).Inc(int64(chunksInBatch))
+ }
+ chunksInBatch = 0
+ batchStartTime = time.Now()
+
+ // and start iterating on Push index from the beginning
+ chunks, unsubscribe = p.store.SubscribePush(ctx)
+ // reset timer to go off after retryInterval
+ timer.Reset(p.retryInterval)
+
+ case <-p.quit:
+ if unsubscribe != nil {
+ unsubscribe()
+ }
+ return
+ }
+ }
+}
+
+// handleReceiptMsg is a handler for pssReceiptTopic that
+// - deserialises receiptMsg and
+// - sends the receipted address on a channel
+func (p *Pusher) handleReceiptMsg(msg []byte) error {
+ receipt, err := decodeReceiptMsg(msg)
+ if err != nil {
+ return err
+ }
+ p.logger.Trace("Handler", "receipt", label(receipt.Addr))
+ go p.pushReceipt(receipt.Addr)
+ return nil
+}
+
+// pushReceipt just inserts the address into the channel
+func (p *Pusher) pushReceipt(addr []byte) {
+ select {
+ case p.receipts <- addr:
+ case <-p.quit:
+ }
+}
+
+// sendChunkMsg sends chunks to their destination
+// using the PubSub interface Send method (e.g., pss neighbourhood addressing)
+func (p *Pusher) sendChunkMsg(ch chunk.Chunk) error {
+ rlpTimer := time.Now()
+
+ cmsg := &chunkMsg{
+ Origin: p.ps.BaseAddr(),
+ Addr: ch.Address(),
+ Data: ch.Data(),
+ Nonce: newNonce(),
+ }
+ msg, err := rlp.EncodeToBytes(cmsg)
+ if err != nil {
+ return err
+ }
+ p.logger.Trace("send chunk", "addr", label(ch.Address()))
+
+ metrics.GetOrRegisterResettingTimer("pusher.send.chunk.rlp", nil).UpdateSince(rlpTimer)
+
+ defer metrics.GetOrRegisterResettingTimer("pusher.send.chunk.pss", nil).UpdateSince(time.Now())
+ return p.ps.Send(ch.Address()[:], pssChunkTopic, msg)
+}
+
+// needToSync checks if a chunk needs to be push-synced:
+// * if not sent yet OR
+// * if sent but more than retryInterval ago, so need resend OR
+// * if self is closest node to chunk TODO: and not light node
+// in this case send receipt to self to trigger synced state on chunk
+func (p *Pusher) needToSync(ch chunk.Chunk) bool {
+ item, found := p.pushed[ch.Address().Hex()]
+ now := time.Now()
+ // has been pushed already
+ if found {
+ // has synced already since subscribe called
+ if item.synced {
+ return false
+ }
+ item.lastSentAt = now
+ } else {
+ // first time encountered
+ addr := ch.Address()
+ hexaddr := addr.Hex()
+ // remember item
+ tag, _ := p.tags.Get(ch.TagID())
+ item = &pushedItem{
+ tag: tag,
+ firstSentAt: now,
+ lastSentAt: now,
+ }
+
+ // increment SENT count on tag if it exists
+ if tag != nil {
+ tag.Inc(chunk.StateSent)
+ // opentracing for chunk roundtrip
+ _, span := spancontext.StartSpan(tag.Context(), "chunk.sent")
+ span.LogFields(olog.String("ref", hexaddr))
+ span.SetTag("addr", hexaddr)
+ item.span = span
+ }
+
+ // remember the item
+ p.pushed[hexaddr] = item
+ if p.ps.IsClosestTo(addr) {
+ p.logger.Trace("self is closest to ref: push receipt locally", "ref", hexaddr)
+ item.shortcut = true
+ go p.pushReceipt(addr)
+ return false
+ }
+ p.logger.Trace("self is not the closest to ref: send chunk to neighbourhood", "ref", hexaddr)
+ }
+ return true
+}
+
+// updateRetryInterval calibrates the period after which push index iterator restart from the beginning
+func (p *Pusher) updateRetryInterval(item *pushedItem, measurements uint64, average uint64) (uint64, uint64) {
+ if !item.shortcut { // only real network roundtrips counted, no shortcuts
+ roundtripDuration := time.Since(item.lastSentAt)
+ measurement := uint64(roundtripDuration) / 1000 // in microseconds
+ // recalculate average
+ average = (measurements*average + measurement) / (measurements + 1)
+ if measurement < maxMeasurements {
+ measurements++
+ }
+ p.retryInterval = time.Duration(average*2) * time.Microsecond
+ }
+ return measurements, average
+}
diff --git a/pushsync/pusher_test.go b/pushsync/pusher_test.go
new file mode 100644
index 0000000000..f032b4d6f3
--- /dev/null
+++ b/pushsync/pusher_test.go
@@ -0,0 +1,309 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "encoding/binary"
+ "encoding/hex"
+ "fmt"
+ "math/rand"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ chunktesting "github.com/ethersphere/swarm/chunk/testing"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/testutil"
+)
+
+func init() {
+ testutil.Init()
+}
+
+// TestPusher tests the correct behaviour of Pusher
+// in the context of inserting n chunks
+// receipt response model: the pushed chunk's receipt is sent back
+// after a random delay
+// The test checks:
+// - if sync function is called on chunks in order of insertion (FIFO)
+// - already synced chunks are not resynced
+// - if no more data inserted, the db is emptied shortly
+func TestPusher(t *testing.T) {
+ timeout := 10 * time.Second
+ chunkCnt := 1024
+ tagCnt := 4
+
+ errc := make(chan error)
+ sent := &sync.Map{}
+ synced := make(map[int]int)
+ quit := make(chan struct{})
+ defer close(quit)
+
+ errf := func(s string, vals ...interface{}) {
+ select {
+ case errc <- fmt.Errorf(s, vals...):
+ case <-quit:
+ }
+ }
+
+ lb := newLoopBack()
+
+ respond := func(msg []byte, _ *p2p.Peer) error {
+ chmsg, err := decodeChunkMsg(msg)
+ if err != nil {
+ errf("error decoding chunk message: %v", err)
+ return nil
+ }
+ // check outgoing chunk messages
+ idx := int(binary.BigEndian.Uint64(chmsg.Addr[:8]))
+ // respond ~ mock storer protocol
+ receipt := &receiptMsg{Addr: chmsg.Addr}
+ rmsg, err := rlp.EncodeToBytes(receipt)
+ if err != nil {
+ errf("error encoding receipt message: %v", err)
+ }
+ log.Debug("store chunk, send receipt", "idx", idx)
+ err = lb.Send(chmsg.Origin, pssReceiptTopic, rmsg)
+ if err != nil {
+ errf("error sending receipt message: %v", err)
+ }
+ return nil
+ }
+ // register the respond function
+ lb.Register(pssChunkTopic, false, respond)
+ tags, tagIDs := setupTags(chunkCnt, tagCnt)
+ // construct the mock push sync index iterator
+ tp := newTestPushSyncIndex(chunkCnt, tagIDs, tags, sent)
+ // start push syncing in a go routine
+ p := NewPusher(tp, &testPubSub{lb, func([]byte) bool { return false }}, tags)
+ defer p.Close()
+ // collect synced chunks until all chunks synced
+ // wait on errc for errors on any thread
+ // otherwise time out
+ for {
+ select {
+ case i := <-tp.synced:
+ n := synced[i]
+ synced[i] = n + 1
+ if len(synced) == chunkCnt {
+ expTotal := int64(chunkCnt / tagCnt)
+ checkTags(t, expTotal, tagIDs[:tagCnt-1], tags)
+ return
+ }
+ case err := <-errc:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-time.After(timeout):
+ t.Fatalf("timeout waiting for all chunks to be synced")
+ }
+ }
+
+}
+
+type testPubSub struct {
+ *loopBack
+ isClosestTo func([]byte) bool
+}
+
+var testBaseAddr = make([]byte, 32)
+
+// BaseAddr needed to implement PubSub interface
+// in the testPubSub, this address has no relevant and is given only for logging
+func (tps *testPubSub) BaseAddr() []byte {
+ return testBaseAddr
+}
+
+// IsClosestTo needed to implement PubSub interface
+func (tps *testPubSub) IsClosestTo(addr []byte) bool {
+ return tps.isClosestTo(addr)
+}
+
+// loopback implements PubSub as a central subscription engine,
+// ie a msg sent is received by all handlers registered for the topic
+type loopBack struct {
+ handlers map[string][]func(msg []byte, p *p2p.Peer) error
+}
+
+func newLoopBack() *loopBack {
+ return &loopBack{
+ handlers: make(map[string][]func(msg []byte, p *p2p.Peer) error),
+ }
+}
+
+// Register subscribes to a topic with a handler
+func (lb *loopBack) Register(topic string, _ bool, handler func(msg []byte, p *p2p.Peer) error) func() {
+ lb.handlers[topic] = append(lb.handlers[topic], handler)
+ return func() {}
+}
+
+// Send publishes a msg with a topic and directly calls registered handlers with
+// that topic
+func (lb *loopBack) Send(to []byte, topic string, msg []byte) error {
+ if !delayResponse() {
+ return nil
+ }
+ return lb.send(to, topic, msg)
+}
+
+func (lb *loopBack) send(to []byte, topic string, msg []byte) error {
+ p := p2p.NewPeer(enode.ID{}, "", nil)
+ for _, handler := range lb.handlers[topic] {
+ log.Debug("handling message", "topic", topic, "to", hex.EncodeToString(to))
+ if err := handler(msg, p); err != nil {
+ log.Error("error handling message", "topic", topic, "to", hex.EncodeToString(to))
+ return err
+ }
+ }
+ return nil
+}
+
+// testPushSyncIndex mocks localstore and provides subscription and setting synced status
+// it implements the DB interface
+type testPushSyncIndex struct {
+ i, total int
+ tagIDs []uint32 //
+ tags *chunk.Tags
+ sent *sync.Map // to store time of send for retry
+ synced chan int // to check if right amount of chunks
+}
+
+func newTestPushSyncIndex(chunkCnt int, tagIDs []uint32, tags *chunk.Tags, sent *sync.Map) *testPushSyncIndex {
+ return &testPushSyncIndex{
+ i: 0,
+ total: chunkCnt,
+ tagIDs: tagIDs,
+ tags: tags,
+ sent: sent,
+ synced: make(chan int),
+ }
+}
+
+// SubscribePush allows iteration on the hashes and mocks the behaviour of localstore
+// push index
+// we keep track of an index so that each call to SubscribePush knows where to start
+// generating the new fake hashes
+// Before the new fake hashes it iterates over hashes not synced yet
+func (tp *testPushSyncIndex) SubscribePush(context.Context) (<-chan storage.Chunk, func()) {
+ chunks := make(chan storage.Chunk)
+ tagCnt := len(tp.tagIDs)
+ quit := make(chan struct{})
+ stop := func() { close(quit) }
+ go func() {
+ // feed fake chunks into the db, hashes encode the order so that
+ // it can be traced
+ feed := func(i int) bool {
+ // generate fake hashes that encode the chunk order
+ addr := make([]byte, 32)
+ binary.BigEndian.PutUint64(addr, uint64(i))
+ tagID := tp.tagIDs[i%tagCnt]
+ // remember when the chunk was put
+ // if sent again, dont modify the time
+ _, loaded := tp.sent.LoadOrStore(i, time.Now())
+ if !loaded {
+ // increment stored count on tag
+ if tag, _ := tp.tags.Get(tagID); tag != nil {
+ tag.Inc(chunk.StateStored)
+ }
+ }
+ tp.sent.Store(i, time.Now())
+ select {
+ // chunks have no data and belong to tag i%tagCount
+ case chunks <- storage.NewChunk(addr, nil).WithTagID(tagID):
+ return true
+ case <-quit:
+ return false
+ }
+ }
+ // push the chunks already pushed but not yet synced
+ tp.sent.Range(func(k, _ interface{}) bool {
+ log.Debug("resending", "idx", k)
+ return feed(k.(int))
+ })
+ // generate the new chunks from tp.i
+ for tp.i < tp.total && feed(tp.i) {
+ tp.i++
+ }
+ log.Debug("sent chunks", "sent", tp.i, "total", tp.total)
+ close(chunks)
+ }()
+ return chunks, stop
+}
+
+func (tp *testPushSyncIndex) Set(ctx context.Context, _ chunk.ModeSet, addrs ...storage.Address) error {
+ for _, addr := range addrs {
+ idx := int(binary.BigEndian.Uint64(addr[:8]))
+ tp.sent.Delete(idx)
+ tp.synced <- idx
+ log.Debug("set chunk synced", "idx", idx, "addr", addr)
+ }
+ return nil
+}
+
+var (
+ maxDelay = 210 // max delay in millisecond
+ minDelay = 1 // min delay in millisecond
+ retentionLimit = 200 // ~5% of msg lost
+)
+
+// delayResponse when called mock connection/throughput
+func delayResponse() bool {
+ delay := rand.Intn(maxDelay) + minDelay
+ time.Sleep(time.Duration(delay) * time.Millisecond)
+ return delay < retentionLimit
+}
+
+// setupTags constructs tags object create tagCnt - 1 tags
+// the sequential fake chunk i will be tagged with i%tagCnt
+func setupTags(chunkCnt, tagCnt int) (tags *chunk.Tags, tagIDs []uint32) {
+ // construct tags object
+ tags = chunk.NewTags()
+ // all but one tag is created
+ for i := 0; i < tagCnt-1; i++ {
+ tags.Create(context.Background(), "", int64(chunkCnt/tagCnt))
+ }
+ // extract tag ids
+ tags.Range(func(k, _ interface{}) bool {
+ tagIDs = append(tagIDs, k.(uint32))
+ return true
+ })
+ // add an extra for which no tag exists
+ return tags, append(tagIDs, 0)
+}
+
+func checkTags(t *testing.T, expTotal int64, tagIDs []uint32, tags *chunk.Tags) {
+ t.Helper()
+ for _, tagID := range tagIDs {
+ tag, err := tags.Get(tagID)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // the tag is adjusted after the store.Set calls show
+ err = tag.WaitTillDone(context.Background(), chunk.StateSynced)
+ if err != nil {
+ t.Fatalf("error waiting for syncing on tag %v: %v", tag.Uid, err)
+ }
+
+ chunktesting.CheckTag(t, tag, 0, expTotal, 0, expTotal, expTotal, expTotal)
+ }
+}
diff --git a/pushsync/simulation_test.go b/pushsync/simulation_test.go
new file mode 100644
index 0000000000..bf50748cd4
--- /dev/null
+++ b/pushsync/simulation_test.go
@@ -0,0 +1,280 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "encoding/hex"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
+ "github.com/ethersphere/swarm/network"
+ "github.com/ethersphere/swarm/network/retrieval"
+ "github.com/ethersphere/swarm/network/simulation"
+ "github.com/ethersphere/swarm/pss"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/storage/localstore"
+ "golang.org/x/sync/errgroup"
+)
+
+var (
+ bucketKeyPushSyncer = simulation.BucketKey("pushsyncer")
+ bucketKeyNetStore = simulation.BucketKey("netstore")
+)
+
+var (
+ nodeCntFlag = flag.Int("nodes", 4, "number of nodes in simulation")
+ chunkCntFlag = flag.Int("chunks", 4, "number of chunks per upload in simulation")
+ testCasesFlag = flag.Int("cases", 4, "number of concurrent upload-download cases to test in simulation")
+)
+
+// test syncer using pss
+// the test
+// * creates a simulation with connectivity loaded from a snapshot
+// * for each test case, two nodes are chosen randomly, an uploader and a downloader
+// * uploader uploads a number of chunks
+// * wait until the uploaded chunks are synced
+// * downloader downloads the chunk
+// Testcases are run concurrently
+func TestPushsyncSimulation(t *testing.T) {
+ nodeCnt := *nodeCntFlag
+ chunkCnt := *chunkCntFlag
+ testcases := *testCasesFlag
+
+ err := testSyncerWithPubSub(nodeCnt, chunkCnt, testcases, newServiceFunc)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func testSyncerWithPubSub(nodeCnt, chunkCnt, testcases int, sf simulation.ServiceFunc) error {
+ sim := simulation.NewInProc(map[string]simulation.ServiceFunc{
+ "pushsync": sf,
+ })
+ defer sim.Close()
+
+ ctx := context.Background()
+ snapCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+ err := sim.UploadSnapshot(snapCtx, filepath.Join("../network/stream/testdata", fmt.Sprintf("snapshot_%d.json", nodeCnt)))
+ if err != nil {
+ return fmt.Errorf("error while loading snapshot: %v", err)
+ }
+
+ start := time.Now()
+ log.Info("Snapshot loaded. Simulation starting", "at", start)
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
+ errc := make(chan error)
+ for j := 0; j < testcases; j++ {
+ j := j
+ go func() {
+ err := uploadAndDownload(ctx, sim, nodeCnt, chunkCnt, j)
+ select {
+ case errc <- err:
+ case <-ctx.Done():
+ }
+ }()
+ }
+ i := 0
+ for err := range errc {
+ if err != nil {
+ return err
+ }
+ i++
+ if i >= testcases {
+ return nil
+ }
+ }
+ return nil
+ })
+
+ if result.Error != nil {
+ return fmt.Errorf("simulation error: %v", result.Error)
+ }
+ log.Info("simulation", "duration", time.Since(start))
+ return nil
+}
+
+// pickNodes selects 2 distinct
+func pickNodes(n int) (i, j int) {
+ i = rand.Intn(n)
+ j = rand.Intn(n - 1)
+ if j >= i {
+ j++
+ }
+ return
+}
+
+func uploadAndDownload(ctx context.Context, sim *simulation.Simulation, nodeCnt, chunkCnt, i int) error {
+ // chose 2 random nodes as uploader and downloader
+ u, d := pickNodes(nodeCnt)
+ // setup uploader node
+ uid := sim.UpNodeIDs()[u]
+ p := sim.MustNodeItem(uid, bucketKeyPushSyncer).(*Pusher)
+ // setup downloader node
+ did := sim.UpNodeIDs()[d]
+ // the created tag indicates the uploader and downloader nodes
+ tagname := fmt.Sprintf("tag-%v-%v-%d", label(uid[:]), label(did[:]), i)
+ log.Debug("uploading", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+ tag, ref, err := upload(ctx, p.store.(*localstore.DB), p.tags, tagname, chunkCnt)
+ if err != nil {
+ return err
+ }
+ log.Debug("uploaded", "peer", uid, "chunks", chunkCnt, "tagname", tagname)
+
+ // wait till pushsync is done
+ syncTimeout := 30 * time.Second
+ sctx, cancel := context.WithTimeout(ctx, syncTimeout)
+ defer cancel()
+ err = tag.WaitTillDone(sctx, chunk.StateSynced)
+ if err != nil {
+ log.Debug("tag", "tag", tag)
+ return fmt.Errorf("error waiting syncing: %v", err)
+ }
+
+ log.Debug("downloading", "peer", did, "chunks", chunkCnt, "tagname", tagname)
+ netstore := sim.MustNodeItem(did, bucketKeyNetStore).(*storage.NetStore)
+ err = download(ctx, netstore, ref)
+ log.Debug("downloaded", "peer", did, "chunks", chunkCnt, "tagname", tagname, "err", err)
+ return err
+}
+
+// newServiceFunc constructs a minimal service needed for a simulation test for Push Sync, namely:
+// localstore, netstore, stream and pss
+func newServiceFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (node.Service, func(), error) {
+ // setup localstore
+ n := ctx.Config.Node()
+ addr := network.NewAddr(n)
+ dir, err := ioutil.TempDir("", "pushsync-test")
+ if err != nil {
+ return nil, nil, err
+ }
+ lstore, err := localstore.New(dir, addr.Over(), nil)
+ if err != nil {
+ os.RemoveAll(dir)
+ return nil, nil, err
+ }
+ // setup netstore
+ netStore := storage.NewNetStore(lstore, addr.Over(), n.ID())
+
+ // setup pss
+ kadParams := network.NewKadParams()
+ kad := network.NewKademlia(addr.Over(), kadParams)
+ bucket.Store(simulation.BucketKeyKademlia, kad)
+
+ privKey, err := crypto.GenerateKey()
+ pssp := pss.NewParams().WithPrivateKey(privKey)
+ ps, err := pss.New(kad, pssp)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ bucket.Store(bucketKeyNetStore, netStore)
+
+ r := retrieval.New(kad, netStore, kad.BaseAddr())
+
+ pubSub := pss.NewPubSub(ps)
+ // setup pusher
+ p := NewPusher(lstore, pubSub, chunk.NewTags())
+ bucket.Store(bucketKeyPushSyncer, p)
+
+ // setup storer
+ s := NewStorer(netStore, pubSub)
+
+ cleanup := func() {
+ p.Close()
+ s.Close()
+ netStore.Close()
+ os.RemoveAll(dir)
+ }
+
+ return &RetrievalAndPss{r, ps}, cleanup, nil
+}
+
+// implements the node.Service interface
+type RetrievalAndPss struct {
+ retrieval *retrieval.Retrieval
+ pss *pss.Pss
+}
+
+func (s *RetrievalAndPss) APIs() []rpc.API {
+ return nil
+}
+
+func (s *RetrievalAndPss) Protocols() []p2p.Protocol {
+ return append(s.retrieval.Protocols(), s.pss.Protocols()...)
+}
+
+func (s *RetrievalAndPss) Start(srv *p2p.Server) error {
+ err := s.retrieval.Start(srv)
+ if err != nil {
+ return err
+ }
+ return s.pss.Start(srv)
+}
+
+func (s *RetrievalAndPss) Stop() error {
+ err := s.retrieval.Stop()
+ if err != nil {
+ return err
+ }
+ return s.pss.Stop()
+}
+
+func upload(ctx context.Context, store Store, tags *chunk.Tags, tagname string, n int) (tag *chunk.Tag, addrs []storage.Address, err error) {
+ tag, err = tags.Create(ctx, tagname, int64(n))
+ if err != nil {
+ return nil, nil, err
+ }
+ for i := 0; i < n; i++ {
+ ch := storage.GenerateRandomChunk(int64(chunk.DefaultSize))
+ addrs = append(addrs, ch.Address())
+ _, err := store.Put(ctx, chunk.ModePutUpload, ch.WithTagID(tag.Uid))
+ if err != nil {
+ return nil, nil, err
+ }
+ tag.Inc(chunk.StateStored)
+ }
+ return tag, addrs, nil
+}
+
+func download(ctx context.Context, store *storage.NetStore, addrs []storage.Address) error {
+ var g errgroup.Group
+ for _, addr := range addrs {
+ addr := addr
+ g.Go(func() error {
+ _, err := store.Get(ctx, chunk.ModeGetRequest, storage.NewRequest(addr))
+ log.Debug("Get", "addr", hex.EncodeToString(addr[:]), "err", err)
+ return err
+ })
+ }
+ return g.Wait()
+}
diff --git a/pushsync/storer.go b/pushsync/storer.go
new file mode 100644
index 0000000000..c42ae10701
--- /dev/null
+++ b/pushsync/storer.go
@@ -0,0 +1,129 @@
+// Copyright 2019 The Swarm Authors
+// This file is part of the Swarm library.
+//
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+
+package pushsync
+
+import (
+ "context"
+ "encoding/hex"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/spancontext"
+ "github.com/ethersphere/swarm/storage"
+ olog "github.com/opentracing/opentracing-go/log"
+)
+
+// Store is the storage interface to save chunks
+// NetStore implements this interface
+type Store interface {
+ Put(context.Context, chunk.ModePut, ...chunk.Chunk) ([]bool, error)
+}
+
+// Storer is the object used by the push-sync server side protocol
+type Storer struct {
+ store Store // store to put chunks in, and retrieve them from
+ ps PubSub // pubsub interface to receive chunks and send receipts
+ deregister func() // deregister the registered handler when Storer is closed
+ logger log.Logger // custom logger
+}
+
+// NewStorer constructs a Storer
+// Storer runs on storer nodes to handle the reception of push-synced chunks
+// that fall within their area of responsibility.
+// The protocol makes sure that
+// - the chunks are stored and synced to their nearest neighbours and
+// - a statement of custody receipt is sent as a response to the originator
+// it sets a cancel function that deregisters the handler
+func NewStorer(store Store, ps PubSub) *Storer {
+ s := &Storer{
+ store: store,
+ ps: ps,
+ logger: log.New("self", label(ps.BaseAddr())),
+ }
+ s.deregister = ps.Register(pssChunkTopic, true, func(msg []byte, _ *p2p.Peer) error {
+ return s.handleChunkMsg(msg)
+ })
+ return s
+}
+
+// Close needs to be called to deregister the handler
+func (s *Storer) Close() {
+ s.deregister()
+}
+
+// handleChunkMsg is called by the pss dispatcher on pssChunkTopic msgs
+// - deserialises chunkMsg and
+// - calls storer.processChunkMsg function
+func (s *Storer) handleChunkMsg(msg []byte) error {
+ chmsg, err := decodeChunkMsg(msg)
+ if err != nil {
+ return err
+ }
+
+ ctx, osp := spancontext.StartSpan(context.Background(), "handle.chunk.msg")
+ defer osp.Finish()
+ osp.LogFields(olog.String("ref", hex.EncodeToString(chmsg.Addr)))
+ osp.SetTag("addr", hex.EncodeToString(chmsg.Addr))
+ s.logger.Trace("Storer Handler", "chunk", label(chmsg.Addr), "origin", label(chmsg.Origin))
+ return s.processChunkMsg(ctx, chmsg)
+}
+
+// processChunkMsg processes a chunk received via pss pssChunkTopic
+// these chunk messages are sent to their address as destination
+// using neighbourhood addressing. Therefore nodes only handle
+// chunks that fall within their area of responsibility.
+// Upon receiving the chunk is saved and a statement of custody
+// receipt message is sent as a response to the originator.
+func (s *Storer) processChunkMsg(ctx context.Context, chmsg *chunkMsg) error {
+ // TODO: double check if it falls in area of responsibility
+ ch := storage.NewChunk(chmsg.Addr, chmsg.Data)
+ if _, err := s.store.Put(ctx, chunk.ModePutSync, ch); err != nil {
+ return err
+ }
+
+ // if self is closest peer then send back a receipt
+ if s.ps.IsClosestTo(chmsg.Addr) {
+ s.logger.Trace("self is closest to ref", "ref", label(chmsg.Addr))
+ return s.sendReceiptMsg(ctx, chmsg)
+ }
+ return nil
+}
+
+// sendReceiptMsg sends a statement of custody receipt message
+// to the originator of a push-synced chunk message.
+// Including a unique nonce makes the receipt immune to deduplication cache
+func (s *Storer) sendReceiptMsg(ctx context.Context, chmsg *chunkMsg) error {
+ ctx, osp := spancontext.StartSpan(ctx, "send.receipt")
+ defer osp.Finish()
+ osp.LogFields(olog.String("ref", hex.EncodeToString(chmsg.Addr)))
+ osp.SetTag("addr", hex.EncodeToString(chmsg.Addr))
+ osp.LogFields(olog.String("origin", hex.EncodeToString(chmsg.Origin)))
+
+ rmsg := &receiptMsg{
+ Addr: chmsg.Addr,
+ Nonce: newNonce(),
+ }
+ msg, err := rlp.EncodeToBytes(rmsg)
+ if err != nil {
+ return err
+ }
+ to := chmsg.Origin
+ s.logger.Trace("send receipt", "addr", label(rmsg.Addr), "to", label(to))
+ return s.ps.Send(to, pssReceiptTopic, msg)
+}
diff --git a/shed/index.go b/shed/index.go
index 57c53d6caa..8dbb74bb1e 100644
--- a/shed/index.go
+++ b/shed/index.go
@@ -42,6 +42,7 @@ type Item struct {
StoreTimestamp int64
BinID uint64
PinCounter uint64 // maintains the no of time a chunk is pinned
+ Tag uint32
}
// Merge is a helper method to construct a new
@@ -66,6 +67,9 @@ func (i Item) Merge(i2 Item) (new Item) {
if i.PinCounter == 0 {
i.PinCounter = i2.PinCounter
}
+ if i.Tag == 0 {
+ i.Tag = i2.Tag
+ }
return i
}
diff --git a/storage/chunker_test.go b/storage/chunker_test.go
index 7351c78341..5a41da81e4 100644
--- a/storage/chunker_test.go
+++ b/storage/chunker_test.go
@@ -43,10 +43,10 @@ type chunkerTester struct {
t test
}
-var mockTag = chunk.NewTag(0, "mock-tag", 0)
+var mockTag = chunk.NewTag(context.Background(), 0, "mock-tag", 0)
func newTestHasherStore(store ChunkStore, hash string) *hasherStore {
- return NewHasherStore(store, MakeHashFunc(hash), false, chunk.NewTag(0, "test-tag", 0))
+ return NewHasherStore(store, MakeHashFunc(hash), false, chunk.NewTag(context.Background(), 0, "test-tag", 0))
}
func testRandomBrokenData(n int, tester *chunkerTester) {
diff --git a/storage/filestore.go b/storage/filestore.go
index cb63c43216..834fdd6f6f 100644
--- a/storage/filestore.go
+++ b/storage/filestore.go
@@ -91,7 +91,7 @@ func (f *FileStore) Retrieve(ctx context.Context, addr Address) (reader *LazyChu
isEncrypted = len(addr) > f.hashFunc().Size()
tag, err := f.tags.GetFromContext(ctx)
if err != nil {
- tag = chunk.NewTag(0, "ephemeral-retrieval-tag", 0)
+ tag = chunk.NewTag(ctx, 0, "ephemeral-retrieval-tag", 0)
}
getter := NewHasherStore(f.ChunkStore, f.hashFunc, isEncrypted, tag)
@@ -108,7 +108,7 @@ func (f *FileStore) Store(ctx context.Context, data io.Reader, size int64, toEnc
// of the original request nor the tag with the trie, recalculating the trie hence
// loses the tag uid. thus we create an ephemeral tag here for that purpose
- tag = chunk.NewTag(0, "", 0)
+ tag = chunk.NewTag(ctx, 0, "", 0)
//return nil, nil, err
}
putter := NewHasherStore(f.putterStore, f.hashFunc, toEncrypt, tag)
@@ -121,7 +121,7 @@ func (f *FileStore) HashSize() int {
// GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file
func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader) (addrs AddressCollection, err error) {
- tag := chunk.NewTag(0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results
+ tag := chunk.NewTag(ctx, 0, "ephemeral-tag", 0) //this tag is just a mock ephemeral tag since we don't want to save these results
// create a special kind of putter, which only will store the references
putter := &hashExplorer{
diff --git a/storage/hasherstore.go b/storage/hasherstore.go
index b2592c61bf..4890219a15 100644
--- a/storage/hasherstore.go
+++ b/storage/hasherstore.go
@@ -191,7 +191,7 @@ func (h *hasherStore) createHash(chunkData ChunkData) Address {
func (h *hasherStore) createChunk(chunkData ChunkData) Chunk {
hash := h.createHash(chunkData)
- chunk := NewChunk(hash, chunkData)
+ chunk := NewChunk(hash, chunkData).WithTagID(h.tag.Uid)
return chunk
}
diff --git a/storage/hasherstore_test.go b/storage/hasherstore_test.go
index 4b6347418f..191a432a66 100644
--- a/storage/hasherstore_test.go
+++ b/storage/hasherstore_test.go
@@ -44,7 +44,7 @@ func TestHasherStore(t *testing.T) {
for _, tt := range tests {
chunkStore := NewMapChunkStore()
- hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 2))
+ hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(context.Background(), 0, "test-tag", 2))
// Put two random chunks into the hasherStore
chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data()
diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go
index 2953de2a97..3c3a881983 100644
--- a/storage/localstore/localstore.go
+++ b/storage/localstore/localstore.go
@@ -1,4 +1,4 @@
-// Copyright 2018 The go-ethereum Authors
+// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
@@ -306,9 +306,12 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
return e, nil
},
EncodeValue: func(fields shed.Item) (value []byte, err error) {
- return nil, nil
+ tag := make([]byte, 4)
+ binary.BigEndian.PutUint32(tag, fields.Tag)
+ return tag, nil
},
DecodeValue: func(keyItem shed.Item, value []byte) (e shed.Item, err error) {
+ e.Tag = binary.BigEndian.Uint32(value)
return e, nil
},
})
@@ -417,6 +420,7 @@ func chunkToItem(ch chunk.Chunk) shed.Item {
return shed.Item{
Address: ch.Address(),
Data: ch.Data(),
+ Tag: ch.TagID(),
}
}
diff --git a/storage/localstore/subscription_push.go b/storage/localstore/subscription_push.go
index c8dd5cf215..07821adf97 100644
--- a/storage/localstore/subscription_push.go
+++ b/storage/localstore/subscription_push.go
@@ -18,6 +18,7 @@ package localstore
import (
"context"
+ "fmt"
"sync"
"time"
@@ -75,11 +76,12 @@ func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop fun
}
select {
- case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
+ case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data).WithTagID(dataItem.Tag):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
+ log.Trace("subscribe.push", "ref", fmt.Sprintf("%x", sinceItem.Address), "binid", sinceItem.BinID)
return false, nil
case <-stopChan:
// gracefully stop the iteration
diff --git a/storage/netstore.go b/storage/netstore.go
index cf0ee873d1..b5d1d4d3f2 100644
--- a/storage/netstore.go
+++ b/storage/netstore.go
@@ -27,12 +27,10 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p/enode"
-
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/spancontext"
lru "github.com/hashicorp/golang-lru"
-
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
"golang.org/x/sync/singleflight"
@@ -236,7 +234,7 @@ func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) e
"remote.fetch")
osp.LogFields(olog.String("ref", ref.String()))
- n.logger.Trace("remote.fetch", "ref", ref)
+ log.Trace("remote.fetch", "ref", ref)
currentPeer, err := n.RemoteGet(ctx, req, n.LocalID)
if err != nil {
diff --git a/storage/pin/pin.go b/storage/pin/pin.go
index d9d8a0a087..e3a69a5fdf 100644
--- a/storage/pin/pin.go
+++ b/storage/pin/pin.go
@@ -140,7 +140,7 @@ func (p *API) PinFiles(addr []byte, isRaw bool, credentials string) error {
// Get the file size from the root chunk first 8 bytes
hashFunc := storage.MakeHashFunc(storage.DefaultHash)
isEncrypted := len(addr) > hashFunc().Size()
- getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(0, "show-chunks-tag", 0))
+ getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(context.Background(), 0, "show-chunks-tag", 0))
chunkData, err := getter.Get(context.Background(), addr)
if err != nil {
log.Error("Error getting chunk data from localstore.", "Address", hex.EncodeToString(addr))
@@ -365,7 +365,7 @@ func (p *API) walkFile(fileRef storage.Reference, executeFunc func(storage.Refer
hashFunc := storage.MakeHashFunc(storage.DefaultHash)
hashSize := len(addr)
isEncrypted := len(addr) > hashFunc().Size()
- getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(0, "show-chunks-tag", 0))
+ getter := storage.NewHasherStore(p.db, hashFunc, isEncrypted, chunk.NewTag(context.Background(), 0, "show-chunks-tag", 0))
// Trigger unwrapping the merkle tree starting from root hash of the file
chunkHashesC <- fileRef
diff --git a/swarm.go b/swarm.go
index dc8952063e..43d873dd7a 100644
--- a/swarm.go
+++ b/swarm.go
@@ -52,6 +52,7 @@ import (
"github.com/ethersphere/swarm/network/stream/v2"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/pss"
+ "github.com/ethersphere/swarm/pushsync"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
@@ -84,6 +85,8 @@ type Swarm struct {
netStore *storage.NetStore
sfs *fuse.SwarmFS // need this to cleanup all the active mounts on node exit
ps *pss.Pss
+ pushSync *pushsync.Pusher
+ storer *pushsync.Storer
swap *swap.Swap
stateStore *state.DBStore
tags *chunk.Tags
@@ -241,6 +244,10 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
pss.SetHandshakeController(self.ps, pss.NewHandshakeParams())
}
+ pubsub := pss.NewPubSub(self.ps)
+ self.pushSync = pushsync.NewPusher(localStore, pubsub, self.tags)
+ self.storer = pushsync.NewStorer(self.netStore, pubsub)
+
self.api = api.NewAPI(self.fileStore, self.dns, feedsHandler, self.privateKey, self.tags)
// Instantiate the pinAPI object with the already opened localstore
diff --git a/swarm_test.go b/swarm_test.go
index 36638dd7d9..89ac6d5238 100644
--- a/swarm_test.go
+++ b/swarm_test.go
@@ -28,15 +28,14 @@ import (
"testing"
"time"
- "github.com/ethersphere/swarm/network"
- "github.com/ethersphere/swarm/testutil"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethersphere/swarm/api"
+ "github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/swap"
+ "github.com/ethersphere/swarm/testutil"
)
// TestNewSwarm validates Swarm fields in repsect to the provided configuration.
@@ -442,12 +441,13 @@ func testLocalStoreAndRetrieve(t *testing.T, swarm *Swarm, n int, randomData boo
rand.Seed(time.Now().UnixNano())
rand.Read(slice)
}
+ ctx := context.Background()
dataPut := string(slice)
- tag, err := swarm.api.Tags.Create("test-local-store-and-retrieve", 0)
+ tag, err := swarm.api.Tags.Create(ctx, "test-local-store-and-retrieve", 0)
if err != nil {
t.Fatal(err)
}
- ctx := sctx.SetTag(context.Background(), tag.Uid)
+ ctx = sctx.SetTag(ctx, tag.Uid)
k, wait, err := swarm.api.Store(ctx, strings.NewReader(dataPut), int64(len(dataPut)), false)
if err != nil {
t.Fatal(err)