Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: interchainstaking keeper #341

Merged
merged 11 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x/airdrop/keeper/claim_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (k Keeper) verifyZoneIntent(ctx sdk.Context, chainID string, address string
return fmt.Errorf("zone %s not found", chainID)
}

intent, ok := k.icsKeeper.GetIntent(ctx, &zone, addr.String(), false)
intent, ok := k.icsKeeper.GetDelegatorIntent(ctx, &zone, addr.String(), false)
if !ok || len(intent.Intents) == 0 {
return fmt.Errorf("intent not found or no intents set for %s", addr)
}
Expand Down
2 changes: 1 addition & 1 deletion x/airdrop/keeper/msg_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (suite *KeeperTestSuite) Test_msgServer_Claim() {
},
},
}
appA.InterchainstakingKeeper.SetIntent(
appA.InterchainstakingKeeper.SetDelegatorIntent(
suite.chainA.GetContext(),
&zone,
intent,
Expand Down
6 changes: 3 additions & 3 deletions x/interchainstaking/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState)
panic("unable to find zone for delegation")
}
for _, delegatorIntent := range delegatorIntentsForZone.DelegationIntent {
k.SetIntent(ctx, &zone, *delegatorIntent, false)
k.SetDelegatorIntent(ctx, &zone, *delegatorIntent, false)
}
}

Expand Down Expand Up @@ -98,9 +98,9 @@ func ExportDelegatorIntentsPerZone(ctx sdk.Context, k keeper.Keeper) []types.Del
delegatorIntentsForZones := make([]types.DelegatorIntentsForZone, 0)
k.IterateZones(ctx, func(_ int64, zoneInfo *types.Zone) (stop bool) {
// export current epoch intents
delegatorIntentsForZones = append(delegatorIntentsForZones, types.DelegatorIntentsForZone{ChainId: zoneInfo.ChainId, DelegationIntent: k.AllIntentsAsPointer(ctx, zoneInfo, false), Snapshot: false})
delegatorIntentsForZones = append(delegatorIntentsForZones, types.DelegatorIntentsForZone{ChainId: zoneInfo.ChainId, DelegationIntent: k.AllDelegatorIntentsAsPointer(ctx, zoneInfo, false), Snapshot: false})
// export last epoch intents
delegatorIntentsForZones = append(delegatorIntentsForZones, types.DelegatorIntentsForZone{ChainId: zoneInfo.ChainId, DelegationIntent: k.AllIntentsAsPointer(ctx, zoneInfo, true), Snapshot: true})
delegatorIntentsForZones = append(delegatorIntentsForZones, types.DelegatorIntentsForZone{ChainId: zoneInfo.ChainId, DelegationIntent: k.AllDelegatorIntentsAsPointer(ctx, zoneInfo, true), Snapshot: true})
return false
})
return delegatorIntentsForZones
Expand Down
10 changes: 5 additions & 5 deletions x/interchainstaking/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

const blockInterval = 30

type zoneItrFn func(index int64, zoneInfo *types.Zone) (stop bool)
type zoneItrFn func(index int64, zone *types.Zone) (stop bool)

