diff --git a/internal/ethereum/event_actions_test.go b/internal/ethereum/event_actions_test.go index ca21f2d..7155071 100644 --- a/internal/ethereum/event_actions_test.go +++ b/internal/ethereum/event_actions_test.go @@ -88,6 +88,9 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() diff --git a/internal/ethereum/event_stream.go b/internal/ethereum/event_stream.go index ce73503..c841655 100644 --- a/internal/ethereum/event_stream.go +++ b/internal/ethereum/event_stream.go @@ -302,6 +302,7 @@ func (es *eventStream) leadGroupSteadyState() bool { lastUpdate := -1 failCount := 0 filterResetRequired := false + filterRPCMethodToUse := "" for { if es.c.doFailureDelay(es.ctx, failCount) { log.L(es.ctx).Debugf("Stream loop exiting") @@ -329,6 +330,7 @@ func (es *eventStream) leadGroupSteadyState() bool { es.uninstallFilter(&filter) } filterResetRequired = false + filterRPCMethodToUse = "eth_getFilterLogs" // first JSON/RPC for a new filter ID fetches all the historical logs to ensure no gaps // Determine the earliest block we need to poll from fromBlock := int64(-1) for _, l := range ag.listeners { @@ -362,17 +364,18 @@ func (es *eventStream) leadGroupSteadyState() bool { } // Get the next batch of logs var ethLogs []*logJSONRPC - rpcErr := es.c.backend.CallRPC(es.ctx, ðLogs, "eth_getFilterLogs", filter) + rpcErr := es.c.backend.CallRPC(es.ctx, ðLogs, filterRPCMethodToUse, filter) // If we fail to query we just retry - setting filter to nil if not found if rpcErr != nil { if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message) filter = "" } - log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message) + log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPCMethodToUse, rpcErr.Message) failCount++ continue } + filterRPCMethodToUse = "eth_getFilterChanges" // subsequent JSON/RPC calls after the initial fetch, this fetches only the new logs // Enrich the events events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs) if enrichErr != nil { diff --git a/internal/ethereum/event_stream_test.go b/internal/ethereum/event_stream_test.go index 94af303..fa63cd8 100644 --- a/internal/ethereum/event_stream_test.go +++ b/internal/ethereum/event_stream_test.go @@ -391,7 +391,11 @@ func TestLeadGroupDeliverEvents(t *testing.T) { Run(func(args mock.Arguments) { *args[1].(*string) = testLogsFilterID1 }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = []*logJSONRPC{ { BlockNumber: ethtypes.NewHexInteger64(212122), @@ -408,6 +412,9 @@ func TestLeadGroupDeliverEvents(t *testing.T) { }, } }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = []*logJSONRPC{} + }) mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) { *args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{ Number: ethtypes.NewHexInteger64(212122), @@ -464,6 +471,7 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) { }).Once().Run(func(args mock.Arguments) { close(filtered) }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -648,6 +656,9 @@ func TestStreamLoopChangeFilter(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -690,6 +701,9 @@ func TestStreamLoopFilterReset(t *testing.T) { mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) }).Maybe() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe() @@ -747,6 +761,9 @@ func TestStreamLoopEnrichFail(t *testing.T) { close(errorReturned) }). Return(&rpcbackend.RPCError{Message: "pop"}).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) { + *args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0) + }).Maybe() mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) { *args[1].(*bool) = true }).Maybe()