Skip to content

Commit

Permalink
add customized info in sdk's fetcher, reconciler, syncer and db (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
jingweicb authored Jan 13, 2023
1 parent 2301850 commit 4c69164
Show file tree
Hide file tree
Showing 114 changed files with 422 additions and 183 deletions.
4 changes: 2 additions & 2 deletions asserter/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func Amount(amount *types.Amount) error {
return ErrAmountValueMissing
}

_, ok := new(big.Int).SetString(amount.Value, 10)
_, ok := new(big.Int).SetString(amount.Value, 10) // nolint: gomnd
if !ok {
return ErrAmountIsNotInt
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func (a *Asserter) Operations( // nolint:gocognit

if a.validations.Enabled {
if op.Type == a.validations.Payment.Name {
val, _ := new(big.Int).SetString(op.Amount.Value, 10)
val, _ := new(big.Int).SetString(op.Amount.Value, 10) // nolint: gomnd
paymentTotal.Add(paymentTotal, val)
paymentCount++
}
Expand Down
2 changes: 1 addition & 1 deletion client/api_account.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_block.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_call.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_construction.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_events.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_mempool.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_network.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/api_search.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/configuration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion client/response.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion constructor/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (j *Job) unmarshalNumber(
return nil, ErrVariableNotFound
}

i, ok := new(big.Int).SetString(value.String(), 10)
i, ok := new(big.Int).SetString(value.String(), 10) // nolint: gomnd
if !ok {
return nil, ErrVariableIncorrectFormat
}
Expand Down
44 changes: 30 additions & 14 deletions fetcher/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/fatih/color"

"github.com/coinbase/rosetta-sdk-go/asserter"
"github.com/coinbase/rosetta-sdk-go/types"
)
Expand All @@ -33,8 +35,10 @@ func (f *Fetcher) AccountBalance(
currencies []*types.Currency,
) (*types.BlockIdentifier, []*types.Amount, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
err = fmt.Errorf("failed to acquire semaphore: %w%s", err, f.metaData)
color.Red(err.Error())
return nil, nil, nil, &Error{
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
Err: err,
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -55,11 +59,10 @@ func (f *Fetcher) AccountBalance(
block,
response,
); err != nil {
err = fmt.Errorf("/account/balance response is invalid: %w%s", err, f.metaData)
color.Red(err.Error())
fetcherErr := &Error{
Err: fmt.Errorf(
"/account/balance response is invalid: %w",
err,
),
Err: err,
}
return nil, nil, nil, fetcherErr
}
Expand Down Expand Up @@ -101,15 +104,19 @@ func (f *Fetcher) AccountBalanceRetry(
}

if is, _ := asserter.Err(err.Err); is {
errForPrint := fmt.Errorf("/account/balance not attempting retry: %w", err.Err)
color.Red(errForPrint.Error())
fetcherErr := &Error{
Err: fmt.Errorf("/account/balance not attempting retry: %w", err.Err),
Err: errForPrint,
ClientErr: err.ClientErr,
}
return nil, nil, nil, fetcherErr
}

msg := fmt.Sprintf("/account/balance %s%s", types.PrintStruct(account), f.metaData)
color.Cyan(msg)
if err := tryAgain(
fmt.Sprintf("/account/balance %s", types.PrintStruct(account)),
msg,
backoffRetries,
err,
); err != nil {
Expand All @@ -128,8 +135,10 @@ func (f *Fetcher) AccountCoins(
currencies []*types.Currency,
) (*types.BlockIdentifier, []*types.Coin, map[string]interface{}, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
err = fmt.Errorf("failed to acquire semaphore: %w%s", err, f.metaData)
color.Red(err.Error())
return nil, nil, nil, &Error{
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
Err: err,
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand All @@ -149,11 +158,10 @@ func (f *Fetcher) AccountCoins(
if err := asserter.AccountCoinsResponse(
response,
); err != nil {
err = fmt.Errorf("/account/coins response is invalid: %w%s", err, f.metaData)
color.Red(err.Error())
fetcherErr := &Error{
Err: fmt.Errorf(
"/account/coins response is invalid: %w",
err,
),
Err: err,
}
return nil, nil, nil, fetcherErr
}
Expand Down Expand Up @@ -194,15 +202,23 @@ func (f *Fetcher) AccountCoinsRetry(
}

if is, _ := asserter.Err(err.Err); is {
errForPrint := fmt.Errorf(
"/account/coins not attempting retry: %w%s",
err.Err,
f.metaData,
)
color.Red(errForPrint.Error())
fetcherErr := &Error{
Err: fmt.Errorf("/account/coins not attempting retry: %w", err.Err),
Err: errForPrint,
ClientErr: err.ClientErr,
}
return nil, nil, nil, fetcherErr
}

msg := fmt.Sprintf("/account/coins %s%s", types.PrintStruct(account), f.metaData)
color.Cyan(msg)
if err := tryAgain(
fmt.Sprintf("/account/coins %s", types.PrintStruct(account)),
msg,
backoffRetries,
err,
); err != nil {
Expand Down
19 changes: 15 additions & 4 deletions fetcher/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/fatih/color"
"golang.org/x/sync/errgroup"

"github.com/coinbase/rosetta-sdk-go/asserter"
Expand Down Expand Up @@ -72,8 +73,10 @@ func (f *Fetcher) fetchChannelTransactions(
) *Error {
// We keep the lock for all transactions we fetch in this goroutine.
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
err = fmt.Errorf("failed to acquire semaphore: %w%s", err, f.metaData)
color.Red(err.Error())
return &Error{
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
Err: err,
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand Down Expand Up @@ -111,6 +114,7 @@ func (f *Fetcher) fetchChannelTransactions(
))

txFetchErr := fmt.Sprintf("transaction %s", types.PrintStruct(transactionIdentifier))
color.Red("%s%s", txFetchErr, f.metaData)
if err := tryAgain(txFetchErr, backoffRetries, fetchErr); err != nil {
return err
}
Expand Down Expand Up @@ -214,8 +218,10 @@ func (f *Fetcher) UnsafeBlock(
blockIdentifier *types.PartialBlockIdentifier,
) (*types.Block, *Error) {
if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil {
err = fmt.Errorf("failed to acquire semaphore: %w%s", err, f.metaData)
color.Red(err.Error())
return nil, &Error{
Err: fmt.Errorf("failed to acquire semaphore: %w", err),
Err: err,
}
}
defer f.connectionSemaphore.Release(semaphoreRequestWeight)
Expand Down Expand Up @@ -273,8 +279,10 @@ func (f *Fetcher) Block(
}

if err := f.Asserter.Block(block); err != nil {
err = fmt.Errorf("/block response is invalid: %w%s", err, f.metaData)
color.Red(err.Error())
fetcherErr := &Error{
Err: fmt.Errorf("/block response is invalid: %w", err),
Err: err,
}
return nil, fetcherErr
}
Expand Down Expand Up @@ -313,14 +321,17 @@ func (f *Fetcher) BlockRetry(
}

if is, _ := asserter.Err(err.Err); is {
errForPrint := fmt.Errorf("/block not attempting retry: %w%s", err.Err, f.metaData)
color.Red(errForPrint.Error())
fetcherErr := &Error{
Err: fmt.Errorf("/block not attempting retry: %w", err.Err),
Err: errForPrint,
ClientErr: err.ClientErr,
}
return nil, fetcherErr
}

blockFetchErr := fmt.Sprintf("block %s", types.PrintStruct(blockIdentifier))
color.Red("%s%s", blockFetchErr, f.metaData)
if err := tryAgain(blockFetchErr, backoffRetries, err); err != nil {
return nil, err
}
Expand Down
7 changes: 7 additions & 0 deletions fetcher/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,10 @@ func WithForceRetry() Option {
f.forceRetry = true
}
}

// add a metaData map to fetcher
func WithMetaData(metaData string) Option {
return func(f *Fetcher) {
f.metaData = metaData
}
}
15 changes: 13 additions & 2 deletions fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"log"

"github.com/fatih/color"

utils "github.com/coinbase/rosetta-sdk-go/errors"
"github.com/coinbase/rosetta-sdk-go/types"
)
Expand Down Expand Up @@ -50,12 +52,21 @@ func (f *Fetcher) RequestFailedError(
// If there is a *types.Error assertion error, we log it instead
// of exiting. Exiting abruptly here may cause unintended consequences.
if assertionErr := f.Asserter.Error(rosettaErr); assertionErr != nil {
log.Printf("error %s assertion failed: %s", types.PrintStruct(rosettaErr), assertionErr)
msg := fmt.Sprintf(
"error %s assertion failed: %s%s",
types.PrintStruct(rosettaErr),
assertionErr,
f.metaData,
)
color.Cyan(msg)
log.Println(msg)
}
}

errForPrint := fmt.Errorf("%s %s: %w%s", message, err.Error(), ErrRequestFailed, f.metaData)
color.Red(errForPrint.Error())
return &Error{
Err: fmt.Errorf("%s %s: %w", message, err.Error(), ErrRequestFailed),
Err: errForPrint,
ClientErr: rosettaErr,
Retry: ((rosettaErr != nil && rosettaErr.Retriable) || transientError(err) || f.forceRetry) &&
!errors.Is(err, context.Canceled),
Expand Down
3 changes: 3 additions & 0 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"time"

"github.com/fatih/color"
"golang.org/x/sync/semaphore"

"github.com/coinbase/rosetta-sdk-go/asserter"
Expand Down Expand Up @@ -75,6 +76,7 @@ type Fetcher struct {
insecureTLS bool
forceRetry bool
httpTimeout time.Duration
metaData string

// connectionSemaphore is used to limit the
// number of concurrent requests we make.
Expand Down Expand Up @@ -155,6 +157,7 @@ func (f *Fetcher) InitializeAsserter(
*Error,
) {
if f.Asserter != nil {
color.Red("asserter already initialized%s", f.metaData)
return nil, nil, &Error{Err: errors.New("asserter already initialized")}
}

Expand Down
2 changes: 1 addition & 1 deletion parser/exemptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func MatchBalanceExemption(
matchedExemptions []*types.BalanceExemption,
difference string, // live - computed
) *types.BalanceExemption {
bigDifference, ok := new(big.Int).SetString(difference, 10)
bigDifference, ok := new(big.Int).SetString(difference, 10) // nolint
if !ok {
return nil
}
Expand Down
7 changes: 7 additions & 0 deletions reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,10 @@ func WithBacklogSize(size int) Option {
r.backlogSize = size
}
}

// add a metaData map to fetcher
func WithMetaData(metaData string) Option {
return func(r *Reconciler) {
r.metaData = metaData
}
}
3 changes: 3 additions & 0 deletions reconciler/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,7 @@ type Reconciler struct {
// blocks asynchronously so that we don't slow down the sync
// loop.
processQueue chan *blockRequest

// store customized data
metaData string
}
2 changes: 1 addition & 1 deletion server/api.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion server/api_account.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion server/api_block.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion server/api_call.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Coinbase, Inc.
// Copyright 2023 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 4c69164

Please sign in to comment.