Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new Global Configs #10

Merged
Merged
15 changes: 10 additions & 5 deletions QuantConnect.Polygon.Tests/PolygonDataDownloaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,22 @@ public void DownloadsHistoricalData(Symbol symbol, Resolution resolution, TimeSp

[TestCaseSource(nameof(IndexHistoricalDataTestCases))]
[Explicit("This tests require a Polygon.io api key, requires internet and are long.")]
public void DownloadsIndexHistoricalData(Resolution resolution, TimeSpan period, TickType tickType, bool shouldBeEmpy)
public void DownloadsIndexHistoricalData(Resolution resolution, TimeSpan period, TickType tickType, bool shouldBeEmpty)
{
var symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA);
var request = PolygonHistoryTests.CreateHistoryRequest(symbol, resolution, tickType, period);

var parameters = new DataDownloaderGetParameters(symbol, resolution, request.StartTimeUtc, request.EndTimeUtc, tickType);
var data = _downloader.Get(parameters).ToList();
var data = _downloader.Get(parameters)?.ToList();

if (data == null)
{
Assert.Pass("History returns null result");
}
Romazes marked this conversation as resolved.
Show resolved Hide resolved

Log.Trace("Data points retrieved: " + data.Count);

if (shouldBeEmpy)
if (shouldBeEmpty)
{
Assert.That(data, Is.Empty);
}
Expand All @@ -86,9 +91,9 @@ public void DownloadsDataFromCanonicalOptionSymbol()
{
var symbol = Symbol.CreateCanonicalOption(Symbol.Create("SPY", SecurityType.Equity, Market.USA));
var parameters = new DataDownloaderGetParameters(symbol, Resolution.Hour,
new DateTime(2024, 01, 03), new DateTime(2024, 01, 04), TickType.Trade);
new DateTime(2024, 02, 22), new DateTime(2024, 02, 23), TickType.Trade);
using var downloader = new TestablePolygonDataDownloader();
var data = downloader.Get(parameters).ToList();
var data = downloader.Get(parameters)?.ToList();

Log.Trace("Data points retrieved: " + data.Count);

Expand Down
11 changes: 8 additions & 3 deletions QuantConnect.Polygon.Tests/PolygonHistoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ public void GetsIndexHistoricalData(Resolution resolution, TimeSpan period, Tick
{
var symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA);
var requests = new List<HistoryRequest> { CreateHistoryRequest(symbol, resolution, tickType, period) };
var history = _historyProvider.GetHistory(requests, TimeZones.Utc).ToList();
var history = _historyProvider.GetHistory(requests, TimeZones.Utc)?.ToList();

if (history == null)
{
Assert.Pass("History returns null result");
}
Romazes marked this conversation as resolved.
Show resolved Hide resolved

Log.Trace("Data points retrieved: " + history.Count);

Expand Down Expand Up @@ -223,9 +228,9 @@ public void ReturnsEmptyForUnsupportedSecurityTypeResolutionOrTickType(Symbol sy
{
using var historyProvider = new TestPolygonHistoryProvider();
var request = CreateHistoryRequest(symbol, resolution, tickType, TimeSpan.FromDays(100));
var history = historyProvider.GetHistory(request).ToList();
var history = historyProvider.GetHistory(request)?.ToList();

Assert.That(history, Is.Empty);
Assert.IsNull(history);
Romazes marked this conversation as resolved.
Show resolved Hide resolved
Assert.That(historyProvider.TestRestApiClient.ApiCallsCount, Is.EqualTo(0));
}

Expand Down
80 changes: 55 additions & 25 deletions QuantConnect.Polygon/PolygonDataDownloader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
* limitations under the License.
*/

using QuantConnect.Configuration;
using NodaTime;
Romazes marked this conversation as resolved.
Show resolved Hide resolved
using QuantConnect.Data;
using QuantConnect.Securities;
using QuantConnect.Util;
using QuantConnect.Securities;
using QuantConnect.Configuration;
using System.Collections.Concurrent;

namespace QuantConnect.Lean.DataSource.Polygon
Expand Down Expand Up @@ -55,7 +56,7 @@ public PolygonDataDownloader()
/// </summary>
/// <param name="parameters">Parameters for the historical data request</param>
/// <returns>Enumerable of base data for this symbol</returns>
public IEnumerable<BaseData> Get(DataDownloaderGetParameters parameters)
public IEnumerable<BaseData>? Get(DataDownloaderGetParameters parameters)
{
var symbol = parameters.Symbol;
var resolution = parameters.Resolution;
Expand All @@ -65,7 +66,7 @@ public IEnumerable<BaseData> Get(DataDownloaderGetParameters parameters)

if (endUtc < startUtc)
{
yield break;
return null;
Romazes marked this conversation as resolved.
Show resolved Hide resolved
}

var dataType = LeanData.GetDataType(resolution, tickType);
Expand All @@ -74,35 +75,64 @@ public IEnumerable<BaseData> Get(DataDownloaderGetParameters parameters)

if (symbol.IsCanonical())
{
using var dataQueue = new BlockingCollection<BaseData>();
var symbols = GetOptions(symbol, startUtc, endUtc);
return GetBaseData(symbol, startUtc, endUtc, dataType, resolution, exchangeHours, dataTimeZone, tickType);
}
else
{
var historyRequest = new HistoryRequest(startUtc, endUtc, dataType, symbol, resolution, exchangeHours, dataTimeZone, resolution,
true, false, DataNormalizationMode.Raw, tickType);

Task.Run(() => Parallel.ForEach(symbols, targetSymbol =>
{
var historyRequest = new HistoryRequest(startUtc, endUtc, dataType, targetSymbol, resolution, exchangeHours, dataTimeZone,
resolution, true, false, DataNormalizationMode.Raw, tickType);
foreach (var data in _historyProvider.GetHistory(historyRequest))
{
dataQueue.Add(data);
}
})).ContinueWith(_ =>
{
dataQueue.CompleteAdding();
});
var historyData = _historyProvider.GetHistory(historyRequest);

foreach (var data in dataQueue.GetConsumingEnumerable())
if (historyData == null)
{
yield return data;
return null;
}

return GetBaseData(historyData);
}
else
}

private IEnumerable<BaseData>? GetBaseData(Symbol symbol, DateTime startUtc, DateTime endUtc, Type dataType, Resolution resolution, SecurityExchangeHours exchangeHours, DateTimeZone dataTimeZone, TickType tickType)
Romazes marked this conversation as resolved.
Show resolved Hide resolved
{
var dataQueue = new BlockingCollection<BaseData>();
var symbols = GetOptions(symbol, startUtc, endUtc);

Task.Run(() => Parallel.ForEach(symbols, targetSymbol =>
{
var historyRequest = new HistoryRequest(startUtc, endUtc, dataType, symbol, resolution, exchangeHours, dataTimeZone, resolution,
true, false, DataNormalizationMode.Raw, tickType);
foreach (var data in _historyProvider.GetHistory(historyRequest))
var historyRequest = new HistoryRequest(startUtc, endUtc, dataType, targetSymbol, resolution, exchangeHours, dataTimeZone,
resolution, true, false, DataNormalizationMode.Raw, tickType);

var history = _historyProvider.GetHistory(historyRequest);

if (history == null)
{
yield return data;
return;
}

foreach (var data in history)
{
dataQueue.Add(data);
}
})).ContinueWith(_ =>
{
dataQueue.CompleteAdding();
});

// Validate: data is not null, we have gotten anything at least
if (!dataQueue.TryTake(out _, TimeSpan.FromSeconds(10)))
Romazes marked this conversation as resolved.
Show resolved Hide resolved
{
return null;
}

return GetBaseData(dataQueue.GetConsumingEnumerable());
}

private IEnumerable<BaseData> GetBaseData(IEnumerable<BaseData> baseData)
Romazes marked this conversation as resolved.
Show resolved Hide resolved
{
foreach (var data in baseData)
{
yield return data;
}
}

Expand Down
25 changes: 13 additions & 12 deletions QuantConnect.Polygon/PolygonDataProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,20 @@ public PolygonDataProvider(string apiKey, int maxSubscriptionsPerWebSocket, bool
return;
}

_apiKey = apiKey;

Initialize(maxSubscriptionsPerWebSocket, streamingEnabled);
Initialize(apiKey, maxSubscriptionsPerWebSocket, streamingEnabled);
}

/// <summary>
/// Initializes the data queue handler and validates the product subscription
/// </summary>
private void Initialize(int maxSubscriptionsPerWebSocket, bool streamingEnabled = true)
private void Initialize(string apiKey, int maxSubscriptionsPerWebSocket, bool streamingEnabled = true)
{
if (string.IsNullOrWhiteSpace(apiKey))
{
throw new PolygonAuthenticationException("History calls for Polygon.io require an API key.");
}
_apiKey = apiKey;

_initialized = true;
_dataAggregator = new PolygonAggregationManager();
RestApiClient = new PolygonRestApiClient(_apiKey);
Expand Down Expand Up @@ -175,16 +179,13 @@ public void SetJob(LiveNodePacket job)
throw new ArgumentException("The Polygon.io API key is missing from the brokerage data.");
}

_apiKey = apiKey;

var maxSubscriptionsPerWebSocket = 0;
if (!job.BrokerageData.TryGetValue("polygon-max-subscriptions-per-websocket", out var maxSubscriptionsPerWebSocketStr) ||
!int.TryParse(maxSubscriptionsPerWebSocketStr, out maxSubscriptionsPerWebSocket))
!int.TryParse(maxSubscriptionsPerWebSocketStr, out var maxSubscriptionsPerWebSocket))
{
maxSubscriptionsPerWebSocket = -1;
}
Romazes marked this conversation as resolved.
Show resolved Hide resolved

Initialize(maxSubscriptionsPerWebSocket);
Initialize(apiKey, maxSubscriptionsPerWebSocket);
}

