Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(realyer): cache height for offset search #2325

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 168 additions & 131 deletions lib/cchain/provider/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/omni-network/omni/lib/cchain"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/expbackoff"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/netconf"
"github.com/omni-network/omni/lib/tracer"
"github.com/omni-network/omni/lib/umath"
Expand Down Expand Up @@ -352,7 +351,7 @@ func newABCIAllAttsFunc(cl atypes.QueryClient) allAttsFunc {
}

func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient, chainNamer func(xchain.ChainVersion) string) fetchFunc {
return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64) ([]xchain.Attestation, error) {
return func(ctx context.Context, chainVer xchain.ChainVersion, fromOffset uint64, cursor uint64) ([]xchain.Attestation, uint64, error) {
const endpoint = "fetch_attestations"
defer latency(endpoint)()

Expand All @@ -365,40 +364,60 @@ func newABCIFetchFunc(attCl atypes.QueryClient, cmtCl cmtservice.ServiceClient,
atts, ok, err := attsFromAtHeight(ctx, attCl, chainVer, fromOffset, 0)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query attestations-from")
return nil, 0, errors.Wrap(err, "abci query attestations-from")
} else if ok {
fetchStepsMetrics(chainName, 0, 0)
return atts, nil
binarySearchStepsMetric(chainName, 0)
lookbackStepsMetric(chainName, 0)

return atts, cursor, nil
}

earliestAttestationAtLatestHeight, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, 0)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query earliest-attestation-in-state")
return nil, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// Either no attestations have happened yet, or the queried fromOffset is in the "future"
// Caller has to wait and retry in both cases
if !ok || earliestAttestationAtLatestHeight.AttestOffset < fromOffset {
// First attestation hasn't happened yet, return empty
return []xchain.Attestation{}, nil
// First attestation hasn't happened yet, return empty and set cursor to latest height
return []xchain.Attestation{}, earliestAttestationAtLatestHeight.BlockHeight, nil
}

latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{})
if err != nil {
return []xchain.Attestation{}, 0, errors.Wrap(err, "query latest block")
}

latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height)

// Binary search range from cached to latest
searchStart, searchEnd := cursor, latestHeight
if cursor == 0 {
// Unless no cached height provided, then perform lookback search
searchStart, searchEnd, err = lookbackRange(ctx, attCl, chainVer, chainName, fromOffset, latestHeight)
if err != nil {
incQueryErr(endpoint)
return nil, 0, errors.Wrap(err, "lookback search")
}
}

offsetHeight, err := searchOffsetInHistory(ctx, cmtCl, attCl, chainVer, chainName, fromOffset)
offsetHeight, err := binarySearch(ctx, attCl, chainVer, chainName, fromOffset, searchStart, searchEnd)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "searching offset in history")
return nil, 0, errors.Wrap(err, "binary search")
}

atts, ok, err = attsFromAtHeight(ctx, attCl, chainVer, fromOffset, offsetHeight)
if err != nil {
incQueryErr(endpoint)
return nil, errors.Wrap(err, "abci query attestations-from")
return nil, 0, errors.Wrap(err, "abci query attestations-from")
} else if !ok {
return nil, errors.New("expected to find attestations [BUG]")
return nil, 0, errors.New("expected to find attestations [BUG]")
}

return atts, nil
return atts, offsetHeight, nil
}
}

Expand Down Expand Up @@ -570,124 +589,6 @@ func spanName(endpoint string) string {
return "cprovider/" + endpoint
}

