From 12719a788f68598df27760f6ba7c951858979e98 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 27 Jun 2018 16:52:57 -0700 Subject: [PATCH 1/5] use `copy` instead of looping License: MIT Signed-off-by: Steven Allen --- unixfs/io/pbdagreader.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go index ce53d67118d..8c357fb8cad 100644 --- a/unixfs/io/pbdagreader.go +++ b/unixfs/io/pbdagreader.go @@ -75,9 +75,7 @@ func (dr *PBDagReader) preloadNextNodes(ctx context.Context) { end = len(dr.links) } - for i, p := range ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]) { - dr.promises[beg+i] = p - } + copy(dr.promises[beg:], ipld.GetNodes(ctx, dr.serv, dr.links[beg:end])) } // precalcNextBuf follows the next link in line and loads it from the From 0344e3e6027c7755678c5bbee18c410dafb1b20d Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 27 Jun 2018 17:06:30 -0700 Subject: [PATCH 2/5] better handle context cancellations in the PBDagReader Good: If a previous read is canceled, we cancel the preloads that the read triggered. Bad: Future reads at that point will fail. This fixes that issue. License: MIT Signed-off-by: Steven Allen --- unixfs/io/pbdagreader.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go index 8c357fb8cad..8d21f8da3f7 100644 --- a/unixfs/io/pbdagreader.go +++ b/unixfs/io/pbdagreader.go @@ -95,10 +95,27 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error { } nxt, err := dr.promises[dr.linkPosition].Get(ctx) - if err != nil { + dr.promises[dr.linkPosition] = nil + switch err { + case nil: + case context.DeadlineExceeded, context.Canceled: + err = ctx.Err() + if err != nil { + return ctx.Err() + } + // In this case, the context used to *preload* the node has been canceled. + // We need to retry the load with our context and we might as + // well preload some extra nodes while we're at it. + dr.preload(ctx, dr.linkPosition) + nxt, err = dr.promises[dr.linkPosition].Get(ctx) + dr.promises[dr.linkPosition] = nil + if err != nil { + return err + } + default: return err } - dr.promises[dr.linkPosition] = nil + dr.linkPosition++ switch nxt := nxt.(type) { From ff3efb5646437ac77c6b75e0452b239d154928c5 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 27 Jun 2018 17:08:50 -0700 Subject: [PATCH 3/5] always prefetch at least 5 blocks ahead This should reduce stuttering when streaming. License: MIT Signed-off-by: Steven Allen --- unixfs/io/pbdagreader.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go index 8d21f8da3f7..9c3909577f0 100644 --- a/unixfs/io/pbdagreader.go +++ b/unixfs/io/pbdagreader.go @@ -68,8 +68,7 @@ func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv const preloadSize = 10 -func (dr *PBDagReader) preloadNextNodes(ctx context.Context) { - beg := dr.linkPosition +func (dr *PBDagReader) preload(ctx context.Context, beg int) { end := beg + preloadSize if end >= len(dr.links) { end = len(dr.links) @@ -90,8 +89,13 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error { return io.EOF } - if dr.promises[dr.linkPosition] == nil { - dr.preloadNextNodes(ctx) + // If we drop to <= preloadSize/2 preloading nodes, preload the next 10. + for i := dr.linkPosition; i < dr.linkPosition+preloadSize/2 && i < len(dr.promises); i++ { + // TODO: check if canceled. + if dr.promises[i] == nil { + dr.preload(ctx, i) + break + } } nxt, err := dr.promises[dr.linkPosition].Get(ctx) From 5c0dc6a968b21dbc8975b4f0e0d797c76be22fa6 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 27 Jun 2018 17:51:26 -0700 Subject: [PATCH 4/5] test dag reader context cancellation License: MIT Signed-off-by: Steven Allen --- unixfs/io/dagreader_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/unixfs/io/dagreader_test.go b/unixfs/io/dagreader_test.go index e3d3d042b23..7cbe35bb5f4 100644 --- a/unixfs/io/dagreader_test.go +++ b/unixfs/io/dagreader_test.go @@ -122,6 +122,41 @@ func TestSeekAndReadLarge(t *testing.T) { } } +func TestReadAndCancel(t *testing.T) { + dserv := testu.GetDAGServ() + inbuf := make([]byte, 20000) + rand.Read(inbuf) + + node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves) + ctx, closer := context.WithCancel(context.Background()) + defer closer() + + reader, err := NewDagReader(ctx, node, dserv) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + buf := make([]byte, 100) + _, err = reader.CtxReadFull(ctx, buf) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, inbuf[0:100]) { + t.Fatal("read failed") + } + cancel() + + b, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(inbuf[100:], b) { + t.Fatal("buffers not equal") + } +} + func TestRelativeSeek(t *testing.T) { dserv := testu.GetDAGServ() ctx, closer := context.WithCancel(context.Background()) From 7fd34048abb5cfadae75e4162ea9f50848133d9f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 5 Jul 2018 17:06:50 -0700 Subject: [PATCH 5/5] explain when a promise can be canceled in pbdagreader License: MIT Signed-off-by: Steven Allen --- unixfs/io/pbdagreader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/unixfs/io/pbdagreader.go b/unixfs/io/pbdagreader.go index 9c3909577f0..42d903aac26 100644 --- a/unixfs/io/pbdagreader.go +++ b/unixfs/io/pbdagreader.go @@ -110,6 +110,11 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error { // In this case, the context used to *preload* the node has been canceled. // We need to retry the load with our context and we might as // well preload some extra nodes while we're at it. + // + // Note: When using `Read`, this code will never execute as + // `Read` will use the global context. It only runs if the user + // explicitly reads with a custom context (e.g., by calling + // `CtxReadFull`). dr.preload(ctx, dr.linkPosition) nxt, err = dr.promises[dr.linkPosition].Get(ctx) dr.promises[dr.linkPosition] = nil