/// <summary>
Expand Down Expand Up @@ -488,9 +489,9 @@ private static void ValidateSubscription()
try
{
const int productId = 306;
var userId = Config.GetInt("job-user-id");
var token = Config.Get("api-access-token");
var organizationId = Config.Get("job-organization-id", null);
var userId = Globals.UserId;
var token = Globals.UserToken;
var organizationId = Globals.OrganizationID;
// Verify we can authenticate with this user and token
var api = new ApiConnection(userId, token);
if (!api.Connected)
Expand Down
37 changes: 24 additions & 13 deletions QuantConnect.Polygon/PolygonHistoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,24 @@ public override void Initialize(HistoryProviderInitializeParameters parameters)
/// <param name="requests">The historical data requests</param>
/// <param name="sliceTimeZone">The time zone used when time stamping the slice instances</param>
/// <returns>An enumerable of the slices of data covering the span specified in each request</returns>
public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
public override IEnumerable<Slice>? GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
{
var subscriptions = new List<Subscription>();
foreach (var request in requests)
{
var history = GetHistory(request);
if (history == null)
{
continue;
}
var subscription = CreateSubscription(request, history);
subscriptions.Add(subscription);
}

if (subscriptions.Count == 0)
{
return null;
}
return CreateSliceEnumerableFromSubscriptions(subscriptions, sliceTimeZone);
}

Expand All @@ -66,39 +74,41 @@ public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> reques
/// </summary>
/// <param name="request">The historical data request</param>
/// <returns>An enumerable of BaseData points</returns>
public IEnumerable<BaseData> GetHistory(HistoryRequest request)
public IEnumerable<BaseData>? GetHistory(HistoryRequest request)
{
if (string.IsNullOrWhiteSpace(_apiKey))
{
throw new PolygonAuthenticationException("History calls for Polygon.io require an API key.");
}

if (request.Symbol.IsCanonical() ||
!IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution))
{
yield break;
// It is Logged in IsSupported(...)
return null;
}

// Quote data can only be fetched from Polygon from their Quote Tick endpoint,
// which would be too slow for anything above second resolution or long time spans.
if (request.TickType == TickType.Quote && request.Resolution > Resolution.Second)
{
Log.Error("PolygonDataProvider.GetHistory(): Quote data above second resolution is not supported.");
yield break;
return null;
}

// Use the trade aggregates API for resolutions above tick for fastest results
if (request.TickType == TickType.Trade && request.Resolution > Resolution.Tick)
{
foreach (var data in GetAggregates(request))
var data = GetAggregates(request);

if (data == null)
{
Interlocked.Increment(ref _dataPointCount);
yield return data;
return null;
}

yield break;
return data;
}

return GetHistoryThroughDataConsolidator(request);
}

private IEnumerable<BaseData>? GetHistoryThroughDataConsolidator(HistoryRequest request)
{
IDataConsolidator consolidator;
IEnumerable<BaseData> history;

Expand Down Expand Up @@ -160,6 +170,7 @@ private IEnumerable<TradeBar> GetAggregates(HistoryRequest request)
var utcTime = Time.UnixMillisecondTimeStampToDateTime(bar.Timestamp);
var time = GetTickTime(request.Symbol, utcTime);

Interlocked.Increment(ref _dataPointCount);
yield return new TradeBar(time, request.Symbol, bar.Open, bar.High, bar.Low, bar.Close,
bar.Volume, resolutionTimeSpan);
}
Expand Down