Skip to content

Commit

Permalink
feat: pagination for polling, fix tests (#55)
Browse files Browse the repository at this point in the history
* pagination for polling, fix tests

* remove deduplication, listen to update events

* fix event listening

* use packet status for filtering events

* config naming fix, configurable validation interval, handle order update in pool, logical order delete from pool

* small fixes

* use amount to calculate minimum fee
  • Loading branch information
zale144 authored Nov 22, 2024
1 parent 643005e commit 7cd29c4
Show file tree
Hide file tree
Showing 14 changed files with 341 additions and 213 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ policy.json
--spend-limit 10000adym \
--rollapp rollapp1 \
--denoms "adym,uatom" \
--min-lp-fee-percentage "0.1" \
--min-fee-percentage "0.1" \
--max-price 10000adym \
--operator-fee-share 0.1 \
--settlement-validated --fees 1dym -y
Expand Down
12 changes: 6 additions & 6 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var RootCmd = &cobra.Command{

var initCmd = &cobra.Command{
Use: "init",
Short: "Initialize the order client",
Long: `Initialize the order client by generating a config file with default values.`,
Short: "Initialize the eibc client",
Long: `Initialize the eibc client by generating a config file with default values.`,
Run: func(cmd *cobra.Command, args []string) {
cfg := config.Config{}
if err := viper.Unmarshal(&cfg); err != nil {
Expand All @@ -61,8 +61,8 @@ var initCmd = &cobra.Command{

var startCmd = &cobra.Command{
Use: "start",
Short: "Start the order client",
Long: `Start the order client that scans for demand orders and fulfills them.`,
Short: "Start the eibc client",
Long: `Start the eibc client that scans for demand orders and fulfills them.`,
Run: func(cmd *cobra.Command, args []string) {
viper.AutomaticEnv()

Expand Down Expand Up @@ -91,7 +91,7 @@ var startCmd = &cobra.Command{

oc, err := eibc.NewOrderClient(cfg, logger)
if err != nil {
log.Fatalf("failed to create order client: %v", err)
log.Fatalf("failed to create eibc client: %v", err)
}

if cfg.Fulfillers.Scale == 0 {
Expand All @@ -100,7 +100,7 @@ var startCmd = &cobra.Command{
}

if err := oc.Start(cmd.Context()); err != nil {
log.Fatalf("failed to start order client: %v", err)
log.Fatalf("failed to start eibc client: %v", err)
}
},
}
Expand Down
11 changes: 7 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type OperatorConfig struct {
}

type ValidationConfig struct {
FallbackLevel ValidationLevel `mapstructure:"fallback_level"`
ValidationWaitTime time.Duration `mapstructure:"validation_wait_time"`
FallbackLevel ValidationLevel `mapstructure:"fallback_level"`
WaitTime time.Duration `mapstructure:"wait_time"`
Interval time.Duration `mapstructure:"interval"`
}

type RollappConfig struct {
Expand All @@ -79,7 +80,8 @@ const (
defaultMaxOrdersPerTx = 10
defaultOrderRefreshInterval = 30 * time.Second
defaultValidationFallbackLevel = "p2p"
defaultValidationWaitTime = "60m"
defaultValidationWaitTime = "61m"
defaultOrderValidationInterval = "5m"
)

type ValidationLevel string
Expand Down Expand Up @@ -125,7 +127,8 @@ func InitConfig() {
viper.SetDefault("fulfillers.policy_address", "<your-policy-address>")

viper.SetDefault("validation.fallback_level", defaultValidationFallbackLevel)
viper.SetDefault("validation.validation_wait_time", defaultValidationWaitTime)
viper.SetDefault("validation.wait_time", defaultValidationWaitTime)
viper.SetDefault("validation.interval", defaultOrderValidationInterval)

viper.SetDefault("rollapps.example_1234-1.full_nodes", []string{"http://localhost:26657"})
viper.SetDefault("rollapps.example_1234-1.min_confirmations", "1")
Expand Down
5 changes: 5 additions & 0 deletions eibc/lp.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (or *orderTracker) loadLPs(ctx context.Context) error {
continue
}

if grant.Granter == "" || grant.Grantee == "" {
or.logger.Error("invalid grant", zap.Any("grant", grant))
continue
}

g := new(types.FulfillOrderAuthorization)
if err = proto.Unmarshal(grant.Authorization.Value, g); err != nil {
return fmt.Errorf("failed to unmarshal grant: %w", err)
Expand Down
8 changes: 2 additions & 6 deletions eibc/order_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/bech32"
"go.uber.org/zap"

"github.com/dymensionxyz/cosmosclient/cosmosclient"
"go.uber.org/zap"

"github.com/dymensionxyz/eibc-client/config"
)
Expand All @@ -30,9 +29,7 @@ func NewOrderClient(cfg config.Config, logger *zap.Logger) (*orderClient, error)

//nolint:gosec
subscriberID := fmt.Sprintf("eibc-client-%d", rand.Int())

orderCh := make(chan []*demandOrder, config.NewOrderBufferSize)
fulfilledOrdersCh := make(chan *orderBatch, config.NewOrderBufferSize) // TODO: make buffer size configurable

hubClient, err := getHubClient(cfg)
if err != nil {
Expand All @@ -57,12 +54,12 @@ func NewOrderClient(cfg config.Config, logger *zap.Logger) (*orderClient, error)
cfg.Fulfillers.PolicyAddress,
minOperatorFeeShare,
fullNodeClient,
fulfilledOrdersCh,
subscriberID,
cfg.Fulfillers.BatchSize,
&cfg.Validation,
orderCh,
cfg.OrderPolling.Interval, // we can use the same interval for order polling and LP balance checking
cfg.Validation.Interval,
logger,
)

Expand Down Expand Up @@ -145,7 +142,6 @@ func NewOrderClient(cfg config.Config, logger *zap.Logger) (*orderClient, error)
cfg.Fulfillers.PolicyAddress,
cClient,
orderCh,
fulfilledOrdersCh,
ordTracker.releaseAllReservedOrdersFunds,
ordTracker.debitAllReservedOrdersFunds,
)
Expand Down
56 changes: 42 additions & 14 deletions eibc/order_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/authz"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
rpcclient "github.com/tendermint/tendermint/rpc/client"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/dymensionxyz/cosmosclient/cosmosclient"

"github.com/dymensionxyz/eibc-client/config"
"github.com/dymensionxyz/eibc-client/types"
)
Expand All @@ -40,6 +39,7 @@ func TestOrderClient(t *testing.T) {
fullNodeClient *nodeClient
pollOrders []Order
eventOrders []Order
updateOrders []Order
expectLPFulfilledOrderIDs map[string]string // orderID -> lpAddress
}{
{
Expand All @@ -57,7 +57,8 @@ func TestOrderClient(t *testing.T) {
BatchSize: 4,
},
Validation: config.ValidationConfig{
ValidationWaitTime: time.Second,
WaitTime: time.Second,
Interval: time.Second,
},
},
lpConfigs: []lpConfig{
Expand Down Expand Up @@ -148,22 +149,25 @@ func TestOrderClient(t *testing.T) {
pollOrders: []Order{
{
EibcOrderId: "order1",
Price: "80stake",
Price: "80",
Fee: "12stake",
RollappId: "rollapp1",
ProofHeight: "1",
BlockHeight: "1",
}, {
EibcOrderId: "order2",
Price: "202stake",
Fee: "25stake",
Price: "202",
Fee: "2stake", // too low - won't fulfill
RollappId: "rollapp2",
ProofHeight: "2",
BlockHeight: "2",
}, {
EibcOrderId: "order5",
Price: "201stake",
Price: "201",
Fee: "50stake",
RollappId: "rollapp1",
ProofHeight: "5",
BlockHeight: "5",
},
},
eventOrders: []Order{
Expand All @@ -187,6 +191,16 @@ func TestOrderClient(t *testing.T) {
ProofHeight: "6",
},
},
updateOrders: []Order{
{
EibcOrderId: "order2",
Price: "202",
Fee: "25stake", // update so it will fulfill
RollappId: "rollapp2",
ProofHeight: "2",
BlockHeight: "2",
},
},
expectLPFulfilledOrderIDs: map[string]string{
"order1": "lp-3-address", // lp3 (lp1 and lp3 selected because they fulfill for rollapp1, lp3 preferred because operator fee is higher)
// "order2": "", // not fulfilled (lp1 has not enough balance, lp2 does not fulfill stake orders, lp3 does not fulfill for rollapp2)
Expand Down Expand Up @@ -216,6 +230,7 @@ func TestOrderClient(t *testing.T) {
lpAddr := fmt.Sprintf("lp-%d-address", i+1)
grants = append(grants, &authz.GrantAuthorization{
Granter: lpAddr,
Grantee: "policyAddress",
Authorization: a,
})
lpBalances[lpAddr] = g.balance
Expand Down Expand Up @@ -258,6 +273,19 @@ func TestOrderClient(t *testing.T) {
}
}

for _, order := range tt.updateOrders {
oc.orderEventer.eventClient.(*mockNodeClient).updateOrderCh <- coretypes.ResultEvent{
Events: map[string][]string{
updatedFeeEvent + ".order_id": {order.EibcOrderId},
updatedFeeEvent + ".price": {order.Price},
updatedFeeEvent + ".packet_status": {"PENDING"},
updatedFeeEvent + ".new_fee": {order.Fee},
updatedFeeEvent + ".rollapp_id": {order.RollappId},
updatedFeeEvent + ".proof_height": {order.ProofHeight},
},
}
}

// wait a bit for the client to fulfill orders
time.Sleep(3 * time.Second)

Expand Down Expand Up @@ -295,7 +323,6 @@ func setupTestOrderClient(
) (*orderClient, error) {
logger, _ := zap.NewDevelopment()
orderCh := make(chan []*demandOrder, config.NewOrderBufferSize)
fulfilledOrdersCh := make(chan *orderBatch, config.NewOrderBufferSize)

// tracker
trackerClient := hubClient
Expand All @@ -308,12 +335,12 @@ func setupTestOrderClient(
"policyAddress",
minOperatorFeeShare,
fullNodeClient,
fulfilledOrdersCh,
"subscriber",
cfg.Fulfillers.BatchSize,
&cfg.Validation,
orderCh,
cfg.OrderPolling.Interval,
cfg.Validation.Interval,
logger,
)
ordTracker.getLPGrants = grantsFn
Expand All @@ -324,7 +351,7 @@ func setupTestOrderClient(
eventerClient := hubClient
eventerClient.finalizeOrderCh = make(chan coretypes.ResultEvent, 1)
eventerClient.addOrderCh = make(chan coretypes.ResultEvent, 1)
eventerClient.stateInfoCh = make(chan coretypes.ResultEvent, 1)
eventerClient.updateOrderCh = make(chan coretypes.ResultEvent, 1)

eventer := newOrderEventer(
cosmosclient.Client{
Expand Down Expand Up @@ -353,7 +380,6 @@ func setupTestOrderClient(
"policyAddress",
&hc,
orderCh,
fulfilledOrdersCh,
ordTracker.releaseAllReservedOrdersFunds,
ordTracker.debitAllReservedOrdersFunds,
)
Expand Down Expand Up @@ -429,7 +455,7 @@ type mockNodeClient struct {
rpcclient.Client
finalizeOrderCh chan coretypes.ResultEvent
addOrderCh chan coretypes.ResultEvent
stateInfoCh chan coretypes.ResultEvent
updateOrderCh chan coretypes.ResultEvent
}

func (m *mockNodeClient) Start() error {
Expand All @@ -445,9 +471,11 @@ func (m *mockNodeClient) BroadcastTx(string, ...sdk.Msg) (cosmosclient.Response,
}

func (m *mockNodeClient) Subscribe(_ context.Context, _ string, query string, _ ...int) (out <-chan coretypes.ResultEvent, err error) {
switch query {
case fmt.Sprintf("%s.is_fulfilled='false'", createdEvent):
switch {
case strings.Contains(query, createdEvent):
return m.addOrderCh, nil
case strings.Contains(query, updatedFeeEvent):
return m.updateOrderCh, nil
}
return nil, fmt.Errorf("invalid query")
}
Loading

0 comments on commit 7cd29c4

Please sign in to comment.