diff --git a/cmd/lotus-storage-miner/actor.go b/cmd/lotus-storage-miner/actor.go index 611ea8f19be..69486eaf5b9 100644 --- a/cmd/lotus-storage-miner/actor.go +++ b/cmd/lotus-storage-miner/actor.go @@ -40,6 +40,8 @@ var actorCmd = &cli.Command{ actorSetPeeridCmd, actorSetOwnerCmd, actorControl, + actorProposeChangeWorker, + actorConfirmChangeWorker, }, } @@ -698,3 +700,221 @@ var actorSetOwnerCmd = &cli.Command{ return nil }, } + +var actorProposeChangeWorker = &cli.Command{ + Name: "propose-change-worker", + Usage: "Propose a worker address change", + ArgsUsage: "[address]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "Actually send transaction performing the action", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return fmt.Errorf("must pass address of new worker address") + } + + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + api, acloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer acloser() + + ctx := lcli.ReqContext(cctx) + + na, err := address.NewFromString(cctx.Args().First()) + if err != nil { + return err + } + + newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK) + if err != nil { + return err + } + + maddr, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + + mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + + if mi.NewWorker.Empty() { + if mi.Worker == newAddr { + return fmt.Errorf("worker address already set to %s", na) + } + } else { + if mi.NewWorker == newAddr { + return fmt.Errorf("change to worker address %s already pending", na) + } + } + + if !cctx.Bool("really-do-it") { + fmt.Fprintln(cctx.App.Writer, "Pass --really-do-it to actually execute this action") + return nil + } + + cwp := &miner2.ChangeWorkerAddressParams{ + NewWorker: newAddr, + NewControlAddrs: mi.ControlAddresses, + } + + sp, err := actors.SerializeParams(cwp) + if err != nil { + return xerrors.Errorf("serializing params: %w", err) + } + + smsg, err := api.MpoolPushMessage(ctx, &types.Message{ + From: mi.Owner, + To: maddr, + Method: miner.Methods.ChangeWorkerAddress, + Value: big.Zero(), + Params: sp, + }, nil) + if err != nil { + return xerrors.Errorf("mpool push: %w", err) + } + + fmt.Fprintln(cctx.App.Writer, "Propose Message CID:", smsg.Cid()) + + // wait for it to get mined into a block + wait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence) + if err != nil { + return err + } + + // check it executed successfully + if wait.Receipt.ExitCode != 0 { + fmt.Fprintln(cctx.App.Writer, "Propose worker change failed!") + return err + } + + mi, err = api.StateMinerInfo(ctx, maddr, wait.TipSet) + if err != nil { + return err + } + if mi.NewWorker != newAddr { + return fmt.Errorf("Proposed worker address change not reflected on chain: expected '%s', found '%s'", na, mi.NewWorker) + } + + fmt.Fprintf(cctx.App.Writer, "Worker key change to %s successfully proposed.\n", na) + fmt.Fprintf(cctx.App.Writer, "Call 'confirm-change-worker' at or after height %d to complete.\n", mi.WorkerChangeEpoch) + + return nil + }, +} + +var actorConfirmChangeWorker = &cli.Command{ + Name: "confirm-change-worker", + Usage: "Confirm a worker address change", + ArgsUsage: "[address]", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "Actually send transaction performing the action", + Value: false, + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Args().Present() { + return fmt.Errorf("must pass address of new worker address") + } + + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + + api, acloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer acloser() + + ctx := lcli.ReqContext(cctx) + + na, err := address.NewFromString(cctx.Args().First()) + if err != nil { + return err + } + + newAddr, err := api.StateLookupID(ctx, na, types.EmptyTSK) + if err != nil { + return err + } + + maddr, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + + mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return err + } + + if mi.NewWorker.Empty() { + return xerrors.Errorf("no worker key change proposed") + } else if mi.NewWorker != newAddr { + return xerrors.Errorf("worker key %s does not match current worker key proposal %s", newAddr, mi.NewWorker) + } + + if head, err := api.ChainHead(ctx); err != nil { + return xerrors.Errorf("failed to get the chain head: %w", err) + } else if head.Height() < mi.WorkerChangeEpoch { + return xerrors.Errorf("worker key change cannot be confirmed until %d, current height is %d", mi.WorkerChangeEpoch, head.Height()) + } + + if !cctx.Bool("really-do-it") { + fmt.Fprintln(cctx.App.Writer, "Pass --really-do-it to actually execute this action") + return nil + } + + smsg, err := api.MpoolPushMessage(ctx, &types.Message{ + From: mi.Owner, + To: maddr, + Method: miner.Methods.ConfirmUpdateWorkerKey, + Value: big.Zero(), + }, nil) + if err != nil { + return xerrors.Errorf("mpool push: %w", err) + } + + fmt.Fprintln(cctx.App.Writer, "Confirm Message CID:", smsg.Cid()) + + // wait for it to get mined into a block + wait, err := api.StateWaitMsg(ctx, smsg.Cid(), build.MessageConfidence) + if err != nil { + return err + } + + // check it executed successfully + if wait.Receipt.ExitCode != 0 { + fmt.Fprintln(cctx.App.Writer, "Worker change failed!") + return err + } + + mi, err = api.StateMinerInfo(ctx, maddr, wait.TipSet) + if err != nil { + return err + } + if mi.Worker != newAddr { + return fmt.Errorf("Confirmed worker address change not reflected on chain: expected '%s', found '%s'", newAddr, mi.Worker) + } + + return nil + }, +} diff --git a/cmd/lotus-storage-miner/actor_test.go b/cmd/lotus-storage-miner/actor_test.go new file mode 100644 index 00000000000..58c0169f8ad --- /dev/null +++ b/cmd/lotus-storage-miner/actor_test.go @@ -0,0 +1,131 @@ +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "regexp" + "strconv" + "sync/atomic" + "testing" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api/test" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/lotuslog" + "github.com/filecoin-project/lotus/node/repo" + builder "github.com/filecoin-project/lotus/node/test" +) + +func TestWorkerKeyChange(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("*", "INFO") + + policy.SetConsensusMinerMinPower(abi.NewStoragePower(2048)) + policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1) + policy.SetMinVerifiedDealSize(abi.NewStoragePower(256)) + + lotuslog.SetupLogLevels() + logging.SetLogLevel("miner", "ERROR") + logging.SetLogLevel("chainstore", "ERROR") + logging.SetLogLevel("chain", "ERROR") + logging.SetLogLevel("sub", "ERROR") + logging.SetLogLevel("storageminer", "ERROR") + + blocktime := 1 * time.Millisecond + + n, sn := builder.MockSbBuilder(t, []test.FullNodeOpts{test.FullNodeWithUpgradeAt(1)}, test.OneMiner) + + output := bytes.NewBuffer(nil) + run := func(cmd *cli.Command, args ...string) error { + app := cli.NewApp() + app.Metadata = map[string]interface{}{ + "repoType": repo.StorageMiner, + "testnode-full": n[0], + "testnode-storage": sn[0], + } + app.Writer = output + build.RunningNodeType = build.NodeMiner + + fs := flag.NewFlagSet("", flag.ContinueOnError) + for _, f := range cmd.Flags { + if err := f.Apply(fs); err != nil { + return err + } + } + require.NoError(t, fs.Parse(args)) + + cctx := cli.NewContext(app, fs, nil) + return cmd.Action(cctx) + } + + // setup miner + mine := int64(1) + done := make(chan struct{}) + go func() { + defer close(done) + for atomic.LoadInt64(&mine) == 1 { + time.Sleep(blocktime) + if err := sn[0].MineOne(ctx, test.MineNext); err != nil { + t.Error(err) + } + } + }() + defer func() { + atomic.AddInt64(&mine, -1) + fmt.Println("shutting down mining") + <-done + }() + + client := n[0] + + newKey, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // Initialize wallet. + test.SendFunds(ctx, t, client, newKey, abi.NewTokenAmount(0)) + + require.NoError(t, run(actorProposeChangeWorker, "--really-do-it", newKey.String())) + + result := output.String() + output.Reset() + + require.Contains(t, result, fmt.Sprintf("Worker key change to %s successfully proposed.", newKey)) + + epochRe := regexp.MustCompile("at or after height (?P[0-9]+) to complete") + matches := epochRe.FindStringSubmatch(result) + require.NotNil(t, matches) + targetEpoch, err := strconv.Atoi(matches[1]) + require.NoError(t, err) + require.NotZero(t, targetEpoch) + + // Too early. + require.Error(t, run(actorConfirmChangeWorker, "--really-do-it", newKey.String())) + output.Reset() + + for { + head, err := client.ChainHead(ctx) + require.NoError(t, err) + if head.Height() >= abi.ChainEpoch(targetEpoch) { + break + } + build.Clock.Sleep(10 * blocktime) + } + require.NoError(t, run(actorConfirmChangeWorker, "--really-do-it", newKey.String())) + output.Reset() +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 66b16e9f34f..eef67cfc488 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -186,22 +186,12 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st ctx := helpers.LifecycleCtx(mctx, lc) - mi, err := api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr) if err != nil { return nil, err } - worker, err := api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK) - if err != nil { - return nil, err - } - - fps, err := storage.NewWindowedPoStScheduler(api, fc, sealer, sealer, j, maddr, worker) - if err != nil { - return nil, err - } - - sm, err := storage.NewMiner(api, maddr, worker, h, ds, sealer, sc, verif, gsd, fc, j) + sm, err := storage.NewMiner(api, maddr, h, ds, sealer, sc, verif, gsd, fc, j) if err != nil { return nil, err } diff --git a/storage/miner.go b/storage/miner.go index b5a2fd6a565..b982e90870f 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -49,8 +49,7 @@ type Miner struct { sc sealing.SectorIDCounter verif ffiwrapper.Verifier - maddr address.Address - worker address.Address + maddr address.Address getSealConfig dtypes.GetSealingConfigFunc sealing *sealing.Sealing @@ -111,7 +110,7 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) { +func NewMiner(api storageMinerApi, maddr address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig, journal journal.Journal) (*Miner, error) { m := &Miner{ api: api, feeCfg: feeCfg, @@ -122,7 +121,6 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d verif: verif, maddr: maddr, - worker: worker, getSealConfig: gsd, journal: journal, sealingEvtType: journal.RegisterEventType("storage", "sealing_states"), @@ -174,7 +172,17 @@ func (m *Miner) Stop(ctx context.Context) error { } func (m *Miner) runPreflightChecks(ctx context.Context) error { - has, err := m.api.WalletHas(ctx, m.worker) + mi, err := m.api.StateMinerInfo(ctx, m.maddr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("failed to resolve miner info: %w", err) + } + + workerKey, err := m.api.StateAccountKey(ctx, mi.Worker, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("failed to resolve worker key: %w", err) + } + + has, err := m.api.WalletHas(ctx, workerKey) if err != nil { return xerrors.Errorf("failed to check wallet for worker key: %w", err) } @@ -183,7 +191,7 @@ func (m *Miner) runPreflightChecks(ctx context.Context) error { return errors.New("key for worker not found in local wallet") } - log.Infof("starting up miner %s, worker addr %s", m.maddr, m.worker) + log.Infof("starting up miner %s, worker addr %s", m.maddr, workerKey) return nil } diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 54d3d3dcc77..147a5631b0b 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -283,13 +283,14 @@ func (s *WindowPoStScheduler) checkNextRecoveries(ctx context.Context, dlIdx uin msg := &types.Message{ To: s.actor, - From: s.worker, Method: miner.Methods.DeclareFaultsRecovered, Params: enc, Value: types.NewInt(0), } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - s.setSender(ctx, msg, spec) + if err := s.setSender(ctx, msg, spec); err != nil { + return recoveries, nil, err + } sm, err := s.api.MpoolPushMessage(ctx, msg, &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)}) if err != nil { @@ -367,13 +368,14 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, dlIdx uint64, msg := &types.Message{ To: s.actor, - From: s.worker, Method: miner.Methods.DeclareFaults, Params: enc, Value: types.NewInt(0), // TODO: Is there a fee? } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - s.setSender(ctx, msg, spec) + if err := s.setSender(ctx, msg, spec); err != nil { + return faults, nil, err + } sm, err := s.api.MpoolPushMessage(ctx, msg, spec) if err != nil { @@ -707,13 +709,14 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi msg := &types.Message{ To: s.actor, - From: s.worker, Method: miner.Methods.SubmitWindowedPoSt, Params: enc, Value: types.NewInt(0), } spec := &api.MessageSendSpec{MaxFee: abi.TokenAmount(s.feeCfg.MaxWindowPoStGasFee)} - s.setSender(ctx, msg, spec) + if err := s.setSender(ctx, msg, spec); err != nil { + return nil, err + } // TODO: consider maybe caring about the output sm, err := s.api.MpoolPushMessage(ctx, msg, spec) @@ -741,21 +744,18 @@ func (s *WindowPoStScheduler) submitPost(ctx context.Context, proof *miner.Submi return sm, nil } -func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) { +func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) error { mi, err := s.api.StateMinerInfo(ctx, s.actor, types.EmptyTSK) if err != nil { - log.Errorw("error getting miner info", "error", err) - - // better than just failing - msg.From = s.worker - return + return xerrors.Errorf("error getting miner info: %w", err) } + // use the worker as a fallback + msg.From = mi.Worker gm, err := s.api.GasEstimateMessageGas(ctx, msg, spec, types.EmptyTSK) if err != nil { log.Errorw("estimating gas", "error", err) - msg.From = s.worker - return + return nil } *msg = *gm @@ -764,9 +764,9 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message, pa, err := AddressFor(ctx, s.api, mi, PoStAddr, minFunds) if err != nil { log.Errorw("error selecting address for window post", "error", err) - msg.From = s.worker - return + return nil } msg.From = pa + return nil } diff --git a/storage/wdpost_run_test.go b/storage/wdpost_run_test.go index bac86f022fc..12830c71122 100644 --- a/storage/wdpost_run_test.go +++ b/storage/wdpost_run_test.go @@ -127,7 +127,6 @@ func TestWDPostDoPost(t *testing.T) { proofType := abi.RegisteredPoStProof_StackedDrgWindow2KiBV1 postAct := tutils.NewIDAddr(t, 100) - workerAct := tutils.NewIDAddr(t, 101) mockStgMinerAPI := newMockStorageMinerAPI() @@ -168,7 +167,6 @@ func TestWDPostDoPost(t *testing.T) { faultTracker: &mockFaultTracker{}, proofType: proofType, actor: postAct, - worker: workerAct, journal: journal.NilJournal(), } diff --git a/storage/wdpost_sched.go b/storage/wdpost_sched.go index 99bea2e6600..1a1422e1937 100644 --- a/storage/wdpost_sched.go +++ b/storage/wdpost_sched.go @@ -31,8 +31,7 @@ type WindowPoStScheduler struct { partitionSectors uint64 ch *changeHandler - actor address.Address - worker address.Address + actor address.Address evtTypes [4]journal.EventType journal journal.Journal @@ -41,7 +40,7 @@ type WindowPoStScheduler struct { // failLk sync.Mutex } -func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address, worker address.Address) (*WindowPoStScheduler, error) { +func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb storage.Prover, ft sectorstorage.FaultTracker, j journal.Journal, actor address.Address) (*WindowPoStScheduler, error) { mi, err := api.StateMinerInfo(context.TODO(), actor, types.EmptyTSK) if err != nil { return nil, xerrors.Errorf("getting sector size: %w", err) @@ -60,8 +59,7 @@ func NewWindowedPoStScheduler(api storageMinerApi, fc config.MinerFeeConfig, sb proofType: rt, partitionSectors: mi.WindowPoStPartitionSectors, - actor: actor, - worker: worker, + actor: actor, evtTypes: [...]journal.EventType{ evtTypeWdPoStScheduler: j.RegisterEventType("wdpost", "scheduler"), evtTypeWdPoStProofs: j.RegisterEventType("wdpost", "proofs_processed"),