Skip to content

Commit

Permalink
Start filter out message events
Browse files Browse the repository at this point in the history
  • Loading branch information
alpe committed Aug 12, 2021
1 parent 7b2e84c commit 7de7141
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 69 deletions.
10 changes: 10 additions & 0 deletions x/wasm/keeper/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,13 @@ func contractSDKEventAttributes(customAttributes []wasmvmtypes.EventAttribute, c
}
return attrs, nil
}

func filterOutMessageTypeEvents(events sdk.Events) sdk.Events {
var r sdk.Events
for _, e := range events {
if e.Type != sdk.EventTypeMessage {
r = append(r, e)
}
}
return r
}
29 changes: 20 additions & 9 deletions x/wasm/keeper/handler_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (h IBCRawPacketHandler) DispatchMsg(ctx sdk.Context, _ sdk.AccAddress, cont
return nil, nil, sdkerrors.Wrapf(types.ErrEmpty, "ibc channel")
}

em := sdk.NewEventManager()
ctx = ctx.WithEventManager(em)
sequence, found := h.channelKeeper.GetNextSequenceSend(ctx, contractIBCPortID, contractIBCChannelID)
if !found {
return nil, nil, sdkerrors.Wrapf(channeltypes.ErrSequenceSendNotFound,
Expand All @@ -175,35 +177,44 @@ func (h IBCRawPacketHandler) DispatchMsg(ctx sdk.Context, _ sdk.AccAddress, cont
convertWasmIBCTimeoutHeightToCosmosHeight(msg.IBC.SendPacket.Timeout.Block),
msg.IBC.SendPacket.Timeout.Timestamp,
)
return nil, nil, h.channelKeeper.SendPacket(ctx, channelCap, packet)

if err = h.channelKeeper.SendPacket(ctx, channelCap, packet); err != nil {
return nil, nil, err
}
return filterOutMessageTypeEvents(em.Events()), nil, nil
}

var _ Messenger = MessageHandlerFunc(nil)

// MessageHandlerFunc is a helper to construct simple function based message handler
type MessageHandlerFunc func(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error)
type MessageHandlerFunc func(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (data [][]byte, err error)

func (m MessageHandlerFunc) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
return m(ctx, contractAddr, contractIBCPortID, msg)
em := sdk.NewEventManager()
data, err = m(ctx.WithEventManager(em), contractAddr, contractIBCPortID, msg)
if err != nil {
events = filterOutMessageTypeEvents(em.Events())
}
return
}

// NewBurnCoinMessageHandler handles wasmvm.BurnMsg messages
func NewBurnCoinMessageHandler(burner types.Burner) MessageHandlerFunc {
return func(ctx sdk.Context, contractAddr sdk.AccAddress, _ string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
return func(ctx sdk.Context, contractAddr sdk.AccAddress, _ string, msg wasmvmtypes.CosmosMsg) (data [][]byte, err error) {
if msg.Bank != nil && msg.Bank.Burn != nil {
coins, err := convertWasmCoinsToSdkCoins(msg.Bank.Burn.Amount)
if err != nil {
return nil, nil, err
return nil, err
}
if err := burner.SendCoinsFromAccountToModule(ctx, contractAddr, types.ModuleName, coins); err != nil {
return nil, nil, sdkerrors.Wrap(err, "transfer to module")
return nil, sdkerrors.Wrap(err, "transfer to module")
}
if err := burner.BurnCoins(ctx, types.ModuleName, coins); err != nil {
return nil, nil, sdkerrors.Wrap(err, "burn coins")
return nil, sdkerrors.Wrap(err, "burn coins")
}
moduleLogger(ctx).Info("Burned", "amount", coins)
return nil, nil, nil
return nil, nil
}
return nil, nil, types.ErrUnknownMsg
return nil, types.ErrUnknownMsg
}
}
13 changes: 11 additions & 2 deletions x/wasm/keeper/handler_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ func TestIBCRawPacketHandler(t *testing.T) {
},
SendPacketFn: func(ctx sdk.Context, channelCap *capabilitytypes.Capability, packet ibcexported.PacketI) error {
capturedPacket = packet
ctx.EventManager().EmitEvents(sdk.Events{
sdk.NewEvent(channeltypes.EventTypeSendPacket),
sdk.NewEvent(
sdk.EventTypeMessage, // to be filtered out
sdk.NewAttribute(sdk.AttributeKeyModule, channeltypes.AttributeValueCategory),
),
})
return nil
},
}
Expand All @@ -246,6 +253,7 @@ func TestIBCRawPacketHandler(t *testing.T) {
capKeeper types.CapabilityKeeper
expPacketSent channeltypes.Packet
expErr *sdkerrors.Error
expEvents []string
}{
"all good": {
srcMsg: wasmvmtypes.SendPacketMsg{
Expand All @@ -264,6 +272,7 @@ func TestIBCRawPacketHandler(t *testing.T) {
Data: []byte("myData"),
TimeoutHeight: clienttypes.Height{RevisionNumber: 1, RevisionHeight: 2},
},
expEvents: []string{"send_packet"},
},
"sequence not found returns error": {
srcMsg: wasmvmtypes.SendPacketMsg{
Expand Down Expand Up @@ -296,15 +305,15 @@ func TestIBCRawPacketHandler(t *testing.T) {
capturedPacket = nil
// when
h := NewIBCRawPacketHandler(spec.chanKeeper, spec.capKeeper)
data, evts, gotErr := h.DispatchMsg(ctx, RandomAccountAddress(t), ibcPort, wasmvmtypes.CosmosMsg{IBC: &wasmvmtypes.IBCMsg{SendPacket: &spec.srcMsg}})
evts, data, gotErr := h.DispatchMsg(ctx, RandomAccountAddress(t), ibcPort, wasmvmtypes.CosmosMsg{IBC: &wasmvmtypes.IBCMsg{SendPacket: &spec.srcMsg}})
// then
require.True(t, spec.expErr.Is(gotErr), "exp %v but got %#+v", spec.expErr, gotErr)
if spec.expErr != nil {
return
}
assert.Nil(t, data)
assert.Nil(t, evts)
assert.Equal(t, spec.expPacketSent, capturedPacket)
assert.Equal(t, spec.expEvents, stripTypes(evts))
})
}
}
Expand Down
17 changes: 3 additions & 14 deletions x/wasm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,12 +1046,8 @@ func (c BankCoinTransferrer) TransferCoins(parentCtx sdk.Context, fromAddr sdk.A
if sdkerr != nil {
return sdkerr
}
for _, e := range em.Events() {
if e.Type == sdk.EventTypeMessage { // skip messages as we talk to the keeper directly
continue
}
parentCtx.EventManager().EmitEvent(e)
}
// skip messages as we talk to the keeper directly
parentCtx.EventManager().EmitEvents(filterOutMessageTypeEvents(em.Events()))
return nil
}

Expand All @@ -1072,19 +1068,12 @@ func NewDefaultWasmVMContractResponseHandler(md msgDispatcher) *DefaultWasmVMCon

// Handle processes the data returned by a contract invocation.
func (h DefaultWasmVMContractResponseHandler) Handle(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, messages []wasmvmtypes.SubMsg, origRspData []byte) ([]byte, error) {
em := sdk.NewEventManager()
result := origRspData
switch rsp, err := h.md.DispatchSubmessages(ctx.WithEventManager(em), contractAddr, ibcPort, messages); {
switch rsp, err := h.md.DispatchSubmessages(ctx, contractAddr, ibcPort, messages); {
case err != nil:
return nil, sdkerrors.Wrap(err, "submessages")
case rsp != nil:
result = rsp
}
// emit non message type events only
for _, e := range em.Events() {
if e.Type != sdk.EventTypeMessage {
ctx.EventManager().EmitEvent(e)
}
}
return result, nil
}
72 changes: 28 additions & 44 deletions x/wasm/keeper/msg_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,6 @@ func NewMessageDispatcher(messenger Messenger, keeper replyer) *MessageDispatche
return &MessageDispatcher{messenger: messenger, keeper: keeper}
}

// DispatchMessages sends all messages.
func (d MessageDispatcher) DispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
events, _, err := d.messenger.DispatchMsg(ctx, contractAddr, ibcPort, msg)
if err != nil {
return err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)
}
return nil
}

