diff --git a/cmd/exchange-scrapers/collector/go.mod b/cmd/exchange-scrapers/collector/go.mod index eb7a0d597..5fcc11b4d 100644 --- a/cmd/exchange-scrapers/collector/go.mod +++ b/cmd/exchange-scrapers/collector/go.mod @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector go 1.17 require ( - github.com/diadata-org/diadata v1.4.360 + github.com/diadata-org/diadata v1.4.375 github.com/segmentio/kafka-go v0.4.35 github.com/sirupsen/logrus v1.9.0 ) diff --git a/cmd/http/graphqlServer/go.mod b/cmd/http/graphqlServer/go.mod index 76eca0d06..c85f195a1 100644 --- a/cmd/http/graphqlServer/go.mod +++ b/cmd/http/graphqlServer/go.mod @@ -3,7 +3,7 @@ module graphqlServer go 1.17 require ( - github.com/diadata-org/diadata v1.4.361 + github.com/diadata-org/diadata v1.4.376 github.com/graph-gophers/graphql-go v1.1.0 github.com/sirupsen/logrus v1.8.1 ) diff --git a/cmd/http/graphqlServer/schema/quotation.graphql b/cmd/http/graphqlServer/schema/quotation.graphql index 0ce329841..bce957f6f 100644 --- a/cmd/http/graphqlServer/schema/quotation.graphql +++ b/cmd/http/graphqlServer/schema/quotation.graphql @@ -45,6 +45,7 @@ type Query { StartTime: Time EndTime: Time TradeVolumeThreshold: Float + NativeDenomination: Boolean FeedSelection: [FeedSelection!] ): [FilterPointExtended] diff --git a/internal/pkg/filtersBlockService/FilterMA.go b/internal/pkg/filtersBlockService/FilterMA.go index ad6344889..ceca7c004 100644 --- a/internal/pkg/filtersBlockService/FilterMA.go +++ b/internal/pkg/filtersBlockService/FilterMA.go @@ -13,32 +13,34 @@ import ( // FilterMA is the struct for a moving average filter implementing // the Filter interface. type FilterMA struct { - asset dia.Asset - exchange string - currentTime time.Time - prices []float64 - volumes []float64 - lastTrade dia.Trade - memory int - value float64 - modified bool - filterName string + asset dia.Asset + exchange string + currentTime time.Time + prices []float64 + volumes []float64 + lastTrade dia.Trade + memory int + value float64 + modified bool + filterName string + nativeDenomination bool //max float64 min float64 } // NewFilterMA returns a moving average filter. // @currentTime is the begin time of the filtersBlock. -func NewFilterMA(asset dia.Asset, exchange string, currentTime time.Time, memory int) *FilterMA { +func NewFilterMA(asset dia.Asset, exchange string, currentTime time.Time, memory int, nativeDenomination bool) *FilterMA { filter := &FilterMA{ - asset: asset, - exchange: exchange, - prices: []float64{}, - volumes: []float64{}, - currentTime: currentTime, - memory: memory, - filterName: "MA" + strconv.Itoa(memory), - min: -1, + asset: asset, + exchange: exchange, + prices: []float64{}, + volumes: []float64{}, + currentTime: currentTime, + memory: memory, + filterName: "MA" + strconv.Itoa(memory), + min: -1, + nativeDenomination: nativeDenomination, } return filter } @@ -88,7 +90,12 @@ func (filter *FilterMA) processDataPoint(trade dia.Trade) { filter.prices = filter.prices[0 : filter.memory-1] filter.volumes = filter.volumes[0 : filter.memory-1] } - filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + if !filter.nativeDenomination { + filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + } else { + filter.prices = append([]float64{trade.Price}, filter.prices...) + } + filter.volumes = append([]float64{trade.Volume}, filter.volumes...) } @@ -118,7 +125,11 @@ func (filter *FilterMA) finalCompute(t time.Time) float64 { } if len(filter.prices) > 0 && len(filter.volumes) > 0 { - filter.prices = []float64{filter.lastTrade.EstimatedUSDPrice} + if !filter.nativeDenomination { + filter.prices = []float64{filter.lastTrade.EstimatedUSDPrice} + } else { + filter.prices = []float64{filter.lastTrade.Price} + } filter.volumes = []float64{filter.lastTrade.Volume} } return filter.value diff --git a/internal/pkg/filtersBlockService/FilterMAIR.go b/internal/pkg/filtersBlockService/FilterMAIR.go index 77ce38e54..2ca514e2f 100644 --- a/internal/pkg/filtersBlockService/FilterMAIR.go +++ b/internal/pkg/filtersBlockService/FilterMAIR.go @@ -13,28 +13,30 @@ import ( // Outliers are eliminated using interquartile range. // see: https://en.wikipedia.org/wiki/Interquartile_range type FilterMAIR struct { - asset dia.Asset - exchange string - currentTime time.Time - prices []float64 - volumes []float64 - lastTrade dia.Trade - memory int - value float64 - filterName string - modified bool + asset dia.Asset + exchange string + currentTime time.Time + prices []float64 + volumes []float64 + lastTrade dia.Trade + memory int + value float64 + filterName string + nativeDenomination bool + modified bool } // NewFilterMAIR returns a FilterMAIR -func NewFilterMAIR(asset dia.Asset, exchange string, currentTime time.Time, memory int) *FilterMAIR { +func NewFilterMAIR(asset dia.Asset, exchange string, currentTime time.Time, memory int, nativeDenomination bool) *FilterMAIR { filter := &FilterMAIR{ - asset: asset, - exchange: exchange, - prices: []float64{}, - volumes: []float64{}, - currentTime: currentTime, - memory: memory, - filterName: "MAIR" + strconv.Itoa(memory), + asset: asset, + exchange: exchange, + prices: []float64{}, + volumes: []float64{}, + currentTime: currentTime, + memory: memory, + filterName: "MAIR" + strconv.Itoa(memory), + nativeDenomination: nativeDenomination, } return filter } @@ -84,7 +86,12 @@ func (filter *FilterMAIR) processDataPoint(trade dia.Trade) { filter.prices = filter.prices[0 : filter.memory-1] filter.volumes = filter.volumes[0 : filter.memory-1] } - filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + if !filter.nativeDenomination { + filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + } else { + filter.prices = append([]float64{trade.Price}, filter.prices...) + } + filter.volumes = append([]float64{trade.Volume}, filter.volumes...) } @@ -113,7 +120,11 @@ func (filter *FilterMAIR) finalCompute(t time.Time) float64 { filter.value = mean // Reduce the filter values to the last recorded value for the next tradesblock. if len(filter.prices) > 0 && len(filter.volumes) > 0 { - filter.prices = []float64{filter.lastTrade.EstimatedUSDPrice} + if !filter.nativeDenomination { + filter.prices = []float64{filter.lastTrade.EstimatedUSDPrice} + } else { + filter.prices = []float64{filter.lastTrade.Price} + } filter.volumes = []float64{filter.lastTrade.Volume} } return filter.value diff --git a/internal/pkg/filtersBlockService/FilterMAIR_test.go b/internal/pkg/filtersBlockService/FilterMAIR_test.go index 5ae13d1c3..de191fdcf 100644 --- a/internal/pkg/filtersBlockService/FilterMAIR_test.go +++ b/internal/pkg/filtersBlockService/FilterMAIR_test.go @@ -113,7 +113,7 @@ func TestFilterMAIRIgnore(t *testing.T) { Symbol: "XRP", Name: "XRP", } - f := NewFilterMAIR(assetXRP, "", d, filterParam) + f := NewFilterMAIR(assetXRP, "", d, filterParam, false) p := firstPrice priceIncrements := 1.0 for i := 0; i <= steps; i++ { @@ -205,7 +205,7 @@ func TestFilterMAIRAverageCleanOutliers(t *testing.T) { Symbol: "XRP", Name: "XRP", } - f := NewFilterMAIR(assetXRP, "", d, memory) + f := NewFilterMAIR(assetXRP, "", d, memory, false) for _, p := range c.samples { f.compute(dia.Trade{EstimatedUSDPrice: p, Time: d}) d = d.Add(time.Second) diff --git a/internal/pkg/filtersBlockService/FilterMEDIR.go b/internal/pkg/filtersBlockService/FilterMEDIR.go index 01965ded6..ec99a5126 100644 --- a/internal/pkg/filtersBlockService/FilterMEDIR.go +++ b/internal/pkg/filtersBlockService/FilterMEDIR.go @@ -13,26 +13,28 @@ import ( // It implements a trimmed median. Outliers are eliminated using interquartile range // see: https://en.wikipedia.org/wiki/Interquartile_range type FilterMEDIR struct { - asset dia.Asset - exchange string - currentTime time.Time - prices []float64 - lastTrade dia.Trade - memory int - value float64 - filterName string - modified bool + asset dia.Asset + exchange string + currentTime time.Time + prices []float64 + lastTrade dia.Trade + memory int + value float64 + filterName string + modified bool + nativeDenomination bool } -//NewFilterMEDIR creates a FilterMEDIR -func NewFilterMEDIR(asset dia.Asset, exchange string, currentTime time.Time, memory int) *FilterMEDIR { +// NewFilterMEDIR creates a FilterMEDIR +func NewFilterMEDIR(asset dia.Asset, exchange string, currentTime time.Time, memory int, nativeDenomination bool) *FilterMEDIR { filter := &FilterMEDIR{ - asset: asset, - exchange: exchange, - prices: []float64{}, - currentTime: currentTime, - memory: memory, - filterName: "MEDIR" + strconv.Itoa(memory), + asset: asset, + exchange: exchange, + prices: []float64{}, + currentTime: currentTime, + memory: memory, + filterName: "MEDIR" + strconv.Itoa(memory), + nativeDenomination: nativeDenomination, } return filter } @@ -62,7 +64,11 @@ func (filter *FilterMEDIR) processDataPoint(trade dia.Trade) { if len(filter.prices) >= filter.memory { filter.prices = filter.prices[0 : filter.memory-1] } - filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + if !filter.nativeDenomination { + filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + } else { + filter.prices = append([]float64{trade.Price}, filter.prices...) + } } func (filter *FilterMEDIR) finalCompute(t time.Time) float64 { diff --git a/internal/pkg/filtersBlockService/FilterMEDIR_test.go b/internal/pkg/filtersBlockService/FilterMEDIR_test.go index f2a2ba74f..5b69fcffb 100644 --- a/internal/pkg/filtersBlockService/FilterMEDIR_test.go +++ b/internal/pkg/filtersBlockService/FilterMEDIR_test.go @@ -54,7 +54,7 @@ func TestFilterMEDIRMedianCleanOutliers(t *testing.T) { Symbol: "XRP", Name: "XRP", } - f := NewFilterMEDIR(assetXRP, "", d, memory) + f := NewFilterMEDIR(assetXRP, "", d, memory, false) for _, p := range c.samples { f.compute(dia.Trade{EstimatedUSDPrice: p, Time: d}) d = d.Add(time.Second) diff --git a/internal/pkg/filtersBlockService/FilterVWAP.go b/internal/pkg/filtersBlockService/FilterVWAP.go index 0868f2304..48e64727e 100644 --- a/internal/pkg/filtersBlockService/FilterVWAP.go +++ b/internal/pkg/filtersBlockService/FilterVWAP.go @@ -11,28 +11,30 @@ import ( // FilterVWAP ... type FilterVWAP struct { - asset dia.Asset - exchange string - currentTime time.Time - prices []float64 - volumes []float64 - lastTrade dia.Trade - param int - value float64 - filterName string - modified bool + asset dia.Asset + exchange string + currentTime time.Time + prices []float64 + volumes []float64 + lastTrade dia.Trade + param int + value float64 + filterName string + nativeDenomination bool + modified bool } // NewFilterVWAP ... -func NewFilterVWAP(asset dia.Asset, exchange string, currentTime time.Time, param int) *FilterVWAP { +func NewFilterVWAP(asset dia.Asset, exchange string, currentTime time.Time, param int, nativeDenomination bool) *FilterVWAP { s := &FilterVWAP{ - asset: asset, - exchange: exchange, - prices: []float64{}, - volumes: []float64{}, - currentTime: currentTime, - param: param, - filterName: "VWAP" + strconv.Itoa(param), + asset: asset, + exchange: exchange, + prices: []float64{}, + volumes: []float64{}, + currentTime: currentTime, + param: param, + filterName: "VWAP" + strconv.Itoa(param), + nativeDenomination: nativeDenomination, } return s } @@ -62,7 +64,11 @@ func (filter *FilterVWAP) fill(trade dia.Trade) { } func (filter *FilterVWAP) processDataPoint(trade dia.Trade) { - filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + if !filter.nativeDenomination { + filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + } else { + filter.prices = append([]float64{trade.Price}, filter.prices...) + } filter.volumes = append([]float64{trade.Volume}, filter.volumes...) } diff --git a/internal/pkg/filtersBlockService/FilterVWAPIR.go b/internal/pkg/filtersBlockService/FilterVWAPIR.go index 874bff55d..7dd506786 100644 --- a/internal/pkg/filtersBlockService/FilterVWAPIR.go +++ b/internal/pkg/filtersBlockService/FilterVWAPIR.go @@ -12,27 +12,29 @@ import ( // FilterVWAP ... type FilterVWAPIR struct { - exchange string - currentTime time.Time - prices []float64 - volumes []float64 - lastTrade dia.Trade - param int - value float64 - modified bool - filterName string - asset dia.Asset + exchange string + currentTime time.Time + prices []float64 + volumes []float64 + lastTrade dia.Trade + param int + value float64 + modified bool + filterName string + asset dia.Asset + nativeDenomination bool } -func NewFilterVWAPIR(asset dia.Asset, exchange string, currentTime time.Time, param int) *FilterVWAPIR { +func NewFilterVWAPIR(asset dia.Asset, exchange string, currentTime time.Time, param int, nativeDenomination bool) *FilterVWAPIR { s := &FilterVWAPIR{ - asset: asset, - exchange: exchange, - prices: []float64{}, - volumes: []float64{}, - currentTime: currentTime, - param: param, - filterName: "VWAPIR" + strconv.Itoa(param), + asset: asset, + exchange: exchange, + prices: []float64{}, + volumes: []float64{}, + currentTime: currentTime, + param: param, + filterName: "VWAPIR" + strconv.Itoa(param), + nativeDenomination: nativeDenomination, } return s } @@ -61,7 +63,11 @@ func (filter *FilterVWAPIR) fill(trade dia.Trade) { } func (filter *FilterVWAPIR) processDataPoint(trade dia.Trade) { - filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + if !filter.nativeDenomination { + filter.prices = append([]float64{trade.EstimatedUSDPrice}, filter.prices...) + } else { + filter.prices = append([]float64{trade.Price}, filter.prices...) + } filter.volumes = append([]float64{trade.Volume}, filter.volumes...) } diff --git a/internal/pkg/filtersBlockService/FilterVWAPIR_test.go b/internal/pkg/filtersBlockService/FilterVWAPIR_test.go index 0f76b90bf..a6a6cc724 100644 --- a/internal/pkg/filtersBlockService/FilterVWAPIR_test.go +++ b/internal/pkg/filtersBlockService/FilterVWAPIR_test.go @@ -9,7 +9,7 @@ import ( func TestVWAPIR(t *testing.T) { var filterPoints []dia.FilterPoint trades := getTrades() - maFilter := NewFilterVWAPIR(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds) + maFilter := NewFilterVWAPIR(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds, false) for _, trade := range trades { @@ -32,7 +32,7 @@ func BenchmarkVWAPIR(b *testing.B) { trades := getTrades() for n := 0; n < b.N; n++ { - maFilter := NewFilterVWAPIR(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds) + maFilter := NewFilterVWAPIR(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds, false) for _, trade := range trades { diff --git a/internal/pkg/filtersBlockService/FilterVWAP_test.go b/internal/pkg/filtersBlockService/FilterVWAP_test.go index 166ae68e3..e3cbc7e34 100644 --- a/internal/pkg/filtersBlockService/FilterVWAP_test.go +++ b/internal/pkg/filtersBlockService/FilterVWAP_test.go @@ -17,7 +17,7 @@ func getTrades() (trades []dia.Trade) { func TestVWAP(t *testing.T) { var filterPoints []dia.FilterPoint trades := getTrades() - maFilter := NewFilterVWAP(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds) + maFilter := NewFilterVWAP(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds, false) totalVolume := 0.0 totalPrice := 0.0 @@ -47,7 +47,7 @@ func BenchmarkVWAP(b *testing.B) { trades := getTrades() for n := 0; n < b.N; n++ { - maFilter := NewFilterVWAP(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds) + maFilter := NewFilterVWAP(dia.Asset{}, "Binance", trades[len(trades)-1].Time, dia.BlockSizeSeconds, false) for _, trade := range trades { diff --git a/internal/pkg/filtersBlockService/FiltersBlockService.go b/internal/pkg/filtersBlockService/FiltersBlockService.go index 890bdfa2b..fecd6bf06 100644 --- a/internal/pkg/filtersBlockService/FiltersBlockService.go +++ b/internal/pkg/filtersBlockService/FiltersBlockService.go @@ -184,10 +184,10 @@ func (s *FiltersBlockService) createFilters(asset dia.Asset, exchange string, Be _, ok := s.filters[fa] if !ok { s.filters[fa] = []Filter{ - NewFilterMA(asset, exchange, BeginTime, dia.BlockSizeSeconds), - NewFilterMAIR(asset, exchange, BeginTime, dia.BlockSizeSeconds), - NewFilterMEDIR(asset, exchange, BeginTime, dia.BlockSizeSeconds), - NewFilterVWAPIR(asset, exchange, BeginTime, dia.BlockSizeSeconds), + NewFilterMA(asset, exchange, BeginTime, dia.BlockSizeSeconds, false), + NewFilterMAIR(asset, exchange, BeginTime, dia.BlockSizeSeconds, false), + NewFilterMEDIR(asset, exchange, BeginTime, dia.BlockSizeSeconds, false), + NewFilterVWAPIR(asset, exchange, BeginTime, dia.BlockSizeSeconds, false), NewFilterVOL(asset, exchange, dia.BlockSizeSeconds), NewFilterCOUNT(asset, exchange, dia.BlockSizeSeconds), NewFilterTLT(asset, exchange), diff --git a/pkg/dia/helpers/queryHelper/filters.go b/pkg/dia/helpers/queryHelper/filters.go index ee9a35d23..85393414e 100644 --- a/pkg/dia/helpers/queryHelper/filters.go +++ b/pkg/dia/helpers/queryHelper/filters.go @@ -16,7 +16,7 @@ func FilterMA(tradeBlocks []Block, asset dia.Asset, blockSize int) (filterPoints metadata = dia.NewFilterPointMetadata() for _, block := range tradeBlocks { if len(block.Trades) > 0 { - maFilter := filters.NewFilterMA(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + maFilter := filters.NewFilterMA(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, false) for _, trade := range block.Trades { maFilter.Compute(trade) @@ -59,7 +59,7 @@ func FilterMAIR(tradeBlocks []Block, asset dia.Asset, blockSize int) (filterPoin log.Infof("block number: %v", i) if len(block.Trades) > 0 { - mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, false) firstBlock := block.Trades[0] for _, trade := range block.Trades { mairFilter.Compute(trade) @@ -99,7 +99,7 @@ func FilterVWAP(tradeBlocks []Block, asset dia.Asset, blockSize int) (filterPoin metadata = dia.NewFilterPointMetadata() for _, block := range tradeBlocks { if len(block.Trades) > 0 { - vwapFilter := filters.NewFilterVWAP(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + vwapFilter := filters.NewFilterVWAP(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, false) for _, trade := range block.Trades { vwapFilter.Compute(trade) @@ -135,7 +135,7 @@ func FilterVWAPIR(tradeBlocks []Block, asset dia.Asset, blockSize int) (filterPo for _, block := range tradeBlocks { if len(block.Trades) > 0 { - vwapirFilter := filters.NewFilterVWAPIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + vwapirFilter := filters.NewFilterVWAPIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, false) for _, trade := range block.Trades { vwapirFilter.Compute(trade) } @@ -177,7 +177,7 @@ func FilterMEDIR(tradeBlocks []Block, asset dia.Asset, blockSize int) (filterPoi for _, block := range tradeBlocks { if len(block.Trades) > 0 { - medirFilter := filters.NewFilterMEDIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + medirFilter := filters.NewFilterMEDIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, false) for _, trade := range block.Trades { diff --git a/pkg/dia/helpers/queryHelper/filtersExtended.go b/pkg/dia/helpers/queryHelper/filtersExtended.go index cce061b56..6505b3c2e 100644 --- a/pkg/dia/helpers/queryHelper/filtersExtended.go +++ b/pkg/dia/helpers/queryHelper/filtersExtended.go @@ -7,7 +7,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" ) -func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { +func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64, nativeDenomination bool) (filterPointsExtended []dia.FilterPointExtended) { lastfp := &dia.FilterPoint{} @@ -19,7 +19,7 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum fpe.Pools = pools if len(block.Trades) > 0 { - maFilter := filters.NewFilterMA(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + maFilter := filters.NewFilterMA(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, nativeDenomination) for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { @@ -61,7 +61,7 @@ func FilterMAextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volum return } -func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { +func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64, nativeDenomination bool) (filterPointsExtended []dia.FilterPointExtended) { lastfp := &dia.FilterPoint{} for _, block := range tradeBlocks { @@ -71,7 +71,7 @@ func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol fpe.Pools = pools if len(block.Trades) > 0 { - mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + mairFilter := filters.NewFilterMAIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, nativeDenomination) for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { fpe.TradesCount++ @@ -113,7 +113,7 @@ func FilterMAIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol return } -func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { +func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64, nativeDenomination bool) (filterPointsExtended []dia.FilterPointExtended) { var lastfp *dia.FilterPoint for _, block := range tradeBlocks { @@ -124,7 +124,7 @@ func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol if len(block.Trades) > 0 { - vwapFilter := filters.NewFilterVWAP(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + vwapFilter := filters.NewFilterVWAP(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, nativeDenomination) for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { fpe.TradesCount++ @@ -161,7 +161,7 @@ func FilterVWAPextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vol return } -func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { +func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64, nativeDenomination bool) (filterPointsExtended []dia.FilterPointExtended) { var lastfp *dia.FilterPoint for _, block := range tradeBlocks { @@ -173,7 +173,7 @@ func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, v if len(block.Trades) > 0 { - vwapirFilter := filters.NewFilterVWAPIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + vwapirFilter := filters.NewFilterVWAPIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, nativeDenomination) for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { fpe.TradesCount++ @@ -216,7 +216,7 @@ func FilterVWAPIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, v return } -func FilterMEDIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64) (filterPointsExtended []dia.FilterPointExtended) { +func FilterMEDIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, volumeThresholdUSD float64, nativeDenomination bool) (filterPointsExtended []dia.FilterPointExtended) { var lastfp *dia.FilterPoint for _, block := range tradeBlocks { @@ -228,7 +228,7 @@ func FilterMEDIRextended(tradeBlocks []Block, asset dia.Asset, blockSize int, vo if len(block.Trades) > 0 { - medirFilter := filters.NewFilterMEDIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize) + medirFilter := filters.NewFilterMEDIR(asset, "", time.Unix(block.TimeStamp/1e9, 0), blockSize, nativeDenomination) for _, trade := range block.Trades { if trade.VolumeUSD() > volumeThresholdUSD { fpe.TradesCount++ diff --git a/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go b/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go index 91f518f92..bfd321ba9 100644 --- a/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/CurvefiScraper.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "math/big" + "reflect" "strconv" "strings" "sync" @@ -19,14 +20,15 @@ import ( "github.com/diadata-org/diadata/pkg/utils" "github.com/diadata-org/diadata/pkg/dia" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) const ( - curveRestDialEth = "https://nd-475-370-970.p2pify.com/8658efa12b79ca9cd4b1c72b55a7f4fa" - curveWsDialEth = "wss://ws-nd-475-370-970.p2pify.com/8658efa12b79ca9cd4b1c72b55a7f4fa" + curveRestDialEth = "" + curveWsDialEth = "" curveRestDialFantom = "" curveWsDialFantom = "" curveRestDialMoonbeam = "" @@ -328,7 +330,7 @@ func (scraper *CurveFIScraper) watchSwaps(pool string) error { log.Warn("got underlying swap: ", swpUnderlying) // Only fetch trades from USDR pool until we have parsing TokenExchangeUnderlying resolved. // if pool == common.HexToAddress("0xa138341185a9d0429b0021a11fb717b225e13e1f").Hex() && Exchanges[scraper.exchangeName].BlockChain.Name == dia.POLYGON { - // scraper.processSwap(pool, swpUnderlying) + scraper.processSwap(pool, swpUnderlying) // } } } @@ -441,9 +443,17 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( case *curvepool.CurvepoolTokenExchangeUnderlying: log.Info("Got TokenExchangeUnderlying in pool: ", pool) var ( - underlyingCoins [8]common.Address - errCoins error + isMetaPool bool + underlyingCoins [8]common.Address + underlyingCoinCount *big.Int + metaCoinCount *big.Int + errNCoin error + errCoins error ) + soldID := int(s.SoldId.Int64()) + boughtId := int(s.BoughtId.Int64()) + tokenSold := s.TokensSold + tokenBought := s.TokensBought for _, registry := range scraper.registriesUnderlying { if registry.Type == 2 { @@ -453,14 +463,23 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( } // Get underlying coins from on-chain. soldID and boughtID are referring to these. underlyingCoins, errCoins = metaRegistryContract.GetUnderlyingCoins(&bind.CallOpts{}, common.HexToAddress(pool)) - if errCoins != nil || (underlyingCoins[int(s.SoldId.Int64())] == (common.Address{}) && underlyingCoins[int(s.BoughtId.Int64())] == (common.Address{})) { + if errCoins != nil || (underlyingCoins[soldID] == (common.Address{}) && underlyingCoins[boughtId] == (common.Address{})) { + log.Warnf("Failed to call GetUnderlyingCoins: %v", errCoins) continue } else { log.Warn("TokenExchangeUnderlying from meta type pool") - log.Warnf("bought id and sold id: %v -- %v", s.BoughtId.Int64(), s.SoldId.Int64()) - log.Warnf("tokens bought and tokens sold: %v -- %v ", s.TokensBought, s.TokensSold) + log.Warnf("bought id and sold id: %v -- %v", boughtId, soldID) + log.Warnf("tokens bought and tokens sold: %v -- %v ", tokenBought, tokenSold) + metaCoinCount, underlyingCoinCount, errNCoin = metaRegistryContract.GetMetaNCoins(&bind.CallOpts{}, common.HexToAddress(pool)) + if errNCoin != nil { + log.Errorf("calling GetMetaNCoins: %v", errCoins) + continue + } + log.Warnf("metaCoinCount: %v, underlyingCoinCount: %v", metaCoinCount, underlyingCoinCount) + isMetaPool = true break } + } if registry.Type == 1 { basepoolRegistryContract, errBase := curvefi.NewCurvefiCaller(registry.Address, scraper.RestClient) @@ -469,19 +488,36 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( } // Get underlying coins from on-chain. soldID and boughtID are referring to these. underlyingCoins, errCoins = basepoolRegistryContract.GetUnderlyingCoins(&bind.CallOpts{}, common.HexToAddress(pool)) - if errCoins != nil || (underlyingCoins[int(s.SoldId.Int64())] == (common.Address{}) && underlyingCoins[int(s.BoughtId.Int64())] == (common.Address{})) { + if errCoins != nil || (underlyingCoins[soldID] == (common.Address{}) && underlyingCoins[boughtId] == (common.Address{})) { continue } else { log.Warn("TokenExchangeUnderlying from base type pool") - log.Warnf("bought id and sold id: %v -- %v", s.BoughtId.Int64(), s.SoldId.Int64()) - log.Warnf("tokens bought and tokens sold: %v -- %v ", s.TokensBought, s.TokensSold) + log.Warnf("bought id and sold id: %v -- %v", boughtId, soldID) + log.Warnf("tokens bought and tokens sold: %v -- %v ", tokenBought, tokenSold) break } + + } + } + log.Warnf("meta pool : %v, soldID: %v", isMetaPool, soldID) + if isMetaPool && soldID > 0 && boughtId == 0 { + // This is the only case we need to look into the AddLiquidity event for finding the actual amount of sold token + // as this event contains the meta pool token amount in this case! + + tokenAmount, errTokenAmounts := scraper.getTokenAmount(s.Raw.TxHash, common.HexToAddress(pool), soldID, metaCoinCount, underlyingCoinCount) + if err != nil { + log.Error("getting AddLiquidity event for tx: ", s.Raw.TxHash.Hex()) + err = errTokenAmounts + return } + log.Warnf("token Amount %v", tokenAmount) + // Again we need to subtract MAX_COIN index from this value, (which is metaCoinCount minus one) + tokenSold = tokenAmount } + log.Infof("token sold: %v", tokenSold) - fromTokenAddress := underlyingCoins[int(s.SoldId.Int64())] - toTokenAddress := underlyingCoins[int(s.BoughtId.Int64())] + fromTokenAddress := underlyingCoins[int(soldID)] + toTokenAddress := underlyingCoins[int(boughtId)] fromTokenAsset, errToken := scraper.relDB.GetAsset(fromTokenAddress.Hex(), Exchanges[scraper.exchangeName].BlockChain.Name) if errToken != nil { @@ -498,8 +534,8 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( fromToken = assetToCoin(fromTokenAsset) toToken = assetToCoin(toTokenAsset) - amountIn, _ = new(big.Float).Quo(big.NewFloat(0).SetInt(s.TokensSold), new(big.Float).SetFloat64(math.Pow10(int(fromToken.Decimals)))).Float64() - amountOut, _ = new(big.Float).Quo(big.NewFloat(0).SetInt(s.TokensBought), new(big.Float).SetFloat64(math.Pow10(int(toToken.Decimals)))).Float64() + amountIn, _ = new(big.Float).Quo(big.NewFloat(0).SetInt(tokenSold), new(big.Float).SetFloat64(math.Pow10(int(fromToken.Decimals)))).Float64() + amountOut, _ = new(big.Float).Quo(big.NewFloat(0).SetInt(tokenBought), new(big.Float).SetFloat64(math.Pow10(int(toToken.Decimals)))).Float64() } baseToken = dia.Asset{ @@ -523,6 +559,42 @@ func (scraper *CurveFIScraper) getSwapDataCurve(pool string, swp interface{}) ( return } +func (scraper *CurveFIScraper) getTokenAmount(txHash common.Hash, poolAddress common.Address, soldID int, metaCoinCount, underlyingCoinCount *big.Int) (*big.Int, error) { + // Here we need to get the event log manually as the len of the token_amounts array is unknown! + // (fixed to 2 in the current curvepool contract, but there are cases when it's 3 like this pool: + // https://etherscan.io/address/0xbebc44782c7db0a1a60cb6fe97d0b483032ff1c7) + // We get the value using reflection. + receipt, err := scraper.RestClient.TransactionReceipt(context.Background(), txHash) + if err != nil { + return nil, err + } + basePoolCoinCount := underlyingCoinCount.Int64() - 1 // We need to subtract that one meta pool coin here! + abiAddLiquidityJson := `[{"name":"AddLiquidity","inputs":[{"type":"address","name":"provider","indexed":true},{"type":"uint256[%d]","name":"token_amounts","indexed":false},{"type":"uint256[%d]","name":"fees","indexed":false},{"type":"uint256","name":"invariant","indexed":false},{"type":"uint256","name":"token_supply","indexed":false}],"anonymous":false,"type":"event"}]` + abiAddLiquidity, err := abi.JSON(strings.NewReader(fmt.Sprintf(abiAddLiquidityJson, basePoolCoinCount, basePoolCoinCount))) + if err != nil { + return nil, err + } + for _, log := range receipt.Logs { + if len(log.Topics) < 2 || log.Topics[1] != poolAddress.Hash() { + continue + } + valueMap := make(map[string]interface{}) + err := abiAddLiquidity.UnpackIntoMap(valueMap, "AddLiquidity", log.Data) + if err != nil { + continue + } + // Again we need to subtract MAX_COIN index from this value, (which is metaCoinCount minus one) + tokenBoughtIdx := soldID - (int(metaCoinCount.Int64()) - 1) + tokenAmount, ok := reflect.ValueOf(valueMap["token_amounts"]).Index(tokenBoughtIdx).Interface().(*big.Int) + if !ok { + return nil, errors.New("couldn't parse the AddLiquidity event") + } + return tokenAmount, nil + } + return nil, errors.New("AddLiquidity log couldn't be found") + +} + func (scraper *CurveFIScraper) loadPools(liquidityThreshold float64, liquidityThresholdUSD float64) { pools, err := scraper.relDB.GetAllPoolsExchange(scraper.exchangeName, liquidityThreshold) diff --git a/pkg/graphql/resolver/root.go b/pkg/graphql/resolver/root.go index fabe36984..8a2df4788 100644 --- a/pkg/graphql/resolver/root.go +++ b/pkg/graphql/resolver/root.go @@ -451,6 +451,7 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { StartTime graphql.NullTime EndTime graphql.NullTime TradeVolumeThreshold graphql.NullFloat + NativeDenomination graphql.NullBool FeedSelection *[]FeedSelection }) (*[]*FilterPointExtendedResolver, error) { var ( @@ -461,6 +462,7 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { blockShiftSeconds int64 tradeVolumeThreshold float64 err error + nativeDenomination bool ) // Parsing input parameters. @@ -515,6 +517,12 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { tradeVolumeThreshold = TRADE_VOLUME_THRESHOLD_DEFAULT } + // If nativeDenomination is true, price is returned in terms of the respective base asset. + // Default is false, i.e. price is returned in USD denomination. + if args.NativeDenomination.Value != nil { + nativeDenomination = *args.NativeDenomination.Value + } + if args.FeedSelection == nil { return sr, errors.New("At least 1 asset must be selected.") } @@ -599,23 +607,23 @@ func (r *DiaResolver) GetFeed(ctx context.Context, args struct { // } case "mair": { - filterPoints = queryhelper.FilterMAIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold) + filterPoints = queryhelper.FilterMAIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold, nativeDenomination) } case "ma": { - filterPoints = queryhelper.FilterMAextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold) + filterPoints = queryhelper.FilterMAextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold, nativeDenomination) } case "vwap": { - filterPoints = queryhelper.FilterVWAPextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold) + filterPoints = queryhelper.FilterVWAPextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold, nativeDenomination) } case "vwapir": { - filterPoints = queryhelper.FilterVWAPIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold) + filterPoints = queryhelper.FilterVWAPIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold, nativeDenomination) } case "medir": { - filterPoints = queryhelper.FilterMEDIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold) + filterPoints = queryhelper.FilterMEDIRextended(tradeBlocks, feedselection[0].Asset, int(blockSizeSeconds), tradeVolumeThreshold, nativeDenomination) } case "vol": {