From 84b8bcdab5cae01c693a7ea9fd7fe5573312858d Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Wed, 2 Mar 2022 16:07:01 +0800 Subject: [PATCH] chore: optimize reuse logic (#1110) * chore: optimize reuse logic Signed-off-by: Jim Ma --- client/daemon/peer/peertask_reuse.go | 87 ++-- client/daemon/peer/peertask_reuse_test.go | 501 ++++++++++++++++++++++ 2 files changed, 558 insertions(+), 30 deletions(-) create mode 100644 client/daemon/peer/peertask_reuse_test.go diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 1ef1f6cd47c..330f77b9e10 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -37,31 +37,45 @@ import ( var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation +// reuse task search logic: +// A. prefetch feature enabled +// for ranged request, 1, find completed subtask, 2, find partial completed parent task +// for non-ranged request, just find completed task +// B. prefetch feature disabled +// for ranged request, 1, find completed normal task, 2, find partial completed parent task +// for non-ranged request, just find completed task + func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, request *FileTaskRequest) (chan *FileTaskProgress, bool) { taskID := idgen.TaskID(request.Url, request.UrlMeta) - var reuse *storage.ReusePeerTask + var ( + reuse *storage.ReusePeerTask + reuseRange *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + length int64 + err error + ) + if ptm.enabledPrefetch(request.Range) { reuse = ptm.storageManager.FindCompletedSubTask(taskID) } else { reuse = ptm.storageManager.FindCompletedTask(taskID) } - var ( - rg *clientutil.Range // the range of parent peer task data to read - log *logger.SugaredLoggerOnWith - length int64 - err error - ) + if reuse == nil { - rg = request.Range + if request.Range == nil { + return nil, false + } + // for ranged request, check the parent task + reuseRange = request.Range taskID = idgen.ParentTaskID(request.Url, request.UrlMeta) - reuse = ptm.storageManager.FindPartialCompletedTask(taskID, rg) + reuse = ptm.storageManager.FindPartialCompletedTask(taskID, reuseRange) if reuse == nil { return nil, false } } - if rg == nil { + if reuseRange == nil { log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseFilePeerTask") log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) length = reuse.ContentLength @@ -70,7 +84,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, "component", "reuseRangeFilePeerTask") log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range) - length = rg.Length + length = reuseRange.Length } _, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient)) @@ -80,7 +94,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerID.String(request.PeerId)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) span.SetAttributes(semconv.HTTPURLKey.String(request.Url)) - if rg != nil { + if reuseRange != nil { span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range)) } defer span.End() @@ -89,7 +103,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID))) start := time.Now() - if rg == nil || request.KeepOriginalOffset { + if reuseRange == nil || request.KeepOriginalOffset { storeRequest := &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: reuse.PeerID, @@ -103,7 +117,7 @@ func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context, } err = ptm.storageManager.Store(ctx, storeRequest) } else { - err = ptm.storePartialFile(ctx, request, log, reuse, rg) + err = ptm.storePartialFile(ctx, request, log, reuse, reuseRange) } if err != nil { @@ -168,36 +182,41 @@ func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileT func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) { taskID := idgen.TaskID(request.URL, request.URLMeta) - var reuse *storage.ReusePeerTask + var ( + reuse *storage.ReusePeerTask + reuseRange *clientutil.Range // the range of parent peer task data to read + log *logger.SugaredLoggerOnWith + length int64 + ) + if ptm.enabledPrefetch(request.Range) { reuse = ptm.storageManager.FindCompletedSubTask(taskID) } else { reuse = ptm.storageManager.FindCompletedTask(taskID) } - var ( - rg *clientutil.Range // the range of parent peer task data to read - log *logger.SugaredLoggerOnWith - ) + if reuse == nil { - // for ranged request, check the parent task if request.Range == nil { return nil, nil, false } - rg = request.Range + // for ranged request, check the parent task + reuseRange = request.Range taskID = idgen.ParentTaskID(request.URL, request.URLMeta) - reuse = ptm.storageManager.FindPartialCompletedTask(taskID, rg) + reuse = ptm.storageManager.FindPartialCompletedTask(taskID, reuseRange) if reuse == nil { return nil, nil, false } } - if rg == nil { + if reuseRange == nil { log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseStreamPeerTask") log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength) + length = reuse.ContentLength } else { log = logger.With("peer", request.PeerID, "task", taskID, "component", "reuseRangeStreamPeerTask") log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s", reuse.PeerID, reuse.ContentLength, request.URLMeta.Range) + length = reuseRange.Length } ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient)) @@ -207,13 +226,13 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, span.SetAttributes(config.AttributePeerID.String(request.PeerID)) span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID)) span.SetAttributes(semconv.HTTPURLKey.String(request.URL)) - if rg != nil { + if reuseRange != nil { span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range)) } defer span.End() rc, err := ptm.storageManager.ReadAllPieces(ctx, - &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg}) + &storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: reuseRange}) if err != nil { log.Errorf("read pieces error when reuse peer task: %s", err) span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false)) @@ -224,12 +243,20 @@ func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, attr := map[string]string{} attr[config.HeaderDragonflyTask] = taskID attr[config.HeaderDragonflyPeer] = request.PeerID - if rg != nil { + attr[headers.ContentLength] = fmt.Sprintf("%d", length) + + if reuseRange != nil { attr[config.HeaderDragonflyRange] = request.URLMeta.Range - attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", rg.Start, rg.Start+rg.Length-1, reuse.ContentLength) - attr[headers.ContentLength] = fmt.Sprintf("%d", rg.Length) - } else { - attr[headers.ContentLength] = fmt.Sprintf("%d", reuse.ContentLength) + attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", reuseRange.Start, + reuseRange.Start+reuseRange.Length-1, reuse.ContentLength) + } else if request.Range != nil { + // the length is from reuse task, ensure it equal with request + if length != request.Range.Length { + log.Errorf("target task length %d did not match range length %d", length, request.Range.Length) + return nil, nil, false + } + attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/*", request.Range.Start, + request.Range.Start+request.Range.Length-1) } // TODO record time when file closed, need add a type to implement Close and WriteTo diff --git a/client/daemon/peer/peertask_reuse_test.go b/client/daemon/peer/peertask_reuse_test.go new file mode 100644 index 00000000000..2003f9fc24c --- /dev/null +++ b/client/daemon/peer/peertask_reuse_test.go @@ -0,0 +1,501 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package peer + +import ( + "bytes" + "context" + "io" + "os" + "path" + "testing" + + "github.com/go-http-utils/headers" + "github.com/golang/mock/gomock" + testifyassert "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/daemon/storage" + "d7y.io/dragonfly/v2/client/daemon/test" + ms "d7y.io/dragonfly/v2/client/daemon/test/mock/storage" + "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" +) + +func TestReuseFilePeerTask(t *testing.T) { + ctrl := gomock.NewController(t) + assert := testifyassert.New(t) + + testBytes, err := os.ReadFile(test.File) + assert.Nil(err) + testOutput := path.Join(os.TempDir(), "d7y-reuse-output.data") + defer os.Remove(testOutput) + + var testCases = []struct { + name string + request *FileTaskRequest + enablePrefetch bool + storageManager func(sm *ms.MockManager) + verify func(pg chan *FileTaskProgress, ok bool) + }{ + { + name: "normal completed task found", + request: &FileTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + PeerId: "", + Url: "http://example.com/1", + UrlMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + }, + Output: testOutput, + Range: nil, + }, + enablePrefetch: false, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 10, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().Store(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.StoreRequest) error { + return os.WriteFile(req.Destination, testBytes[0:10], 0644) + }) + }, + verify: func(pg chan *FileTaskProgress, ok bool) { + assert.True(ok) + data, err := os.ReadFile(testOutput) + assert.Nil(err) + assert.Equal(testBytes[0:10], data) + }, + }, + { + name: "normal completed task not found", + request: &FileTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + PeerId: "", + Url: "http://example.com/1", + UrlMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + }, + Output: testOutput, + Range: nil, + }, + enablePrefetch: false, + storageManager: func(sm *ms.MockManager) { + //sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + // func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask { + // return nil + // }) + //sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + // func(taskID string) *storage.ReusePeerTask { + // return nil + // }) + sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn( + func(taskID string) *storage.ReusePeerTask { + return nil + }) + }, + verify: func(pg chan *FileTaskProgress, ok bool) { + assert.False(ok) + assert.Nil(pg) + }, + }, + { + name: "normal completed subtask found", + request: &FileTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + PeerId: "", + Url: "http://example.com/1", + UrlMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + }, + Output: testOutput, + Range: &clientutil.Range{Start: 200, Length: 100}, + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 10, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().Store(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.StoreRequest) error { + return os.WriteFile(req.Destination, testBytes[200:300], 0644) + }) + }, + verify: func(pg chan *FileTaskProgress, ok bool) { + assert.True(ok) + data, err := os.ReadFile(testOutput) + assert.Nil(err) + assert.Equal(testBytes[200:300], data) + }, + }, + { + name: "normal completed subtask not found", + request: &FileTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + PeerId: "", + Url: "http://example.com/1", + UrlMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + }, + Output: testOutput, + Range: &clientutil.Range{Start: 0, Length: 10}, + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask { + return nil + }) + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(taskID string) *storage.ReusePeerTask { + return nil + }) + }, + verify: func(pg chan *FileTaskProgress, ok bool) { + assert.False(ok) + assert.Nil(pg) + }, + }, + { + name: "partial task found", + request: &FileTaskRequest{ + PeerTaskRequest: scheduler.PeerTaskRequest{ + PeerId: "", + Url: "http://example.com/1", + UrlMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + }, + Output: testOutput, + Range: &clientutil.Range{Start: 300, Length: 100}, + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + return nil + }) + sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(id string, rg *clientutil.Range) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 100, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { + assert.Equal(taskID, req.TaskID) + return io.NopCloser(bytes.NewBuffer(testBytes[300:400])), nil + }) + }, + verify: func(pg chan *FileTaskProgress, ok bool) { + assert.True(ok) + data, err := os.ReadFile(testOutput) + assert.Nil(err) + assert.Equal(testBytes[300:400], data) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer os.Remove(testOutput) + sm := ms.NewMockManager(ctrl) + tc.storageManager(sm) + ptm := &peerTaskManager{ + host: &scheduler.PeerHost{}, + enablePrefetch: tc.enablePrefetch, + storageManager: sm, + } + tc.verify(ptm.tryReuseFilePeerTask(context.Background(), tc.request)) + }) + } +} + +func TestReuseStreamPeerTask(t *testing.T) { + ctrl := gomock.NewController(t) + assert := testifyassert.New(t) + + var testCases = []struct { + name string + request *StreamTaskRequest + enablePrefetch bool + storageManager func(sm *ms.MockManager) + verify func(rc io.ReadCloser, attr map[string]string, ok bool) + }{ + { + name: "normal completed task found", + request: &StreamTaskRequest{ + URL: "http://example.com/1", + URLMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + Range: nil, + PeerID: "", + }, + enablePrefetch: false, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 10, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { + assert.Equal(taskID, req.TaskID) + return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil + }) + }, + verify: func(rc io.ReadCloser, attr map[string]string, ok bool) { + assert.True(ok) + assert.NotNil(rc) + assert.Equal("10", attr[headers.ContentLength]) + _, exist := attr[headers.ContentRange] + assert.False(exist) + }, + }, + { + name: "normal completed task not found", + request: &StreamTaskRequest{ + URL: "http://example.com/1", + URLMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + Range: nil, + PeerID: "", + }, + enablePrefetch: false, + storageManager: func(sm *ms.MockManager) { + //sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + // func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask { + // return nil + // }) + //sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + // func(taskID string) *storage.ReusePeerTask { + // return nil + // }) + sm.EXPECT().FindCompletedTask(gomock.Any()).DoAndReturn( + func(taskID string) *storage.ReusePeerTask { + return nil + }) + }, + verify: func(rc io.ReadCloser, attr map[string]string, ok bool) { + assert.False(ok) + assert.Nil(rc) + assert.Nil(attr) + }, + }, + { + name: "normal completed subtask found", + request: &StreamTaskRequest{ + URL: "http://example.com/1", + URLMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + Range: &clientutil.Range{Start: 0, Length: 10}, + PeerID: "", + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 10, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { + assert.Equal(taskID, req.TaskID) + return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil + }) + }, + verify: func(rc io.ReadCloser, attr map[string]string, ok bool) { + assert.True(ok) + assert.NotNil(rc) + assert.Equal("10", attr[headers.ContentLength]) + assert.Equal("bytes 0-9/*", attr[headers.ContentRange]) + }, + }, + { + name: "normal completed subtask not found", + request: &StreamTaskRequest{ + URL: "http://example.com/1", + URLMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + Range: &clientutil.Range{Start: 0, Length: 10}, + PeerID: "", + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(taskID string, rg *clientutil.Range) *storage.ReusePeerTask { + return nil + }) + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(taskID string) *storage.ReusePeerTask { + return nil + }) + }, + verify: func(rc io.ReadCloser, attr map[string]string, ok bool) { + assert.False(ok) + assert.Nil(rc) + assert.Nil(attr) + }, + }, + { + name: "partial task found", + request: &StreamTaskRequest{ + URL: "http://example.com/1", + URLMeta: &base.UrlMeta{ + Digest: "", + Tag: "", + Range: "", + Filter: "", + Header: nil, + }, + Range: &clientutil.Range{Start: 0, Length: 10}, + PeerID: "", + }, + enablePrefetch: true, + storageManager: func(sm *ms.MockManager) { + var taskID string + sm.EXPECT().FindCompletedSubTask(gomock.Any()).DoAndReturn( + func(id string) *storage.ReusePeerTask { + return nil + }) + sm.EXPECT().FindPartialCompletedTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(id string, rg *clientutil.Range) *storage.ReusePeerTask { + taskID = id + return &storage.ReusePeerTask{ + PeerTaskMetadata: storage.PeerTaskMetadata{ + TaskID: taskID, + }, + ContentLength: 100, + TotalPieces: 0, + PieceMd5Sign: "", + } + }) + sm.EXPECT().ReadAllPieces(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, req *storage.ReadAllPiecesRequest) (io.ReadCloser, error) { + assert.Equal(taskID, req.TaskID) + return io.NopCloser(bytes.NewBuffer([]byte("1111111111"))), nil + }) + }, + verify: func(rc io.ReadCloser, attr map[string]string, ok bool) { + assert.True(ok) + assert.NotNil(rc) + assert.Equal("10", attr[headers.ContentLength]) + assert.Equal("bytes 0-9/100", attr[headers.ContentRange]) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sm := ms.NewMockManager(ctrl) + tc.storageManager(sm) + ptm := &peerTaskManager{ + host: &scheduler.PeerHost{}, + enablePrefetch: tc.enablePrefetch, + storageManager: sm, + } + tc.verify(ptm.tryReuseStreamPeerTask(context.Background(), tc.request)) + }) + } +}