// dispatchMsgWithGasLimit sends a message with gas limit applied
func (d MessageDispatcher) dispatchMsgWithGasLimit(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msg wasmvmtypes.CosmosMsg, gasLimit uint64) (events []sdk.Event, data [][]byte, err error) {
limitedMeter := sdk.NewGasMeter(gasLimit)
subCtx := ctx.WithGasMeter(limitedMeter)

// catch out of gas panic and just charge the entire gas limit
defer func() {
if r := recover(); r != nil {
// if it's not an OutOfGas error, raise it again
if _, ok := r.(sdk.ErrorOutOfGas); !ok {
// log it to get the original stack trace somewhere (as panic(r) keeps message but stacktrace to here
moduleLogger(ctx).Info("SubMsg rethrowing panic: %#v", r)
panic(r)
}
ctx.GasMeter().ConsumeGas(gasLimit, "Sub-Message OutOfGas panic")
err = sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "SubMsg hit gas limit")
}
}()
events, data, err = d.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg)

// make sure we charge the parent what was spent
spent := subCtx.GasMeter().GasConsumed()
ctx.GasMeter().ConsumeGas(spent, "From limited Sub-Message")

return events, data, err
}

// DispatchSubmessages builds a sandbox to execute these messages and returns the execution result to the contract
// that dispatched them, both on success as well as failure
func (d MessageDispatcher) DispatchSubmessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.SubMsg) ([]byte, error) {
Expand All @@ -82,8 +42,6 @@ func (d MessageDispatcher) DispatchSubmessages(ctx sdk.Context, contractAddr sdk
}
// first, we build a sub-context which we can use inside the submessages
subCtx, commit := ctx.CacheContext()
em := sdk.NewEventManager()
subCtx = subCtx.WithEventManager(em)

// check how much gas left locally, optionally wrap the gas meter
gasRemaining := ctx.GasMeter().Limit() - ctx.GasMeter().GasConsumed()
Expand All @@ -97,11 +55,10 @@ func (d MessageDispatcher) DispatchSubmessages(ctx sdk.Context, contractAddr sdk
} else {
events, data, err = d.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg.Msg)
}

