Skip to content

Commit

Permalink
MERC-6299 Skip telemetry for market status bridges (#14415)
Browse files Browse the repository at this point in the history
* Skip telemetry for market status bridges

* Update changeset

* Remove field
  • Loading branch information
martin-cll authored Sep 20, 2024
1 parent 22a8c99 commit d2d9568
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changeset/slow-lizards-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Skip telemetry for market-status bridges #internal
45 changes: 33 additions & 12 deletions core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,14 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f

eaTelem.BridgeTaskRunStartedTimestamp = trr.CreatedAt.UnixMilli()
eaTelem.BridgeTaskRunEndedTimestamp = trr.FinishedAt.Time.UnixMilli()
eaTelem.AssetSymbol = getAssetSymbolFromRequestData(bridgeTask.RequestData)

parsedBridgeData := parseBridgeRequestData(bridgeTask.RequestData, feedVersion)
if parsedBridgeData.IsMarketStatus {
// Only collect telemetry for pricing bridges.
continue
}

eaTelem.AssetSymbol = parsedBridgeData.AssetSymbol

eaTelemetryValues = append(eaTelemetryValues, eaTelem)
}
Expand Down Expand Up @@ -477,12 +484,19 @@ func parseTelemetryAttributes(a string) (telemetryAttributes, error) {
return *attrs, nil
}

// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair
func getAssetSymbolFromRequestData(requestData string) string {
type bridgeRequestData struct {
AssetSymbol string
IsMarketStatus bool
}

// parseRequestData parses the requestData of the bridge.
func parseBridgeRequestData(requestData string, mercuryVersion mercuryutils.FeedVersion) bridgeRequestData {
type reqDataPayload struct {
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
Endpoint *string `json:"endpoint"`
To *string `json:"to"`
From *string `json:"from"`
Address *string `json:"address"` // used for view function ea only
Market *string `json:"market"` // used for market status ea only
}
type reqData struct {
Data reqDataPayload `json:"data"`
Expand All @@ -491,18 +505,25 @@ func getAssetSymbolFromRequestData(requestData string) string {
rd := &reqData{}
err := json.Unmarshal([]byte(requestData), rd)
if err != nil {
return ""
return bridgeRequestData{}
}

if mercuryVersion == 4 && ((rd.Data.Endpoint != nil && *rd.Data.Endpoint == "market-status") || (rd.Data.Market != nil && *rd.Data.Market != "")) {
return bridgeRequestData{
AssetSymbol: *rd.Data.Market,
IsMarketStatus: true,
}
}

if rd.Data.From != nil && rd.Data.To != nil {
return *rd.Data.From + "/" + *rd.Data.To
return bridgeRequestData{AssetSymbol: *rd.Data.From + "/" + *rd.Data.To}
}

if rd.Data.Address != nil {
return *rd.Data.Address
return bridgeRequestData{AssetSymbol: *rd.Data.Address}
}

return ""
return bridgeRequestData{}
}

// ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent
Expand Down Expand Up @@ -616,8 +637,8 @@ func getPricesFromResultsByOrder(lggr logger.Logger, startTask pipeline.TaskRunR
benchmarkPrice = parsePriceFromTask(lggr, *benchmarkPriceTask)
}

// mercury version 2 only supports benchmarkPrice
if mercuryVersion == 2 {
// mercury versions 2 and 4 only supports benchmarkPrice
if mercuryVersion == 2 || mercuryVersion == 4 {
return benchmarkPrice, 0, 0
}

Expand Down
142 changes: 134 additions & 8 deletions core/services/ocrcommon/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/shopspring/decimal"
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types/mercury"
mercuryv1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1"
mercuryv2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2"

mercuryv4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
Expand Down Expand Up @@ -274,7 +273,6 @@ func TestSendEATelemetry(t *testing.T) {
expectedMessage, _ := proto.Marshal(&expectedTelemetry)
wg.Wait()
assert.Equal(t, expectedMessage, sentMessage)
//enhancedTelemService.StopOnce("EnhancedTelemetryService", func() error { return nil })
doneCh <- struct{}{}
}

Expand Down Expand Up @@ -446,6 +444,45 @@ var trrsMercuryV2 = pipeline.TaskRunResults{
},
}

var trrsMercuryV4 = pipeline.TaskRunResults{
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
Name: "link-usd-test-bridge-v2",
BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0),
RequestData: `{"data":{"to":"LINK","from":"USD"}}`,
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(1, "ds1_benchmark", nil, nil, 1),
},
Result: pipeline.Result{
Value: 123456.123456,
},
},
pipeline.TaskRunResult{
Task: &pipeline.BridgeTask{
Name: "market-status-bridge",
BaseTask: pipeline.NewBaseTask(2, "ds2", nil, nil, 2),
RequestData: `{"data":{"endpoint":"market-status","market":"forex"}}`,
},
Result: pipeline.Result{
Value: bridgeResponse,
},
},
pipeline.TaskRunResult{
Task: &pipeline.JSONParseTask{
BaseTask: pipeline.NewBaseTask(3, "market_status", nil, nil, 3),
},
Result: pipeline.Result{
Value: 2.0,
},
},
}

