Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track upgrade details #3527

Merged
merged 47 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
0f02a50
Remove context and handle cancellation internally instead
ycombinator Oct 12, 2023
0b91b11
More optimizations
ycombinator Oct 12, 2023
af56f61
Add back context
ycombinator Oct 12, 2023
f6396be
Adding FSM for upgrades
ycombinator Oct 3, 2023
f68c6af
Implementing TODO
ycombinator Oct 3, 2023
c730a30
WIP
ycombinator Oct 4, 2023
41e2bc8
WIP
ycombinator Oct 4, 2023
66e57b9
Reorganizing imports
ycombinator Oct 4, 2023
47d05bb
Running go mod tidy
ycombinator Oct 4, 2023
bf47f0c
Resolve deadlock
ycombinator Oct 4, 2023
171b115
Add unit tests
ycombinator Oct 4, 2023
394650d
Fix type
ycombinator Oct 4, 2023
a39e204
Renaming variable to avoid conflict with package name
ycombinator Oct 5, 2023
539313f
Handle failures in one place
ycombinator Oct 5, 2023
55164c6
Set UPG_RESTARTING state
ycombinator Oct 5, 2023
d029c23
Remove Fleet changes
ycombinator Oct 5, 2023
101bf97
Add guard for action
ycombinator Oct 5, 2023
c8fb1fa
Immediately notify observer when registered
ycombinator Oct 5, 2023
a0e6b6b
Add UpgradeCompleted effect to observer doc
ycombinator Oct 5, 2023
4d509c7
Fix initialization
ycombinator Oct 13, 2023
a60bd94
Adding details progress observer and unit tests
ycombinator Oct 13, 2023
0673107
Fixing booboos introduced during conflict resolution
ycombinator Oct 13, 2023
a5489d6
Add unit test
ycombinator Oct 13, 2023
1d76472
Add assertion on error
ycombinator Oct 13, 2023
898ba0e
Add comment on stateNeedsRefresh
ycombinator Oct 13, 2023
761e2fe
Add comment linking to Fleet Server OpenAPI spec for UPG_* values
ycombinator Oct 13, 2023
5df99f3
Use public accessor for setting upgrade details on coordinator to pre…
ycombinator Oct 13, 2023
a479063
Use buffered channel for upgradeDetailsChan in test so test can run i…
ycombinator Oct 13, 2023
8fe8499
Fixing unit test
ycombinator Oct 13, 2023
94cc5ab
Add mutex to prevent data race
ycombinator Oct 13, 2023
e7d3401
Clarify assertion's intent
ycombinator Oct 17, 2023
413959c
Make copy of details before notifying observer with it.
ycombinator Oct 17, 2023
11dca0b
Add setter for setting download percent
ycombinator Oct 17, 2023
52fa242
Remove unnecessary struct tags
ycombinator Oct 17, 2023
3029e0f
Change mutex type
ycombinator Oct 17, 2023
8eae498
Document FailedState and ErrorMsg fields
ycombinator Oct 17, 2023
3ac886c
Track download rate as well
ycombinator Oct 17, 2023
db2eb60
Change data type of time field
ycombinator Oct 17, 2023
7d20b9f
Rename struct to avoid stutter in naming
ycombinator Oct 17, 2023
abefefa
Log upgrade details when they change
ycombinator Oct 17, 2023
492d195
Add nil guard
ycombinator Oct 17, 2023
6db2a99
Setting logger in test
ycombinator Oct 17, 2023
29afff0
Use sentinel value for encoding +Inf download rate in JSON
ycombinator Oct 18, 2023
6b3719b
Fix up comment
ycombinator Oct 18, 2023
26d2dd6
Set omitempty on failed_state and error_msg
ycombinator Oct 19, 2023
52987e7
Add units to download rate
ycombinator Oct 19, 2023
b8d81a0
Fixing test after conflicts
ycombinator Oct 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
Expand All @@ -35,7 +36,7 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
return nil
}

