Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ratelimiter and endDate and debug for coinpaprika logger #5

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,61 @@ A Coinpaprika source plugin for CloudQuery that loads data from Coinpaprika API

The following source configuration file will sync to a PostgreSQL database. See [the CloudQuery Quickstart](https://www.cloudquery.io/docs/quickstart) for more information on how to configure the source and destination.

```yaml
kind: source
spec:
name: "coinpaprika"
path: "coinpaprika/coinpaprika"
version: "${VERSION}"
concurrency: 100
backend: local
destinations:
- "postgresql"
spec:
start_date: "2023-05-15T08:00:00Z"
interval: "1h"
access_token: "${COINPAPRIKA_API_TOKEN}"
```

| Spec fields | Description | Optional |
|--------------|----------------------------------------------------------------------------------------------------------------------------|----------|
| start_date | Starting date for synchronizing data | NO |
| interval | Intervals for historic data [possible values](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById) | NO |
| access_token | Coinpaprika API token | YES |

The Coinpaprika plugin supports incremental syncing for historical tickers, only new tickers will be fetched. This is done by storing last timestamp of fetched ticker in Cloudquery backed. To enable this, `backend` option must be set in the spec.
1. With api token rate limited for `Bussines` plan (3 000 000 calls/month). Only bitcoin tickers.
```yaml
kind: source
spec:
name: "coinpaprika"
path: "coinpaprika/coinpaprika"
version: "1.0.0"
backend: local
destinations:
- "postgresql"
spec:
start_date: "2023-05-15T08:00:00Z"
interval: 5m
access_token: "${COINPAPRIKA_API_TOKEN}"
api_debug: true
rate_duration: 720h
rate_number: 3000000
tickers:
["*-bitcoin"]
```
2. Without token, `Free` plan (25 000 calls/month) minimal interval 1h, see [available history range depending on the selected API plan](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById). Only bitcoin tickers.

```yaml
kind: source
spec:
name: "coinpaprika"
path: "coinpaprika/coinpaprika"
version: "1.0.0"
backend: local
destinations:
- "postgresql"
spec:
start_date: "2023-05-15T08:00:00Z"
interval: 1h
rate_duration: 720h
rate_number: 25000
tickers:
["*-bitcoin"]
```

| Spec fields | Description | Default value | Optional |
| ------------- | -------------------------------------------------------------------------------------------------------------------------- | ------------- | -------- |
| start_date | Start date for synchronizing data in RFC3339 format. | | NO |
| end_date | End date for synchronizing data in RFC3339 format. | NOW | YES |
| interval | Intervals for historic data [possible values](https://api.coinpaprika.com/#tag/Tickers/operation/getTickersHistoricalById) | | NO |
| access_token | Coinpaprika API token. | | YES |
| api_debug | Enable request log. | false | YES |
| rate_duration | Unit of rate in time of request rate, go duration format. | 30 | YES |
| rate_number | Number of request in `rate_duration`. | 30 | YES |
| tickers | list of globe pattern ticker ids to synchronize. | * | YES |



The Coinpaprika plugin supports incremental syncing for historical tickers, only new tickers will be fetched. This is done by storing last timestamp of fetched ticker in CloudQuery backend. To enable this, `backend` option must be set in the spec.

Due to large number of coins and tickers in Coinpaprika, consider to limit `concurrency` accordingly to machine spec. Good starting point is 100.

## Development

Expand Down
31 changes: 22 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/cloudquery/plugin-sdk/v2/plugins/source"
"github.com/cloudquery/plugin-sdk/v2/schema"
"github.com/coinpaprika/coinpaprika-api-go-client/v2/coinpaprika"
"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog"
)

Expand All @@ -18,7 +17,9 @@ type Client struct {
CoinpaprikaClient CoinpaprikaServices
Backend Backend
StartDate time.Time
EndDate time.Time
Interval string
Tickers []string
}

func (c *Client) ID() string {
Expand All @@ -41,14 +42,24 @@ func New(ctx context.Context, logger zerolog.Logger, s specs.Source, opts source
return nil, fmt.Errorf("failed to parse startDate from spec: %w", err)
}

retryClient := retryablehttp.NewClient()
retryClient.HTTPClient.Timeout = 5 * time.Second
retryClient.RetryMax = 3
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 5 * time.Second
retryClient.Logger = nil

cc := coinpaprika.NewClient(retryClient.StandardClient(), cOpts...)
endDate := time.Now()
if pluginSpec.EndDate != "" {
endDate, err = time.Parse(time.RFC3339, pluginSpec.EndDate)
if err != nil {
return nil, fmt.Errorf("failed to parse startDate from spec: %w", err)
}
}
rateNumber := 30
rateDuration := time.Second
if pluginSpec.RateDuration != "" && pluginSpec.RateNumber != 0 {
rateNumber = pluginSpec.RateNumber
rateDuration, err = time.ParseDuration(pluginSpec.RateDuration)
if err != nil {
return nil, fmt.Errorf("failed to parse rate duration from spec: %w", err)
}
}

cc := coinpaprika.NewClient(NewHttpClient(logger, pluginSpec.ApiDebug, rateNumber, rateDuration), cOpts...)

return &Client{
Logger: logger,
Expand All @@ -59,6 +70,8 @@ func New(ctx context.Context, logger zerolog.Logger, s specs.Source, opts source
},
Backend: opts.Backend,
StartDate: startDate,
EndDate: endDate,
Interval: pluginSpec.Interval,
Tickers: pluginSpec.Tickers,
}, nil
}
55 changes: 55 additions & 0 deletions client/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package client

import (
"net/http"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/rs/zerolog"
"go.uber.org/ratelimit"
)

func NewHttpClient(logger zerolog.Logger, debug bool, rateNumber int, rateDuration time.Duration) *http.Client {
retryClient := retryablehttp.NewClient()
retryClient.HTTPClient.Timeout = 5 * time.Second
retryClient.RetryMax = 3
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 5 * time.Second
retryClient.Logger = nil

if debug {
retryClient.RequestLogHook = func(l retryablehttp.Logger, r *http.Request, i int) {
logger.Info().
Str("method", r.Method).
Str("url", r.URL.String()).
Int("attempt", i).
Msg("Send request")
}
}

client := retryClient.StandardClient()
client.Transport = newLimitedTransport(logger, client.Transport, rateNumber, rateDuration)
return client
}

type transport struct {
logger zerolog.Logger
wrappedRT http.RoundTripper
limiter ratelimit.Limiter
}

func newLimitedTransport(logger zerolog.Logger, t http.RoundTripper, rateNumber int, rateDuration time.Duration) http.RoundTripper {
if t == nil {
t = http.DefaultTransport
}
return &transport{
logger: logger,
wrappedRT: t,
limiter: ratelimit.New(rateNumber, ratelimit.Per(rateDuration)),
}
}

func (t *transport) RoundTrip(r *http.Request) (*http.Response, error) {
t.limiter.Take()
return t.wrappedRT.RoundTrip(r)
}
11 changes: 8 additions & 3 deletions client/spec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package client

type Spec struct {
AccessToken string `json:"access_token"`
StartDate string `json:"start_date"`
Interval string `json:"interval"`
AccessToken string `json:"access_token"`
StartDate string `json:"start_date"`
EndDate string `json:"end_date"`
Interval string `json:"interval"`
Tickers []string `json:"tickers"`
ApiDebug bool `json:"api_debug"`
RateNumber int `json:"rate_number"`
RateDuration string `json:"rate_duration"`
}
4 changes: 4 additions & 0 deletions client/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
type TestOptions struct {
Backend backend.Backend
StartTime time.Time
EndTime time.Time
Interval string
Tickers []string
}

func MockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T, *gomock.Controller) CoinpaprikaServices, opts TestOptions) {
Expand All @@ -35,7 +37,9 @@ func MockTestHelper(t *testing.T, table *schema.Table, builder func(*testing.T,
CoinpaprikaClient: builder(t, ctrl),
Backend: opts.Backend,
StartDate: opts.StartTime,
EndDate: opts.EndTime,
Interval: opts.Interval,
Tickers: opts.Tickers,
}, nil
}
p := source.NewPlugin(
Expand Down
56 changes: 28 additions & 28 deletions docs/tables/coinpaprika_coins.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@ The following tables depend on coinpaprika_coins:

## Columns

| Name | Type |
|--------------------|-----------|
| _cq_source_name | String |
| _cq_sync_time | Timestamp |
| _cq_id | UUID |
| _cq_parent_id | UUID |
| id (PK) | String |
| name | String |
| symbol | String |
| rank | Int |
| is_new | Bool |
| is_active | Bool |
| type | String |
| parent | JSON |
| open_source | Bool |
| hardware_wallet | Bool |
| description | String |
| message | String |
| started_at | String |
| development_status | String |
| proof_type | String |
| org_structure | String |
| hash_algorithm | String |
| whitepaper | JSON |
| links | JSON |
| links_extended | JSON |
| tags | JSON |
| team | JSON |
| Name | Type |
| ------------- | ------------- |
|_cq_source_name|String|
|_cq_sync_time|Timestamp|
|_cq_id|UUID|
|_cq_parent_id|UUID|
|id (PK)|String|
|name|String|
|symbol|String|
|rank|Int|
|is_new|Bool|
|is_active|Bool|
|type|String|
|parent|JSON|
|open_source|Bool|
|hardware_wallet|Bool|
|description|String|
|message|String|
|started_at|String|
|development_status|String|
|proof_type|String|
|org_structure|String|
|hash_algorithm|String|
|whitepaper|JSON|
|links|JSON|
|links_extended|JSON|
|tags|JSON|
|team|JSON|
48 changes: 24 additions & 24 deletions docs/tables/coinpaprika_exchanges.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,27 @@ The primary key for this table is **_cq_id**.

## Columns

| Name | Type |
|---------------------------|-----------|
| _cq_source_name | String |
| _cq_sync_time | Timestamp |
| _cq_id (PK) | UUID |
| _cq_parent_id | UUID |
| id | String |
| name | String |
| message | String |
| description | String |
| active | Bool |
| website_status | Bool |
| api_status | Bool |
| markets_data_fetched | Bool |
| rank | Int |
| adjusted_rank | Int |
| reported_rank | Int |
| currencies | Int |
| markets | Int |
| adjusted_volume_24h_share | Float |
| fiats | JSON |
| quotes | JSON |
| links | JSON |
| last_updated | String |
| Name | Type |
| ------------- | ------------- |
|_cq_source_name|String|
|_cq_sync_time|Timestamp|
|_cq_id (PK)|UUID|
|_cq_parent_id|UUID|
|id|String|
|name|String|
|message|String|
|description|String|
|active|Bool|
|website_status|Bool|
|api_status|Bool|
|markets_data_fetched|Bool|
|rank|Int|
|adjusted_rank|Int|
|reported_rank|Int|
|currencies|Int|
|markets|Int|
|adjusted_volume_24h_share|Float|
|fiats|JSON|
|quotes|JSON|
|links|JSON|
|last_updated|String|
22 changes: 11 additions & 11 deletions docs/tables/coinpaprika_tickers.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ This table depends on [coinpaprika_coins](coinpaprika_coins.md).

## Columns

| Name | Type |
|----------------------------------|-----------|
| _cq_source_name | String |
| _cq_sync_time | Timestamp |
| _cq_id | UUID |
| _cq_parent_id | UUID |
| id (PK) | String |
| timestamp (PK) (Incremental Key) | String |
| price | Float |
| volume_24h | Float |
| market_cap | Float |
| Name | Type |
| ------------- | ------------- |
|_cq_source_name|String|
|_cq_sync_time|Timestamp|
|_cq_id|UUID|
|_cq_parent_id|UUID|
|id (PK)|String|
|timestamp (PK) (Incremental Key)|String|
|price|Float|
|volume_24h|Float|
|market_cap|Float|
Loading