Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submsg and replies #441

Merged
merged 15 commits into from
Mar 10, 2021
Merged
45 changes: 22 additions & 23 deletions x/wasm/internal/keeper/handler_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,50 +320,49 @@ func convertWasmIBCTimeoutTimestampToCosmosTimestamp(timestamp *uint64) uint64 {
return *timestamp
}

func (h DefaultMessageHandler) Dispatch(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msgs ...wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
sdkMsgs, err := h.encoders.Encode(ctx, contractAddr, contractIBCPortID, msg)
func (h DefaultMessageHandler) DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error) {
sdkMsgs, err := h.encoders.Encode(ctx, contractAddr, contractIBCPortID, msg)
if err != nil {
return nil, nil, err
}
for _, sdkMsg := range sdkMsgs {
res, err := h.handleSdkMessage(ctx, contractAddr, sdkMsg)
if err != nil {
return err
return nil, nil, err
}
for _, sdkMsg := range sdkMsgs {
if err := h.handleSdkMessage(ctx, contractAddr, sdkMsg); err != nil {
return err
}
// append data
data = append(data, res.Data)
// append events
sdkEvents := make([]sdk.Event, len(res.Events))
for i := range res.Events {
sdkEvents[i] = sdk.Event(res.Events[i])
}
events = append(events, sdkEvents...)
}
return nil
return
}

func (h DefaultMessageHandler) handleSdkMessage(ctx sdk.Context, contractAddr sdk.Address, msg sdk.Msg) error {
func (h DefaultMessageHandler) handleSdkMessage(ctx sdk.Context, contractAddr sdk.Address, msg sdk.Msg) (*sdk.Result, error) {
if err := msg.ValidateBasic(); err != nil {
return err
return nil, err
}
// make sure this account can send it
for _, acct := range msg.GetSigners() {
if !acct.Equals(contractAddr) {
return sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "contract doesn't have permission")
return nil, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "contract doesn't have permission")
}
}

// find the handler and execute it
handler := h.router.Route(ctx, msg.Route())
if handler == nil {
return sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, msg.Route())
return nil, sdkerrors.Wrap(sdkerrors.ErrUnknownRequest, msg.Route())
}
res, err := handler(ctx, msg)
if err != nil {
return err
}

events := make(sdk.Events, len(res.Events))
for i := range res.Events {
events[i] = sdk.Event(res.Events[i])
return nil, err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)

return nil
return res, nil
}

func convertWasmCoinsToSdkCoins(coins []wasmvmtypes.Coin) (sdk.Coins, error) {
Expand Down
209 changes: 190 additions & 19 deletions x/wasm/internal/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
abci "github.com/tendermint/tendermint/abci/types"
"path/filepath"

"github.com/CosmWasm/wasmd/x/wasm/internal/types"
Expand Down Expand Up @@ -52,7 +53,7 @@ type Option interface {
}

type messenger interface {
Dispatch(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msgs ...wasmvmtypes.CosmosMsg) error
DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error)
}

// Keeper will have a reference to Wasmer with it's own data directory.
Expand Down Expand Up @@ -220,7 +221,7 @@ func (k Keeper) Instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.A

func (k Keeper) instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.AccAddress, initMsg []byte, label string, deposit sdk.Coins, authZ AuthorizationPolicy) (sdk.AccAddress, []byte, error) {
if !k.IsPinnedCode(ctx, codeID) {
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: init")
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: instantiate")
}

// create contract address
Expand Down Expand Up @@ -301,13 +302,14 @@ func (k Keeper) instantiate(ctx sdk.Context, codeID uint64, creator, admin sdk.A
contractInfo.IBCPortID = ibcPort
}

// store contract before dispatch so that contract could be called back
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved
k.storeContractInfo(ctx, contractAddress, &contractInfo)
k.appendToContractHistory(ctx, contractAddress, contractInfo.InitialHistory(initMsg))

// then dispatch so that contract could be called back
err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, nil, err
return nil, nil, sdkerrors.Wrap(err, "dispatch")
}