func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
select {
case <-time.After(2 * time.Second):
u.msgChan <- "completed " + version
Expand Down
42 changes: 36 additions & 6 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/capabilities"
Expand Down Expand Up @@ -59,7 +61,7 @@ type UpgradeManager interface {
Reload(rawConfig *config.Config) error

// Upgrade upgrades running agent.
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error)

// Ack is used on startup to check if the agent has upgraded and needs to send an ack for the action
Ack(ctx context.Context, acker acker.Acker) error
Expand Down Expand Up @@ -192,8 +194,12 @@ type Coordinator struct {
// state should never be directly read or written outside the Coordinator
// goroutine. Callers who need to access or modify the state should use the
// public accessors like State(), SetLogLevel(), etc.
state State
stateBroadcaster *broadcaster.Broadcaster[State]
state State
stateBroadcaster *broadcaster.Broadcaster[State]

// If you get a race detector error while accessing this field, it probably
// means you're calling private Coordinator methods from outside the
// Coordinator goroutine.
stateNeedsRefresh bool

// overrideState is used during the update process to report the overall
Expand All @@ -204,6 +210,10 @@ type Coordinator struct {
// SetOverrideState helper to the Coordinator goroutine.
overrideStateChan chan *coordinatorOverrideState

// upgradeDetailsChan forwards upgrade details from the publicly accessible
// SetUpgradeDetails helper to the Coordinator goroutine.
upgradeDetailsChan chan *details.Details

// loglevelCh forwards log level changes from the public API (SetLogLevel)
// to the run loop in Coordinator's main goroutine.
logLevelCh chan logp.Level
Expand Down Expand Up @@ -326,8 +336,9 @@ func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.
// synchronization in the subscriber API, just set the input buffer to 0.
stateBroadcaster: broadcaster.New(state, 64, 32),

logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
logLevelCh: make(chan logp.Level),
overrideStateChan: make(chan *coordinatorOverrideState),
upgradeDetailsChan: make(chan *details.Details),
}
// Setup communication channels for any non-nil components. This pattern
// lets us transparently accept nil managers / simulated events during
Expand Down Expand Up @@ -445,17 +456,33 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str

// override the overall state to upgrading until the re-execution is complete
c.SetOverrideState(agentclient.Upgrading, fmt.Sprintf("Upgrading to version %s", version))
cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, skipVerifyOverride, skipDefaultPgp, pgpBytes...)

// initialize upgrade details
actionID := ""
if action != nil {
actionID = action.ActionID
}
det := details.NewDetails(version, details.StateRequested, actionID)
det.RegisterObserver(c.SetUpgradeDetails)
det.RegisterObserver(c.logUpgradeDetails)

cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
if err != nil {
c.ClearOverrideState()
det.Fail(err)
return err
}
if cb != nil {
det.SetState(details.StateRestarting)
c.ReExec(cb)
}
return nil
}

func (c *Coordinator) logUpgradeDetails(details *details.Details) {
c.logger.Infow("updated upgrade details", "upgrade_details", details)
}

