diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index c47f83c63..674d5cc4f 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -14,6 +14,8 @@ jobs: uses: cachix/install-nix-action@3715ab1a11cac9e991980d7b4a28d80c7ebdd8f9 # nix:v2.24.6 with: nix_path: nixpkgs=channel:nixos-unstable + - name: Print golangci-lint Version + run: nix develop -c golangci-lint --version - name: golangci-lint run: nix develop -c make lint-go-integration-tests - name: Print lint report artifact @@ -37,6 +39,8 @@ jobs: uses: cachix/install-nix-action@3715ab1a11cac9e991980d7b4a28d80c7ebdd8f9 # nix:v2.24.6 with: nix_path: nixpkgs=channel:nixos-unstable + - name: Print golangci-lint Version + run: nix develop -c golangci-lint --version - name: golangci-lint run: nix develop -c make lint-go-relay - name: Print lint report artifact diff --git a/integration-tests/smoke/ocr2_test.go b/integration-tests/smoke/ocr2_test.go index 330222fdc..92f6af18f 100644 --- a/integration-tests/smoke/ocr2_test.go +++ b/integration-tests/smoke/ocr2_test.go @@ -20,7 +20,7 @@ import ( ) func TestSolanaOCRV2Smoke(t *testing.T) { - for _, test := range []struct { + tests := []struct { name string env map[string]string }{ @@ -29,7 +29,11 @@ func TestSolanaOCRV2Smoke(t *testing.T) { "CL_MEDIAN_CMD": "chainlink-feeds", "CL_SOLANA_CMD": "chainlink-solana", }}, - } { + } + + for idx := range tests { + test := tests[idx] + config, err := tc.GetConfig("Smoke", tc.OCR2) if err != nil { t.Fatal(err) diff --git a/integration-tests/soak/ocr2_test.go b/integration-tests/soak/ocr2_test.go index b13eea188..de0bc78a7 100644 --- a/integration-tests/soak/ocr2_test.go +++ b/integration-tests/soak/ocr2_test.go @@ -20,7 +20,7 @@ import ( ) func TestSolanaOCRV2Soak(t *testing.T) { - for _, test := range []struct { + tests := []struct { name string env map[string]string }{ @@ -29,7 +29,11 @@ func TestSolanaOCRV2Soak(t *testing.T) { "CL_MEDIAN_CMD": "chainlink-feeds", "CL_SOLANA_CMD": "chainlink-solana", }}, - } { + } + + for idx := range tests { + test := tests[idx] + config, err := tc.GetConfig("Soak", tc.OCR2) if err != nil { t.Fatal(err) diff --git a/integration-tests/solclient/deployer.go b/integration-tests/solclient/deployer.go index 1d41600a6..98be3ea31 100644 --- a/integration-tests/solclient/deployer.go +++ b/integration-tests/solclient/deployer.go @@ -502,11 +502,13 @@ func (c *ContractDeployer) DeployAnchorProgramsRemote(contractsDir string, env * } log.Debug().Interface("Binaries", contractBinaries).Msg("Program binaries") g := errgroup.Group{} - for _, bin := range contractBinaries { + + for idx := range contractBinaries { g.Go(func() error { - return c.DeployProgramRemote(bin, env) + return c.DeployProgramRemote(contractBinaries[idx], env) }) } + return g.Wait() } @@ -517,11 +519,13 @@ func (c *ContractDeployer) DeployAnchorProgramsRemoteDocker(baseDir, subDir stri } log.Info().Interface("Binaries", contractBinaries).Msg(fmt.Sprintf("Program binaries [%s]", filepath.Join("programs", subDir))) g := errgroup.Group{} - for _, bin := range contractBinaries { + + for idx := range contractBinaries { g.Go(func() error { - return c.DeployProgramRemoteLocal(filepath.Join(subDir, bin), sol, programIDBuilder) + return c.DeployProgramRemoteLocal(filepath.Join(subDir, contractBinaries[idx]), sol, programIDBuilder) }) } + return g.Wait() } diff --git a/pkg/solana/config_tracker.go b/pkg/solana/config_tracker.go index 425e62d46..96185bf5b 100644 --- a/pkg/solana/config_tracker.go +++ b/pkg/solana/config_tracker.go @@ -26,13 +26,15 @@ func (c *ConfigTracker) LatestConfigDetails(ctx context.Context) (changedInBlock func ConfigFromState(ctx context.Context, state State) (types.ContractConfig, error) { pubKeys := []types.OnchainPublicKey{} accounts := []types.Account{} + oracles, err := state.Oracles.Data() if err != nil { return types.ContractConfig{}, err } - for _, o := range oracles { - pubKeys = append(pubKeys, o.Signer.Key[:]) - accounts = append(accounts, types.Account(o.Transmitter.String())) + + for idx := range oracles { + pubKeys = append(pubKeys, oracles[idx].Signer.Key[:]) + accounts = append(accounts, types.Account(oracles[idx].Transmitter.String())) } onchainConfigStruct := median.OnchainConfig{ diff --git a/pkg/solana/fees/block_history_test.go b/pkg/solana/fees/block_history_test.go index 03ab41b63..2cb92cc72 100644 --- a/pkg/solana/fees/block_history_test.go +++ b/pkg/solana/fees/block_history_test.go @@ -41,8 +41,8 @@ func TestBlockHistoryEstimator_InvalidBlockHistorySize(t *testing.T) { func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { // Helper variables for tests - min := uint64(10) - max := uint64(100_000) + minPrice := uint64(10) + maxPrice := uint64(100_000) defaultPrice := uint64(100) depth := uint64(1) // 1 is LatestBlockEstimator pollPeriod := 100 * time.Millisecond @@ -63,13 +63,13 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { t.Run("Successful Estimation", func(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Assert the computed price matches the expected price require.NoError(t, estimator.calculatePrice(ctx), "Failed to calculate price") - cfg.On("ComputeUnitPriceMin").Return(min) - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMin").Return(minPrice) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, uint64(lastBlockMedianPrice), estimator.BaseComputeUnitPrice()) }) @@ -77,7 +77,7 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) tmpMin := uint64(lastBlockMedianPrice) + 100 // Set min higher than the median price - setupConfigMock(cfg, defaultPrice, tmpMin, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, tmpMin, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Call calculatePrice and ensure no error @@ -91,14 +91,14 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) tmpMax := uint64(lastBlockMedianPrice) - 100 // Set max lower than the median price - setupConfigMock(cfg, defaultPrice, min, tmpMax, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Call calculatePrice and ensure no error // Assert the compute unit price is capped at max require.NoError(t, estimator.calculatePrice(ctx), "Failed to calculate price with price above max") cfg.On("ComputeUnitPriceMax").Return(tmpMax) - cfg.On("ComputeUnitPriceMin").Return(min) + cfg.On("ComputeUnitPriceMin").Return(minPrice) assert.Equal(t, tmpMax, estimator.BaseComputeUnitPrice(), "Price should be capped at max") }) @@ -109,13 +109,13 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("GetLatestBlock", mock.Anything).Return(nil, fmt.Errorf("fail rpc call")) // Mock GetLatestBlock returning error estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Ensure the price remains unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when GetLatestBlock fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, uint64(100), estimator.BaseComputeUnitPrice(), "Price should not change when GetLatestBlock fails") }) @@ -126,13 +126,13 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("GetLatestBlock", mock.Anything).Return(nil, nil) // Mock GetLatestBlock returning nil estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Ensure the price remains unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when parsing fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, uint64(100), estimator.BaseComputeUnitPrice(), "Price should not change when parsing fails") }) @@ -143,13 +143,13 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("GetLatestBlock", mock.Anything).Return(&rpc.GetBlockResult{}, nil) // Mock GetLatestBlock returning empty block estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Ensure the price remains unchanged require.EqualError(t, estimator.calculatePrice(ctx), errNoComputeUnitPriceCollected.Error(), "Expected error when no compute unit prices are collected") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, uint64(100), estimator.BaseComputeUnitPrice(), "Price should not change when median calculation fails") }) @@ -160,21 +160,21 @@ func TestBlockHistoryEstimator_LatestBlock(t *testing.T) { return nil, fmt.Errorf("fail client load") }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwFailLoader, cfg, logger.Test(t)) // Call calculatePrice and expect an error // Ensure the price remains unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when getting client fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice(), "Price should remain at default when client fails") }) } func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { // helpers vars for tests - min := uint64(100) - max := uint64(100_000) + minPrice := uint64(100) + maxPrice := uint64(100_000) depth := uint64(3) defaultPrice := uint64(100) pollPeriod := 3 * time.Second @@ -220,12 +220,12 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { t.Run("Successful Estimation", func(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Calculated avg price should be equal to the one extracted manually from the blocks. require.NoError(t, estimator.calculatePrice(ctx)) - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, uint64(multipleBlocksAvg), estimator.BaseComputeUnitPrice()) }) @@ -233,7 +233,7 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) tmpMin := uint64(multipleBlocksAvg) + 100 // Set min higher than the avg price - setupConfigMock(cfg, defaultPrice, tmpMin, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, tmpMin, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Compute unit price should be floored at min @@ -246,13 +246,13 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { // Setup cfg := cfgmock.NewConfig(t) tmpMax := uint64(multipleBlocksAvg) - 100 // Set tmpMax lower than the avg price - setupConfigMock(cfg, defaultPrice, min, tmpMax, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Compute unit price should be capped at max require.NoError(t, estimator.calculatePrice(ctx), "Failed to calculate price with price above max") cfg.On("ComputeUnitPriceMax").Return(tmpMax) - cfg.On("ComputeUnitPriceMin").Return(min) + cfg.On("ComputeUnitPriceMin").Return(minPrice) assert.Equal(t, tmpMax, estimator.BaseComputeUnitPrice(), "Price should be capped at max") }) @@ -264,12 +264,12 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { return nil, fmt.Errorf("fail client load") }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) estimator := initializeEstimator(ctx, t, rwFailLoader, cfg, logger.Test(t)) // Price should remain unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when getting client fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice()) }) @@ -280,13 +280,13 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("SlotHeight", mock.Anything).Return(uint64(0), fmt.Errorf("failed to get current slot")) // Mock SlotHeight returning error estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Price should remain unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when getting current slot fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice()) }) @@ -297,13 +297,13 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("SlotHeight", mock.Anything).Return(depth-1, nil) // Mock SlotHeight returning less than desiredBlockCount estimator := initializeEstimator(ctx, t, rwLoader, cfg, logger.Test(t)) // Price should remain unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when current slot is less than desired block count") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice()) }) @@ -314,7 +314,7 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("SlotHeight", mock.Anything).Return(testSlots[len(testSlots)-1], nil) rw.On("GetBlocksWithLimit", mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("failed to get blocks with limit")) // Mock GetBlocksWithLimit returning error @@ -322,7 +322,7 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { // Price should remain unchanged require.Error(t, estimator.calculatePrice(ctx), "Expected error when getting blocks with limit fails") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice()) }) @@ -333,7 +333,7 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { return rw, nil }) cfg := cfgmock.NewConfig(t) - setupConfigMock(cfg, defaultPrice, min, max, pollPeriod, depth) + setupConfigMock(cfg, defaultPrice, minPrice, pollPeriod, depth) rw.On("SlotHeight", mock.Anything).Return(testSlots[len(testSlots)-1], nil) emptyBlocks := rpc.BlocksResult{} // No blocks with compute unit prices rw.On("GetBlocksWithLimit", mock.Anything, mock.Anything, mock.Anything). @@ -342,15 +342,15 @@ func TestBlockHistoryEstimator_MultipleBlocks(t *testing.T) { // Price should remain unchanged require.EqualError(t, estimator.calculatePrice(ctx), errNoComputeUnitPriceCollected.Error(), "Expected error when no compute unit prices are collected") - cfg.On("ComputeUnitPriceMax").Return(max) + cfg.On("ComputeUnitPriceMax").Return(maxPrice) assert.Equal(t, defaultPrice, estimator.BaseComputeUnitPrice()) }) } // setupConfigMock configures the Config mock with necessary return values. -func setupConfigMock(cfg *cfgmock.Config, defaultPrice uint64, min, max uint64, pollPeriod time.Duration, depth uint64) { +func setupConfigMock(cfg *cfgmock.Config, defaultPrice uint64, minPrice uint64, pollPeriod time.Duration, depth uint64) { cfg.On("ComputeUnitPriceDefault").Return(defaultPrice).Once() - cfg.On("ComputeUnitPriceMin").Return(min).Once() + cfg.On("ComputeUnitPriceMin").Return(minPrice).Once() cfg.On("BlockHistoryPollPeriod").Return(pollPeriod).Once() cfg.On("BlockHistorySize").Return(depth) } diff --git a/pkg/solana/fees/fixed_price.go b/pkg/solana/fees/fixed_price.go index 3a748c301..a8c4cdee3 100644 --- a/pkg/solana/fees/fixed_price.go +++ b/pkg/solana/fees/fixed_price.go @@ -14,10 +14,10 @@ type fixedPriceEstimator struct { } func NewFixedPriceEstimator(cfg config.Config) (Estimator, error) { - defaultPrice, min, max := cfg.ComputeUnitPriceDefault(), cfg.ComputeUnitPriceMin(), cfg.ComputeUnitPriceMax() + defaultPrice, minPrice, maxPrice := cfg.ComputeUnitPriceDefault(), cfg.ComputeUnitPriceMin(), cfg.ComputeUnitPriceMax() - if defaultPrice < min || defaultPrice > max { - return nil, fmt.Errorf("default price (%d) is not within the min (%d) and max (%d) price bounds", defaultPrice, min, max) + if defaultPrice < minPrice || defaultPrice > maxPrice { + return nil, fmt.Errorf("default price (%d) is not within the min (%d) and max (%d) price bounds", defaultPrice, minPrice, maxPrice) } return &fixedPriceEstimator{ diff --git a/pkg/solana/fees/utils.go b/pkg/solana/fees/utils.go index 652fc5039..487928231 100644 --- a/pkg/solana/fees/utils.go +++ b/pkg/solana/fees/utils.go @@ -8,7 +8,7 @@ import ( ) // returns new fee based on number of times bumped -func CalculateFee(base, max, min uint64, count uint) uint64 { +func CalculateFee(base, maxFee, minFee uint64, count uint) uint64 { amount := base for i := uint(0); i < count; i++ { @@ -18,7 +18,7 @@ func CalculateFee(base, max, min uint64, count uint) uint64 { next := amount + amount if next <= amount { // overflowed - amount = max + amount = maxFee break } amount = next @@ -26,11 +26,11 @@ func CalculateFee(base, max, min uint64, count uint) uint64 { } // respect bounds - if amount < min { - return min + if amount < minFee { + return minFee } - if amount > max { - return max + if amount > maxFee { + return maxFee } return amount } diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 4ca6fee5b..2239ed608 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -7,6 +7,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" ) var _ ORM = (*DSORM)(nil) @@ -200,3 +201,27 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address Public } return logs, nil } + +func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, limitAndSort query.LimitAndSort, _ string) ([]Log, error) { + qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter, limitAndSort) + if err != nil { + return nil, err + } + + values, err := args.toArgs() + if err != nil { + return nil, err + } + + query, sqlArgs, err := o.ds.BindNamed(qs, values) + if err != nil { + return nil, err + } + + var logs []Log + if err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...); err != nil { + return nil, err + } + + return logs, nil +} diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index be97a4f48..fcb3a8da9 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -1,6 +1,414 @@ package logpoller +import ( + "errors" + "fmt" + "math" + "strconv" + "strings" + "time" + + "github.com/gagliardetto/solana-go" + + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" +) + +const ( + blockFieldName = "block_number" + chainIDFieldName = "chain_id" + timestampFieldName = "block_timestamp" + txHashFieldName = "tx_hash" + addressFieldName = "address" + eventSigFieldName = "event_sig" + defaultSort = "block_number ASC, log_index ASC" +) + var ( - logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", "expires_at", "sequence_num"} - filterFields = [...]string{"id", "name", "address", "event_name", "event_sig", "starting_block", "event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted", "is_backfilled"} + ErrInvalidComparator = errors.New("invalid comparison operator") + ErrInvalidConfidence = errors.New("invalid confidence level; solana only supports finalized") + ErrInvalidCursorDir = errors.New("invalid cursor direction") + ErrInvalidCursorFormat = errors.New("invalid cursor format") + ErrInvalidSortDir = errors.New("invalid sort direction") + ErrInvalidSortType = errors.New("invalid sort by type") + + logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", + "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", + "expires_at", "sequence_num"} + + filterFields = [...]string{"id", "name", "address", "event_name", "event_sig", "starting_block", + "event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted", "is_backfilled"} ) + +// The parser builds SQL expressions piece by piece for each Accept function call and resets the error and expression +// values after every call. +type pgDSLParser struct { + args *queryArgs + + // transient properties expected to be set and reset with every expression + expression string + err error +} + +var _ primitives.Visitor = (*pgDSLParser)(nil) + +func (v *pgDSLParser) Comparator(_ primitives.Comparator) {} + +func (v *pgDSLParser) Block(prim primitives.Block) { + cmp, err := cmpOpToString(prim.Operator) + if err != nil { + v.err = err + + return + } + + v.expression = fmt.Sprintf( + "%s %s :%s", + blockFieldName, + cmp, + v.args.withIndexedField(blockFieldName, prim.Block), + ) +} + +func (v *pgDSLParser) Confidence(prim primitives.Confidence) { + switch prim.ConfidenceLevel { + case primitives.Finalized, primitives.Unconfirmed: + // solana LogPoller will only use and store finalized logs + // to ensure x-chain compatibility, do nothing and return no error + // confidence in solana is effectively a noop + return + default: + // still return an error for invalid confidence levels + v.err = fmt.Errorf("%w: %s", ErrInvalidConfidence, prim.ConfidenceLevel) + + return + } +} + +func (v *pgDSLParser) Timestamp(prim primitives.Timestamp) { + cmp, err := cmpOpToString(prim.Operator) + if err != nil { + v.err = err + + return + } + + tm := int64(prim.Timestamp) //nolint:gosec // disable G115 + if prim.Timestamp > math.MaxInt64 { + tm = 0 + } + + v.expression = fmt.Sprintf( + "%s %s :%s", + timestampFieldName, + cmp, + v.args.withIndexedField(timestampFieldName, time.Unix(tm, 0)), + ) +} + +func (v *pgDSLParser) TxHash(prim primitives.TxHash) { + txHash, err := solana.PublicKeyFromBase58(prim.TxHash) + if err != nil { + v.err = err + + return + } + + v.expression = fmt.Sprintf( + "%s = :%s", + txHashFieldName, + v.args.withIndexedField(txHashFieldName, PublicKey(txHash)), + ) +} + +func (v *pgDSLParser) VisitAddressFilter(p *addressFilter) { + v.expression = fmt.Sprintf( + "%s = :%s", + addressFieldName, + v.args.withIndexedField(addressFieldName, p.address), + ) +} + +func (v *pgDSLParser) VisitEventSigFilter(p *eventSigFilter) { + v.expression = fmt.Sprintf( + "%s = :%s", + eventSigFieldName, + v.args.withIndexedField(eventSigFieldName, p.eventSig), + ) +} + +func (v *pgDSLParser) buildQuery( + chainID string, + expressions []query.Expression, + limiter query.LimitAndSort, +) (string, *queryArgs, error) { + // reset transient properties + v.args = newQueryArgs(chainID) + v.expression = "" + v.err = nil + + // build the query string + clauses := []string{logsQuery("")} + + where, err := v.whereClause(expressions, limiter) + if err != nil { + return "", nil, err + } + + clauses = append(clauses, where) + + order, err := v.orderClause(limiter) + if err != nil { + return "", nil, err + } + + if len(order) > 0 { + clauses = append(clauses, order) + } + + limit := v.limitClause(limiter) + if len(limit) > 0 { + clauses = append(clauses, limit) + } + + return strings.Join(clauses, " "), v.args, nil +} + +func (v *pgDSLParser) whereClause(expressions []query.Expression, limiter query.LimitAndSort) (string, error) { + segment := fmt.Sprintf("WHERE %s = :chain_id", chainIDFieldName) + + if len(expressions) > 0 { + exp, err := v.combineExpressions(expressions, query.AND) + if err != nil { + return "", err + } + + if exp != "" { + segment = fmt.Sprintf("%s AND %s", segment, exp) + } + } + + if limiter.HasCursorLimit() { + var op string + switch limiter.Limit.CursorDirection { + case query.CursorFollowing: + op = ">" + case query.CursorPrevious: + op = "<" + default: + return "", ErrInvalidCursorDir + } + + block, logIdx, _, err := valuesFromCursor(limiter.Limit.Cursor) + if err != nil { + return "", err + } + + segment = fmt.Sprintf("%s AND (block_number %s :cursor_block_number OR (block_number = :cursor_block_number AND log_index %s :cursor_log_index))", segment, op, op) + + v.args.withField("cursor_block_number", block). + withField("cursor_log_index", logIdx) + } + + return segment, nil +} + +func (v *pgDSLParser) orderClause(limiter query.LimitAndSort) (string, error) { + sorting := limiter.SortBy + + if limiter.HasCursorLimit() && !limiter.HasSequenceSort() { + var dir query.SortDirection + + switch limiter.Limit.CursorDirection { + case query.CursorFollowing: + dir = query.Asc + case query.CursorPrevious: + dir = query.Desc + default: + return "", ErrInvalidCursorDir + } + + sorting = append(sorting, query.NewSortBySequence(dir)) + } + + if len(sorting) == 0 { + return fmt.Sprintf("ORDER BY %s", defaultSort), nil + } + + sort := make([]string, len(sorting)) + + for idx, sorted := range sorting { + var name string + + order, err := orderToString(sorted.GetDirection()) + if err != nil { + return "", err + } + + switch sorted.(type) { + case query.SortByBlock: + name = blockFieldName + case query.SortBySequence: + sort[idx] = fmt.Sprintf("block_number %s, log_index %s, tx_hash %s", order, order, order) + + continue + case query.SortByTimestamp: + name = timestampFieldName + default: + return "", fmt.Errorf("%w: %T", ErrInvalidSortType, sorted) + } + + sort[idx] = fmt.Sprintf("%s %s", name, order) + } + + return fmt.Sprintf("ORDER BY %s", strings.Join(sort, ", ")), nil +} + +func (v *pgDSLParser) limitClause(limiter query.LimitAndSort) string { + if !limiter.HasCursorLimit() && limiter.Limit.Count == 0 { + return "" + } + + return fmt.Sprintf("LIMIT %d", limiter.Limit.Count) +} + +func (v *pgDSLParser) combineExpressions(expressions []query.Expression, op query.BoolOperator) (string, error) { + clauses := make([]string, 0, len(expressions)) + + for _, exp := range expressions { + if exp.IsPrimitive() { + exp.Primitive.Accept(v) + + clause, err := v.getLastExpression() + if err != nil { + return "", err + } + + if clause != "" { + clauses = append(clauses, clause) + } + } else { + clause, err := v.combineExpressions(exp.BoolExpression.Expressions, exp.BoolExpression.BoolOperator) + if err != nil { + return "", err + } + + if clause != "" { + clauses = append(clauses, clause) + } + } + } + + if len(clauses) == 0 { + return "", nil + } + + output := strings.Join(clauses, fmt.Sprintf(" %s ", op.String())) + + if len(clauses) > 1 { + output = fmt.Sprintf("(%s)", output) + } + + return output, nil +} + +func (v *pgDSLParser) getLastExpression() (string, error) { + exp := v.expression + err := v.err + + v.expression = "" + v.err = nil + + return exp, err +} + +func cmpOpToString(op primitives.ComparisonOperator) (string, error) { + switch op { + case primitives.Eq: + return "=", nil + case primitives.Neq: + return "!=", nil + case primitives.Gt: + return ">", nil + case primitives.Gte: + return ">=", nil + case primitives.Lt: + return "<", nil + case primitives.Lte: + return "<=", nil + default: + return "", ErrInvalidComparator + } +} + +// ensure valuesFromCursor remains consistent with the function above that creates a cursor +func valuesFromCursor(cursor string) (int64, int, []byte, error) { + partCount := 3 + + parts := strings.Split(cursor, "-") + if len(parts) != partCount { + return 0, 0, nil, fmt.Errorf("%w: must be composed as block-logindex-txHash", ErrInvalidCursorFormat) + } + + block, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, 0, nil, fmt.Errorf("%w: block number not parsable as int64", ErrInvalidCursorFormat) + } + + logIdx, err := strconv.ParseInt(parts[1], 10, 32) + if err != nil { + return 0, 0, nil, fmt.Errorf("%w: log index not parsable as int", ErrInvalidCursorFormat) + } + + txHash, err := solana.PublicKeyFromBase58(parts[2]) + if err != nil { + return 0, 0, nil, fmt.Errorf("%w: invalid transaction hash: %s", ErrInvalidCursorFormat, err.Error()) + } + + return block, int(logIdx), txHash.Bytes(), nil +} + +func orderToString(dir query.SortDirection) (string, error) { + switch dir { + case query.Asc: + return "ASC", nil + case query.Desc: + return "DESC", nil + default: + return "", ErrInvalidSortDir + } +} + +type addressFilter struct { + address solana.PublicKey +} + +func NewAddressFilter(address solana.PublicKey) query.Expression { + return query.Expression{ + Primitive: &addressFilter{address: address}, + } +} + +func (f *addressFilter) Accept(visitor primitives.Visitor) { + switch v := visitor.(type) { + case *pgDSLParser: + v.VisitAddressFilter(f) + } +} + +type eventSigFilter struct { + eventSig []byte +} + +func NewEventSigFilter(sig []byte) query.Expression { + return query.Expression{ + Primitive: &eventSigFilter{eventSig: sig}, + } +} + +func (f *eventSigFilter) Accept(visitor primitives.Visitor) { + switch v := visitor.(type) { + case *pgDSLParser: + v.VisitEventSigFilter(f) + } +} diff --git a/pkg/solana/logpoller/parser_test.go b/pkg/solana/logpoller/parser_test.go new file mode 100644 index 000000000..96d2d8656 --- /dev/null +++ b/pkg/solana/logpoller/parser_test.go @@ -0,0 +1,278 @@ +package logpoller + +import ( + "crypto/rand" + "fmt" + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" +) + +var ( + chainID = "chain" + txHash = "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4" +) + +func assertArgs(t *testing.T, args *queryArgs, numVals int) { + values, err := args.toArgs() + + assert.Len(t, values, numVals) + assert.NoError(t, err) +} + +func TestDSLParser(t *testing.T) { + t.Parallel() + + t.Run("query with no filters no order and no limit", func(t *testing.T) { + t.Parallel() + + parser := &pgDSLParser{} + expressions := []query.Expression{} + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + + require.NoError(t, err) + assert.Equal(t, logsQuery(" WHERE chain_id = :chain_id ORDER BY "+defaultSort), result) + + assertArgs(t, args, 1) + }) + + t.Run("query with cursor and no order by", func(t *testing.T) { + t.Parallel() + + var pk solana.PublicKey + + _, _ = rand.Read(pk[:]) + + parser := &pgDSLParser{} + expressions := []query.Expression{ + NewAddressFilter(pk), + NewEventSigFilter([]byte("test")), + query.Confidence(primitives.Unconfirmed), + } + limiter := query.NewLimitAndSort(query.CursorLimit(fmt.Sprintf("10-5-%s", txHash), query.CursorFollowing, 20)) + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "AND (block_number > :cursor_block_number OR (block_number = :cursor_block_number " + + "AND log_index > :cursor_log_index)) " + + "ORDER BY block_number ASC, log_index ASC, tx_hash ASC LIMIT 20") + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 5) + }) + + t.Run("query with limit and no order by", func(t *testing.T) { + t.Parallel() + + var pk solana.PublicKey + + _, _ = rand.Read(pk[:]) + + parser := &pgDSLParser{} + expressions := []query.Expression{ + NewAddressFilter(pk), + NewEventSigFilter([]byte("test")), + } + limiter := query.NewLimitAndSort(query.CountLimit(20)) + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "AND (address = :address_0 AND event_sig = :event_sig_0) " + + "ORDER BY " + defaultSort + " " + + "LIMIT 20") + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 3) + }) + + t.Run("query with order by sequence no cursor no limit", func(t *testing.T) { + t.Parallel() + + parser := &pgDSLParser{} + expressions := []query.Expression{} + limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortBySequence(query.Desc)) + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC") + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 1) + }) + + t.Run("query with multiple order by no limit", func(t *testing.T) { + t.Parallel() + + parser := &pgDSLParser{} + expressions := []query.Expression{} + limiter := query.NewLimitAndSort(query.Limit{}, query.NewSortByBlock(query.Asc), query.NewSortByTimestamp(query.Desc)) + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "ORDER BY block_number ASC, block_timestamp DESC") + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 1) + }) + + t.Run("basic query with default primitives no order by and cursor", func(t *testing.T) { + t.Parallel() + + parser := &pgDSLParser{} + expressions := []query.Expression{ + query.Timestamp(10, primitives.Eq), + query.TxHash(txHash), + query.Block("99", primitives.Neq), + query.Confidence(primitives.Finalized), + } + limiter := query.NewLimitAndSort(query.CursorLimit(fmt.Sprintf("10-20-%s", txHash), query.CursorPrevious, 20)) + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "AND (block_timestamp = :block_timestamp_0 AND tx_hash = :tx_hash_0 " + + "AND block_number != :block_number_0) " + + "AND (block_number < :cursor_block_number OR (block_number = :cursor_block_number " + + "AND log_index < :cursor_log_index)) " + + "ORDER BY block_number DESC, log_index DESC, tx_hash DESC LIMIT 20") + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 6) + }) + + t.Run("query for finality", func(t *testing.T) { + t.Parallel() + + t.Run("finalized", func(t *testing.T) { + parser := &pgDSLParser{} + expressions := []query.Expression{query.Confidence(primitives.Finalized)} + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "ORDER BY " + defaultSort) + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 1) + }) + + t.Run("unconfirmed", func(t *testing.T) { + parser := &pgDSLParser{} + expressions := []query.Expression{query.Confidence(primitives.Unconfirmed)} + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "ORDER BY " + defaultSort) + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 1) + }) + }) + + // nested query -> a & (b || c) + t.Run("nested query", func(t *testing.T) { + t.Parallel() + + parser := &pgDSLParser{} + + expressions := []query.Expression{ + {BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{ + query.Timestamp(10, primitives.Gte), + {BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{ + query.TxHash(txHash), + query.Confidence(primitives.Unconfirmed), + }, + BoolOperator: query.OR, + }}, + }, + BoolOperator: query.AND, + }}, + } + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "AND (block_timestamp >= :block_timestamp_0 AND tx_hash = :tx_hash_0) " + + "ORDER BY " + defaultSort) + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 3) + }) + + // deep nested query -> a & (b || (c & d)) + t.Run("nested query deep", func(t *testing.T) { + t.Parallel() + + sigFilter := NewEventSigFilter([]byte("test")) + parser := &pgDSLParser{} + + expressions := []query.Expression{ + {BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{ + query.Timestamp(10, primitives.Eq), + {BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{ + query.TxHash(txHash), + {BoolExpression: query.BoolExpression{ + Expressions: []query.Expression{ + query.Confidence(primitives.Unconfirmed), + sigFilter, + }, + BoolOperator: query.AND, + }}, + }, + BoolOperator: query.OR, + }}, + }, + BoolOperator: query.AND, + }}, + } + limiter := query.LimitAndSort{} + + result, args, err := parser.buildQuery(chainID, expressions, limiter) + expected := logsQuery( + " WHERE chain_id = :chain_id " + + "AND (block_timestamp = :block_timestamp_0 " + + "AND (tx_hash = :tx_hash_0 OR event_sig = :event_sig_0)) " + + "ORDER BY " + defaultSort) + + require.NoError(t, err) + assert.Equal(t, expected, result) + + assertArgs(t, args, 4) + }) +} diff --git a/pkg/solana/logpoller/query.go b/pkg/solana/logpoller/query.go index 65a792449..ba310c6b5 100644 --- a/pkg/solana/logpoller/query.go +++ b/pkg/solana/logpoller/query.go @@ -31,6 +31,12 @@ func (q *queryArgs) withField(fieldName string, value any) *queryArgs { return args } +func (q *queryArgs) withIndexedField(fieldName string, value any) string { + field, _ := q.withIndexableField(fieldName, value, true) + + return field +} + func (q *queryArgs) withIndexableField(fieldName string, value any, addIndex bool) (string, *queryArgs) { if addIndex { idx := q.nextIdx(fieldName) diff --git a/pkg/solana/logpoller/worker.go b/pkg/solana/logpoller/worker.go index 0e7d31df0..98226452f 100644 --- a/pkg/solana/logpoller/worker.go +++ b/pkg/solana/logpoller/worker.go @@ -300,8 +300,8 @@ type queue[T any] struct { values []T } -func newQueue[T any](len uint) *queue[T] { - values := make([]T, len) +func newQueue[T any](maxLen uint) *queue[T] { + values := make([]T, maxLen) return &queue[T]{ values: values, diff --git a/pkg/solana/report_test.go b/pkg/solana/report_test.go index eab16e26b..0c419eba3 100644 --- a/pkg/solana/report_test.go +++ b/pkg/solana/report_test.go @@ -142,11 +142,11 @@ func TestMedianFromReport(t *testing.T) { tt = append(tt, test) } - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { + for idx := range tt { + t.Run(tt[idx].name, func(t *testing.T) { ctx := tests.Context(t) var pos []median.ParsedAttributedObservation - for i, obs := range tc.obs { + for i, obs := range tt[idx].obs { pos = append(pos, median.ParsedAttributedObservation{ Value: obs, JuelsPerFeeCoin: obs, @@ -155,15 +155,15 @@ func TestMedianFromReport(t *testing.T) { } report, err := cdc.BuildReport(ctx, pos) require.NoError(t, err) - max, err := cdc.MaxReportLength(ctx, len(tc.obs)) + maxLen, err := cdc.MaxReportLength(ctx, len(tt[idx].obs)) require.NoError(t, err) - assert.Equal(t, len(report), max) + assert.Equal(t, len(report), maxLen) med, err := cdc.MedianFromReport(ctx, report) require.NoError(t, err) - assert.Equal(t, tc.expectedMedian.String(), med.String()) + assert.Equal(t, tt[idx].expectedMedian.String(), med.String()) count, err := cdc.ObserversCountFromReport(report) require.NoError(t, err) - assert.Equal(t, len(tc.obs), int(count)) + assert.Equal(t, len(tt[idx].obs), int(count)) }) } } diff --git a/pkg/solana/txm/pendingtx_test.go b/pkg/solana/txm/pendingtx_test.go index f9e956513..183f00c8e 100644 --- a/pkg/solana/txm/pendingtx_test.go +++ b/pkg/solana/txm/pendingtx_test.go @@ -1305,16 +1305,16 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for idx := range tests { + t.Run(tests[idx].name, func(t *testing.T) { // Initialize a new PendingTxContext ctx := newPendingTxContext() // Setup the test case - tt.setup(t, ctx) + tests[idx].setup(t, ctx) // Execute the function under test - result := ctx.ListAllExpiredBroadcastedTxs(tt.currBlockHeight) + result := ctx.ListAllExpiredBroadcastedTxs(tests[idx].currBlockHeight) // Extract the IDs from the result var resultIDs []string @@ -1323,7 +1323,7 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) { } // Assert that the expected IDs match the result IDs (order does not matter) - assert.ElementsMatch(t, tt.expectedTxIDs, resultIDs) + assert.ElementsMatch(t, tests[idx].expectedTxIDs, resultIDs) }) } }