diff --git a/api/api_full.go b/api/api_full.go index 767739582e1..f0ce741ec8b 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -192,6 +192,9 @@ type FullNode interface { // MpoolPush pushes a signed message to mempool. MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error) + // MpoolPushUntrusted pushes a signed message to mempool from untrusted sources. + MpoolPushUntrusted(context.Context, *types.SignedMessage) (cid.Cid, error) + // MpoolPushMessage atomically assigns a nonce, signs, and pushes a message // to mempool. // maxFee is only used when GasFeeCap/GasPremium fields aren't specified diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index c79d0841c30..f9b232d1011 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -122,7 +122,9 @@ type FullNodeStruct struct { MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"` MpoolClear func(context.Context, bool) error `perm:"write"` - MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPushUntrusted func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"` + MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"` MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"` MpoolSub func(context.Context) (<-chan api.MpoolUpdate, error) `perm:"read"` @@ -553,6 +555,10 @@ func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessag return c.Internal.MpoolPush(ctx, smsg) } +func (c *FullNodeStruct) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { + return c.Internal.MpoolPushUntrusted(ctx, smsg) +} + func (c *FullNodeStruct) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { return c.Internal.MpoolPushMessage(ctx, msg, spec) } diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index d54ea7164a4..83aa5c6b784 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -55,6 +55,7 @@ var baseFeeLowerBoundFactor = types.NewInt(10) var baseFeeLowerBoundFactorConservative = types.NewInt(100) var MaxActorPendingMessages = 1000 +var MaxUntrustedActorPendingMessages = 10 var MaxNonceGap = uint64(4) @@ -195,9 +196,17 @@ func CapGasFee(msg *types.Message, maxFee abi.TokenAmount) { msg.GasPremium = big.Min(msg.GasFeeCap, msg.GasPremium) // cap premium at FeeCap } -func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) { +func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict, untrusted bool) (bool, error) { nextNonce := ms.nextNonce nonceGap := false + + maxNonceGap := MaxNonceGap + maxActorPendingMessages := MaxActorPendingMessages + if untrusted { + maxNonceGap = 0 + maxActorPendingMessages = MaxUntrustedActorPendingMessages + } + switch { case m.Message.Nonce == nextNonce: nextNonce++ @@ -206,7 +215,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo nextNonce++ } - case strict && m.Message.Nonce > nextNonce+MaxNonceGap: + case strict && m.Message.Nonce > nextNonce+maxNonceGap: return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap) case m.Message.Nonce > nextNonce: @@ -242,7 +251,7 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (boo //ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int) } - if !has && strict && len(ms.msgs) > MaxActorPendingMessages { + if !has && strict && len(ms.msgs) >= maxActorPendingMessages { log.Errorf("too many pending messages from actor %s", m.Message.From) return false, ErrTooManyPendingMessages } @@ -484,7 +493,7 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) { } mp.curTsLk.Lock() - publish, err := mp.addTs(m, mp.curTs, true) + publish, err := mp.addTs(m, mp.curTs, true, false) if err != nil { mp.curTsLk.Unlock() return cid.Undef, err @@ -551,7 +560,7 @@ func (mp *MessagePool) Add(m *types.SignedMessage) error { mp.curTsLk.Lock() defer mp.curTsLk.Unlock() - _, err = mp.addTs(m, mp.curTs, false) + _, err = mp.addTs(m, mp.curTs, false, false) return err } @@ -619,7 +628,7 @@ func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) return nil } -func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local bool) (bool, error) { +func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local, untrusted bool) (bool, error) { snonce, err := mp.getStateNonce(m.Message.From, curTs) if err != nil { return false, xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure) @@ -641,7 +650,7 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local return false, err } - return publish, mp.addLocked(m, !local) + return publish, mp.addLocked(m, !local, untrusted) } func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { @@ -676,17 +685,17 @@ func (mp *MessagePool) addLoaded(m *types.SignedMessage) error { return err } - return mp.addLocked(m, false) + return mp.addLocked(m, false, false) } func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error { mp.lk.Lock() defer mp.lk.Unlock() - return mp.addLocked(m, false) + return mp.addLocked(m, false, false) } -func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { +func (mp *MessagePool) addLocked(m *types.SignedMessage, strict, untrusted bool) error { log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce) if m.Signature.Type == crypto.SigTypeBLS { mp.blsSigCache.Add(m.Cid(), m.Signature) @@ -713,7 +722,7 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error { mp.pending[m.Message.From] = mset } - incr, err := mset.add(m, mp, strict) + incr, err := mset.add(m, mp, strict, untrusted) if err != nil { log.Debug(err) return err @@ -793,6 +802,50 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) ( return act.Balance, nil } +// this method is provided for the gateway to push messages. +// differences from Push: +// - strict checks are enabled +// - extra strict add checks are used when adding the messages to the msgSet +// that means: no nonce gaps, at most 10 pending messages for the actor +func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) { + err := mp.checkMessage(m) + if err != nil { + return cid.Undef, err + } + + // serialize push access to reduce lock contention + mp.addSema <- struct{}{} + defer func() { + <-mp.addSema + }() + + msgb, err := m.Serialize() + if err != nil { + return cid.Undef, err + } + + mp.curTsLk.Lock() + publish, err := mp.addTs(m, mp.curTs, false, true) + if err != nil { + mp.curTsLk.Unlock() + return cid.Undef, err + } + mp.curTsLk.Unlock() + + mp.lk.Lock() + if err := mp.addLocal(m, msgb); err != nil { + mp.lk.Unlock() + return cid.Undef, err + } + mp.lk.Unlock() + + if publish { + err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb) + } + + return m.Cid(), err +} + func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) { mp.lk.Lock() defer mp.lk.Unlock() diff --git a/cmd/lotus-gateway/api.go b/cmd/lotus-gateway/api.go index 42e9e482997..0a6365dbd08 100644 --- a/cmd/lotus-gateway/api.go +++ b/cmd/lotus-gateway/api.go @@ -86,5 +86,5 @@ func (a *GatewayAPI) MpoolPush(ctx context.Context, sm *types.SignedMessage) (ci // TODO: additional anti-spam checks - return a.api.MpoolPush(ctx, sm) + return a.api.MpoolPushUntrusted(ctx, sm) } diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 2b28816f7a2..19774802e96 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -72,6 +72,7 @@ * [MpoolPending](#MpoolPending) * [MpoolPush](#MpoolPush) * [MpoolPushMessage](#MpoolPushMessage) + * [MpoolPushUntrusted](#MpoolPushUntrusted) * [MpoolSelect](#MpoolSelect) * [MpoolSetConfig](#MpoolSetConfig) * [MpoolSub](#MpoolSub) @@ -1779,6 +1780,43 @@ Response: } ``` +### MpoolPushUntrusted +MpoolPushUntrusted pushes a signed message to mempool from untrusted sources. + + +Perms: write + +Inputs: +```json +[ + { + "Message": { + "Version": 42, + "To": "t01234", + "From": "t01234", + "Nonce": 42, + "Value": "0", + "GasLimit": 9, + "GasFeeCap": "0", + "GasPremium": "0", + "Method": 1, + "Params": "Ynl0ZSBhcnJheQ==" + }, + "Signature": { + "Type": 2, + "Data": "Ynl0ZSBhcnJheQ==" + } + } +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + ### MpoolSelect MpoolSelect returns a list of pending messages for inclusion in the next block diff --git a/node/impl/full/mpool.go b/node/impl/full/mpool.go index 066aafdc58d..e0dd3ecef00 100644 --- a/node/impl/full/mpool.go +++ b/node/impl/full/mpool.go @@ -110,6 +110,10 @@ func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (ci return a.Mpool.Push(smsg) } +func (a *MpoolAPI) MpoolPushUntrusted(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) { + return a.Mpool.PushUntrusted(smsg) +} + func (a *MpoolAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { cp := *msg msg = &cp