// AckUpgrade is the method used on startup to ack a previously successful upgrade action.
// Called from external goroutines.
func (c *Coordinator) AckUpgrade(ctx context.Context, acker acker.Acker) error {
Expand Down Expand Up @@ -878,6 +905,9 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
case overrideState := <-c.overrideStateChan:
c.setOverrideState(overrideState)

case upgradeDetails := <-c.upgradeDetailsChan:
c.setUpgradeDetails(upgradeDetails)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be logging every upgrade details transition? This seems like great way to make upgrade progress obvious in our logs.

Fleet is only going to get the latest state at the time of the next checkin, it won't see the entire set of transitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, will add logging to this PR.

Copy link
Contributor Author

@ycombinator ycombinator Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added logging but I implemented it as just another observer that's registered with the UpgradeDetails object. See d2a2eff.

Here's what the result looks like in Agent logs:

$ sudo elastic-agent logs -f | grep -i upgrade_details
{"log.level":"info","@timestamp":"2023-10-17T20:40:11.633Z","log.origin":{"file.name":"coordinator/coordinator.go","file.line":484},"message":"updated upgrade details","log":{"source":"elastic-agent"},"upgrade_details":{"target_version":"8.13.0","state":"UPG_REQUESTED","metadata":{"scheduled_at":"0001-01-01T00:00:00Z","failed_state":"","error_msg":""}},"ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2023-10-17T20:40:11.633Z","log.origin":{"file.name":"coordinator/coordinator.go","file.line":484},"message":"updated upgrade details","log":{"source":"elastic-agent"},"upgrade_details":{"target_version":"8.13.0","state":"UPG_DOWNLOADING","metadata":{"scheduled_at":"0001-01-01T00:00:00Z","failed_state":"","error_msg":""}},"ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2023-10-17T20:40:31.594Z","log.origin":{"file.name":"coordinator/coordinator.go","file.line":484},"message":"updated upgrade details","log":{"source":"elastic-agent"},"upgrade_details":{"target_version":"8.13.0","state":"UPG_FAILED","metadata":{"scheduled_at":"0001-01-01T00:00:00Z","failed_state":"UPG_DOWNLOADING","error_msg":"failed download of agent binary: context canceled"}},"ecs.version":"1.6.0"}

BTW, that last entry is from me Ctrl-C'ing the sudo elastic-agent upgrade 8.13.0 command since I knew it would never succeed.

Let me know what you think. Thanks!


case componentState := <-c.managerChans.runtimeManagerUpdate:
// New component change reported by the runtime manager via
// Coordinator.watchRuntimeComponents(), merge it with the
Expand Down
21 changes: 19 additions & 2 deletions internal/pkg/agent/application/coordinator/coordinator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package coordinator
import (
"fmt"

agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"

"github.com/elastic/elastic-agent-client/v7/pkg/client"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/pkg/component/runtime"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
)

// State provides the current state of the coordinator along with all the current states of components and units.
Expand All @@ -30,6 +32,8 @@ type State struct {

Components []runtime.ComponentComponentState `yaml:"components"`
LogLevel logp.Level `yaml:"log_level"`

UpgradeDetails *details.Details `yaml:"upgrade_details,omitempty"`
}

type coordinatorOverrideState struct {
Expand All @@ -54,6 +58,11 @@ func (c *Coordinator) ClearOverrideState() {
c.overrideStateChan <- nil
}

// SetUpgradeDetails sets upgrade details. This is used during upgrades.
func (c *Coordinator) SetUpgradeDetails(upgradeDetails *details.Details) {
c.upgradeDetailsChan <- upgradeDetails
}

// setRuntimeManagerError updates the error state for the runtime manager.
// Called on the main Coordinator goroutine.
func (c *Coordinator) setRuntimeManagerError(err error) {
Expand Down Expand Up @@ -114,6 +123,13 @@ func (c *Coordinator) setOverrideState(overrideState *coordinatorOverrideState)
c.stateNeedsRefresh = true
}

// setUpgradeDetails is the internal helper to set upgrade details and set stateNeedsRefresh.
// Must be called on the main Coordinator goroutine.
func (c *Coordinator) setUpgradeDetails(upgradeDetails *details.Details) {
c.state.UpgradeDetails = upgradeDetails
c.stateNeedsRefresh = true
}

// Forward the current state to the broadcaster and clear the stateNeedsRefresh
// flag. Must be called on the main Coordinator goroutine.
func (c *Coordinator) refreshState() {
Expand Down Expand Up @@ -163,6 +179,7 @@ func (c *Coordinator) generateReportableState() (s State) {
s.FleetState = c.state.FleetState
s.FleetMessage = c.state.FleetMessage
s.LogLevel = c.state.LogLevel
s.UpgradeDetails = c.state.UpgradeDetails
s.Components = make([]runtime.ComponentComponentState, len(c.state.Components))
copy(s.Components, c.state.Components)

Expand Down
61 changes: 58 additions & 3 deletions internal/pkg/agent/application/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
Expand Down Expand Up @@ -471,8 +473,50 @@ func TestCoordinator_Upgrade(t *testing.T) {
require.NoError(t, err)
}

func TestCoordinator_UpgradeDetails(t *testing.T) {
coordCh := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

expectedErr := errors.New("some upgrade error")
upgradeManager := &fakeUpgradeManager{
upgradeable: true,
upgradeErr: expectedErr,
}
coord, cfgMgr, varsMgr := createCoordinator(t, ctx, WithUpgradeManager(upgradeManager))
require.Nil(t, coord.state.UpgradeDetails)
go func() {
err := coord.Run(ctx)
if errors.Is(err, context.Canceled) {
// allowed error
err = nil
}
coordCh <- err
}()

// no vars used by the config
varsMgr.Vars(ctx, []*transpiler.Vars{{}})

// no need for anything to really run
cfg, err := config.NewConfigFrom(nil)
require.NoError(t, err)
cfgMgr.Config(ctx, cfg)

err = coord.Upgrade(ctx, "9.0.0", "", nil, true, false)
require.ErrorIs(t, expectedErr, err)
cancel()

err = <-coordCh
require.NoError(t, err)

require.Equal(t, details.StateFailed, coord.state.UpgradeDetails.State)
require.Equal(t, details.StateRequested, coord.state.UpgradeDetails.Metadata.FailedState)
require.Equal(t, expectedErr.Error(), coord.state.UpgradeDetails.Metadata.ErrorMsg)
}

type createCoordinatorOpts struct {
managed bool
managed bool
upgradeManager UpgradeManager
}

type CoordinatorOpt func(o *createCoordinatorOpts)
Expand All @@ -483,6 +527,12 @@ func ManagedCoordinator(managed bool) CoordinatorOpt {
}
}

func WithUpgradeManager(upgradeManager UpgradeManager) CoordinatorOpt {
return func(o *createCoordinatorOpts) {
o.upgradeManager = upgradeManager
}
}

// createCoordinator creates a coordinator that using a fake config manager and a fake vars manager.
//
// The runtime specifications is set up to use both the fake component and fake shipper.
Expand Down Expand Up @@ -527,7 +577,12 @@ func createCoordinator(t *testing.T, ctx context.Context, opts ...CoordinatorOpt
cfgMgr := newFakeConfigManager()
varsMgr := newFakeVarsManager()

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, &fakeUpgradeManager{}, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed)
upgradeManager := o.upgradeManager
if upgradeManager == nil {
upgradeManager = &fakeUpgradeManager{}
}

coord := New(l, nil, logp.DebugLevel, ai, specs, &fakeReExecManager{}, upgradeManager, rm, cfgMgr, varsMgr, caps, monitoringMgr, o.managed)
return coord, cfgMgr, varsMgr
}

Expand Down Expand Up @@ -574,7 +629,7 @@ func (f *fakeUpgradeManager) Reload(cfg *config.Config) error {
return nil
}

func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
func (f *fakeUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
f.upgradeCalled = true
if f.upgradeErr != nil {
return nil, f.upgradeErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/pkg/component"
Expand Down Expand Up @@ -811,6 +812,9 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
// since a successful upgrade sets the override state twice.
overrideStateChan := make(chan *coordinatorOverrideState, 2)

// similarly, upgradeDetailsChan is a buffered channel as well.
upgradeDetailsChan := make(chan *details.Details, 2)

// Create a manager that will allow upgrade attempts but return a failure
// from Upgrade itself (success requires testing ReExec and we aren't
// quite ready to do that yet).
Expand All @@ -820,9 +824,11 @@ func TestCoordinatorInitiatesUpgrade(t *testing.T) {
}

coord := &Coordinator{
stateBroadcaster: broadcaster.New(State{}, 0, 0),
overrideStateChan: overrideStateChan,
upgradeMgr: upgradeMgr,
stateBroadcaster: broadcaster.New(State{}, 0, 0),
overrideStateChan: overrideStateChan,
upgradeDetailsChan: upgradeDetailsChan,
upgradeMgr: upgradeMgr,
logger: logp.NewLogger("testing"),
}

// Call upgrade and make sure the upgrade manager receives an Upgrade call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/artifact/download"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
Expand All @@ -43,13 +44,14 @@ const (

// Downloader is a downloader able to fetch artifacts from elastic.co web page.
type Downloader struct {
log *logger.Logger
config *artifact.Config
client http.Client
log *logger.Logger
config *artifact.Config
client http.Client
upgradeDetails *details.Details
}

// NewDownloader creates and configures Elastic Downloader
func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, error) {
func NewDownloader(log *logger.Logger, config *artifact.Config, upgradeDetails *details.Details) (*Downloader, error) {
client, err := config.HTTPTransportSettings.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: false, IdleConnTimeout: 30 * time.Second},
Expand All @@ -59,15 +61,16 @@ func NewDownloader(log *logger.Logger, config *artifact.Config) (*Downloader, er
}

client.Transport = download.WithHeaders(client.Transport, download.Headers)
return NewDownloaderWithClient(log, config, *client), nil
return NewDownloaderWithClient(log, config, *client, upgradeDetails), nil
}

// NewDownloaderWithClient creates Elastic Downloader with specific client used
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client) *Downloader {
func NewDownloaderWithClient(log *logger.Logger, config *artifact.Config, client http.Client, upgradeDetails *details.Details) *Downloader {
return &Downloader{
log: log,
config: config,
client: client,
log: log,
config: config,
client: client,
upgradeDetails: upgradeDetails,
}
}

Expand Down Expand Up @@ -206,7 +209,8 @@ func (e *Downloader) downloadFile(ctx context.Context, artifactName, filename, f
}

loggingObserver := newLoggingProgressObserver(e.log, e.config.HTTPTransportSettings.Timeout)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver)
detailsObserver := newDetailsProgressObserver(e.upgradeDetails)
dp := newDownloadProgressReporter(sourceURI, e.config.HTTPTransportSettings.Timeout, fileSize, loggingObserver, detailsObserver)
dp.Report(ctx)
_, err = io.Copy(destinationFile, io.TeeReader(resp.Body, dp))
if err != nil {
Expand Down
Loading