diff --git a/relayer/app/app.go b/relayer/app/app.go index a0777d735..b59531439 100644 --- a/relayer/app/app.go +++ b/relayer/app/app.go @@ -13,17 +13,25 @@ import ( "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" xprovider "github.com/omni-network/omni/lib/xchain/provider" + "github.com/omni-network/omni/relayer/app/cursor" "github.com/cometbft/cometbft/rpc/client" "github.com/cometbft/cometbft/rpc/client/http" "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" + + dbm "github.com/cosmos/cosmos-db" ) func Run(ctx context.Context, cfg Config) error { log.Info(ctx, "Starting relayer") + db, err := initializeDB(ctx, cfg) + if err != nil { + return err + } + buildinfo.Instrument(ctx) // Start metrics first, so app is "up" @@ -60,6 +68,12 @@ func Run(ctx context.Context, cfg Config) error { pricer := newTokenPricer(ctx) pnl := newPnlLogger(network.ID, pricer) + cursors, err := cursor.New(db, xprov.GetSubmittedCursor, network) + if err != nil { + return err + } + cursors.StartLoops(ctx) + for _, destChain := range network.EVMChains() { // Setup send provider sendProvider := func() (SendAsync, error) { @@ -93,7 +107,9 @@ func Run(ctx context.Context, cfg Config) error { xprov, CreateSubmissions, sendProvider, - awaitValSet) + awaitValSet, + cursors, + ) go worker.Run(ctx) } @@ -133,6 +149,21 @@ func initializeRPCClients(chains []netconf.Chain, endpoints xchain.RPCEndpoints) return rpcClientPerChain, nil } +func initializeDB(ctx context.Context, cfg Config) (dbm.DB, error) { + var db dbm.DB + if cfg.DBDir == "" { + log.Warn(ctx, "No --db-dir provided, using in-memory DB", nil) + return dbm.NewMemDB(), nil + } + var err error + db, err = dbm.NewGoLevelDB("indexer", cfg.DBDir, nil) + if err != nil { + return nil, errors.Wrap(err, "new golevel db") + } + + return db, nil +} + func makePortalRegistry(network netconf.ID, endpoints xchain.RPCEndpoints) (*bindings.PortalRegistry, error) { meta := netconf.MetadataByID(network, network.Static().OmniExecutionChainID) rpc, err := endpoints.ByNameOrID(meta.Name, meta.ChainID) diff --git a/relayer/app/config.go b/relayer/app/config.go index 620289ee6..5f2b44569 100644 --- a/relayer/app/config.go +++ b/relayer/app/config.go @@ -3,6 +3,7 @@ package relayer import ( "bytes" "text/template" + "time" "github.com/omni-network/omni/lib/buildinfo" "github.com/omni-network/omni/lib/errors" @@ -16,19 +17,23 @@ import ( ) type Config struct { - RPCEndpoints xchain.RPCEndpoints - PrivateKey string - HaloURL string - Network netconf.ID - MonitoringAddr string + RPCEndpoints xchain.RPCEndpoints + PrivateKey string + HaloURL string + Network netconf.ID + MonitoringAddr string + DBDir string + ConfirmInterval time.Duration } func DefaultConfig() Config { return Config{ - PrivateKey: "relayer.key", - HaloURL: "localhost:26657", - Network: "", - MonitoringAddr: ":26660", + PrivateKey: "relayer.key", + HaloURL: "localhost:26657", + Network: "", + MonitoringAddr: ":26660", + DBDir: "./db", + ConfirmInterval: 30 * time.Second, } } diff --git a/relayer/app/cursor/cursors.cosmos_orm.go b/relayer/app/cursor/cursors.cosmos_orm.go new file mode 100644 index 000000000..657e9c318 --- /dev/null +++ b/relayer/app/cursor/cursors.cosmos_orm.go @@ -0,0 +1,168 @@ +// Code generated by protoc-gen-go-cosmos-orm. DO NOT EDIT. + +package cursor + +import ( + context "context" + ormlist "cosmossdk.io/orm/model/ormlist" + ormtable "cosmossdk.io/orm/model/ormtable" + ormerrors "cosmossdk.io/orm/types/ormerrors" +) + +type CursorTable interface { + Insert(ctx context.Context, cursor *Cursor) error + Update(ctx context.Context, cursor *Cursor) error + Save(ctx context.Context, cursor *Cursor) error + Delete(ctx context.Context, cursor *Cursor) error + Has(ctx context.Context, src_chain_id uint64, conf_level uint32, dst_chain_id uint64, attest_offset uint64) (found bool, err error) + // Get returns nil and an error which responds true to ormerrors.IsNotFound() if the record was not found. + Get(ctx context.Context, src_chain_id uint64, conf_level uint32, dst_chain_id uint64, attest_offset uint64) (*Cursor, error) + List(ctx context.Context, prefixKey CursorIndexKey, opts ...ormlist.Option) (CursorIterator, error) + ListRange(ctx context.Context, from, to CursorIndexKey, opts ...ormlist.Option) (CursorIterator, error) + DeleteBy(ctx context.Context, prefixKey CursorIndexKey) error + DeleteRange(ctx context.Context, from, to CursorIndexKey) error + + doNotImplement() +} + +type CursorIterator struct { + ormtable.Iterator +} + +func (i CursorIterator) Value() (*Cursor, error) { + var cursor Cursor + err := i.UnmarshalMessage(&cursor) + return &cursor, err +} + +type CursorIndexKey interface { + id() uint32 + values() []interface{} + cursorIndexKey() +} + +// primary key starting index.. +type CursorPrimaryKey = CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey + +type CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey struct { + vs []interface{} +} + +func (x CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) id() uint32 { return 0 } +func (x CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) values() []interface{} { return x.vs } +func (x CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) cursorIndexKey() {} + +func (this CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) WithSrcChainId(src_chain_id uint64) CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey { + this.vs = []interface{}{src_chain_id} + return this +} + +func (this CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) WithSrcChainIdConfLevel(src_chain_id uint64, conf_level uint32) CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey { + this.vs = []interface{}{src_chain_id, conf_level} + return this +} + +func (this CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) WithSrcChainIdConfLevelDstChainId(src_chain_id uint64, conf_level uint32, dst_chain_id uint64) CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey { + this.vs = []interface{}{src_chain_id, conf_level, dst_chain_id} + return this +} + +func (this CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey) WithSrcChainIdConfLevelDstChainIdAttestOffset(src_chain_id uint64, conf_level uint32, dst_chain_id uint64, attest_offset uint64) CursorSrcChainIdConfLevelDstChainIdAttestOffsetIndexKey { + this.vs = []interface{}{src_chain_id, conf_level, dst_chain_id, attest_offset} + return this +} + +type cursorTable struct { + table ormtable.Table +} + +func (this cursorTable) Insert(ctx context.Context, cursor *Cursor) error { + return this.table.Insert(ctx, cursor) +} + +func (this cursorTable) Update(ctx context.Context, cursor *Cursor) error { + return this.table.Update(ctx, cursor) +} + +func (this cursorTable) Save(ctx context.Context, cursor *Cursor) error { + return this.table.Save(ctx, cursor) +} + +func (this cursorTable) Delete(ctx context.Context, cursor *Cursor) error { + return this.table.Delete(ctx, cursor) +} + +func (this cursorTable) Has(ctx context.Context, src_chain_id uint64, conf_level uint32, dst_chain_id uint64, attest_offset uint64) (found bool, err error) { + return this.table.PrimaryKey().Has(ctx, src_chain_id, conf_level, dst_chain_id, attest_offset) +} + +func (this cursorTable) Get(ctx context.Context, src_chain_id uint64, conf_level uint32, dst_chain_id uint64, attest_offset uint64) (*Cursor, error) { + var cursor Cursor + found, err := this.table.PrimaryKey().Get(ctx, &cursor, src_chain_id, conf_level, dst_chain_id, attest_offset) + if err != nil { + return nil, err + } + if !found { + return nil, ormerrors.NotFound + } + return &cursor, nil +} + +func (this cursorTable) List(ctx context.Context, prefixKey CursorIndexKey, opts ...ormlist.Option) (CursorIterator, error) { + it, err := this.table.GetIndexByID(prefixKey.id()).List(ctx, prefixKey.values(), opts...) + return CursorIterator{it}, err +} + +func (this cursorTable) ListRange(ctx context.Context, from, to CursorIndexKey, opts ...ormlist.Option) (CursorIterator, error) { + it, err := this.table.GetIndexByID(from.id()).ListRange(ctx, from.values(), to.values(), opts...) + return CursorIterator{it}, err +} + +func (this cursorTable) DeleteBy(ctx context.Context, prefixKey CursorIndexKey) error { + return this.table.GetIndexByID(prefixKey.id()).DeleteBy(ctx, prefixKey.values()...) +} + +func (this cursorTable) DeleteRange(ctx context.Context, from, to CursorIndexKey) error { + return this.table.GetIndexByID(from.id()).DeleteRange(ctx, from.values(), to.values()) +} + +func (this cursorTable) doNotImplement() {} + +var _ CursorTable = cursorTable{} + +func NewCursorTable(db ormtable.Schema) (CursorTable, error) { + table := db.GetTable(&Cursor{}) + if table == nil { + return nil, ormerrors.TableNotFound.Wrap(string((&Cursor{}).ProtoReflect().Descriptor().FullName())) + } + return cursorTable{table}, nil +} + +type CursorsStore interface { + CursorTable() CursorTable + + doNotImplement() +} + +type cursorsStore struct { + cursor CursorTable +} + +func (x cursorsStore) CursorTable() CursorTable { + return x.cursor +} + +func (cursorsStore) doNotImplement() {} + +var _ CursorsStore = cursorsStore{} + +func NewCursorsStore(db ormtable.Schema) (CursorsStore, error) { + cursorTable, err := NewCursorTable(db) + if err != nil { + return nil, err + } + + return cursorsStore{ + cursorTable, + }, nil +} diff --git a/relayer/app/cursor/cursors.pb.go b/relayer/app/cursor/cursors.pb.go new file mode 100644 index 000000000..7361c43bb --- /dev/null +++ b/relayer/app/cursor/cursors.pb.go @@ -0,0 +1,208 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.1 +// protoc (unknown) +// source: relayer/app/cursor/cursors.proto + +package cursor + +import ( + _ "cosmossdk.io/api/cosmos/orm/v1" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Cursor struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + SrcChainId uint64 `protobuf:"varint,1,opt,name=src_chain_id,json=srcChainId,proto3" json:"src_chain_id,omitempty"` // Chain ID as per https://chainlist.org + DstChainId uint64 `protobuf:"varint,2,opt,name=dst_chain_id,json=dstChainId,proto3" json:"dst_chain_id,omitempty"` // Chain ID as per https://chainlist.org + ConfLevel uint32 `protobuf:"varint,3,opt,name=conf_level,json=confLevel,proto3" json:"conf_level,omitempty"` // Confirmation level of the cross-chain block + AttestOffset uint64 `protobuf:"varint,4,opt,name=attest_offset,json=attestOffset,proto3" json:"attest_offset,omitempty"` // Offset of the cross-chain block + Confirmed bool `protobuf:"varint,5,opt,name=confirmed,proto3" json:"confirmed,omitempty"` // Is the cursor confirmed + StreamOffsetsByShard map[uint64]uint64 `protobuf:"bytes,6,rep,name=stream_offsets_by_shard,json=streamOffsetsByShard,proto3" json:"stream_offsets_by_shard,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` // Highest XMsg StreamOffset by ShardID +} + +func (x *Cursor) Reset() { + *x = Cursor{} + mi := &file_relayer_app_cursor_cursors_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Cursor) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Cursor) ProtoMessage() {} + +func (x *Cursor) ProtoReflect() protoreflect.Message { + mi := &file_relayer_app_cursor_cursors_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Cursor.ProtoReflect.Descriptor instead. +func (*Cursor) Descriptor() ([]byte, []int) { + return file_relayer_app_cursor_cursors_proto_rawDescGZIP(), []int{0} +} + +func (x *Cursor) GetSrcChainId() uint64 { + if x != nil { + return x.SrcChainId + } + return 0 +} + +func (x *Cursor) GetDstChainId() uint64 { + if x != nil { + return x.DstChainId + } + return 0 +} + +func (x *Cursor) GetConfLevel() uint32 { + if x != nil { + return x.ConfLevel + } + return 0 +} + +func (x *Cursor) GetAttestOffset() uint64 { + if x != nil { + return x.AttestOffset + } + return 0 +} + +func (x *Cursor) GetConfirmed() bool { + if x != nil { + return x.Confirmed + } + return false +} + +func (x *Cursor) GetStreamOffsetsByShard() map[uint64]uint64 { + if x != nil { + return x.StreamOffsetsByShard + } + return nil +} + +var File_relayer_app_cursor_cursors_proto protoreflect.FileDescriptor + +var file_relayer_app_cursor_cursors_proto_rawDesc = []byte{ + 0x0a, 0x20, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x75, + 0x72, 0x73, 0x6f, 0x72, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x70, 0x2e, + 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x1a, 0x17, 0x63, 0x6f, 0x73, 0x6d, 0x6f, 0x73, 0x2f, 0x6f, + 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x6f, 0x72, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0xa4, 0x03, 0x0a, 0x06, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x20, 0x0a, 0x0c, 0x73, 0x72, + 0x63, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x0a, 0x73, 0x72, 0x63, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0c, + 0x64, 0x73, 0x74, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x0a, 0x64, 0x73, 0x74, 0x43, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x12, 0x1d, + 0x0a, 0x0a, 0x63, 0x6f, 0x6e, 0x66, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x66, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x23, 0x0a, + 0x0d, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x72, 0x6d, 0x65, 0x64, + 0x12, 0x6b, 0x0a, 0x17, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x6f, 0x66, 0x66, 0x73, 0x65, + 0x74, 0x73, 0x5f, 0x62, 0x79, 0x5f, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x06, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x34, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x70, 0x2e, + 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x2e, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x42, 0x79, 0x53, 0x68, 0x61, + 0x72, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x14, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4f, + 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x42, 0x79, 0x53, 0x68, 0x61, 0x72, 0x64, 0x1a, 0x47, 0x0a, + 0x19, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x73, 0x42, 0x79, + 0x53, 0x68, 0x61, 0x72, 0x64, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x3a, 0x3e, 0xf2, 0x9e, 0xd3, 0x8e, 0x03, 0x38, 0x0a, 0x34, + 0x0a, 0x32, 0x73, 0x72, 0x63, 0x5f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x2c, 0x63, + 0x6f, 0x6e, 0x66, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x2c, 0x64, 0x73, 0x74, 0x5f, 0x63, 0x68, + 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x2c, 0x61, 0x74, 0x74, 0x65, 0x73, 0x74, 0x5f, 0x6f, 0x66, + 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x42, 0xc1, 0x01, 0x0a, 0x16, 0x63, 0x6f, 0x6d, 0x2e, 0x72, + 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x63, 0x75, 0x72, 0x73, 0x6f, + 0x72, 0x42, 0x0c, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, + 0x01, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6f, 0x6d, + 0x6e, 0x69, 0x2d, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x6f, 0x6d, 0x6e, 0x69, 0x2f, + 0x72, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0xa2, 0x02, 0x03, 0x52, 0x41, 0x43, 0xaa, 0x02, 0x12, 0x52, 0x65, 0x6c, 0x61, 0x79, + 0x65, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0xca, 0x02, 0x12, + 0x52, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x5c, 0x41, 0x70, 0x70, 0x5c, 0x43, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0xe2, 0x02, 0x1e, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x5c, 0x41, 0x70, 0x70, + 0x5c, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0xea, 0x02, 0x14, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x3a, 0x3a, 0x41, + 0x70, 0x70, 0x3a, 0x3a, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_relayer_app_cursor_cursors_proto_rawDescOnce sync.Once + file_relayer_app_cursor_cursors_proto_rawDescData = file_relayer_app_cursor_cursors_proto_rawDesc +) + +func file_relayer_app_cursor_cursors_proto_rawDescGZIP() []byte { + file_relayer_app_cursor_cursors_proto_rawDescOnce.Do(func() { + file_relayer_app_cursor_cursors_proto_rawDescData = protoimpl.X.CompressGZIP(file_relayer_app_cursor_cursors_proto_rawDescData) + }) + return file_relayer_app_cursor_cursors_proto_rawDescData +} + +var file_relayer_app_cursor_cursors_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_relayer_app_cursor_cursors_proto_goTypes = []any{ + (*Cursor)(nil), // 0: relayer.app.cursor.Cursor + nil, // 1: relayer.app.cursor.Cursor.StreamOffsetsByShardEntry +} +var file_relayer_app_cursor_cursors_proto_depIdxs = []int32{ + 1, // 0: relayer.app.cursor.Cursor.stream_offsets_by_shard:type_name -> relayer.app.cursor.Cursor.StreamOffsetsByShardEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_relayer_app_cursor_cursors_proto_init() } +func file_relayer_app_cursor_cursors_proto_init() { + if File_relayer_app_cursor_cursors_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_relayer_app_cursor_cursors_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_relayer_app_cursor_cursors_proto_goTypes, + DependencyIndexes: file_relayer_app_cursor_cursors_proto_depIdxs, + MessageInfos: file_relayer_app_cursor_cursors_proto_msgTypes, + }.Build() + File_relayer_app_cursor_cursors_proto = out.File + file_relayer_app_cursor_cursors_proto_rawDesc = nil + file_relayer_app_cursor_cursors_proto_goTypes = nil + file_relayer_app_cursor_cursors_proto_depIdxs = nil +} diff --git a/relayer/app/cursor/cursors.proto b/relayer/app/cursor/cursors.proto new file mode 100644 index 000000000..4d5adfa1e --- /dev/null +++ b/relayer/app/cursor/cursors.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package relayer.app.cursor; + +import "cosmos/orm/v1/orm.proto"; + +option go_package = "relayer/app"; + +message Cursor { + option (cosmos.orm.v1.table) = { + id: 1; + primary_key: { fields: "src_chain_id,conf_level,dst_chain_id,attest_offset" } + }; + + uint64 src_chain_id = 1; // Chain ID as per https://chainlist.org + uint64 dst_chain_id = 2; // Chain ID as per https://chainlist.org + uint32 conf_level = 3; // Confirmation level of the cross-chain block + uint64 attest_offset = 4; // Offset of the cross-chain block + bool confirmed = 5; // Is the cursor confirmed + map stream_offsets_by_shard = 6; // Highest XMsg StreamOffset by ShardID +} \ No newline at end of file diff --git a/relayer/app/cursor/cursors_internal_test.go b/relayer/app/cursor/cursors_internal_test.go new file mode 100644 index 000000000..2cc03aafc --- /dev/null +++ b/relayer/app/cursor/cursors_internal_test.go @@ -0,0 +1,187 @@ +package cursor + +import ( + "context" + "slices" + "testing" + + "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/netconf" + "github.com/omni-network/omni/lib/xchain" + + db "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/require" +) + +const ( + srcChainID = uint64(1) + destChainID = uint64(2) +) + +var ( + streamID1 = xchain.StreamID{ + SourceChainID: srcChainID, + DestChainID: destChainID, + ShardID: xchain.ShardFinalized0, + } + + streamID2 = xchain.StreamID{ + SourceChainID: srcChainID, + DestChainID: destChainID, + ShardID: xchain.ShardLatest0, + } +) + +var network = netconf.Network{Chains: []netconf.Chain{ + {ID: srcChainID, Name: "source", Shards: []xchain.ShardID{streamID1.ShardID, streamID2.ShardID}}, + {ID: destChainID, Name: "dest"}, +}} + +// TestStore tests how cursors are being confirmed. +func TestStore(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + finalized := make(map[xchain.StreamID]uint64) + getSubmitCursorFunc := func(ctx context.Context, ref xchain.Ref, stream xchain.StreamID) (xchain.SubmitCursor, bool, error) { + if ref != xchain.FinalizedRef { + return xchain.SubmitCursor{}, false, errors.New("unexpected ref") + } + + msgOffset, ok := finalized[stream] + + return xchain.SubmitCursor{ + StreamID: xchain.StreamID{}, + MsgOffset: msgOffset, + }, ok, nil + } + + store, err := New(db.NewMemDB(), getSubmitCursorFunc, network) + require.NoError(t, err) + + saveCursor := func(stream xchain.StreamID, attOffset uint64, msgOffset uint64) { + var msgs []xchain.Msg + if msgOffset != 0 { + msgs = append(msgs, xchain.Msg{MsgID: xchain.MsgID{StreamID: stream, StreamOffset: msgOffset}}) + } + err := store.Save(ctx, stream.ChainVersion(), stream.DestChainID, attOffset, map[xchain.StreamID][]xchain.Msg{ + stream: msgs, + }) + require.NoError(t, err) + } + + assert := func(t *testing.T, stream xchain.StreamID, confirmed, latest uint64) { + t.Helper() + + confirmedOffsets, err := store.WorkerOffsets(ctx, destChainID) + require.NoError(t, err) + require.Equal(t, confirmed, confirmedOffsets[stream.ChainVersion()]) + + lastOffsets, err := store.LatestOffsets(ctx, destChainID) + require.NoError(t, err) + require.Equal(t, latest, lastOffsets[stream.ChainVersion()]) + } + + assertCount := func(t *testing.T, stream1Count, stream2Count uint64) { + t.Helper() + + counts, err := store.CountCursors(ctx, destChainID) + require.NoError(t, err) + require.Equal(t, stream1Count, counts[streamID1.ChainVersion()]) + require.Equal(t, stream2Count, counts[streamID2.ChainVersion()]) + } + + // Add empty cursor, ensure only latest updated + saveCursor(streamID1, 11, 0) + assert(t, streamID1, 0, 11) + assertCount(t, 1, 0) + + // Confirm streamID1 attOffset=11 + require.NoError(t, store.confirmOnce(ctx)) + assert(t, streamID1, 11, 11) + assertCount(t, 1, 0) + + // Add streamID2 attOffset=21, msgOffset=201 + saveCursor(streamID2, 21, 201) + assert(t, streamID1, 11, 11) // Unchanged + require.NoError(t, store.confirmOnce(ctx)) + assert(t, streamID2, 0, 21) // Still unconfirmed + assertCount(t, 1, 1) + + // Confirm streamID2 attOffset=21 + finalized[streamID2] = 202 + require.NoError(t, store.confirmOnce(ctx)) + assert(t, streamID1, 11, 11) + assert(t, streamID2, 21, 21) + assertCount(t, 1, 1) + + // Save new cursors for each stream, assert only latest updated + saveCursor(streamID1, 12, 102) + saveCursor(streamID2, 22, 202) + assert(t, streamID1, 11, 12) + assert(t, streamID2, 21, 22) + assertCount(t, 2, 2) + + // Confirm streamID2 attOffset=22 + require.NoError(t, store.confirmOnce(ctx)) + assert(t, streamID1, 11, 12) // Unconfirmed + assert(t, streamID2, 22, 22) + + // Add empty cursor on stream1, finalize previous cursor, both should be confirmed + saveCursor(streamID1, 13, 0) + finalized[streamID1] = 102 + require.NoError(t, store.confirmOnce(ctx)) + assert(t, streamID1, 13, 13) + assertCount(t, 3, 2) + + require.NoError(t, store.trimOnce(ctx)) + assert(t, streamID1, 13, 13) // Unchanged + assert(t, streamID2, 22, 22) // Unchanged + assertCount(t, 1, 1) +} + +func (s *Store) LatestOffsets( + ctx context.Context, + destChain uint64, +) (map[xchain.ChainVersion]uint64, error) { + all, err := listAll(ctx, s.db) + if err != nil { + return nil, err + } + + // Collect the highest cursor for each streamer. + resp := make(map[xchain.ChainVersion]uint64) + for s, cursors := range splitByStreamer(all) { + if s.DstChainID != destChain { + continue + } + + slices.Reverse(cursors) + + for _, c := range cursors { + resp[s.ChainVersion()] = c.GetAttestOffset() + break + } + } + + return resp, nil +} + +func (s *Store) CountCursors( + ctx context.Context, + destChain uint64, +) (map[xchain.ChainVersion]uint64, error) { + all, err := listAll(ctx, s.db) + if err != nil { + return nil, err + } + + // Collect the highest cursor for each streamer. + resp := make(map[xchain.ChainVersion]uint64) + for s, cursors := range splitByStreamer(all) { + resp[s.ChainVersion()] = uint64(len(cursors)) + } + + return resp, nil +} diff --git a/relayer/app/cursor/db.go b/relayer/app/cursor/db.go new file mode 100644 index 000000000..5054a5eb4 --- /dev/null +++ b/relayer/app/cursor/db.go @@ -0,0 +1,59 @@ +package cursor + +import ( + "context" + + "github.com/omni-network/omni/lib/errors" + + ormv1alpha1 "cosmossdk.io/api/cosmos/orm/v1alpha1" + "cosmossdk.io/core/store" + "cosmossdk.io/orm/model/ormdb" + db "github.com/cosmos/cosmos-db" +) + +// listAll returns all cursors by prefix. +// Results are ordered by primary key ascending: SrcChainId-ConfLevel-DstChainId-AttestOffset. +func listAll(ctx context.Context, db CursorTable) ([]*Cursor, error) { + iterator, err := db.List(ctx, CursorPrimaryKey{}) + if err != nil { + return nil, errors.Wrap(err, "listAll cursors") + } + defer iterator.Close() + + var cursors []*Cursor + for iterator.Next() { + cursor, err := iterator.Value() + if err != nil { + return nil, errors.Wrap(err, "cursor value") + } + cursors = append(cursors, cursor) + } + + return cursors, nil +} + +func newCursorsTable(db db.DB) (CursorTable, error) { + schema := &ormv1alpha1.ModuleSchemaDescriptor{SchemaFile: []*ormv1alpha1.ModuleSchemaDescriptor_FileEntry{ + {Id: 1, ProtoFileName: File_relayer_app_cursor_cursors_proto.Path()}, + }} + + modDB, err := ormdb.NewModuleDB(schema, ormdb.ModuleDBOptions{KVStoreService: dbStoreService{db}}) + if err != nil { + return nil, errors.Wrap(err, "create ormdb module db") + } + + dbStore, err := NewCursorsStore(modDB) + if err != nil { + return nil, errors.Wrap(err, "create store") + } + + return dbStore.CursorTable(), nil +} + +type dbStoreService struct { + db.DB +} + +func (db dbStoreService) OpenKVStore(context.Context) store.KVStore { + return db.DB +} diff --git a/relayer/app/cursor/metrics.go b/relayer/app/cursor/metrics.go new file mode 100644 index 000000000..622c19c52 --- /dev/null +++ b/relayer/app/cursor/metrics.go @@ -0,0 +1,22 @@ +package cursor + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + confirmedOffset = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "relayer", + Subsystem: "cursors", + Name: "confirmed_offset", + Help: "Confirmed cursor offset", + }, []string{"src_chain_version", "dst_chain"}) + + latestOffset = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "relayer", + Subsystem: "cursors", + Name: "latest_offset", + Help: "Latest cursor offset", + }, []string{"src_chain_version", "dst_chain"}) +) diff --git a/relayer/app/cursor/store.go b/relayer/app/cursor/store.go new file mode 100644 index 000000000..9d75a0fd9 --- /dev/null +++ b/relayer/app/cursor/store.go @@ -0,0 +1,252 @@ +package cursor + +import ( + "context" + "slices" + "time" + + "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/log" + "github.com/omni-network/omni/lib/netconf" + "github.com/omni-network/omni/lib/xchain" + + db "github.com/cosmos/cosmos-db" +) + +type getSubmitCursorFunc func(ctx context.Context, ref xchain.Ref, stream xchain.StreamID) (xchain.SubmitCursor, bool, error) + +// Store provides a persisted attestation streamer cursor store. +type Store struct { + db CursorTable + getSubmitCursorFunc getSubmitCursorFunc + network netconf.Network +} + +func New( + db db.DB, + getSubmitCursorFunc getSubmitCursorFunc, + network netconf.Network, +) (*Store, error) { + cursorsTable, err := newCursorsTable(db) + if err != nil { + return nil, err + } + + return &Store{ + db: cursorsTable, + getSubmitCursorFunc: getSubmitCursorFunc, + network: network, + }, nil +} + +// WorkerOffsets returns confirmed offsets for the provided destination chain. +func (s *Store) WorkerOffsets( + ctx context.Context, + destChain uint64, +) (map[xchain.ChainVersion]uint64, error) { + all, err := listAll(ctx, s.db) + if err != nil { + return nil, err + } + + // Collect the highest confirmed cursor for each streamer. + resp := make(map[xchain.ChainVersion]uint64) + for s, cursors := range splitByStreamer(all) { + if s.DstChainID != destChain { + continue + } + + slices.Reverse(cursors) + + for _, c := range cursors { + if c.GetConfirmed() { + resp[s.ChainVersion()] = c.GetAttestOffset() + break + } + } + } + + return resp, nil +} + +// Save an attest offset for the provided streamer. +// Existing cursors' stream offsets are updated and confirmed is reset to false. +func (s *Store) Save( + ctx context.Context, + srcVersion xchain.ChainVersion, + destChain uint64, + attestOffset uint64, + streamMsgs map[xchain.StreamID][]xchain.Msg, +) error { + // Get highest stream offset for each shard + offsetsByShard := make(map[uint64]uint64) + for streamID, msgs := range streamMsgs { + if len(msgs) == 0 { + continue + } + offsetsByShard[uint64(streamID.ShardID)] = msgs[len(msgs)-1].StreamOffset + } + + c := &Cursor{ + SrcChainId: srcVersion.ID, + ConfLevel: uint32(srcVersion.ConfLevel), + DstChainId: destChain, + AttestOffset: attestOffset, + Confirmed: false, + StreamOffsetsByShard: offsetsByShard, + } + + err := s.db.Save(ctx, c) + if err != nil { + return errors.Wrap(err, "save cursor") + } + + latestOffset. + WithLabelValues(s.network.ChainVersionName(srcVersion), s.network.ChainName(destChain)). + Set(float64(attestOffset)) + + return nil +} + +// StartLoops starts goroutines to periodically confirm and trim streams. +// It returns immediately. +func (s *Store) StartLoops(ctx context.Context) { + go s.trimForever(ctx) + go s.confirmForever(ctx) +} + +func (s *Store) confirmForever(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.confirmOnce(ctx); err != nil { + log.Error(ctx, "Trimming cursor stored failed (will retry))", err) + } + } + } +} + +// confirmOnce marks cursors of each streamer as confirmed based on: +// - if previous cursor is confirmed +// - and if all xmsgs are submitted and finalized. +func (s *Store) confirmOnce(ctx context.Context) error { + all, err := listAll(ctx, s.db) + if err != nil { + return errors.Wrap(err, "listAll all cursors") + } + + for streamer, cursors := range splitByStreamer(all) { + for _, c := range cursors { + if c.GetConfirmed() { + continue + } + + var unconfirmed bool + for shardID, offset := range c.GetStreamOffsetsByShard() { + stream := xchain.StreamID{SourceChainID: c.GetSrcChainId(), DestChainID: c.GetDstChainId(), ShardID: xchain.ShardID(shardID)} + submitted, ok, err := s.getSubmitCursorFunc(ctx, xchain.FinalizedRef, stream) + if err != nil { + log.Warn(ctx, "Get submit cursor failed while confirming", err, "stream", s.network.StreamName(stream)) + unconfirmed = true + + break + } else if !ok || submitted.MsgOffset < offset { + unconfirmed = true // Cursor not yet submitted or finalized. + break + } + } + + if unconfirmed { + break + } + + c.Confirmed = true + + confirmedOffset. + WithLabelValues(s.network.ChainVersionName(streamer.ChainVersion()), s.network.ChainName(c.GetDstChainId())). + Set(float64(c.GetAttestOffset())) + + if err := s.db.Save(ctx, c); err != nil { + return errors.Wrap(err, "save cursor") + } + } + } + + return nil +} + +func (s *Store) trimForever(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.trimOnce(ctx); err != nil { + log.Error(ctx, "Trimming cursor stored failed (will retry))", err) + } + } + } +} + +// trimOnce iterates over all streamer's cursors and deletes all initial confirmed +// cursors, except the last confirmed one before any unconfirmed cursors. +func (s *Store) trimOnce(ctx context.Context) error { + all, err := listAll(ctx, s.db) + if err != nil { + return errors.Wrap(err, "listAll all cursors") + } + + for _, cursors := range splitByStreamer(all) { + var prev *Cursor + for i, c := range cursors { + if i > 0 && c.GetAttestOffset() <= prev.GetAttestOffset() { // Sanity check + return errors.New("cursors are not sorted by attest offset [BUG]") + } + + if !c.GetConfirmed() { + break + } + if i > 0 { + if err := s.db.Delete(ctx, prev); err != nil { + return errors.Wrap(err, "delete cursor") + } + } + prev = c + } + } + + return nil +} + +type streamer struct { + SrcChainID uint64 + SrcConfLevel uint32 + DstChainID uint64 +} + +func (s streamer) ChainVersion() xchain.ChainVersion { + return xchain.ChainVersion{ID: s.SrcChainID, ConfLevel: xchain.ConfLevel(s.SrcConfLevel)} +} + +func splitByStreamer(all []*Cursor) map[streamer][]*Cursor { + resp := make(map[streamer][]*Cursor) + for _, c := range all { + s := streamer{ + SrcChainID: c.GetSrcChainId(), + SrcConfLevel: c.GetConfLevel(), + DstChainID: c.GetDstChainId(), + } + + resp[s] = append(resp[s], c) + } + + return resp +} diff --git a/relayer/app/worker.go b/relayer/app/worker.go index 0fc24fb9e..5250bbd3c 100644 --- a/relayer/app/worker.go +++ b/relayer/app/worker.go @@ -12,6 +12,7 @@ import ( "github.com/omni-network/omni/lib/log" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" + "github.com/omni-network/omni/relayer/app/cursor" "github.com/ethereum/go-ethereum/accounts/abi/bind" ) @@ -29,12 +30,19 @@ type Worker struct { creator CreateFunc sendProvider func() (SendAsync, error) awaitValSet awaitValSet + cursors *cursor.Store } // NewWorker creates a new worker for a single destination chain. -func NewWorker(destChain netconf.Chain, network netconf.Network, cProvider cchain.Provider, - xProvider xchain.Provider, creator CreateFunc, sendProvider func() (SendAsync, error), +func NewWorker( + destChain netconf.Chain, + network netconf.Network, + cProvider cchain.Provider, + xProvider xchain.Provider, + creator CreateFunc, + sendProvider func() (SendAsync, error), awaitValSet awaitValSet, + cursors *cursor.Store, ) *Worker { return &Worker{ destChain: destChain, @@ -44,6 +52,7 @@ func NewWorker(destChain netconf.Chain, network netconf.Network, cProvider cchai creator: creator, sendProvider: sendProvider, awaitValSet: awaitValSet, + cursors: cursors, } } @@ -74,11 +83,11 @@ func (w *Worker) runOnce(ctx context.Context) error { return err } - for _, cursor := range cursors { + for _, c := range cursors { log.Info(ctx, "Worker fetched submitted cursor", - "stream", w.network.StreamName(cursor.StreamID), - "attest_offset", cursor.AttestOffset, - "msg_offset", cursor.MsgOffset, + "stream", w.network.StreamName(c.StreamID), + "attest_offset", c.AttestOffset, + "msg_offset", c.MsgOffset, ) } @@ -109,6 +118,22 @@ func (w *Worker) runOnce(ctx context.Context) error { } } + // Update offsets if any stored cursor is higher + stored, err := w.cursors.WorkerOffsets(ctx, w.destChain.ID) + if err != nil { + return err + } + for chainVer, offset := range attestOffsets { + if stored[chainVer] > offset { + log.Info(ctx, "Worker using stored attest offset", + "chain_version", w.network.ChainVersionName(chainVer), + "prev", offset, + "bootstrap", stored[chainVer], + ) + attestOffsets[chainVer] = stored[chainVer] + } + } + msgFilter, err := newMsgOffsetFilter(cursors) if err != nil { return err @@ -209,13 +234,18 @@ func (w *Worker) newCallback( var cachedValSet []cchain.PortalValidator return func(ctx context.Context, att xchain.Attestation) error { + saveCursors := func(streamMsgs map[xchain.StreamID][]xchain.Msg) error { + return w.cursors.Save(ctx, att.ChainVersion, w.destChain.ID, att.AttestOffset, streamMsgs) + } + block, ok, err := fetchXBlock(ctx, w.xProvider, att) if err != nil { return err } else if !ok { return nil // Mismatching fuzzy attestation, skip. } else if len(block.Msgs) == 0 { - return nil // No messages, nothing to do. + // No messages, nothing to do, just update cursors + return saveCursors(nil) } msgTree, err := xchain.NewMsgTree(block.Msgs) @@ -233,6 +263,8 @@ func (w *Worker) newCallback( } } + submitted := make(map[xchain.StreamID][]xchain.Msg) + // Split into streams for streamID, msgs := range msgStreamMapper(block.Msgs) { if streamID.DestChainID != w.destChain.ID { @@ -271,9 +303,11 @@ func (w *Worker) newCallback( return err } } + + submitted[streamID] = msgs } - return nil + return saveCursors(submitted) } } diff --git a/relayer/app/worker_internal_test.go b/relayer/app/worker_internal_test.go index 604442144..a35f8b111 100644 --- a/relayer/app/worker_internal_test.go +++ b/relayer/app/worker_internal_test.go @@ -8,7 +8,9 @@ import ( "github.com/omni-network/omni/lib/cchain" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" + "github.com/omni-network/omni/relayer/app/cursor" + db "github.com/cosmos/cosmos-db" "github.com/stretchr/testify/require" ) @@ -38,7 +40,7 @@ func TestWorker_Run(t *testing.T) { DestChainID: destChainB, ShardID: xchain.ShardLatest0, } - cursors := map[xchain.StreamID]xchain.SubmitCursor{ + submittedCursors := map[xchain.StreamID]xchain.SubmitCursor{ streamA: {StreamID: streamA, MsgOffset: destChainACursor, AttestOffset: destChainACursor}, streamB: {StreamID: streamB, MsgOffset: destChainBCursor, AttestOffset: destChainBCursor}, } @@ -61,7 +63,7 @@ func TestWorker_Run(t *testing.T) { }, true, nil }, GetSubmittedCursorFn: func(_ context.Context, ref xchain.Ref, stream xchain.StreamID) (xchain.SubmitCursor, bool, error) { - resp, ok := cursors[stream] + resp, ok := submittedCursors[stream] return resp, ok, nil }, } @@ -150,6 +152,9 @@ func TestWorker_Run(t *testing.T) { noAwait := func(context.Context, uint64) error { return nil } + cursors, err := cursor.New(db.NewMemDB(), mockXClient.GetSubmittedCursor, network) + require.NoError(t, err) + for _, chain := range network.Chains { w := NewWorker( chain, @@ -158,7 +163,8 @@ func TestWorker_Run(t *testing.T) { mockXClient, mockCreateFunc, func() (SendAsync, error) { return mockSender.SendTransaction, nil }, - noAwait) + noAwait, + cursors) go w.Run(ctx) } diff --git a/relayer/cmd/flags.go b/relayer/cmd/flags.go index c1214f1db..fa5355ce2 100644 --- a/relayer/cmd/flags.go +++ b/relayer/cmd/flags.go @@ -14,4 +14,5 @@ func bindRunFlags(flags *pflag.FlagSet, cfg *relayer.Config) { flags.StringVar(&cfg.PrivateKey, "private-key", cfg.PrivateKey, "The path to the private key e.g path/private.key") flags.StringVar(&cfg.HaloURL, "halo-url", cfg.HaloURL, "The URL of the halo node e.g localhost:26657") flags.StringVar(&cfg.MonitoringAddr, "monitoring-addr", cfg.MonitoringAddr, "The address to bind the monitoring server") + flags.StringVar(&cfg.DBDir, "db-dir", cfg.DBDir, "The path to the database directory") } diff --git a/scripts/buf_generate.sh b/scripts/buf_generate.sh index b4b12e874..932ee1592 100755 --- a/scripts/buf_generate.sh +++ b/scripts/buf_generate.sh @@ -40,7 +40,7 @@ do done echo "Generating orm protos for cosmos keeper orm" -for DIR in halo/*/keeper/ octane/*/keeper/ monitor/xmonitor/* solver/app +for DIR in halo/*/keeper/ octane/*/keeper/ monitor/xmonitor/* solver/app relayer/app/cursor do bufgen orm "${DIR}" done