Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sanitize symbol and name for influx #776

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,7 @@ github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrp
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab h1:HqW4xhhynfjrtEiiSGcQUd6vrK23iMam1FO8rI7mwig=
github.com/influxdata/influxdb1-client v0.0.0-20200827194710-b269163b24ab/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385 h1:ED4e5Cc3z5vSN2Tz2GkOHN7vs4Sxe2yds6CXvDnvZFE=
github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk=
github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
Expand Down
3 changes: 2 additions & 1 deletion pkg/model/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/diadata-org/diadata/pkg/dia"
"github.com/go-redis/redis"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

type Datastore interface {
Expand Down Expand Up @@ -312,7 +313,7 @@ func (datastore *DB) CopyInfluxMeasurements(dbOrigin string, dbDestination strin

func (datastore *DB) SetVWAPFirefly(foreignName string, value float64, timestamp time.Time) error {
tags := map[string]string{
"foreignName": foreignName,
"foreignName": influxql.QuoteString(foreignName),
}
fields := map[string]interface{}{
"value": value,
Expand Down
5 changes: 3 additions & 2 deletions pkg/model/fiatQuotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

func (datastore *DB) SetBatchFiatPriceInflux(fiatQuotations []*FiatQuotation) error {
Expand Down Expand Up @@ -32,8 +33,8 @@ func checkInfluxIsAvailable(db *DB) error {
func addMultiplePointsToBatch(db *DB, fiatQuotations []*FiatQuotation) {
for _, fq := range fiatQuotations {
tags := map[string]string{
"quote_currency": fq.QuoteCurrency,
"base_currency": fq.BaseCurrency,
"quote_currency": influxql.QuoteString(fq.QuoteCurrency),
"base_currency": influxql.QuoteString(fq.BaseCurrency),
"source": fq.Source,
}
fields := map[string]interface{}{
Expand Down
5 changes: 4 additions & 1 deletion pkg/model/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/diadata-org/diadata/pkg/dia"
"github.com/go-redis/redis"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

// SetFilter stores a filter point
Expand Down Expand Up @@ -228,9 +229,10 @@ func getKeyFilterSymbolAndExchangeZSET(filter string, asset dia.Asset, exchange
// SaveFilterInflux stores a filter point in influx.
func (datastore *DB) SaveFilterInflux(filter string, asset dia.Asset, exchange string, value float64, t time.Time) error {
// Create a point and add to batch

tags := map[string]string{
"filter": filter,
"symbol": asset.Symbol,
"symbol": influxql.QuoteString(asset.Symbol),
"address": asset.Address,
"blockchain": asset.Blockchain,
"exchange": exchange,
Expand All @@ -239,6 +241,7 @@ func (datastore *DB) SaveFilterInflux(filter string, asset dia.Asset, exchange s
"value": value,
"allExchanges": exchange == "",
}

pt, err := clientInfluxdb.NewPoint(influxDbFiltersTable, tags, fields, t)
if err != nil {
log.Errorln("new filter influx:", err)
Expand Down
11 changes: 6 additions & 5 deletions pkg/model/quotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/diadata-org/diadata/pkg/utils"
"github.com/go-redis/redis"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -77,8 +78,8 @@ func (datastore *DB) GetAssetPriceUSD(asset dia.Asset, timestamp time.Time) (pri
func (datastore *DB) AddAssetQuotationsToBatch(quotations []*AssetQuotation) error {
for _, quotation := range quotations {
tags := map[string]string{
"symbol": quotation.Asset.Symbol,
"name": quotation.Asset.Name,
"symbol": influxql.QuoteString(quotation.Asset.Symbol),
"name": influxql.QuoteString(quotation.Asset.Name),
"address": quotation.Asset.Address,
"blockchain": quotation.Asset.Blockchain,
}
Expand All @@ -99,8 +100,8 @@ func (datastore *DB) AddAssetQuotationsToBatch(quotations []*AssetQuotation) err
func (datastore *DB) SetAssetQuotation(quotation *AssetQuotation) error {
// Write to influx
tags := map[string]string{
"symbol": quotation.Asset.Symbol,
"name": quotation.Asset.Name,
"symbol": influxql.QuoteString(quotation.Asset.Symbol),
"name": influxql.QuoteString(quotation.Asset.Name),
"address": quotation.Asset.Address,
"blockchain": quotation.Asset.Blockchain,
}
Expand All @@ -117,7 +118,7 @@ func (datastore *DB) SetAssetQuotation(quotation *AssetQuotation) error {

// Write latest point to redis cache
// log.Printf("write to cache: %s", quotation.Asset.Symbol)
_, err = datastore.SetAssetQuotationCache(quotation, false)
// _, err = datastore.SetAssetQuotationCache(quotation, false)
return err

}
Expand Down
5 changes: 3 additions & 2 deletions pkg/model/supplies.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/diadata-org/diadata/pkg/dia/helpers"
"github.com/go-redis/redis"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

func getKeySupply(asset dia.Asset) string {
Expand All @@ -32,8 +33,8 @@ func (datastore *DB) SaveSupplyInflux(supply *dia.Supply) error {
"source": supply.Source,
}
tags := map[string]string{
"symbol": supply.Asset.Symbol,
"name": supply.Asset.Name,
"symbol": influxql.QuoteString(supply.Asset.Symbol),
"name": influxql.QuoteString(supply.Asset.Name),
"address": supply.Asset.Address,
"blockchain": supply.Asset.Blockchain,
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/model/synthAssets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/diadata-org/diadata/pkg/dia"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

// SaveSynthSupplyInflux stores a synth supply in influx. Flushed when more than maxPoints in batch.
Expand All @@ -21,8 +22,8 @@ func (datastore *DB) SaveSynthSupplyInfluxToTable(t *dia.SynthAssetSupply, table

// Create a point and add to batch
tags := map[string]string{
"synthassetsymbol": t.Asset.Symbol,
"underlyingassetsymbol": t.AssetUnderlying.Symbol,
"synthassetsymbol": influxql.QuoteString(t.Asset.Symbol),
"underlyingassetsymbol": influxql.QuoteString(t.AssetUnderlying.Symbol),
"synthtokenaddress": t.Asset.Address,
"underlyingtokenaddress": t.AssetUnderlying.Address,
"blockchain": t.Asset.Blockchain,
Expand Down
3 changes: 2 additions & 1 deletion pkg/model/trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/diadata-org/diadata/pkg/dia"
clientInfluxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/influxdata/influxql"
)

// SaveTradeInflux stores a trade in influx. Flushed when more than maxPoints in batch.
Expand All @@ -24,7 +25,7 @@ func (datastore *DB) SaveTradeInfluxToTable(t *dia.Trade, table string) error {

// Create a point and add to batch
tags := map[string]string{
"symbol": t.Symbol,
"symbol": influxql.QuoteString(t.Symbol),
"pair": t.Pair,
"exchange": t.Source,
"verified": strconv.FormatBool(t.VerifiedPair),
Expand Down