events = filterOutMessageTypeEvents(events)
// if it succeeds, commit state changes from submessage, and pass on events to Event Manager
if err == nil {
commit()
ctx.EventManager().EmitEvents(em.Events())
ctx.EventManager().EmitEvents(events)
} // on failure, revert state from sandbox, and ignore events (just skip doing the above)

Expand Down Expand Up @@ -153,6 +110,33 @@ func (d MessageDispatcher) DispatchSubmessages(ctx sdk.Context, contractAddr sdk
return rsp, nil
}

// dispatchMsgWithGasLimit sends a message with gas limit applied
func (d MessageDispatcher) dispatchMsgWithGasLimit(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msg wasmvmtypes.CosmosMsg, gasLimit uint64) (events []sdk.Event, data [][]byte, err error) {
limitedMeter := sdk.NewGasMeter(gasLimit)
subCtx := ctx.WithGasMeter(limitedMeter)

// catch out of gas panic and just charge the entire gas limit
defer func() {
if r := recover(); r != nil {
// if it's not an OutOfGas error, raise it again
if _, ok := r.(sdk.ErrorOutOfGas); !ok {
// log it to get the original stack trace somewhere (as panic(r) keeps message but stacktrace to here
moduleLogger(ctx).Info("SubMsg rethrowing panic: %#v", r)
panic(r)
}
ctx.GasMeter().ConsumeGas(gasLimit, "Sub-Message OutOfGas panic")
err = sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "SubMsg hit gas limit")
}
}()
events, data, err = d.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg)

// make sure we charge the parent what was spent
spent := subCtx.GasMeter().GasConsumed()
ctx.GasMeter().ConsumeGas(spent, "From limited Sub-Message")

return events, data, err
}

func sdkEventsToWasmVmEvents(events []sdk.Event) []wasmvmtypes.Event {
res := make([]wasmvmtypes.Event, len(events))
for i, ev := range events {
Expand Down

0 comments on commit 7de7141

Please sign in to comment.