Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

pushsync #1392

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
95bd89f
swarm: push sync
nonsense May 15, 2019
ea82e7d
storage/pushsync: update NetStore api from master
nonsense Jun 18, 2019
930fda6
storage/pushsync: add sleeps and a bit more tracing, change subscript…
acud Jun 19, 2019
a0e01d9
chunk; fix "already exists" error tags.Uid no need for rng obj
zelig Jun 26, 2019
cd17977
storage/pushsync: fix storer initialisation
zelig Jun 29, 2019
1d6ee6b
swarm: push sync
nonsense May 15, 2019
8cb5558
chunk: tags improvement
zelig Aug 27, 2019
dc0c973
shed: linting fix
zelig Aug 27, 2019
6a7e94b
pot: DistanceCmp opposite of ProxCmp reflecting doc
zelig Aug 27, 2019
9e6f990
network: add IsClosestTo(addr, filter) using pot.DistanceCmp
zelig Aug 27, 2019
61ca500
pss: fix hashpool init to use keccak256
zelig Aug 27, 2019
32e04ae
pushsync, swarm.go: complete protocol - all tests pass non-flaky
zelig Aug 27, 2019
9e6ec3c
api, chunk, network, pss, pushsync: act on review comments
zelig Sep 2, 2019
b229e87
pushsync: simulation test params for appveyor
zelig Sep 3, 2019
0dbc1db
pushsync: asynchronous send in sync forever loop; disabled ordering test
nonsense Sep 3, 2019
71436e1
api/http: remove periodicTagTrace
zelig Sep 3, 2019
096b08f
pushsync: close item span at end of roundtrip
zelig Sep 3, 2019
cd49ab4
pushsync: amend pusher test, remove loopback sync option
zelig Sep 3, 2019
55a4758
pushsync: if no new items set timer to half a second
zelig Sep 3, 2019
048ca03
pushsync: address PR review feedback
zelig Sep 9, 2019
1eb077e
pushsync, testutil: address review comments
zelig Sep 12, 2019
68903b5
pss, pushsync: move digest out of critical section; async handle of r…
nonsense Sep 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestApiPut(t *testing.T) {
resp := testGet(t, api, addr.Hex(), "")
checkResponse(t, resp, exp)
tag := tags.All()[0]
chunktesting.CheckTag(t, tag, 2, 2, 0, 2) //1 chunk data, 1 chunk manifest
chunktesting.CheckTag(t, tag, 2, 2, 0, 0, 0, 2) //1 chunk data, 1 chunk manifest
})
}