// searchOffsetInHistory searches the consensus state history and
// returns a historical consensus block height that contains an approved attestation
// for the provided chain version and fromOffset.
func searchOffsetInHistory(ctx context.Context, cmtCl cmtservice.ServiceClient, attCl atypes.QueryClient, chainVer xchain.ChainVersion, chainName string, fromOffset uint64) (uint64, error) {
sideninja marked this conversation as resolved.
Show resolved Hide resolved
const endpoint = "search_offset"
defer latency(endpoint)()

// Exponentially backoff to find a good start point for binary search, this prefers more recent queries

latestBlockResp, err := cmtCl.GetLatestBlock(ctx, &cmtservice.GetLatestBlockRequest{})
if err != nil {
return 0, errors.Wrap(err, "query latest block")
}
latestHeight := uint64(latestBlockResp.SdkBlock.Header.Height)

var startHeightIndex uint64
endHeightIndex := latestHeight
lookback := uint64(1)
var lookbackStepsCounter uint64 // For metrics only
queryHeight := endHeightIndex
for {
lookbackStepsCounter++
if queryHeight <= lookback {
// Query from the start, but don't break out yet -- we need to find the earliest height that we have state for
queryHeight = 1
} else {
queryHeight -= lookback
}

if queryHeight == 0 || queryHeight >= latestHeight {
return 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen
}
earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight)
if IsErrHistoryPruned(err) {
// We've jumped to before the prune height, but _might_ still have the requested offset
earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1)
if err != nil {
return 0, errors.Wrap(err, "failed to get earliest store height")
}
earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're so far back that no attestation is found, or that we're before fromOffset,
// that's a good breaking point for binary search
if !ok || earliestAtt.AttestOffset <= fromOffset {
startHeightIndex = earliestStoreHeight
break
}

// Otherwise, we just don't have the needed state, fail
return 0, ErrHistoryPruned
}
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're before the first attestation, or found an earlier attestation, it's a good start height
if !ok || earliestAtt.AttestOffset <= fromOffset {
startHeightIndex = queryHeight
break
}

// Otherwise, keep moving back
endHeightIndex = queryHeight
lookback *= 2
}

// We now have reasonable start and end indices for binary search
var binarySearchStepsCounter uint64 // For metrics only
for startHeightIndex <= endHeightIndex {
binarySearchStepsCounter++
midHeightIndex := startHeightIndex + umath.SubtractOrZero(endHeightIndex, startHeightIndex)/2

earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeightIndex)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

if !ok {
// If we're so far back that there's no attestation at all, move forward
startHeightIndex = midHeightIndex + 1
continue
}

latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeightIndex)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query latest-attestation")
}

if !ok {
return 0, errors.New("no latest attestation found despite earlier check [BUG]")
}

if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset {
log.Debug(ctx, "Fetching offset from history", "chain", chainName, "from", fromOffset, "latest", latestHeight, "found", midHeightIndex, "lookback", lookbackStepsCounter, "search", binarySearchStepsCounter)
fetchStepsMetrics(chainName, lookbackStepsCounter, binarySearchStepsCounter)

return midHeightIndex, nil
}

// Query at a lower or higher height depending on whether fromOffset
// is smaller or larger than the earliest offset we found
if fromOffset < earliestAtt.AttestOffset {
endHeightIndex = umath.SubtractOrZero(midHeightIndex, 1)
} else {
startHeightIndex = midHeightIndex + 1
}
}

return 0, errors.New("unexpectedly reach end of search method [BUG]")
}

// getEarliestStoreHeight walks forward from startPoint, and returns the first height for which we have the state in our Store.
func getEarliestStoreHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchain.ChainVersion, startPoint uint64) (uint64, error) {
// Note: the correct thing to do here would be to query the node's Status, and look at its EarliestStoreHeight
Expand Down Expand Up @@ -771,3 +672,139 @@ func attsFromAtHeight(ctx context.Context, cl atypes.QueryClient, chainVer xchai

return atts, true, nil
}

// binarySearch uses a binary search between defined start and end consensus block heights to
// find the attestations with the provided fromOffset. It returns the consensus block height
// that contains the attestation offset.
func binarySearch(
sideninja marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
attCl atypes.QueryClient,
chainVer xchain.ChainVersion,
chainName string,
fromOffset uint64,
startHeight uint64,
endHeight uint64,
) (uint64, error) {
const endpoint = "offset_binary_search"
defer latency(endpoint)()

for steps := 0; startHeight <= endHeight; steps++ {
midHeight := startHeight + umath.SubtractOrZero(endHeight, startHeight)/2

earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, midHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
} else if !ok {
// If we're so far back that there's no attestation at all, move forward
startHeight = midHeight + 1
continue
}

sideninja marked this conversation as resolved.
Show resolved Hide resolved
if fromOffset == earliestAtt.AttestOffset {
binarySearchStepsMetric(chainName, steps)
return midHeight, nil
}

latestAtt, ok, err := queryLatestAttestation(ctx, attCl, chainVer, midHeight)
if err != nil {
incQueryErr(endpoint)
return 0, errors.Wrap(err, "abci query latest-attestation")
} else if !ok {
return 0, errors.New("no latest attestation found despite earlier check [BUG]")
}

if fromOffset >= earliestAtt.AttestOffset && fromOffset <= latestAtt.AttestOffset {
binarySearchStepsMetric(chainName, steps)

return midHeight, nil
}

// Query at a lower or higher height depending on whether fromOffset
// is smaller or larger than the earliest offset we found
if fromOffset < earliestAtt.AttestOffset {
endHeight = umath.SubtractOrZero(midHeight, 1)
} else {
startHeight = midHeight + 1
}
}

return 0, errors.New("unexpectedly reach end of search method [BUG]")
}