// BeginBlocker of interchainstaking module
func (k *Keeper) BeginBlocker(ctx sdk.Context) {
Expand All @@ -35,13 +35,13 @@ func (k *Keeper) BeginBlocker(ctx sdk.Context) {
// commenting this out until we can revisit. in its current state it causes more issues than it fixes.

if err := k.EnsureWithdrawalAddresses(ctx, zone); err != nil {
k.Logger(ctx).Error("error in EnsureWithdrawalAddresses", "error", err)
k.Logger(ctx).Error("error in EnsureWithdrawalAddresses", "error", err.Error())
}
if err := k.HandleMaturedUnbondings(ctx, zone); err != nil {
k.Logger(ctx).Error("error in HandleMaturedUnbondings", "error", err)
k.Logger(ctx).Error("error in HandleMaturedUnbondings", "error", err.Error())
}
if err := k.GCCompletedUnbondings(ctx, zone); err != nil {
k.Logger(ctx).Error("error in GCCompletedUnbondings", "error", err)
k.Logger(ctx).Error("error in GCCompletedUnbondings", "error", err.Error())
}
}

Expand All @@ -58,7 +58,7 @@ func (k *Keeper) BeginBlocker(ctx sdk.Context) {
query := stakingTypes.QueryValidatorsRequest{}
err := k.EmitValSetQuery(ctx, zone, query, sdkmath.NewInt(period))
if err != nil {
k.Logger(ctx).Error("unable to trigger valset update query", "error", err)
k.Logger(ctx).Error("unable to trigger valset update query", "error", err.Error())
// failing to emit the valset update is not terminal but constitutes
// an error, as if this starts happening frequent it is something
// we should investigate.
Expand Down
93 changes: 54 additions & 39 deletions x/interchainstaking/keeper/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c Callbacks) RegisterCallbacks() icqtypes.QueryCallbacks {
AddCallback("delegation", Callback(DelegationCallback)).
AddCallback("distributerewards", Callback(DistributeRewardsFromWithdrawAccount)).
AddCallback("depositinterval", Callback(DepositIntervalCallback)).
AddCallback("deposittx", Callback(DepositTx)).
AddCallback("deposittx", Callback(DepositTxCallback)).
AddCallback("perfbalance", Callback(PerfBalanceCallback)).
AddCallback("accountbalance", Callback(AccountBalanceCallback)).
AddCallback("allbalances", Callback(AllBalancesCallback))
Expand Down Expand Up @@ -264,10 +264,10 @@ func checkTrustedHeader(header *tmclienttypes.Header, consState *tmclienttypes.C
return nil
}

// pulled directly from ibc-go tm light client
// checkValidity checks if the Tendermint header is valid.
// checkTMStateValidity checks if the Tendermint header is valid.
// CONTRACT: consState.Height == header.TrustedHeight
func checkValidity(
// pulled directly from ibc-go tm light client
func checkTMStateValidity(
clientState *tmclienttypes.ClientState, consState *tmclienttypes.ConsensusState,
header *tmclienttypes.Header, currentTimestamp time.Time,
) error {
Expand Down Expand Up @@ -347,44 +347,19 @@ func checkValidity(
return nil
}

func DepositTx(k *Keeper, ctx sdk.Context, args []byte, query icqtypes.Query) error {
zone, found := k.GetZone(ctx, query.GetChainId())
if !found {
return fmt.Errorf("no registered zone for chain id: %s", query.GetChainId())
}

if !zone.DepositsEnabled {
return fmt.Errorf("chain id %s does not current allow deposits", query.GetChainId())
}

k.Logger(ctx).Debug("DepositTx callback", "zone", zone.ChainId)

res := icqtypes.GetTxWithProofResponse{}
if len(args) == 0 {
return errors.New("attempted to unmarshal zero length byte slice (6)")
}
err := k.cdc.Unmarshal(args, &res)
if err != nil {
return err
}

_, found = k.GetReceipt(ctx, types.GetReceiptKey(zone.ChainId, res.GetTxResponse().TxHash))
if found {
k.Logger(ctx).Debug("Found previously handled tx. Ignoring.", "txhash", res.GetTxResponse().TxHash)
return nil
}

// validate proof
// CheckTMHeaderForZone verifies the Tendermint consensus and client states for a given zone. Returns error if unable
// to verify.
func (k *Keeper) CheckTMHeaderForZone(ctx sdk.Context, zone *types.Zone, res icqtypes.GetTxWithProofResponse) error {
connection, _ := k.IBCKeeper.ConnectionKeeper.GetConnection(ctx, zone.ConnectionId)

clientState, found := k.IBCKeeper.ClientKeeper.GetClientState(ctx, connection.ClientId)
if !found {
return errors.New("unable to fetch client state")
}

/** we can call ClientKeeper.CheckHeaderAndUpdateState() here, but this causes state changes inside the IBCKeeper which feels bad.
so instead we copy the above two functions wholesale from ibc-go (this sucks too, but with predicatable behaviour) and validate
the inbound header manually. */
/*
We can call ClientKeeper.CheckHeaderAndUpdateState() here, but this causes state changes inside the IBCKeeper
which feels bad. so instead we copy the above two functions wholesale from ibc-go (this sucks too, but with
predictable behaviour) and validate the inbound header manually.
*/
consensusState, found := k.IBCKeeper.ClientKeeper.GetClientConsensusState(ctx, connection.ClientId, res.Header.TrustedHeight)
if !found {
return fmt.Errorf("unable to fetch consensus state for trusted height: %s", res.Header.TrustedHeight.String())
Expand All @@ -400,7 +375,8 @@ func DepositTx(k *Keeper, ctx sdk.Context, args []byte, query icqtypes.Query) er
return errors.New("unable to marshal consensus state")
}

err = checkValidity(tmclientState, tmconsensusState, res.GetHeader(), ctx.BlockHeader().Time)
// validate tendermint statefor
err := checkTMStateValidity(tmclientState, tmconsensusState, res.GetHeader(), ctx.BlockHeader().Time)
if err != nil {
return fmt.Errorf("unable to validate header; %w", err)
}
Expand All @@ -414,7 +390,46 @@ func DepositTx(k *Keeper, ctx sdk.Context, args []byte, query icqtypes.Query) er
return fmt.Errorf("unable to validate proof: %w", err)
}

return k.HandleReceiptTransaction(ctx, res.GetTxResponse(), res.GetTx(), &zone)
return nil
}

// DepositTxCallback is a callback that verifies client chain state validity, gets Tx receipt and calls
// HandleReceiptForTransaction.
func DepositTxCallback(k *Keeper, ctx sdk.Context, args []byte, query icqtypes.Query) error {
// check validity
if len(args) == 0 {
return errors.New("attempted to unmarshal zero length byte slice (6)")
}

zone, found := k.GetZone(ctx, query.GetChainId())
if !found {
return fmt.Errorf("no registered zone for chain id: %s", query.GetChainId())
}

if !zone.DepositsEnabled {
return fmt.Errorf("chain id %s does not current allow deposits", query.GetChainId())
}

k.Logger(ctx).Debug("DepositTx callback", "zone", zone.ChainId)

res := icqtypes.GetTxWithProofResponse{}
err := k.cdc.Unmarshal(args, &res)
if err != nil {
return err
}

_, found = k.GetReceipt(ctx, types.GetReceiptKey(zone.ChainId, res.GetTxResponse().TxHash))
if found {
k.Logger(ctx).Debug("Found previously handled tx. Ignoring.", "txhash", res.GetTxResponse().TxHash)
return nil
}

err = k.CheckTMHeaderForZone(ctx, &zone, res)
if err != nil {
return fmt.Errorf("unable to verify proof: %w", err)
}

return k.HandleReceiptForTransaction(ctx, res.GetTxResponse(), res.GetTx(), &zone)
}

// AccountBalanceCallback is a callback handler for Balance queries.
Expand Down
4 changes: 2 additions & 2 deletions x/interchainstaking/keeper/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func (k *Keeper) DelegatorIntent(c context.Context, req *types.QueryDelegatorInt
}

// we can ignore bool (found) as it always returns true
// - see comment in GetIntent
intent, _ := k.GetIntent(ctx, &zone, req.DelegatorAddress, false)
// - see comment in GetDelegatorIntent
intent, _ := k.GetDelegatorIntent(ctx, &zone, req.DelegatorAddress, false)

return &types.QueryDelegatorIntentResponse{Intent: &intent}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion x/interchainstaking/keeper/grpc_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (suite *KeeperTestSuite) TestKeeper_DelegatorIntent() {
},
}
for _, intent := range intents {
icsKeeper.SetIntent(ctx, &zone, intent, false)
icsKeeper.SetDelegatorIntent(ctx, &zone, intent, false)
}
},
&types.QueryDelegatorIntentRequest{
Expand Down
101 changes: 76 additions & 25 deletions x/interchainstaking/keeper/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,105 @@ func (k *Keeper) BeforeEpochStart(_ sdk.Context, _ string, _ int64) error {
return nil
}

// AfterEpochEnd is called after any registered epoch ends.
// calls:
//
// k.AggregateDelegatorIntents
// k.HandleQueuedUnbondings
// k.Rebalance
//
// and re-queries icq for new zone info.
func (k *Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
// every epoch
if epochIdentifier == "epoch" {
k.Logger(ctx).Info("handling epoch end")

k.IterateZones(ctx, func(index int64, zoneInfo *types.Zone) (stop bool) {
k.Logger(ctx).Info("taking a snapshot of intents")
err := k.AggregateIntents(ctx, zoneInfo)
if epochIdentifier == types.EpochIdentifier {
k.Logger(ctx).Info("handling epoch end", "epoch_identifier", epochIdentifier, "epoch_number", epochNumber)

k.IterateZones(ctx, func(index int64, zone *types.Zone) (stop bool) {
k.Logger(ctx).Info(
"taking a snapshot of delegator intents",
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
err := k.AggregateDelegatorIntents(ctx, zone)
if err != nil {
// we can and need not panic here; logging the error is sufficient.
// an error here is not expected, but also not terminal.
// we don't return on failure here as we still want to attempt
// the unrelated tasks below.
k.Logger(ctx).Error("encountered a problem aggregating intents; leaving aggregated intents unchanged since last epoch", "error", err)
k.Logger(ctx).Error(
"encountered a problem aggregating intents; leaving aggregated intents unchanged since last epoch",
"error", err.Error(),
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
}

if zoneInfo.DelegationAddress == nil {
if zone.DelegationAddress == nil {
// we have reached the end of the epoch and the delegation address is nil.
// This shouldn't happen in normal operation, but can if the zone was registered right on the epoch boundary.
return false
}

if err := k.HandleQueuedUnbondings(ctx, zoneInfo, epochNumber); err != nil {
k.Logger(ctx).Error(err.Error())
if err := k.HandleQueuedUnbondings(ctx, zone, epochNumber); err != nil {
// we can and need not panic here; logging the error is sufficient.
// an error here is not expected, but also not terminal.
// we don't return on failure here as we still want to attempt
// the unrelated tasks below.
k.Logger(ctx).Error(
"encountered a problem handling queued unbondings",
"error", err.Error(),
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
}

err = k.Rebalance(ctx, zoneInfo, epochNumber)
err = k.Rebalance(ctx, zone, epochNumber)
if err != nil {
// we can and need not panic here; logging the error is sufficient.
// an error here is not expected, but also not terminal.
// we don't return on failure here as we still want to attempt
// the unrelated tasks below.
k.Logger(ctx).Error("encountered a problem rebalancing", "error", err.Error())
k.Logger(ctx).Error(
"encountered a problem rebalancing",
"error", err.Error(),
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
}

if zoneInfo.WithdrawalWaitgroup > 0 {
k.Logger(ctx).Error("epoch waitgroup was unexpected > 0; this means we did not process the previous epoch!")
zoneInfo.WithdrawalWaitgroup = 0
if zone.WithdrawalWaitgroup > 0 {
k.Logger(ctx).Error(
"epoch waitgroup was unexpected > 0; this means we did not process the previous epoch!",
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
zone.WithdrawalWaitgroup = 0
}

// OnChanOpenAck calls SetWithdrawalAddress (see ibc_module.go)
k.Logger(ctx).Info("Withdrawing rewards")
k.Logger(ctx).Info(
"withdrawing rewards",
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)

delegationQuery := stakingtypes.QueryDelegatorDelegationsRequest{DelegatorAddr: zoneInfo.DelegationAddress.Address, Pagination: &query.PageRequest{Limit: uint64(len(zoneInfo.Validators))}}
delegationQuery := stakingtypes.QueryDelegatorDelegationsRequest{
DelegatorAddr: zone.DelegationAddress.Address,
Pagination: &query.PageRequest{
Limit: uint64(len(zone.Validators)),
},
}
bz := k.cdc.MustMarshal(&delegationQuery)

k.ICQKeeper.MakeRequest(
ctx,
zoneInfo.ConnectionId,
zoneInfo.ChainId,
zone.ConnectionId,
zone.ChainId,
"cosmos.staking.v1beta1.Query/DelegatorDelegations",
bz,
sdk.NewInt(-1),
Expand All @@ -76,13 +121,13 @@ func (k *Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNum
0,
)

rewardsQuery := distrtypes.QueryDelegationTotalRewardsRequest{DelegatorAddress: zoneInfo.DelegationAddress.Address}
rewardsQuery := distrtypes.QueryDelegationTotalRewardsRequest{DelegatorAddress: zone.DelegationAddress.Address}
bz = k.cdc.MustMarshal(&rewardsQuery)

k.ICQKeeper.MakeRequest(
ctx,
zoneInfo.ConnectionId,
zoneInfo.ChainId,
zone.ConnectionId,
zone.ChainId,
"cosmos.distribution.v1beta1.Query/DelegationTotalRewards",
bz,
sdk.NewInt(-1),
Expand All @@ -94,9 +139,14 @@ func (k *Keeper) AfterEpochEnd(ctx sdk.Context, epochIdentifier string, epochNum
// increment the WithdrawalWaitgroup
// this allows us to track the response for every protocol delegator
// WithdrawalWaitgroup is decremented in RewardsCallback
zoneInfo.WithdrawalWaitgroup++
k.Logger(ctx).Info("Incrementing waitgroup for delegation", "value", zoneInfo.WithdrawalWaitgroup)
k.SetZone(ctx, zoneInfo)
zone.WithdrawalWaitgroup++
k.Logger(ctx).Info("Incrementing waitgroup for delegation",
"value", zone.WithdrawalWaitgroup,
"chain_id", zone.ChainId,
"epoch_identifier", epochIdentifier,
"epoch_number", epochNumber,
)
k.SetZone(ctx, zone)

return false
})
Expand All @@ -118,6 +168,7 @@ func (k *Keeper) Hooks() Hooks {
}

// epochs hooks

func (h Hooks) BeforeEpochStart(ctx sdk.Context, epochIdentifier string, epochNumber int64) error {
return h.k.BeforeEpochStart(ctx, epochIdentifier, epochNumber)
}
Expand Down
Loading