Expand All @@ -150,7 +150,7 @@ func TestApiTagLarge(t *testing.T) {
const contentLength = 4096 * 4095
testAPI(t, func(api *API, tags *chunk.Tags, toEncrypt bool) {
randomContentReader := io.LimitReader(crand.Reader, int64(contentLength))
tag, err := api.Tags.Create("unnamed-tag", 0)
tag, err := api.Tags.Create(context.Background(), "unnamed-tag", 0)
if err != nil {
t.Fatal(err)
}
Expand All @@ -168,11 +168,11 @@ func TestApiTagLarge(t *testing.T) {
if toEncrypt {
tag := tags.All()[0]
expect := int64(4095 + 64 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
} else {
tag := tags.All()[0]
expect := int64(4095 + 32 + 1)
chunktesting.CheckTag(t, tag, expect, expect, 0, expect)
chunktesting.CheckTag(t, tag, expect, expect, 0, 0, 0, expect)
}
})
}
Expand Down Expand Up @@ -549,18 +549,18 @@ func TestDetectContentType(t *testing.T) {
// putString provides singleton manifest creation on top of api.API
func putString(ctx context.Context, a *API, content string, contentType string, toEncrypt bool) (k storage.Address, wait func(context.Context) error, err error) {
r := strings.NewReader(content)
tag, err := a.Tags.Create("unnamed-tag", 0)
tag, err := a.Tags.Create(ctx, "unnamed-tag", 0)

log.Trace("created new tag", "uid", tag.Uid)
log.Trace("created new tag", "id", tag.Uid)

cCtx := sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(cCtx, r, int64(len(content)), toEncrypt)
ctx = sctx.SetTag(ctx, tag.Uid)
key, waitContent, err := a.Store(ctx, r, int64(len(content)), toEncrypt)
if err != nil {
return nil, nil, err
}
manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType)
r = strings.NewReader(manifest)
key, waitManifest, err := a.Store(cCtx, r, int64(len(manifest)), toEncrypt)
key, waitManifest, err := a.Store(ctx, r, int64(len(manifest)), toEncrypt)
if err != nil {
return nil, nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestClientUploadDownloadRaw(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func TestClientUploadDownloadRawEncrypted(t *testing.T) {
Expand All @@ -69,7 +69,7 @@ func TestClientUploadDownloadRawEncrypted(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 1, 1, 0, 1)
chunktesting.CheckTag(t, tag, 1, 1, 0, 0, 0, 1)
}

func testClientUploadDownloadRaw(srv *swarmhttp.TestSwarmServer, toEncrypt bool, t *testing.T, data []byte, toPin bool) string {
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestClientUploadDownloadDirectory(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)

// check we can download the individual files
checkDownloadFile := func(path string, expected []byte) {
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestClientMultipartUpload(t *testing.T) {

// check the tag was created successfully
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 9, 9, 0, 9)
chunktesting.CheckTag(t, tag, 8, 8, 0, 0, 0, 8)

// check we can download the individual files
checkDownloadFile := func(path string) {
Expand Down
3 changes: 2 additions & 1 deletion api/http/middleware.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package http

import (
"context"
"fmt"
"net/http"
"runtime/debug"
Expand Down Expand Up @@ -123,7 +124,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {

log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)

t, err := tags.Create(tagName, estimatedTotal)
t, err := tags.Create(context.Background(), tagName, estimatedTotal)
if err != nil {
log.Error("error creating tag", "err", err, "tagName", tagName)
}
Expand Down
30 changes: 20 additions & 10 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/sctx"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/storage/feed"
"github.com/ethersphere/swarm/storage/pin"
Expand Down Expand Up @@ -276,10 +277,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
ruid := GetRUID(r.Context())
log.Debug("handle.post.raw", "ruid", ruid)

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

postRawCount.Inc(1)
Expand Down Expand Up @@ -334,7 +335,7 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
}

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, addr)
}
Expand All @@ -349,6 +350,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("handle.post.files", "ruid", ruid)
postFilesCount.Inc(1)

tagUID := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUID)
if err != nil {
log.Error("handle post raw got an error retrieving tag", "tagUID", tagUID, "err", err)
}

ctx, sp := spancontext.StartSpan(tag.Context(), "http.post")
defer sp.Finish()

contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
Expand All @@ -375,15 +385,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
} else {
addr, err = s.api.NewManifest(r.Context(), toEncrypt)
addr, err = s.api.NewManifest(ctx, toEncrypt)
if err != nil {
postFilesFail.Inc(1)
respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
}
newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
newAddr, err := s.api.UpdateManifest(ctx, addr, func(mw *api.ManifestWriter) error {
switch contentType {
case tarContentType:
_, err := s.handleTarUpload(r, mw)
Expand All @@ -405,10 +415,10 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
return
}

tagUid := sctx.GetTag(r.Context())
tag, err := s.api.Tags.Get(tagUid)
tagUID = sctx.GetTag(r.Context())
tag, err = s.api.Tags.Get(tagUID)
if err != nil {
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
log.Error("got an error retrieving tag for DoneSplit", "tagUID", tagUID, "err", err)
}

log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.TotalCounter())
Expand All @@ -427,7 +437,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
log.Debug("stored content", "ruid", ruid, "key", newAddr)

w.Header().Set("Content-Type", "text/plain")
w.Header().Set(TagHeaderName, fmt.Sprint(tagUid))
w.Header().Set(TagHeaderName, fmt.Sprint(tagUID))
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, newAddr)
}
Expand Down
4 changes: 2 additions & 2 deletions api/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func testBzzTar(encrypted bool, t *testing.T) {

// check that the tag was written correctly
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 4, 4, 0, 4)
chunktesting.CheckTag(t, tag, 3, 3, 0, 0, 0, 3)

swarmHash, err := ioutil.ReadAll(resp2.Body)
resp2.Body.Close()
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func TestBzzCorrectTagEstimate(t *testing.T) {
<-time.After(10 * time.Millisecond)
case 1:
tag := srv.Tags.All()[0]
chunktesting.CheckTag(t, tag, 0, 0, 0, v.expChunks)
chunktesting.CheckTag(t, tag, 0, 0, 0, 0, 0, v.expChunks)
srv.Tags.Delete(tag.Uid)
done = true
}
Expand Down
15 changes: 15 additions & 0 deletions api/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
stream "github.com/ethersphere/swarm/network/stream/v2"
Expand Down Expand Up @@ -53,6 +54,20 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
return i.hive.KademliaInfo()
}

func (i *Inspector) IsPushSynced(tagname string) bool {
tags := i.api.Tags.All()

for _, t := range tags {
if t.Name == tagname {
ds := t.Done(chunk.StateSynced)
log.Trace("found tag", "tagname", tagname, "done-syncing", ds)
return ds
}
}

return false
}

func (i *Inspector) IsPullSyncing() bool {
t := i.stream.LastReceivedChunkTime()

Expand Down
14 changes: 14 additions & 0 deletions api/pullsync/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pullsync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that this file is committed by mistake.


import stream "github.com/ethersphere/swarm/network/stream/v2"



// the node-wide pullsync.Client
type Client struct {
stream.Syncer // embed stream.Syncer
// when pullsync
// here you simply put the update sync logic listening to kademlia depth changes
// and call `Request`
// remember the request, when no longer relevant just call request.Cancel()
}
9 changes: 9 additions & 0 deletions api/pullsync/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package pullsync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this file also is probably committed by mistake.


// pullSync.Server implements stream.Provider
// uses localstore SubscribePull for the bins
// server is node-wide
type Server struct {
// ...
*stream.LocalProvider
}
12 changes: 12 additions & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ type Chunk interface {
Data() []byte
PinCounter() uint64
WithPinCounter(p uint64) Chunk
TagID() uint32
WithTagID(t uint32) Chunk
}

type chunk struct {
addr Address
sdata []byte
pinCounter uint64
tagID uint32
}

func NewChunk(addr Address, data []byte) Chunk {
Expand All @@ -60,6 +63,11 @@ func (c *chunk) WithPinCounter(p uint64) Chunk {
return c
}

func (c *chunk) WithTagID(t uint32) Chunk {
c.tagID = t
return c
}

func (c *chunk) Address() Address {
return c.addr
}
Expand All @@ -72,6 +80,10 @@ func (c *chunk) PinCounter() uint64 {
return c.pinCounter
}

func (c *chunk) TagID() uint32 {
return c.tagID
}

func (self *chunk) String() string {
return fmt.Sprintf("Address: %v Chunksize: %v", self.addr.Log(), len(self.sdata))
}
Expand Down
Loading