return contractAddress, res.Data, nil
Expand Down Expand Up @@ -352,9 +354,10 @@ func (k Keeper) Execute(ctx sdk.Context, contractAddress sdk.AccAddress, caller
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, err
return nil, sdkerrors.Wrap(err, "dispatch")
ethanfrey marked this conversation as resolved.
Show resolved Hide resolved
}

return &sdk.Result{
Expand Down Expand Up @@ -426,8 +429,9 @@ func (k Keeper) migrate(ctx sdk.Context, contractAddress sdk.AccAddress, caller
k.appendToContractHistory(ctx, contractAddress, historyEntry)
k.storeContractInfo(ctx, contractAddress, contractInfo)

// then dispatch
if err := k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages); err != nil {
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}

Expand Down Expand Up @@ -464,7 +468,50 @@ func (k Keeper) Sudo(ctx sdk.Context, contractAddress sdk.AccAddress, msg []byte
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

err = k.dispatchMessages(ctx, contractAddress, contractInfo.IBCPortID, res.Messages)
// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}

return &sdk.Result{
Data: res.Data,
}, nil
}

// reply is only called from keeper internal functions (dispatchSubmessages) after processing the submessage
// it
func (k Keeper) reply(ctx sdk.Context, contractAddress sdk.AccAddress, reply wasmvmtypes.Reply) (*sdk.Result, error) {
contractInfo, codeInfo, prefixStore, err := k.contractInstance(ctx, contractAddress)
if err != nil {
return nil, err
}

// current thought is to charge gas like a fresh run, we can revisit whether to give it a discount later
if !k.IsPinnedCode(ctx, contractInfo.CodeID) {
ctx.GasMeter().ConsumeGas(InstanceCost, "Loading CosmWasm module: reply")
}

env := types.NewEnv(ctx, contractAddress)

// prepare querier
querier := QueryHandler{
Ctx: ctx,
Plugins: k.queryPlugins,
}
gas := gasForContract(ctx)
res, gasUsed, execErr := k.wasmer.Reply(codeInfo.CodeHash, env, reply, prefixStore, cosmwasmAPI, querier, gasMeter(ctx), gas)
consumeGas(ctx, gasUsed)
if execErr != nil {
return nil, sdkerrors.Wrap(types.ErrExecuteFailed, execErr.Error())
}

// emit all events from this contract itself
events := types.ParseEvents(res.Attributes, contractAddress)
ctx.EventManager().EmitEvents(events)

// dispatch submessages then messages
err = k.dispatchAll(ctx, contractAddress, contractInfo.IBCPortID, res.Submessages, res.Messages)
if err != nil {
return nil, sdkerrors.Wrap(err, "dispatch")
}
Expand Down Expand Up @@ -678,15 +725,6 @@ func (k Keeper) GetByteCode(ctx sdk.Context, codeID uint64) ([]byte, error) {
return k.wasmer.GetCode(codeInfo.CodeHash)
}

func (k Keeper) dispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
if err := k.messenger.Dispatch(ctx, contractAddr, ibcPort, msg); err != nil {
return err
}
}
return nil
}

// PinCode pins the wasm contract in wasmvm cache
func (k Keeper) PinCode(ctx sdk.Context, codeID uint64) error {
codeInfo := k.GetCodeInfo(ctx, codeID)
Expand Down Expand Up @@ -740,6 +778,139 @@ func (k Keeper) InitializePinnedCodes(ctx sdk.Context) error {
return nil
}

func (k Keeper) dispatchAll(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, subMsgs []wasmvmtypes.SubMsg, msgs []wasmvmtypes.CosmosMsg) error {
// first dispatch all submessages (and the replies).
err := k.dispatchSubmessages(ctx, contractAddr, ibcPort, subMsgs)
if err != nil {
return err
}
// then dispatch all the normal messages
return k.dispatchMessages(ctx, contractAddr, ibcPort, msgs)
}

func (k Keeper) dispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
events, _, err := k.messenger.DispatchMsg(ctx, contractAddr, ibcPort, msg)
if err != nil {
return err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)
}
return nil
}

