diff --git a/configs/config.sample.yml b/configs/config.sample.yml index 01a1c5c9..f202959b 100644 --- a/configs/config.sample.yml +++ b/configs/config.sample.yml @@ -33,10 +33,12 @@ upstreams: # it quickly updates the gateway with the latest block height. If this # setting is undefined, the gateway will attempt to subscribe to new # heads if the upstream supports it. + # nodeType - full or archive - id: my-node httpURL: "http://12.57.207.168:8545" wsURL: "wss://12.57.207.168:8546" group: primary + nodeType: full - id: infura-eth httpURL: "https://mainnet.infura.io/v3/${INFURA_API_KEY}" wsURL: "wss://mainnet.infura.io/ws/v3/${INFURA_API_KEY}" @@ -44,9 +46,11 @@ upstreams: username: ~ password: ${INFURA_API_KEY_SECRET} group: fallback + nodeType: archive - id: alchemy-eth httpURL: "https://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}" wsURL: "wss://eth-mainnet.g.alchemy.com/v2/${ALCHEMY_API_KEY}" healthCheck: useWsForBlockHeight: false group: fallback + nodeType: full diff --git a/internal/config/config.go b/internal/config/config.go index c7dfc8a7..15f27bec 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,6 +8,13 @@ import ( "gopkg.in/yaml.v3" ) +type NodeType string + +const ( + Archive NodeType = "archive" + Full NodeType = "full" +) + type UpstreamConfig struct { BasicAuthConfig BasicAuthConfig `yaml:"basicAuth"` HealthCheckConfig HealthCheckConfig `yaml:"healthCheck"` @@ -15,6 +22,7 @@ type UpstreamConfig struct { HTTPURL string `yaml:"httpURL"` WSURL string `yaml:"wsURL"` GroupID string `yaml:"group"` + NodeType NodeType `yaml:"nodeType"` } func (c *UpstreamConfig) isValid(groups []GroupConfig) bool { @@ -25,6 +33,12 @@ func (c *UpstreamConfig) isValid(groups []GroupConfig) bool { zap.L().Error("httpUrl cannot be empty", zap.Any("config", c), zap.String("upstreamId", c.ID)) } + if c.NodeType == "" { + isValid = false + + zap.L().Error("nodeType cannot be empty", zap.Any("config", c), zap.String("upstreamId", c.ID)) + } + if c.HealthCheckConfig.UseWSForBlockHeight != nil && *c.HealthCheckConfig.UseWSForBlockHeight && c.WSURL == "" { isValid = false diff --git a/internal/config/config_test.go b/internal/config/config_test.go index f22782f1..b89e1830 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -120,10 +120,12 @@ func TestParseConfig_ValidConfig(t *testing.T) { healthCheck: useWsForBlockHeight: true group: primary + nodeType: full - id: ankr-polygon httpURL: "https://rpc.ankr.com/polygon" wsURL: "wss://rpc.ankr.com/polygon/ws/${ANKR_API_KEY}" group: fallback + nodeType: archive ` configBytes := []byte(config) @@ -142,7 +144,8 @@ func TestParseConfig_ValidConfig(t *testing.T) { HealthCheckConfig: HealthCheckConfig{ UseWSForBlockHeight: newBool(true), }, - GroupID: "primary", + GroupID: "primary", + NodeType: Full, }, { ID: "ankr-polygon", @@ -151,7 +154,8 @@ func TestParseConfig_ValidConfig(t *testing.T) { HealthCheckConfig: HealthCheckConfig{ UseWSForBlockHeight: nil, }, - GroupID: "fallback", + GroupID: "fallback", + NodeType: Archive, }, }, Global: GlobalConfig{ diff --git a/internal/metadata/request_metadata.go b/internal/metadata/request_metadata.go new file mode 100644 index 00000000..c8278c18 --- /dev/null +++ b/internal/metadata/request_metadata.go @@ -0,0 +1,5 @@ +package metadata + +type RequestMetadata struct { + IsStateRequired bool +} diff --git a/internal/metadata/request_metadata_parser.go b/internal/metadata/request_metadata_parser.go new file mode 100644 index 00000000..c0902eb2 --- /dev/null +++ b/internal/metadata/request_metadata_parser.go @@ -0,0 +1,21 @@ +package metadata + +import ( + "github.com/satsuma-data/node-gateway/internal/jsonrpc" +) + +type RequestMetadataParser struct{} + +func (p *RequestMetadataParser) Parse(requestBody jsonrpc.RequestBody) RequestMetadata { + result := RequestMetadata{} + + switch requestBody.Method { + case "eth_getBalance", "eth_getStorageAt", "eth_getTransactionCount", "eth_getCode", "eth_call", "eth_estimateGas": + // List of state methods: https://ethereum.org/en/developers/docs/apis/json-rpc/#state_methods + result.IsStateRequired = true + default: + result.IsStateRequired = false + } + + return result +} diff --git a/internal/metadata/request_metadata_parser_test.go b/internal/metadata/request_metadata_parser_test.go new file mode 100644 index 00000000..7fd85bf4 --- /dev/null +++ b/internal/metadata/request_metadata_parser_test.go @@ -0,0 +1,42 @@ +package metadata + +import ( + "testing" + + "github.com/satsuma-data/node-gateway/internal/jsonrpc" + "github.com/stretchr/testify/assert" +) + +func TestRequestMetadataParser_Parse(t *testing.T) { + type args struct { + requestBody jsonrpc.RequestBody + } + + type testArgs struct { + name string + args args + want RequestMetadata + } + + testForMethod := func(methodName string, isStateRequired bool) testArgs { + return testArgs{ + methodName, + args{jsonrpc.RequestBody{Method: methodName}}, + RequestMetadata{IsStateRequired: isStateRequired}, + } + } + + tests := []testArgs{ + testForMethod("eth_call", true), + testForMethod("eth_getBalance", true), + testForMethod("eth_getBlockByNumber", false), + testForMethod("eth_getTransactionReceipt", false), + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &RequestMetadataParser{} + assert.Equalf(t, tt.want, p.Parse(tt.args.requestBody), "Parse(%v)", tt.args.requestBody) + }) + } +} diff --git a/internal/mocks/RoutingStrategy.go b/internal/mocks/RoutingStrategy.go index 4cbf4a73..f2167dca 100644 --- a/internal/mocks/RoutingStrategy.go +++ b/internal/mocks/RoutingStrategy.go @@ -3,6 +3,7 @@ package mocks import ( + "github.com/satsuma-data/node-gateway/internal/metadata" mock "github.com/stretchr/testify/mock" types "github.com/satsuma-data/node-gateway/internal/types" @@ -22,7 +23,10 @@ func (_m *MockRoutingStrategy) EXPECT() *MockRoutingStrategy_Expecter { } // RouteNextRequest provides a mock function with given fields: upstreamsByPriority -func (_m *MockRoutingStrategy) RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap) (string, error) { +func (_m *MockRoutingStrategy) RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, +) (string, error) { ret := _m.Called(upstreamsByPriority) var r0 string diff --git a/internal/route/filtering_strategy.go b/internal/route/filtering_strategy.go index 5b71309b..39773841 100644 --- a/internal/route/filtering_strategy.go +++ b/internal/route/filtering_strategy.go @@ -2,6 +2,7 @@ package route import ( "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "go.uber.org/zap" ) @@ -11,12 +12,18 @@ type FilteringRoutingStrategy struct { BackingStrategy RoutingStrategy } -func (s *FilteringRoutingStrategy) RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap) (string, error) { - filteredUpstreams := s.filter(upstreamsByPriority) - return s.BackingStrategy.RouteNextRequest(filteredUpstreams) +func (s *FilteringRoutingStrategy) RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, +) (string, error) { + filteredUpstreams := s.filter(upstreamsByPriority, requestMetadata) + return s.BackingStrategy.RouteNextRequest(filteredUpstreams, requestMetadata) } -func (s *FilteringRoutingStrategy) filter(upstreamsByPriority types.PriorityToUpstreamsMap) types.PriorityToUpstreamsMap { +func (s *FilteringRoutingStrategy) filter( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, +) types.PriorityToUpstreamsMap { priorityToHealthyUpstreams := make(types.PriorityToUpstreamsMap) for priority, upstreamConfigs := range upstreamsByPriority { @@ -25,7 +32,7 @@ func (s *FilteringRoutingStrategy) filter(upstreamsByPriority types.PriorityToUp filteredUpstreams := make([]*config.UpstreamConfig, 0) for _, upstreamConfig := range upstreamConfigs { - if s.NodeFilter.Apply(nil, upstreamConfig) { + if s.NodeFilter.Apply(requestMetadata, upstreamConfig) { filteredUpstreams = append(filteredUpstreams, upstreamConfig) } } diff --git a/internal/route/node_filter.go b/internal/route/node_filter.go index 86cfa9d4..d4dc3e13 100644 --- a/internal/route/node_filter.go +++ b/internal/route/node_filter.go @@ -6,17 +6,15 @@ import ( "github.com/satsuma-data/node-gateway/internal/metadata" ) -type RequestMetadata struct{} - type NodeFilter interface { - Apply(requestMetadata *RequestMetadata, upstreamConfig *config.UpstreamConfig) bool + Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig) bool } type AndFilter struct { filters []NodeFilter } -func (a *AndFilter) Apply(requestMetadata *RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { +func (a *AndFilter) Apply(requestMetadata metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { var result = true for filterIndex := range a.filters { @@ -34,7 +32,7 @@ type IsHealthy struct { healthCheckManager checks.HealthCheckManager } -func (f *IsHealthy) Apply(_ *RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { +func (f *IsHealthy) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { var upstreamStatus = f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) return upstreamStatus.PeerCheck.IsPassing() && upstreamStatus.SyncingCheck.IsPassing() } @@ -44,7 +42,7 @@ type IsAtGlobalMaxHeight struct { chainMetadataStore *metadata.ChainMetadataStore } -func (f *IsAtGlobalMaxHeight) Apply(_ *RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { +func (f *IsAtGlobalMaxHeight) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { maxHeight := f.chainMetadataStore.GetGlobalMaxHeight() upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) @@ -57,7 +55,7 @@ type IsAtMaxHeightForGroup struct { chainMetadataStore *metadata.ChainMetadataStore } -func (f *IsAtMaxHeightForGroup) Apply(_ *RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { +func (f *IsAtMaxHeightForGroup) Apply(_ metadata.RequestMetadata, upstreamConfig *config.UpstreamConfig) bool { maxHeightForGroup := f.chainMetadataStore.GetMaxHeightForGroup(upstreamConfig.GroupID) upstreamStatus := f.healthCheckManager.GetUpstreamStatus(upstreamConfig.ID) @@ -65,6 +63,19 @@ func (f *IsAtMaxHeightForGroup) Apply(_ *RequestMetadata, upstreamConfig *config return upstreamStatus.BlockHeightCheck.IsPassing(maxHeightForGroup) } +type SimpleIsStatePresent struct{} + +func (f *SimpleIsStatePresent) Apply( + requestMetadata metadata.RequestMetadata, + upstreamConfig *config.UpstreamConfig, +) bool { + if requestMetadata.IsStateRequired { + return upstreamConfig.NodeType == config.Archive + } + + return true +} + func CreateNodeFilter( filterNames []NodeFilterType, manager checks.HealthCheckManager, @@ -96,6 +107,8 @@ func CreateSingleNodeFilter( healthCheckManager: manager, chainMetadataStore: store, } + case SimpleStatePresent: + return &SimpleIsStatePresent{} default: panic("Unknown filter type " + filterName + "!") } @@ -104,7 +117,8 @@ func CreateSingleNodeFilter( type NodeFilterType string const ( - Healthy NodeFilterType = "healthy" - GlobalMaxHeight NodeFilterType = "globalMaxHeight" - MaxHeightForGroup NodeFilterType = "maxHeightForGroup" + Healthy NodeFilterType = "healthy" + GlobalMaxHeight NodeFilterType = "globalMaxHeight" + MaxHeightForGroup NodeFilterType = "maxHeightForGroup" + SimpleStatePresent NodeFilterType = "simpleStatePresent" ) diff --git a/internal/route/node_filter_test.go b/internal/route/node_filter_test.go index 74412a42..dc1fa3d5 100644 --- a/internal/route/node_filter_test.go +++ b/internal/route/node_filter_test.go @@ -4,18 +4,19 @@ import ( "testing" "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/stretchr/testify/assert" ) type AlwaysPass struct{} -func (AlwaysPass) Apply(_ *RequestMetadata, _ *config.UpstreamConfig) bool { +func (AlwaysPass) Apply(metadata.RequestMetadata, *config.UpstreamConfig) bool { return true } type AlwaysFail struct{} -func (AlwaysFail) Apply(_ *RequestMetadata, _ *config.UpstreamConfig) bool { +func (AlwaysFail) Apply(metadata.RequestMetadata, *config.UpstreamConfig) bool { return false } @@ -24,8 +25,8 @@ func TestAndFilter_Apply(t *testing.T) { filters []NodeFilter } - type args struct { - requestMetadata *RequestMetadata + type args struct { //nolint:govet // field alignment doesn't matter in tests + requestMetadata metadata.RequestMetadata upstreamConfig *config.UpstreamConfig } @@ -50,3 +51,34 @@ func TestAndFilter_Apply(t *testing.T) { }) } } + +func TestSimpleIsStatePresentFilter_Apply(t *testing.T) { + fullNodeConfig := config.UpstreamConfig{NodeType: config.Full} + archiveNodeConfig := config.UpstreamConfig{NodeType: config.Archive} + + stateMethodMetadata := metadata.RequestMetadata{IsStateRequired: true} + nonStateMethodMetadata := metadata.RequestMetadata{IsStateRequired: false} + + type args struct { //nolint:govet // field alignment doesn't matter in tests + requestMetadata metadata.RequestMetadata + upstreamConfig *config.UpstreamConfig + } + + tests := []struct { //nolint:govet // field alignment doesn't matter in tests + name string + args args + want bool + }{ + {"stateMethodFullNode", args{stateMethodMetadata, &fullNodeConfig}, false}, + {"stateMethodArchiveNode", args{stateMethodMetadata, &archiveNodeConfig}, true}, + {"nonStateMethodFullNode", args{nonStateMethodMetadata, &fullNodeConfig}, true}, + {"nonStateMethodArchiveNode", args{nonStateMethodMetadata, &archiveNodeConfig}, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := &SimpleIsStatePresent{} + assert.Equalf(t, tt.want, f.Apply(tt.args.requestMetadata, tt.args.upstreamConfig), "Apply(%v, %v)", tt.args.requestMetadata, tt.args.upstreamConfig) + }) + } +} diff --git a/internal/route/router.go b/internal/route/router.go index dd66db29..3a2954f0 100644 --- a/internal/route/router.go +++ b/internal/route/router.go @@ -39,6 +39,7 @@ type SimpleRouter struct { requestExecutor RequestExecutor // Map from Priority => UpstreamIDs priorityToUpstreams types.PriorityToUpstreamsMap + metadataParser metadata.RequestMetadataParser upstreamConfigs []config.UpstreamConfig } @@ -56,6 +57,7 @@ func NewRouter( priorityToUpstreams: groupUpstreamsByPriority(upstreamConfigs, groupConfigs), routingStrategy: routingStrategy, requestExecutor: RequestExecutor{httpClient: &http.Client{}}, + metadataParser: metadata.RequestMetadataParser{}, } return r @@ -95,7 +97,9 @@ func (r *SimpleRouter) Route( ctx context.Context, requestBody jsonrpc.RequestBody, ) (*jsonrpc.ResponseBody, *http.Response, error) { - id, err := r.routingStrategy.RouteNextRequest(r.priorityToUpstreams) + requestMetadata := r.metadataParser.Parse(requestBody) + id, err := r.routingStrategy.RouteNextRequest(r.priorityToUpstreams, requestMetadata) + if err != nil { switch { case errors.Is(err, ErrNoHealthyUpstreams): diff --git a/internal/route/routing_strategy.go b/internal/route/routing_strategy.go index 97fbe96c..9ec30137 100644 --- a/internal/route/routing_strategy.go +++ b/internal/route/routing_strategy.go @@ -6,6 +6,7 @@ import ( "sort" "sync/atomic" + "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "go.uber.org/zap" "golang.org/x/exp/maps" @@ -14,7 +15,10 @@ import ( //go:generate mockery --output ../mocks --name RoutingStrategy --structname MockRoutingStrategy --with-expecter type RoutingStrategy interface { // Returns the next UpstreamID a request should route to. - RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap) (string, error) + RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + requestMetadata metadata.RequestMetadata, + ) (string, error) } type PriorityRoundRobinStrategy struct { counter uint64 @@ -28,7 +32,10 @@ func NewPriorityRoundRobinStrategy() *PriorityRoundRobinStrategy { var ErrNoHealthyUpstreams = errors.New("no healthy upstreams") -func (s *PriorityRoundRobinStrategy) RouteNextRequest(upstreamsByPriority types.PriorityToUpstreamsMap) (string, error) { +func (s *PriorityRoundRobinStrategy) RouteNextRequest( + upstreamsByPriority types.PriorityToUpstreamsMap, + _ metadata.RequestMetadata, +) (string, error) { prioritySorted := maps.Keys(upstreamsByPriority) sort.Ints(prioritySorted) diff --git a/internal/route/routing_strategy_test.go b/internal/route/routing_strategy_test.go index 8c648a17..cb293140 100644 --- a/internal/route/routing_strategy_test.go +++ b/internal/route/routing_strategy_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/satsuma-data/node-gateway/internal/config" + "github.com/satsuma-data/node-gateway/internal/metadata" "github.com/satsuma-data/node-gateway/internal/types" "github.com/stretchr/testify/assert" ) @@ -18,10 +19,10 @@ func TestPriorityStrategy_HighPriority(t *testing.T) { strategy := NewPriorityRoundRobinStrategy() for i := 0; i < 10; i++ { - firstUpstreamID, _ := strategy.RouteNextRequest(upstreams) + firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "something-else", firstUpstreamID) - secondUpstreamID, _ := strategy.RouteNextRequest(upstreams) + secondUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "geth", secondUpstreamID) } } @@ -41,10 +42,10 @@ func TestPriorityStrategy_LowerPriority(t *testing.T) { strategy := NewPriorityRoundRobinStrategy() for i := 0; i < 10; i++ { - firstUpstreamID, _ := strategy.RouteNextRequest(upstreams) + firstUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "fallback2", firstUpstreamID) - secondUpstreamID, _ := strategy.RouteNextRequest(upstreams) + secondUpstreamID, _ := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "fallback1", secondUpstreamID) } } @@ -58,7 +59,7 @@ func TestPriorityStrategy_NoUpstreams(t *testing.T) { strategy := NewPriorityRoundRobinStrategy() for i := 0; i < 10; i++ { - upstreamID, err := strategy.RouteNextRequest(upstreams) + upstreamID, err := strategy.RouteNextRequest(upstreams, metadata.RequestMetadata{}) assert.Equal(t, "", upstreamID) assert.True(t, errors.Is(err, ErrNoHealthyUpstreams)) } diff --git a/internal/server/web_server.go b/internal/server/web_server.go index fb2c29f8..dda057f5 100644 --- a/internal/server/web_server.go +++ b/internal/server/web_server.go @@ -68,7 +68,8 @@ func wireRouter(config conf.Config) route.Router { ticker := time.NewTicker(checks.PeriodicHealthCheckInterval) healthCheckManager := checks.NewHealthCheckManager(client.NewEthClient, config.Upstreams, chainMetadataStore, ticker) - nodeFilter := route.CreateNodeFilter([]route.NodeFilterType{route.Healthy, route.MaxHeightForGroup}, healthCheckManager, chainMetadataStore) + enabledNodeFilters := []route.NodeFilterType{route.Healthy, route.MaxHeightForGroup, route.SimpleStatePresent} + nodeFilter := route.CreateNodeFilter(enabledNodeFilters, healthCheckManager, chainMetadataStore) routingStrategy := route.FilteringRoutingStrategy{ NodeFilter: nodeFilter, BackingStrategy: route.NewPriorityRoundRobinStrategy(),