Skip to content

Commit

Permalink
Merge pull request hyperledger#159 from kaleido-io/log-filter-fix
Browse files Browse the repository at this point in the history
ensure only offsets are pulled after a filter is esstablished
  • Loading branch information
matthew1001 authored Dec 10, 2024
2 parents b561f24 + f7312c7 commit aa3efed
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
3 changes: 3 additions & 0 deletions internal/ethereum/event_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func mockStreamLoopEmpty(mRPC *rpcbackendmocks.Backend) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down
7 changes: 5 additions & 2 deletions internal/ethereum/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
lastUpdate := -1
failCount := 0
filterResetRequired := false
filterRPCMethodToUse := ""
for {
if es.c.doFailureDelay(es.ctx, failCount) {
log.L(es.ctx).Debugf("Stream loop exiting")
Expand Down Expand Up @@ -329,6 +330,7 @@ func (es *eventStream) leadGroupSteadyState() bool {
es.uninstallFilter(&filter)
}
filterResetRequired = false
filterRPCMethodToUse = "eth_getFilterLogs" // first JSON/RPC for a new filter ID fetches all the historical logs to ensure no gaps
// Determine the earliest block we need to poll from
fromBlock := int64(-1)
for _, l := range ag.listeners {
Expand Down Expand Up @@ -362,17 +364,18 @@ func (es *eventStream) leadGroupSteadyState() bool {
}
// Get the next batch of logs
var ethLogs []*logJSONRPC
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, "eth_getFilterLogs", filter)
rpcErr := es.c.backend.CallRPC(es.ctx, &ethLogs, filterRPCMethodToUse, filter)
// If we fail to query we just retry - setting filter to nil if not found
if rpcErr != nil {
if mapError(filterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(es.ctx).Infof("Filter '%v' reset: %s", filter, rpcErr.Message)
filter = ""
}
log.L(es.ctx).Errorf("Failed to query filter (eth_getFilterLogs): %s", rpcErr.Message)
log.L(es.ctx).Errorf("Failed to query filter (%s): %s", filterRPCMethodToUse, rpcErr.Message)
failCount++
continue
}
filterRPCMethodToUse = "eth_getFilterChanges" // subsequent JSON/RPC calls after the initial fetch, this fetches only the new logs
// Enrich the events
events, enrichErr := es.filterEnrichSort(es.ctx, ag, ethLogs)
if enrichErr != nil {
Expand Down
17 changes: 17 additions & 0 deletions internal/ethereum/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,11 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
Run(func(args mock.Arguments) {
*args[1].(*string) = testLogsFilterID1
}).Once()

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{
{
BlockNumber: ethtypes.NewHexInteger64(212122),
Expand All @@ -408,6 +412,9 @@ func TestLeadGroupDeliverEvents(t *testing.T) {
},
}
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = []*logJSONRPC{}
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getBlockByHash", "0x6b012339fbb85b70c58ecfd97b31950c4a28bcef5226e12dbe551cb1abaf3b4c", false).Return(nil).Run(func(args mock.Arguments) {
*args[1].(**blockInfoJSONRPC) = &blockInfoJSONRPC{
Number: ethtypes.NewHexInteger64(212122),
Expand Down Expand Up @@ -464,6 +471,7 @@ func TestLeadGroupNearBlockZeroEnsureNonNegative(t *testing.T) {
}).Once().Run(func(args mock.Arguments) {
close(filtered)
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -648,6 +656,9 @@ func TestStreamLoopChangeFilter(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -690,6 +701,9 @@ func TestStreamLoopFilterReset(t *testing.T) {
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterLogs", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down Expand Up @@ -747,6 +761,9 @@ func TestStreamLoopEnrichFail(t *testing.T) {
close(errorReturned)
}).
Return(&rpcbackend.RPCError{Message: "pop"}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*[]*logJSONRPC) = make([]*logJSONRPC, 0)
}).Maybe()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_uninstallFilter", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
*args[1].(*bool) = true
}).Maybe()
Expand Down

0 comments on commit aa3efed

Please sign in to comment.