func TestGetPricesFromBridgeByTelemetryField(t *testing.T) {
lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel)
// These are intentionally out of order from the "legacy" method which expects order of `benchmark, bid, ask`
Expand Down Expand Up @@ -605,12 +642,23 @@ func TestShouldCollectEnhancedTelemetryMercury(t *testing.T) {
require.Equal(t, ShouldCollectEnhancedTelemetryMercury(j), false)
}

func TestGetAssetSymbolFromRequestData(t *testing.T) {
require.Equal(t, getAssetSymbolFromRequestData(""), "")
func TestParseBridgeRequestData(t *testing.T) {
require.Equal(t, parseBridgeRequestData("", 2), bridgeRequestData{})

reqData := `{"data":{"to":"LINK","from":"USD"}}`
require.Equal(t, getAssetSymbolFromRequestData(reqData), "USD/LINK")
require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"})

reqData = `{"data":{"to":"LINK","from":"USD","market":"forex"}}`
require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"})

reqData = `{"data":{"endpoint":"market-status","market":"forex"}}`
require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "forex", IsMarketStatus: true})

reqData = `{"data":{"market":"metals"}}`
require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "metals", IsMarketStatus: true})

viewFunctionReqData := `{"data":{"address":"0x12345678", "signature": "function stEthPerToken() view returns (int256)"}}`
require.Equal(t, "0x12345678", getAssetSymbolFromRequestData(viewFunctionReqData))
require.Equal(t, parseBridgeRequestData(viewFunctionReqData, 3), bridgeRequestData{AssetSymbol: "0x12345678"})
}

func getViewFunctionTaskRunResults() pipeline.TaskRunResults {
Expand Down Expand Up @@ -1005,3 +1053,81 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) {
require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price")
chDone <- struct{}{}
}

func TestCollectMercuryEnhancedTelemetryV4(t *testing.T) {
ingressClient := mocks.NewTelemetryService(t)
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury)

sentMessageCh := make(chan []byte)
ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) {
sentMessageCh <- args[1].([]byte)
})

lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel)
chTelem := make(chan EnhancedTelemetryMercuryData, 100)
chDone := make(chan struct{})
feedID := common.HexToHash("0x0004")
e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{
chDone: chDone,
chTelem: chTelem,
job: &job.Job{
Type: job.Type(pipeline.OffchainReporting2JobType),
OCR2OracleSpec: &job.OCR2OracleSpec{
CaptureEATelemetry: true,
FeedID: &feedID,
},
},
lggr: lggr,
monitoringEndpoint: monitoringEndpoint,
}
servicetest.Run(t, &e)

chTelem <- EnhancedTelemetryMercuryData{
TaskRunResults: trrsMercuryV4,
FeedVersion: 4,
V4Observation: &mercuryv4.Observation{
BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)},
MarketStatus: mercury.ObsResult[uint32]{Val: 2},
MaxFinalizedTimestamp: mercury.ObsResult[int64]{Val: 321},
LinkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(4321)},
NativePrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(54321)},
},
RepTimestamp: types.ReportTimestamp{
ConfigDigest: types.ConfigDigest{2},
Epoch: 11,
Round: 22,
},
}

expectedPricingTelemetry := telem.EnhancedEAMercury{
DataSource: "data-source-name",
DpBenchmarkPrice: 123456.123456,
BridgeTaskRunStartedTimestamp: trrsMercuryV4[0].CreatedAt.UnixMilli(),
BridgeTaskRunEndedTimestamp: trrsMercuryV4[0].FinishedAt.Time.UnixMilli(),
ProviderRequestedTimestamp: 92233720368547760,
ProviderReceivedTimestamp: -92233720368547760,
ProviderDataStreamEstablished: 1,
ProviderIndicatedTime: -123456789,
Feed: common.HexToHash("0x0004").String(),
ObservationBenchmarkPrice: 111111,
ObservationMarketStatus: 2,
ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000",
Round: 22,
Epoch: 11,
AssetSymbol: "USD/LINK",
ObservationBenchmarkPriceString: "111111",
MaxFinalizedTimestamp: 321,
LinkPrice: 4321,
NativePrice: 54321,
Version: 4,
BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`,
}
expectedPricingMessage, _ := proto.Marshal(&expectedPricingTelemetry)
require.Equal(t, expectedPricingMessage, <-sentMessageCh)

chDone <- struct{}{}

// Verify that no other telemetry is sent.
require.Len(t, sentMessageCh, 0)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message EnhancedEAMercury {

string feed=14;

// v1+v2+v3
// v1+v2+v3+v4
int64 observation_benchmark_price=15; // This value overflows, will be reserved and removed in future versions
string observation_benchmark_price_string = 22;
// v1+v3
Expand Down

0 comments on commit d2d9568

Please sign in to comment.