diff --git a/go.sum b/go.sum index 8f4556ab2..f7f3d2ec5 100644 --- a/go.sum +++ b/go.sum @@ -916,6 +916,7 @@ github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrp github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig= github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385 h1:ED4e5Cc3z5vSN2Tz2GkOHN7vs4Sxe2yds6CXvDnvZFE= github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= diff --git a/pkg/model/db.go b/pkg/model/db.go index f24544fa3..f79286692 100644 --- a/pkg/model/db.go +++ b/pkg/model/db.go @@ -11,6 +11,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" "github.com/go-redis/redis" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) type Datastore interface { @@ -312,7 +313,7 @@ func (datastore *DB) CopyInfluxMeasurements(dbOrigin string, dbDestination strin func (datastore *DB) SetVWAPFirefly(foreignName string, value float64, timestamp time.Time) error { tags := map[string]string{ - "foreignName": foreignName, + "foreignName": influxql.QuoteString(foreignName), } fields := map[string]interface{}{ "value": value, diff --git a/pkg/model/fiatQuotation.go b/pkg/model/fiatQuotation.go index ff376b4de..c65f7732b 100644 --- a/pkg/model/fiatQuotation.go +++ b/pkg/model/fiatQuotation.go @@ -4,6 +4,7 @@ import ( "fmt" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) func (datastore *DB) SetBatchFiatPriceInflux(fiatQuotations []*FiatQuotation) error { @@ -32,8 +33,8 @@ func checkInfluxIsAvailable(db *DB) error { func addMultiplePointsToBatch(db *DB, fiatQuotations []*FiatQuotation) { for _, fq := range fiatQuotations { tags := map[string]string{ - "quote_currency": fq.QuoteCurrency, - "base_currency": fq.BaseCurrency, + "quote_currency": influxql.QuoteString(fq.QuoteCurrency), + "base_currency": influxql.QuoteString(fq.BaseCurrency), "source": fq.Source, } fields := map[string]interface{}{ diff --git a/pkg/model/filters.go b/pkg/model/filters.go index e8c1f5826..169731a8f 100644 --- a/pkg/model/filters.go +++ b/pkg/model/filters.go @@ -10,6 +10,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" "github.com/go-redis/redis" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) // SetFilter stores a filter point @@ -228,9 +229,10 @@ func getKeyFilterSymbolAndExchangeZSET(filter string, asset dia.Asset, exchange // SaveFilterInflux stores a filter point in influx. func (datastore *DB) SaveFilterInflux(filter string, asset dia.Asset, exchange string, value float64, t time.Time) error { // Create a point and add to batch + tags := map[string]string{ "filter": filter, - "symbol": asset.Symbol, + "symbol": influxql.QuoteString(asset.Symbol), "address": asset.Address, "blockchain": asset.Blockchain, "exchange": exchange, @@ -239,6 +241,7 @@ func (datastore *DB) SaveFilterInflux(filter string, asset dia.Asset, exchange s "value": value, "allExchanges": exchange == "", } + pt, err := clientInfluxdb.NewPoint(influxDbFiltersTable, tags, fields, t) if err != nil { log.Errorln("new filter influx:", err) diff --git a/pkg/model/quotation.go b/pkg/model/quotation.go index 6b48c845b..8fc9a0c80 100644 --- a/pkg/model/quotation.go +++ b/pkg/model/quotation.go @@ -13,6 +13,7 @@ import ( "github.com/diadata-org/diadata/pkg/utils" "github.com/go-redis/redis" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" "github.com/jackc/pgx/v4" ) @@ -77,8 +78,8 @@ func (datastore *DB) GetAssetPriceUSD(asset dia.Asset, timestamp time.Time) (pri func (datastore *DB) AddAssetQuotationsToBatch(quotations []*AssetQuotation) error { for _, quotation := range quotations { tags := map[string]string{ - "symbol": quotation.Asset.Symbol, - "name": quotation.Asset.Name, + "symbol": influxql.QuoteString(quotation.Asset.Symbol), + "name": influxql.QuoteString(quotation.Asset.Name), "address": quotation.Asset.Address, "blockchain": quotation.Asset.Blockchain, } @@ -99,8 +100,8 @@ func (datastore *DB) AddAssetQuotationsToBatch(quotations []*AssetQuotation) err func (datastore *DB) SetAssetQuotation(quotation *AssetQuotation) error { // Write to influx tags := map[string]string{ - "symbol": quotation.Asset.Symbol, - "name": quotation.Asset.Name, + "symbol": influxql.QuoteString(quotation.Asset.Symbol), + "name": influxql.QuoteString(quotation.Asset.Name), "address": quotation.Asset.Address, "blockchain": quotation.Asset.Blockchain, } @@ -117,7 +118,7 @@ func (datastore *DB) SetAssetQuotation(quotation *AssetQuotation) error { // Write latest point to redis cache // log.Printf("write to cache: %s", quotation.Asset.Symbol) - _, err = datastore.SetAssetQuotationCache(quotation, false) + // _, err = datastore.SetAssetQuotationCache(quotation, false) return err } diff --git a/pkg/model/supplies.go b/pkg/model/supplies.go index a10a755a6..84a1f9412 100644 --- a/pkg/model/supplies.go +++ b/pkg/model/supplies.go @@ -11,6 +11,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia/helpers" "github.com/go-redis/redis" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) func getKeySupply(asset dia.Asset) string { @@ -32,8 +33,8 @@ func (datastore *DB) SaveSupplyInflux(supply *dia.Supply) error { "source": supply.Source, } tags := map[string]string{ - "symbol": supply.Asset.Symbol, - "name": supply.Asset.Name, + "symbol": influxql.QuoteString(supply.Asset.Symbol), + "name": influxql.QuoteString(supply.Asset.Name), "address": supply.Asset.Address, "blockchain": supply.Asset.Blockchain, } diff --git a/pkg/model/synthAssets.go b/pkg/model/synthAssets.go index 9d253ba6b..1aa229b14 100644 --- a/pkg/model/synthAssets.go +++ b/pkg/model/synthAssets.go @@ -7,6 +7,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) // SaveSynthSupplyInflux stores a synth supply in influx. Flushed when more than maxPoints in batch. @@ -21,8 +22,8 @@ func (datastore *DB) SaveSynthSupplyInfluxToTable(t *dia.SynthAssetSupply, table // Create a point and add to batch tags := map[string]string{ - "synthassetsymbol": t.Asset.Symbol, - "underlyingassetsymbol": t.AssetUnderlying.Symbol, + "synthassetsymbol": influxql.QuoteString(t.Asset.Symbol), + "underlyingassetsymbol": influxql.QuoteString(t.AssetUnderlying.Symbol), "synthtokenaddress": t.Asset.Address, "underlyingtokenaddress": t.AssetUnderlying.Address, "blockchain": t.Asset.Blockchain, diff --git a/pkg/model/trades.go b/pkg/model/trades.go index 5fc3b61c6..bd064487f 100644 --- a/pkg/model/trades.go +++ b/pkg/model/trades.go @@ -10,6 +10,7 @@ import ( "github.com/diadata-org/diadata/pkg/dia" clientInfluxdb "github.com/influxdata/influxdb1-client/v2" + "github.com/influxdata/influxql" ) // SaveTradeInflux stores a trade in influx. Flushed when more than maxPoints in batch. @@ -24,7 +25,7 @@ func (datastore *DB) SaveTradeInfluxToTable(t *dia.Trade, table string) error { // Create a point and add to batch tags := map[string]string{ - "symbol": t.Symbol, + "symbol": influxql.QuoteString(t.Symbol), "pair": t.Pair, "exchange": t.Source, "verified": strconv.FormatBool(t.VerifiedPair),