// lookbackRange does an exponential lookback from the latest block height provided
// until attestation found has lower or equal offset than the provided from offset, this
// guarantees the fromOffset attestation will be found in the range. Start and
// end height defining the range are returned.
//
//nolint:nonamedreturns // named returned for clarity
func lookbackRange(
sideninja marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
attCl atypes.QueryClient,
chainVer xchain.ChainVersion,
chainName string,
fromOffset uint64,
latestHeight uint64,
) (startHeight uint64, endHeight uint64, err error) {
const endpoint = "offset_lookback"
defer latency(endpoint)()

endHeight = latestHeight
lookback := uint64(1)
queryHeight := endHeight

for steps := 0; ; steps++ {
if queryHeight <= lookback {
// Query from the start, but don't break out yet -- we need to find the earliest height that we have state for
queryHeight = 1
} else {
queryHeight -= lookback
}

if queryHeight == 0 || queryHeight >= latestHeight {
return 0, 0, errors.New("unexpected query height [BUG]", "height", queryHeight) // This should never happen
}
earliestAtt, ok, err := queryEarliestAttestation(ctx, attCl, chainVer, queryHeight)
if IsErrHistoryPruned(err) {
// We've jumped to before the prune height, but _might_ still have the requested offset
earliestStoreHeight, err := getEarliestStoreHeight(ctx, attCl, chainVer, queryHeight+1)
if err != nil {
return 0, 0, errors.Wrap(err, "failed to get earliest store height")
}

earliestAtt, ok, err = queryEarliestAttestation(ctx, attCl, chainVer, earliestStoreHeight)
if err != nil {
incQueryErr(endpoint)
return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're so far back that no attestation is found, or that we're before fromOffset,
// that's a good breaking point for binary search
if !ok || earliestAtt.AttestOffset <= fromOffset {
lookbackStepsMetric(chainName, steps)
startHeight = earliestStoreHeight

return startHeight, endHeight, nil
}

// Otherwise, we just don't have the needed state, fail
return 0, 0, ErrHistoryPruned
}
if err != nil {
incQueryErr(endpoint)
return 0, 0, errors.Wrap(err, "abci query earliest-attestation-in-state")
}

// If we're before the first attestation, or found an earlier attestation, it's a good start height
if !ok || earliestAtt.AttestOffset <= fromOffset {
lookbackStepsMetric(chainName, steps)
startHeight = queryHeight

return startHeight, endHeight, nil
}

// Otherwise, keep moving back
endHeight = queryHeight
lookback *= 2
}
}
9 changes: 6 additions & 3 deletions lib/cchain/provider/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,12 @@ var (
}, []string{"chain_version"})
)

func fetchStepsMetrics(chainName string, lookbackSteps, binarySearchSteps uint64) {
fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(lookbackSteps))
fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(binarySearchSteps))
func lookbackStepsMetric(chainName string, steps int) {
fetchLookbackSteps.WithLabelValues(chainName).Observe(float64(steps))
}

func binarySearchStepsMetric(chainName string, steps int) {
fetchBinarySearchSteps.WithLabelValues(chainName).Observe(float64(steps))
}

func latency(endpoint string) func() {
Expand Down
Loading
Loading