diff --git a/README.md b/README.md index 85b6c4c..e10a747 100644 --- a/README.md +++ b/README.md @@ -15,31 +15,61 @@ A Coinpaprika source plugin for CloudQuery that loads data from Coinpaprika API The following source configuration file will sync to a PostgreSQL database. See [the CloudQuery Quickstart](https://www.cloudquery.io/docs/quickstart) for more information on how to configure the source and destination. -```yaml -kind: source -spec: - name: "coinpaprika" - path: "coinpaprika/coinpaprika" - version: "${VERSION}" - concurrency: 100 - backend: local - destinations: - - "postgresql" - spec: - start_date: "2023-05-15T08:00:00Z" - interval: "1h" - access_token: "${COINPAPRIKA_API_TOKEN}" -``` - -| Spec fields | Description | Optional | -|--------------|----------------------------------------------------------------------------------------------------------------------------|----------| -| start_date | Starting date for synchronizing data | NO | -| interval | Intervals for historic data [possible values](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById) | NO | -| access_token | Coinpaprika API token | YES | - -The Coinpaprika plugin supports incremental syncing for historical tickers, only new tickers will be fetched. This is done by storing last timestamp of fetched ticker in Cloudquery backed. To enable this, `backend` option must be set in the spec. +1. With api token rate limited for `Bussines` plan (3 000 000 calls/month). Only bitcoin tickers. + ```yaml + kind: source + spec: + name: "coinpaprika" + path: "coinpaprika/coinpaprika" + version: "1.0.0" + backend: local + destinations: + - "postgresql" + spec: + start_date: "2023-05-15T08:00:00Z" + interval: 5m + access_token: "${COINPAPRIKA_API_TOKEN}" + api_debug: true + rate_duration: 720h + rate_number: 3000000 + tickers: + ["*-bitcoin"] + ``` +2. Without token, `Free` plan (25 000 calls/month) minimal interval 1h, see [available history range depending on the selected API plan](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById). Only bitcoin tickers. + + ```yaml + kind: source + spec: + name: "coinpaprika" + path: "coinpaprika/coinpaprika" + version: "1.0.0" + backend: local + destinations: + - "postgresql" + spec: + start_date: "2023-05-15T08:00:00Z" + interval: 1h + rate_duration: 720h + rate_number: 25000 + tickers: + ["*-bitcoin"] + ``` + +| Spec fields | Description | Default value | Optional | +| ------------- | -------------------------------------------------------------------------------------------------------------------------- | ------------- | -------- | +| start_date | Start date for synchronizing data in RFC3339 format. | | NO | +| end_date | End date for synchronizing data in RFC3339 format. | NOW | YES | +| interval | Intervals for historic data [possible values](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById) | | NO | +| access_token | Coinpaprika API token. | | YES | +| api_debug | Enable request log. | false | YES | +| rate_duration | Unit of rate in time of request rate, go duration format. | 30 | YES | +| rate_number | Number of request in `rate_duration`. | 30 | YES | +| tickers | list of globe pattern ticker ids to synchronize. | * | YES | + + + +The Coinpaprika plugin supports incremental syncing for historical tickers, only new tickers will be fetched. This is done by storing last timestamp of fetched ticker in CloudQuery backend. To enable this, `backend` option must be set in the spec. -Due to large number of coins and tickers in Coinpaprika, consider to limit `concurrency` accordingly to machine spec. Good starting point is 100. ## Development diff --git a/client/client.go b/client/client.go index bba3580..0b1d81c 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,6 @@ import ( "github.com/cloudquery/plugin-sdk/v2/plugins/source" "github.com/cloudquery/plugin-sdk/v2/schema" "github.com/coinpaprika/coinpaprika-api-go-client/v2/coinpaprika" - "github.com/hashicorp/go-retryablehttp" "github.com/rs/zerolog" ) @@ -18,7 +17,9 @@ type Client struct { CoinpaprikaClient CoinpaprikaServices Backend Backend StartDate time.Time + EndDate time.Time Interval string + Tickers []string } func (c *Client) ID() string { @@ -41,14 +42,24 @@ func New(ctx context.Context, logger zerolog.Logger, s specs.Source, opts source return nil, fmt.Errorf("failed to parse startDate from spec: %w", err) } - retryClient := retryablehttp.NewClient() - retryClient.HTTPClient.Timeout = 5 * time.Second - retryClient.RetryMax = 3 - retryClient.RetryWaitMin = 1 * time.Second - retryClient.RetryWaitMax = 5 * time.Second - retryClient.Logger = nil - - cc := coinpaprika.NewClient(retryClient.StandardClient(), cOpts...) + endDate := time.Now() + if pluginSpec.EndDate != "" { + endDate, err = time.Parse(time.RFC3339, pluginSpec.EndDate) + if err != nil { + return nil, fmt.Errorf("failed to parse startDate from spec: %w", err) + } + } + rateNumber := 30 + rateDuration := time.Second + if pluginSpec.RateDuration != "" && pluginSpec.RateNumber != 0 { + rateNumber = pluginSpec.RateNumber + rateDuration, err = time.ParseDuration(pluginSpec.RateDuration) + if err != nil { + return nil, fmt.Errorf("failed to parse rate duration from spec: %w", err) + } + } + + cc := coinpaprika.NewClient(NewHttpClient(logger, pluginSpec.ApiDebug, rateNumber, rateDuration), cOpts...) return &Client{ Logger: logger, @@ -59,6 +70,8 @@ func New(ctx context.Context, logger zerolog.Logger, s specs.Source, opts source }, Backend: opts.Backend, StartDate: startDate, + EndDate: endDate, Interval: pluginSpec.Interval, + Tickers: pluginSpec.Tickers, }, nil } diff --git a/client/http.go b/client/http.go new file mode 100644 index 0000000..9fb6d57 --- /dev/null +++ b/client/http.go @@ -0,0 +1,55 @@ +package client + +import ( + "net/http" + "time" + + "github.com/hashicorp/go-retryablehttp" + "github.com/rs/zerolog" + "go.uber.org/ratelimit" +) + +func NewHttpClient(logger zerolog.Logger, debug bool, rateNumber int, rateDuration time.Duration) *http.Client { + retryClient := retryablehttp.NewClient() + retryClient.HTTPClient.Timeout = 5 * time.Second + retryClient.RetryMax = 3 + retryClient.RetryWaitMin = 1 * time.Second + retryClient.RetryWaitMax = 5 * time.Second + retryClient.Logger = nil + + if debug { + retryClient.RequestLogHook = func(l retryablehttp.Logger, r *http.Request, i int) { + logger.Info(). + Str("method", r.Method). + Str("url", r.URL.String()). + Int("attempt", i). + Msg("Send request") + } + } + + client := retryClient.StandardClient() + client.Transport = newLimitedTransport(logger, client.Transport, rateNumber, rateDuration) + return client +} + +type transport struct { + logger zerolog.Logger + wrappedRT http.RoundTripper + limiter ratelimit.Limiter +} + +func newLimitedTransport(logger zerolog.Logger, t http.RoundTripper, rateNumber int, rateDuration time.Duration) http.RoundTripper { + if t == nil { + t = http.DefaultTransport + } + return &transport{ + logger: logger, + wrappedRT: t, + limiter: ratelimit.New(rateNumber, ratelimit.Per(rateDuration)), + } +} + +func (t *transport) RoundTrip(r *http.Request) (*http.Response, error) { + t.limiter.Take() + return t.wrappedRT.RoundTrip(r) +} diff --git a/client/spec.go b/client/spec.go index 13de41c..a026fbc 100644 --- a/client/spec.go +++ b/client/spec.go @@ -1,7 +1,12 @@ package client type Spec struct { - AccessToken string `json:"access_token"` - StartDate string `json:"start_date"` - Interval string `json:"interval"` + AccessToken string `json:"access_token"` + StartDate string `json:"start_date"` + EndDate string `json:"end_date"` + Interval string `json:"interval"` + Tickers []string `json:"tickers"` + ApiDebug bool `json:"api_debug"` + RateNumber int `json:"rate_number"` + RateDuration string `json:"rate_duration"` } diff --git a/client/testing.go b/client/testing.go index c1a6029..b141157 100644 --- a/client/testing.go +++ b/client/testing.go @@ -17,7 +17,9 @@ import ( type TestOptions struct { Backend backend.Backend StartTime time.Time + EndTime time.Time Interval string + Tickers []string } func MockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T, *gomock.Controller) CoinpaprikaServices, opts TestOptions) { @@ -35,7 +37,9 @@ func MockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T, CoinpaprikaClient: builder(t, ctrl), Backend: opts.Backend, StartDate: opts.StartTime, + EndDate: opts.EndTime, Interval: opts.Interval, + Tickers: opts.Tickers, }, nil } p := source.NewPlugin( diff --git a/docs/tables/coinpaprika_coins.md b/docs/tables/coinpaprika_coins.md index e78538b..b5d83f7 100644 --- a/docs/tables/coinpaprika_coins.md +++ b/docs/tables/coinpaprika_coins.md @@ -13,31 +13,31 @@ The following tables depend on coinpaprika_coins: ## Columns -| Name | Type | -|--------------------|-----------| -| _cq_source_name | String | -| _cq_sync_time | Timestamp | -| _cq_id | UUID | -| _cq_parent_id | UUID | -| id (PK) | String | -| name | String | -| symbol | String | -| rank | Int | -| is_new | Bool | -| is_active | Bool | -| type | String | -| parent | JSON | -| open_source | Bool | -| hardware_wallet | Bool | -| description | String | -| message | String | -| started_at | String | -| development_status | String | -| proof_type | String | -| org_structure | String | -| hash_algorithm | String | -| whitepaper | JSON | -| links | JSON | -| links_extended | JSON | -| tags | JSON | -| team | JSON | \ No newline at end of file +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id|UUID| +|_cq_parent_id|UUID| +|id (PK)|String| +|name|String| +|symbol|String| +|rank|Int| +|is_new|Bool| +|is_active|Bool| +|type|String| +|parent|JSON| +|open_source|Bool| +|hardware_wallet|Bool| +|description|String| +|message|String| +|started_at|String| +|development_status|String| +|proof_type|String| +|org_structure|String| +|hash_algorithm|String| +|whitepaper|JSON| +|links|JSON| +|links_extended|JSON| +|tags|JSON| +|team|JSON| \ No newline at end of file diff --git a/docs/tables/coinpaprika_exchanges.md b/docs/tables/coinpaprika_exchanges.md index 58f3a17..b4c7877 100644 --- a/docs/tables/coinpaprika_exchanges.md +++ b/docs/tables/coinpaprika_exchanges.md @@ -6,27 +6,27 @@ The primary key for this table is **_cq_id**. ## Columns -| Name | Type | -|---------------------------|-----------| -| _cq_source_name | String | -| _cq_sync_time | Timestamp | -| _cq_id (PK) | UUID | -| _cq_parent_id | UUID | -| id | String | -| name | String | -| message | String | -| description | String | -| active | Bool | -| website_status | Bool | -| api_status | Bool | -| markets_data_fetched | Bool | -| rank | Int | -| adjusted_rank | Int | -| reported_rank | Int | -| currencies | Int | -| markets | Int | -| adjusted_volume_24h_share | Float | -| fiats | JSON | -| quotes | JSON | -| links | JSON | -| last_updated | String | \ No newline at end of file +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id (PK)|UUID| +|_cq_parent_id|UUID| +|id|String| +|name|String| +|message|String| +|description|String| +|active|Bool| +|website_status|Bool| +|api_status|Bool| +|markets_data_fetched|Bool| +|rank|Int| +|adjusted_rank|Int| +|reported_rank|Int| +|currencies|Int| +|markets|Int| +|adjusted_volume_24h_share|Float| +|fiats|JSON| +|quotes|JSON| +|links|JSON| +|last_updated|String| \ No newline at end of file diff --git a/docs/tables/coinpaprika_tickers.md b/docs/tables/coinpaprika_tickers.md index ad85cf7..622c115 100644 --- a/docs/tables/coinpaprika_tickers.md +++ b/docs/tables/coinpaprika_tickers.md @@ -12,14 +12,14 @@ This table depends on [coinpaprika_coins](coinpaprika_coins.md). ## Columns -| Name | Type | -|----------------------------------|-----------| -| _cq_source_name | String | -| _cq_sync_time | Timestamp | -| _cq_id | UUID | -| _cq_parent_id | UUID | -| id (PK) | String | -| timestamp (PK) (Incremental Key) | String | -| price | Float | -| volume_24h | Float | -| market_cap | Float | \ No newline at end of file +| Name | Type | +| ------------- | ------------- | +|_cq_source_name|String| +|_cq_sync_time|Timestamp| +|_cq_id|UUID| +|_cq_parent_id|UUID| +|id (PK)|String| +|timestamp (PK) (Incremental Key)|String| +|price|Float| +|volume_24h|Float| +|market_cap|Float| \ No newline at end of file diff --git a/go.mod b/go.mod index a4333ef..0d0caf2 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( ) require ( + github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -41,11 +42,13 @@ require ( github.com/mattn/go-isatty v0.0.18 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/ryanuber/go-glob v1.0.0 github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cobra v1.6.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/thoas/go-funk v0.9.3 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.uber.org/ratelimit v0.2.0 golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.9.0 // indirect diff --git a/go.sum b/go.sum index d12bf73..66fd6e0 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= +github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604 h1:dxPfDaBd95sM2NMHXpp6UGTsCqe1bfGb8/WZeDHklZc= @@ -201,6 +203,8 @@ github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJ github.com/rs/zerolog v1.29.0 h1:Zes4hju04hjbvkVkOhdl2HpZa+0PmVwigmo8XoORE5w= github.com/rs/zerolog v1.29.0/go.mod h1:NILgTygv/Uej1ra5XxGf82ZFSLk58MFGAUS2o6usyD0= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkBk= +github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= @@ -211,8 +215,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= @@ -232,6 +238,10 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/ratelimit v0.2.0 h1:UQE2Bgi7p2B85uP5dC2bbRtig0C+OeNRnNEafLjsLPA= +go.uber.org/ratelimit v0.2.0/go.mod h1:YYBV4e4naJvhpitQrWJu1vCpgB7CboMe0qhltKt6mUg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/resources/services/coins/coins_test.go b/resources/services/coins/coins_test.go index 95c8868..43c94f4 100644 --- a/resources/services/coins/coins_test.go +++ b/resources/services/coins/coins_test.go @@ -12,10 +12,11 @@ import ( ) func TestCoins(t *testing.T) { + now := time.Now().Truncate(time.Hour) + buildDeps := func(t *testing.T, ctrl *gomock.Controller) client.CoinpaprikaServices { cs := mock.NewMockCoinsService(ctrl) ts := mock.NewMockTickersService(ctrl) - var coin coinpaprika.Coin if err := faker.FakeObject(&coin); err != nil { @@ -30,13 +31,13 @@ func TestCoins(t *testing.T) { t.Fatal(err) } - timeStamp := time.Now().Add(-1 * time.Hour) + timeStamp := now.Add(-1 * time.Hour) tick.Timestamp = &timeStamp tt := []*coinpaprika.TickerHistorical{&tick} ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-2 * time.Hour).Truncate(time.Hour), - End: time.Now().Truncate(time.Hour), + Start: now.Add(-2 * time.Hour), + End: now, Interval: "1h", }).Return(tt, nil).Times(1) return client.CoinpaprikaServices{ @@ -44,10 +45,66 @@ func TestCoins(t *testing.T) { Tickers: ts, } } - client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{StartTime: time.Now().Add(-2 * time.Hour), Interval: "1h"}) + client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{ + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Interval: "1h", + Tickers: []string{"*"}, + }) +} + +func TestCoinsFilterTicker(t *testing.T) { + idToInclude := "btc-bitcoin" + now := time.Now().Truncate(time.Hour) + + buildDeps := func(t *testing.T, ctrl *gomock.Controller) client.CoinpaprikaServices { + cs := mock.NewMockCoinsService(ctrl) + ts := mock.NewMockTickersService(ctrl) + + var coin1 coinpaprika.Coin + if err := faker.FakeObject(&coin1); err != nil { + t.Fatal(err) + } + coin1.ID = &idToInclude + + var coin2 coinpaprika.Coin + if err := faker.FakeObject(&coin2); err != nil { + t.Fatal(err) + } + + ee := []*coinpaprika.Coin{&coin1, &coin2} + cs.EXPECT().List().Return(ee, nil) + + var tick coinpaprika.TickerHistorical + if err := faker.FakeObject(&tick); err != nil { + t.Fatal(err) + } + + timeStamp := now.Add(-1 * time.Hour) + tick.Timestamp = &timeStamp + tt := []*coinpaprika.TickerHistorical{&tick} + + ts.EXPECT().GetHistoricalTickersByID(*coin1.ID, &coinpaprika.TickersHistoricalOptions{ + Start: now.Add(-2 * time.Hour), + End: now, + Interval: "1h", + }).Return(tt, nil).Times(1) + return client.CoinpaprikaServices{ + Coins: cs, + Tickers: ts, + } + } + client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{ + StartTime: now.Add(-2 * time.Hour), + EndTime: now, + Interval: "1h", + Tickers: []string{"*-bitcoin"}, + }) } func TestCoinsTwoPages(t *testing.T) { + now := time.Now().Truncate(time.Hour) + buildDeps := func(t *testing.T, ctrl *gomock.Controller) client.CoinpaprikaServices { cs := mock.NewMockCoinsService(ctrl) ts := mock.NewMockTickersService(ctrl) @@ -66,19 +123,19 @@ func TestCoinsTwoPages(t *testing.T) { t.Fatal(err) } - timeStamp := time.Now().Add(-1 * time.Hour) + timeStamp := now.Add(-1 * time.Hour) tick.Timestamp = &timeStamp tt := []*coinpaprika.TickerHistorical{&tick} ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-1500 * time.Minute).Truncate(time.Minute), - End: time.Now().Add(-500 * time.Minute).Truncate(time.Minute), + Start: now.Add(-1500 * time.Minute).Truncate(time.Minute), + End: now.Add(-500 * time.Minute).Truncate(time.Minute), Interval: "1m", }).Return(tt, nil).Times(1) ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-500 * time.Minute).Truncate(time.Minute), - End: time.Now().Truncate(time.Minute), + Start: now.Add(-500 * time.Minute).Truncate(time.Minute), + End: now.Truncate(time.Minute), Interval: "1m", }).Return(nil, nil).Times(1) @@ -87,10 +144,16 @@ func TestCoinsTwoPages(t *testing.T) { Tickers: ts, } } - client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{StartTime: time.Now().Add(-1500 * time.Minute), Interval: "1m"}) + client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{ + StartTime: now.Add(-1500 * time.Minute), + EndTime: now, + Interval: "1m", + }) } func TestCoinsThreePages(t *testing.T) { + now := time.Now().Truncate(time.Minute) + buildDeps := func(t *testing.T, ctrl *gomock.Controller) client.CoinpaprikaServices { cs := mock.NewMockCoinsService(ctrl) ts := mock.NewMockTickersService(ctrl) @@ -109,13 +172,13 @@ func TestCoinsThreePages(t *testing.T) { t.Fatal(err) } - timeStamp1 := time.Now().Add(-2500 * time.Minute) + timeStamp1 := now.Add(-2500 * time.Minute) tick1.Timestamp = &timeStamp1 tt1 := []*coinpaprika.TickerHistorical{&tick1} ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-3000 * time.Minute).Truncate(time.Minute), - End: time.Now().Add(-2000 * time.Minute).Truncate(time.Minute), + Start: now.Add(-3000 * time.Minute), + End: now.Add(-2000 * time.Minute), Interval: "1m", }).Return(tt1, nil).Times(1) @@ -124,19 +187,19 @@ func TestCoinsThreePages(t *testing.T) { t.Fatal(err) } - timeStamp2 := time.Now().Add(-1500 * time.Minute) + timeStamp2 := now.Add(-1500 * time.Minute) tick2.Timestamp = &timeStamp2 tt2 := []*coinpaprika.TickerHistorical{&tick2} ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-2000 * time.Minute).Truncate(time.Minute), - End: time.Now().Add(-1000 * time.Minute).Truncate(time.Minute), + Start: now.Add(-2000 * time.Minute), + End: now.Add(-1000 * time.Minute), Interval: "1m", }).Return(tt2, nil).Times(1) ts.EXPECT().GetHistoricalTickersByID(*coin.ID, &coinpaprika.TickersHistoricalOptions{ - Start: time.Now().Add(-1000 * time.Minute).Truncate(time.Minute), - End: time.Now().Truncate(time.Minute), + Start: now.Add(-1000 * time.Minute), + End: now, Interval: "1m", }).Return(tt2, nil).Times(1) @@ -145,10 +208,16 @@ func TestCoinsThreePages(t *testing.T) { Tickers: ts, } } - client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{StartTime: time.Now().Add(-3000 * time.Minute), Interval: "1m"}) + client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{ + StartTime: now.Add(-3000 * time.Minute), + EndTime: now, + Interval: "1m", + }) } func TestCoinsWithBackend(t *testing.T) { + now := time.Now().Truncate(time.Hour) + buildDeps := func(t *testing.T, ctrl *gomock.Controller) client.CoinpaprikaServices { cs := mock.NewMockCoinsService(ctrl) ts := mock.NewMockTickersService(ctrl) @@ -167,7 +236,7 @@ func TestCoinsWithBackend(t *testing.T) { t.Fatal(err) } - timeStamp := time.Now().Add(-1 * time.Hour) + timeStamp := now.Add(-1 * time.Hour) tick.Timestamp = &timeStamp tt := []*coinpaprika.TickerHistorical{&tick} @@ -179,11 +248,12 @@ func TestCoinsWithBackend(t *testing.T) { } ctrl := gomock.NewController(t) mbe := mock.NewMockBackend(ctrl) - mbe.EXPECT().Get(gomock.Any(), gomock.Any(), "coinpaprika").Return(time.Now().Add(-2*time.Hour).Truncate(time.Hour).Format(time.RFC3339), nil) - mbe.EXPECT().Set(gomock.Any(), gomock.Any(), "coinpaprika", time.Now().Truncate(time.Hour).Format(time.RFC3339)).Return(nil) + mbe.EXPECT().Get(gomock.Any(), gomock.Any(), "coinpaprika").Return(now.Add(-2*time.Hour).Format(time.RFC3339), nil) + mbe.EXPECT().Set(gomock.Any(), gomock.Any(), "coinpaprika", now.Format(time.RFC3339)).Return(nil) client.MockTestHelper(t, CoinsTable(), buildDeps, client.TestOptions{ Backend: mbe, - StartTime: time.Now().Add(-4 * time.Hour), + StartTime: now.Add(-4 * time.Hour), + EndTime: now, Interval: "1h"}, ) } diff --git a/resources/services/coins/tickers.go b/resources/services/coins/tickers.go index 61785f7..4ce56d7 100644 --- a/resources/services/coins/tickers.go +++ b/resources/services/coins/tickers.go @@ -9,6 +9,7 @@ import ( "github.com/cloudquery/plugin-sdk/v2/transformers" "github.com/coinpaprika/coinpaprika-api-go-client/v2/coinpaprika" "github.com/coinpaprika/cq-source-coinpaprika/client" + "github.com/ryanuber/go-glob" ) const ( @@ -48,9 +49,10 @@ func TickersTable() *schema.Table { func fetchTickers(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error { c := parent.Item.(*coinpaprika.Coin) cl := meta.(*client.Client) - + if len(cl.Tickers) > 0 && !filterTickers(cl.Tickers, *c.ID) { + return nil + } startDate := cl.StartDate - key := fmt.Sprintf(stateKeyTpl, *c.ID) if cl.Backend != nil { value, err := cl.Backend.Get(ctx, key, cl.ID()) @@ -75,7 +77,7 @@ func fetchTickers(ctx context.Context, meta schema.ClientMeta, parent *schema.Re startDate = startDate.Truncate(interval) opt.Start = startDate - upTo := time.Now().Truncate(interval) + upTo := cl.EndDate.Truncate(interval) if upTo.Equal(startDate) { return nil @@ -103,6 +105,15 @@ func fetchTickers(ctx context.Context, meta schema.ClientMeta, parent *schema.Re return nil } +func filterTickers(tickers []string, id string) bool { + for _, t := range tickers { + if glob.Glob(t, id) { + return true + } + } + return false +} + type partition struct { start, end time.Time }