func (k Keeper) dispatchMsgWithGasLimit(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msg wasmvmtypes.CosmosMsg, gasLimit uint64) (events []sdk.Event, data [][]byte, err error) {
limitedMeter := sdk.NewGasMeter(gasLimit)
subCtx := ctx.WithGasMeter(limitedMeter)

// catch out of gas panic and just charge the entire gas limit
defer func() {
if r := recover(); r != nil {
// if it's not an OutOfGas error, raise it again
if _, ok := r.(sdk.ErrorOutOfGas); !ok {
// log it to get the original stack trace somewhere (as panic(r) keeps message but stacktrace to here
k.Logger(ctx).Info("SubMsg rethrowing panic: %#v", r)
panic(r)
}
ctx.GasMeter().ConsumeGas(gasLimit, "Sub-Message OutOfGas panic")
err = sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "SubMsg hit gas limit")
}
}()
events, data, err = k.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg)

// make sure we charge the parent what was spent
spent := subCtx.GasMeter().GasConsumed()
ctx.GasMeter().ConsumeGas(spent, "From limited Sub-Message")

return events, data, err
}

// dispatchSubmessages builds a sandbox to execute these messages and returns the execution result to the contract
// that dispatched them, both on success as well as failure
func (k Keeper) dispatchSubmessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.SubMsg) error {
for _, msg := range msgs {
// first, we build a sub-context which we can use inside the submessages
subCtx, commit := ctx.CacheContext()

// check how much gas left locally, optionally wrap the gas meter
gasRemaining := ctx.GasMeter().Limit() - ctx.GasMeter().GasConsumed()
limitGas := msg.GasLimit != nil && (*msg.GasLimit < gasRemaining)

var err error
var events []sdk.Event
var data [][]byte
if limitGas {
events, data, err = k.dispatchMsgWithGasLimit(subCtx, contractAddr, ibcPort, msg.Msg, *msg.GasLimit)
} else {
events, data, err = k.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg.Msg)
}

// if it succeeds, commit state changes from submessage, and pass on events to Event Manager
if err == nil {
commit()
ctx.EventManager().EmitEvents(events)
}
// on failure, revert state from sandbox, and ignore events (just skip doing the above)

var result wasmvmtypes.SubcallResult
if err == nil {
// just take the first one for now if there are multiple sub-sdk messages
// and safely return nothing if no data
var responseData []byte
if len(data) > 0 {
responseData = data[0]
}
result = wasmvmtypes.SubcallResult{
Ok: &wasmvmtypes.SubcallResponse{
Events: sdkEventsToWasmVmEvents(events),
Data: responseData,
},
}
} else {
result = wasmvmtypes.SubcallResult{
Err: err.Error(),
}
}

// now handle the reply, we use the parent context, and abort on error
reply := wasmvmtypes.Reply{
ID: msg.ID,
Result: result,
}

// we can ignore any result returned as there is nothing to do with the data
// and the events are already in the ctx.EventManager()
_, err = k.reply(ctx, contractAddr, reply)
if err != nil {
return err
}
}
return nil
}

func sdkEventsToWasmVmEvents(events []sdk.Event) []wasmvmtypes.Event {
res := make([]wasmvmtypes.Event, len(events))
for i, ev := range events {
res[i] = wasmvmtypes.Event{
Type: ev.Type,
Attributes: sdkAttributesToWasmVmAttributes(ev.Attributes),
}
}
return res
}

func sdkAttributesToWasmVmAttributes(attrs []abci.EventAttribute) []wasmvmtypes.EventAttribute {
res := make([]wasmvmtypes.EventAttribute, len(attrs))
for i, attr := range attrs {
res[i] = wasmvmtypes.EventAttribute{
Key: string(attr.Key),
Value: string(attr.Value),
}
}
return res
}

func gasForContract(ctx sdk.Context) uint64 {
meter := ctx.GasMeter()
if meter.IsOutOfGas() {
Expand Down
Loading