diff --git a/lib/cchain/provider/abci.go b/lib/cchain/provider/abci.go index 4d04ba8fe..7f80849dd 100644 --- a/lib/cchain/provider/abci.go +++ b/lib/cchain/provider/abci.go @@ -13,7 +13,6 @@ import ( "github.com/omni-network/omni/lib/cchain" "github.com/omni-network/omni/lib/errors" "github.com/omni-network/omni/lib/expbackoff" - "github.com/omni-network/omni/lib/log" "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/tracer" "github.com/omni-network/omni/lib/umath" @@ -352,7 +351,7 @@ func newABCIAllAttsFunc(cl atypes.QueryClient) allAttsFunc { } func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient, chainNamer func(xchain.ChainVersion) string) fetchFunc { - return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) { + return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, cursor uint64) ([]xchain.Attestation, uint64, error) { const endpoint = "fetch_attestations" defer latency(endpoint)() @@ -365,40 +364,60 @@ func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient, atts, ok, err := attsFromAtHeight(ctx, attCl, chainVer, fromOffset, 0) if err != nil { incQueryErr(endpoint) - return nil, errors.Wrap(err, "abci query attestations-from") + return nil, 0, errors.Wrap(err, "abci query attestations-from") } else if ok { - fetchStepsMetrics(chainName, 0, 0) - return atts, nil + binarySearchStepsMetric(chainName, 0) + lookbackStepsMetric(chainName, 0) + + return atts, cursor, nil } earliestAttestationAtLatestHeight, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, 0) if err != nil { incQueryErr(endpoint) - return nil, errors.Wrap(err, "abci query earliest-attestation-in-state") + return nil, 0, errors.Wrap(err, "abci query earliest-attestation-in-state") } // Either no attestations have happened yet, or the queried fromOffset is in the "future" // Caller has to wait and retry in both cases if !ok || earliestAttestationAtLatestHeight.AttestOffset < fromOffset { - // First attestation hasn't happened yet, return empty - return []xchain.Attestation{}, nil + // First attestation hasn't happened yet, return empty and set cursor to latest height + return []xchain.Attestation{}, earliestAttestationAtLatestHeight.BlockHeight, nil + } + + latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) + if err != nil { + return []xchain.Attestation{}, 0, errors.Wrap(err, "query latest block") + } + + latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height) + + // Binary search range from cached to latest + searchStart, searchEnd := cursor, latestHeight + if cursor == 0 { + // Unless no cached height provided, then perform lookback search + searchStart, searchEnd, err = lookbackRange(ctx, attCl, chainVer, chainName, fromOffset, latestHeight) + if err != nil { + incQueryErr(endpoint) + return nil, 0, errors.Wrap(err, "lookback search") + } } - offsetHeight, err := searchOffsetInHistory(ctx, cmtCl, attCl, chainVer, chainName, fromOffset) + offsetHeight, err := binarySearch(ctx, attCl, chainVer, chainName, fromOffset, searchStart, searchEnd) if err != nil { incQueryErr(endpoint) - return nil, errors.Wrap(err, "searching offset in history") + return nil, 0, errors.Wrap(err, "binary search") } atts, ok, err = attsFromAtHeight(ctx, attCl, chainVer, fromOffset, offsetHeight) if err != nil { incQueryErr(endpoint) - return nil, errors.Wrap(err, "abci query attestations-from") + return nil, 0, errors.Wrap(err, "abci query attestations-from") } else if !ok { - return nil, errors.New("expected to find attestations [BUG]") + return nil, 0, errors.New("expected to find attestations [BUG]") } - return atts, nil + return atts, offsetHeight, nil } } @@ -570,124 +589,6 @@ func spanName(endpoint string) string { return "cprovider/" + endpoint } -// searchOffsetInHistory searches the consensus state history and -// returns a historical consensus block height that contains an approved attestation -// for the provided chain version and fromOffset. -func searchOffsetInHistory(ctx context.Context, cmtCl cmtservice.ServiceClient, attCl atypes.QueryClient, chainVer xchain.ChainVersion, chainName string, fromOffset uint64) (uint64, error) { - const endpoint = "search_offset" - defer latency(endpoint)() - - // Exponentially backoff to find a good start point for binary search, this prefers more recent queries - - latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{}) - if err != nil { - return 0, errors.Wrap(err, "query latest block") - } - latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height) - - var startHeightIndex uint64 - endHeightIndex := latestHeight - lookback := uint64(1) - var lookbackStepsCounter uint64 // For metrics only - queryHeight := endHeightIndex - for { - lookbackStepsCounter++ - if queryHeight <= lookback { - // Query from the start, but don't break out yet -- we need to find the earliest height that we have state for - queryHeight = 1 - } else { - queryHeight -= lookback - } - - if queryHeight == 0 || queryHeight >= latestHeight { - return 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen - } - earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight) - if IsErrHistoryPruned(err) { - // We've jumped to before the prune height, but _might_ still have the requested offset - earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1) - if err != nil { - return 0, errors.Wrap(err, "failed to get earliest store height") - } - earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight) - if err != nil { - incQueryErr(endpoint) - return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") - } - - // If we're so far back that no attestation is found, or that we're before fromOffset, - // that's a good breaking point for binary search - if !ok || earliestAtt.AttestOffset <= fromOffset { - startHeightIndex = earliestStoreHeight - break - } - - // Otherwise, we just don't have the needed state, fail - return 0, ErrHistoryPruned - } - if err != nil { - incQueryErr(endpoint) - return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") - } - - // If we're before the first attestation, or found an earlier attestation, it's a good start height - if !ok || earliestAtt.AttestOffset <= fromOffset { - startHeightIndex = queryHeight - break - } - - // Otherwise, keep moving back - endHeightIndex = queryHeight - lookback *= 2 - } - - // We now have reasonable start and end indices for binary search - var binarySearchStepsCounter uint64 // For metrics only - for startHeightIndex <= endHeightIndex { - binarySearchStepsCounter++ - midHeightIndex := startHeightIndex + umath.SubtractOrZero(endHeightIndex, startHeightIndex)/2 - - earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeightIndex) - if err != nil { - incQueryErr(endpoint) - return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") - } - - if !ok { - // If we're so far back that there's no attestation at all, move forward - startHeightIndex = midHeightIndex + 1 - continue - } - - latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeightIndex) - if err != nil { - incQueryErr(endpoint) - return 0, errors.Wrap(err, "abci query latest-attestation") - } - - if !ok { - return 0, errors.New("no latest attestation found despite earlier check [BUG]") - } - - if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset { - log.Debug(ctx, "Fetching offset from history", "chain", chainName, "from", fromOffset, "latest", latestHeight, "found", midHeightIndex, "lookback", lookbackStepsCounter, "search", binarySearchStepsCounter) - fetchStepsMetrics(chainName, lookbackStepsCounter, binarySearchStepsCounter) - - return midHeightIndex, nil - } - - // Query at a lower or higher height depending on whether fromOffset - // is smaller or larger than the earliest offset we found - if fromOffset < earliestAtt.AttestOffset { - endHeightIndex = umath.SubtractOrZero(midHeightIndex, 1) - } else { - startHeightIndex = midHeightIndex + 1 - } - } - - return 0, errors.New("unexpectedly reach end of search method [BUG]") -} - // getEarliestStoreHeight walks forward from startPoint, and returns the first height for which we have the state in our Store. func getEarliestStoreHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, startPoint uint64) (uint64, error) { // Note: the correct thing to do here would be to query the node's Status, and look at its EarliestStoreHeight @@ -771,3 +672,139 @@ func attsFromAtHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchai return atts, true, nil } + +// binarySearch uses a binary search between defined start and end consensus block heights to +// find the attestations with the provided fromOffset. It returns the consensus block height +// that contains the attestation offset. +func binarySearch( + ctx context.Context, + attCl atypes.QueryClient, + chainVer xchain.ChainVersion, + chainName string, + fromOffset uint64, + startHeight uint64, + endHeight uint64, +) (uint64, error) { + const endpoint = "offset_binary_search" + defer latency(endpoint)() + + for steps := 0; startHeight <= endHeight; steps++ { + midHeight := startHeight + umath.SubtractOrZero(endHeight, startHeight)/2 + + earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeight) + if err != nil { + incQueryErr(endpoint) + return 0, errors.Wrap(err, "abci query earliest-attestation-in-state") + } else if !ok { + // If we're so far back that there's no attestation at all, move forward + startHeight = midHeight + 1 + continue + } + + if fromOffset == earliestAtt.AttestOffset { + binarySearchStepsMetric(chainName, steps) + return midHeight, nil + } + + latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeight) + if err != nil { + incQueryErr(endpoint) + return 0, errors.Wrap(err, "abci query latest-attestation") + } else if !ok { + return 0, errors.New("no latest attestation found despite earlier check [BUG]") + } + + if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset { + binarySearchStepsMetric(chainName, steps) + + return midHeight, nil + } + + // Query at a lower or higher height depending on whether fromOffset + // is smaller or larger than the earliest offset we found + if fromOffset < earliestAtt.AttestOffset { + endHeight = umath.SubtractOrZero(midHeight, 1) + } else { + startHeight = midHeight + 1 + } + } + + return 0, errors.New("unexpectedly reach end of search method [BUG]") +} + +// lookbackRange does an exponential lookback from the latest block height provided +// until attestation found has lower or equal offset than the provided from offset, this +// guarantees the fromOffset attestation will be found in the range. Start and +// end height defining the range are returned. +// +//nolint:nonamedreturns // named returned for clarity +func lookbackRange( + ctx context.Context, + attCl atypes.QueryClient, + chainVer xchain.ChainVersion, + chainName string, + fromOffset uint64, + latestHeight uint64, +) (startHeight uint64, endHeight uint64, err error) { + const endpoint = "offset_lookback" + defer latency(endpoint)() + + endHeight = latestHeight + lookback := uint64(1) + queryHeight := endHeight + + for steps := 0; ; steps++ { + if queryHeight <= lookback { + // Query from the start, but don't break out yet -- we need to find the earliest height that we have state for + queryHeight = 1 + } else { + queryHeight -= lookback + } + + if queryHeight == 0 || queryHeight >= latestHeight { + return 0, 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen + } + earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight) + if IsErrHistoryPruned(err) { + // We've jumped to before the prune height, but _might_ still have the requested offset + earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1) + if err != nil { + return 0, 0, errors.Wrap(err, "failed to get earliest store height") + } + + earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight) + if err != nil { + incQueryErr(endpoint) + return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state") + } + + // If we're so far back that no attestation is found, or that we're before fromOffset, + // that's a good breaking point for binary search + if !ok || earliestAtt.AttestOffset <= fromOffset { + lookbackStepsMetric(chainName, steps) + startHeight = earliestStoreHeight + + return startHeight, endHeight, nil + } + + // Otherwise, we just don't have the needed state, fail + return 0, 0, ErrHistoryPruned + } + if err != nil { + incQueryErr(endpoint) + return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state") + } + + // If we're before the first attestation, or found an earlier attestation, it's a good start height + if !ok || earliestAtt.AttestOffset <= fromOffset { + lookbackStepsMetric(chainName, steps) + startHeight = queryHeight + + return startHeight, endHeight, nil + } + + // Otherwise, keep moving back + endHeight = queryHeight + lookback *= 2 + } +} diff --git a/lib/cchain/provider/metrics.go b/lib/cchain/provider/metrics.go index 518b74e82..a6891024e 100644 --- a/lib/cchain/provider/metrics.go +++ b/lib/cchain/provider/metrics.go @@ -69,9 +69,12 @@ var ( }, []string{"chain_version"}) ) -func fetchStepsMetrics(chainName string, lookbackSteps, binarySearchSteps uint64) { - fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(lookbackSteps)) - fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(binarySearchSteps)) +func lookbackStepsMetric(chainName string, steps int) { + fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(steps)) +} + +func binarySearchStepsMetric(chainName string, steps int) { + fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(steps)) } func latency(endpoint string) func() { diff --git a/lib/cchain/provider/provider.go b/lib/cchain/provider/provider.go index f7b2edcad..a6c8c472c 100644 --- a/lib/cchain/provider/provider.go +++ b/lib/cchain/provider/provider.go @@ -29,7 +29,12 @@ import ( var _ cchain.Provider = Provider{} -type fetchFunc func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) +// fetchFunc returns a slice of strictly-sequential attestations for the +// provided fromOffset (inclusive) and chain version if present. +// +// It also accepts (optional) and returns an opaque cursor (when error is nil). +// When streaming attestation, the returned cursor can be provided in the subsequent call for improved performance. +type fetchFunc func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, cursor uint64) ([]xchain.Attestation, uint64, error) type allAttsFunc func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) type latestFunc func(ctx context.Context, chainVer xchain.ChainVersion) (xchain.Attestation, bool, error) type windowFunc func(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64) (int, error) @@ -95,9 +100,13 @@ func (p Provider) AppliedPlan(ctx context.Context, name string) (upgradetypes.Pl return p.appliedFunc(ctx, name) } -func (p Provider) AttestationsFrom(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, +func (p Provider) AttestationsFrom( + ctx context.Context, + chainVer xchain.ChainVersion, + attestOffset uint64, ) ([]xchain.Attestation, error) { - return p.fetch(ctx, chainVer, attestOffset) + atts, _, err := p.fetch(ctx, chainVer, attestOffset, 0) + return atts, err } func (p Provider) AllAttestationsFrom(ctx context.Context, chainVer xchain.ChainVersion, attestOffset uint64, @@ -179,9 +188,19 @@ func (p Provider) stream( srcChain := p.chainNamer(chainVer) ctx := log.WithCtx(in, "src_chain", srcChain, "worker", workerName) + // Cache the previous cursor (consensus height) at which we found the attestation + // to be used in the next fetch call as the search start height + var fetchCursor uint64 + deps := stream.Deps[xchain.Attestation]{ FetchBatch: func(ctx context.Context, offset uint64) ([]xchain.Attestation, error) { - return p.fetch(ctx, chainVer, offset) + atts, cursor, err := p.fetch(ctx, chainVer, offset, fetchCursor) + if err != nil { + return nil, err + } + fetchCursor = cursor + + return atts, nil }, Backoff: p.backoffFunc, ElemLabel: "attestation", diff --git a/lib/cchain/provider/provider_test.go b/lib/cchain/provider/provider_test.go index 8fd548f28..67f5430c3 100644 --- a/lib/cchain/provider/provider_test.go +++ b/lib/cchain/provider/provider_test.go @@ -12,6 +12,7 @@ import ( "github.com/omni-network/omni/lib/netconf" "github.com/omni-network/omni/lib/xchain" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -122,7 +123,7 @@ func TestProvider(t *testing.T) { } backoff := new(testBackOff) - fetcher := newTestFetcher(errs, expectCount) + fetcher := newTestFetcher(t, errs, expectCount) p := provider.NewProviderForT(t, fetcher.Fetch, nil, nil, backoff.BackOff) @@ -158,8 +159,10 @@ func TestProvider(t *testing.T) { } } -func newTestFetcher(errs, maxCount int) *testFetcher { +func newTestFetcher(t *testing.T, errs, maxCount int) *testFetcher { + t.Helper() return &testFetcher{ + t: t, errs: errs, maxCount: maxCount, } @@ -169,6 +172,7 @@ func newTestFetcher(errs, maxCount int) *testFetcher { // It first returns errs errors. // Then it returns 0,1,2,3,4,5... attestations up to max. type testFetcher struct { + t *testing.T mu sync.Mutex errs int maxCount int @@ -197,21 +201,28 @@ func (f *testFetcher) Errs() int { return f.errs } -func (f *testFetcher) Fetch(ctx context.Context, chainVer xchain.ChainVersion, fromHeight uint64, -) ([]xchain.Attestation, error) { +func (f *testFetcher) Fetch( + ctx context.Context, + chainVer xchain.ChainVersion, + fromHeight uint64, + cursor uint64, +) ([]xchain.Attestation, uint64, error) { f.mu.Lock() defer f.mu.Unlock() if f.errs > 0 { f.errs-- - return nil, errors.New("test error") + return nil, 0, errors.New("test error") } else if f.count >= f.maxCount { // Block and wait for test to cancel the context // This is required for deterministic fetch assertions since it is done async wrt callbacks. <-ctx.Done() - return nil, ctx.Err() + return nil, cursor, ctx.Err() } + // we use count as consensus block height + assert.Equal(f.t, uint64(f.count), cursor, "search start height invalid") + toReturn := f.count f.count++ @@ -230,7 +241,7 @@ func (f *testFetcher) Fetch(ctx context.Context, chainVer xchain.ChainVersion, f f.fetched += len(resp) - return resp, nil + return resp, uint64(f.count), nil } type testBackOff struct {