-
Notifications
You must be signed in to change notification settings - Fork 0
/
LiveSource.cs
132 lines (110 loc) · 5.05 KB
/
LiveSource.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
using System;
using System.Threading.Tasks;
using Akka;
using Akka.IO;
using Akka.Streams.Dsl;
using Akka.Util;
using CirclesLand.BlockchainIndexer.Util;
using Dapper;
using Nethereum.BlockchainProcessing.BlockStorage.Entities.Mapping;
using Nethereum.Hex.HexTypes;
using Nethereum.JsonRpc.Client.Streaming;
using Nethereum.JsonRpc.WebSocketStreamingClient;
using Nethereum.RPC.Eth.DTOs;
using Nethereum.RPC.Eth.Subscriptions;
using Nethereum.Web3;
using Newtonsoft.Json;
using Npgsql;
using Prometheus;
namespace CirclesLand.BlockchainIndexer.Sources;
public class LiveSource
{
public static async Task<Source<HexBigInteger, NotUsed>> Create(string connectionString, string rpcUrl, long lastPersistedBlock)
{
var catchingUp = true;
return Source.UnfoldAsync(new HexBigInteger(0), async lastBlock =>
{
await using var dbConnection = new NpgsqlConnection(connectionString);
dbConnection.Open();
var web3 = new Web3(rpcUrl);
while (catchingUp)
{
try
{
// Determine if we need to catch up (database old)
var mostRecentBlock = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync();;
if (lastBlock.Value == 0)
{
lastBlock = new HexBigInteger(
lastPersistedBlock == 0 ? Settings.StartFromBlock : lastPersistedBlock);
}
if (mostRecentBlock.ToLong() > lastPersistedBlock && mostRecentBlock.Value > lastBlock.Value)
{
var nextBlockToIndex = lastBlock.Value + 1;
Console.WriteLine($"Catching up block: {nextBlockToIndex}");
SourceMetrics.BlocksEmitted.WithLabels("live").Inc();
return new Option<(HexBigInteger, HexBigInteger)>((new HexBigInteger(nextBlockToIndex),
new HexBigInteger(nextBlockToIndex)));
}
catchingUp = false;
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
if (ex.StackTrace != null) Logger.LogError(ex.StackTrace);
throw;
}
}
await dbConnection.CloseAsync();
using var client = new StreamingWebSocketClient(Settings.RpcWsEndpointUrl);
var subscription = new EthNewBlockHeadersSubscription(client);
var completionSource = new TaskCompletionSource<HexBigInteger>(TaskCreationOptions.None);
#pragma warning disable CS4014
Task.Delay(TimeSpan.FromSeconds(20))
.ContinueWith(_ =>
#pragma warning restore CS4014
{
if (completionSource.Task.IsCompleted)
{
return;
}
completionSource.SetException(new TimeoutException("Received no new block from the LiveSource for 20 sec."));
});
var handler = new EventHandler<StreamingEventArgs<Block>>((sender, e) =>
{
if (e.Exception != null)
{
completionSource.SetException(e.Exception);
}
else
{
var utcTimestamp = DateTimeOffset.FromUnixTimeSeconds((long) e.Response.Timestamp.Value);
Console.WriteLine(
$"New Block: Number: {e.Response.Number.Value}, " +
$"Timestamp: {JsonConvert.SerializeObject(utcTimestamp)}, " +
$"Server time: {JsonConvert.SerializeObject(DateTime.Now.ToUniversalTime())}");
completionSource.SetResult(new HexBigInteger(e.Response.Number.HexValue));
}
});
var errorHandler = new WebSocketStreamingErrorEventHandler((sender, exception) =>
{
Logger.LogError("RPC client websocket connection closed." + exception.Message);
completionSource.SetException(exception);
});
subscription.SubscriptionDataResponse += handler;
client.Error += errorHandler;
await client.StartAsync();
await subscription.SubscribeAsync();
var currentBlock = await completionSource.Task;
Statistics.TrackBlockEnter(currentBlock.ToLong());
subscription.SubscriptionDataResponse -= handler;
client.Error -= errorHandler;
if (currentBlock.Value - 1 > lastBlock.Value)
{
throw new Exception($"The live source missed at least one block. Current block: {currentBlock.Value}; Last block: {lastBlock.Value}");
}
SourceMetrics.BlocksEmitted.WithLabels("live").Inc();
return new Option<(HexBigInteger, HexBigInteger)>((currentBlock, currentBlock));
});
}
}