From 3d0f80ea2fca4593b2ce7a7d0629a4f186974167 Mon Sep 17 00:00:00 2001 From: daniel <4954577+jaensen@users.noreply.github.com> Date: Tue, 4 Jun 2024 01:43:22 +0200 Subject: [PATCH 1/5] Add subscription support for circles events --- Circles.Index.Rpc/Circles.Index.Rpc.csproj | 1 + Circles.Index.Rpc/CirclesRpcModule.cs | 6 + Circles.Index.Rpc/CirclesSubscription.cs | 103 ++++++++++++++ .../CirclesSubscriptionParams.cs | 32 +++++ Circles.Index.Rpc/ICirclesRpcModule.cs | 14 +- Circles.Index.Rpc/QueryEvents.cs | 133 ++++++++++++++++++ Circles.Index/Plugin.cs | 23 ++- Circles.Index/StateMachine.cs | 26 ++++ Circles.Index/tools/test_ws.html | 101 +++++++++++++ docker-compose.gnosis.yml | 3 +- general-example-requests.md | 83 ++++++++++- 11 files changed, 516 insertions(+), 9 deletions(-) create mode 100644 Circles.Index.Rpc/CirclesSubscription.cs create mode 100644 Circles.Index.Rpc/CirclesSubscriptionParams.cs create mode 100644 Circles.Index.Rpc/QueryEvents.cs create mode 100644 Circles.Index/tools/test_ws.html diff --git a/Circles.Index.Rpc/Circles.Index.Rpc.csproj b/Circles.Index.Rpc/Circles.Index.Rpc.csproj index 73e7ea9..71884e0 100644 --- a/Circles.Index.Rpc/Circles.Index.Rpc.csproj +++ b/Circles.Index.Rpc/Circles.Index.Rpc.csproj @@ -7,6 +7,7 @@ + diff --git a/Circles.Index.Rpc/CirclesRpcModule.cs b/Circles.Index.Rpc/CirclesRpcModule.cs index 9a1a3cc..01e14cb 100644 --- a/Circles.Index.Rpc/CirclesRpcModule.cs +++ b/Circles.Index.Rpc/CirclesRpcModule.cs @@ -183,6 +183,12 @@ public ResultWrapper circles_query(SelectDto query) return ResultWrapper.Success(result); } + public ResultWrapper circles_events(Address address, long fromBlock, long? toBlock = null) + { + var queryEvents = new QueryEvents(_indexerContext); + return ResultWrapper.Success(queryEvents.CirclesEvents(address, fromBlock, toBlock)); + } + #region private methods private IEnumerable
TokenAddressesForAccount(Address circlesAccount) diff --git a/Circles.Index.Rpc/CirclesSubscription.cs b/Circles.Index.Rpc/CirclesSubscription.cs new file mode 100644 index 0000000..45efe6a --- /dev/null +++ b/Circles.Index.Rpc/CirclesSubscription.cs @@ -0,0 +1,103 @@ +using Circles.Index.Common; +using Nethermind.JsonRpc; +using Nethermind.JsonRpc.Modules.Subscribe; + +namespace Circles.Index.Rpc; + +public class NotifyEventArgs(CirclesEvent[] events) : EventArgs +{ + public CirclesEvent[] Events { get; } = events; +} + +public class CirclesSubscription : Subscription +{ + public override string Type => "circles"; + + private readonly CirclesSubscriptionParams _param; + + public static long SubscriberCount => _subscriberCount; + private static long _subscriberCount; + + public CirclesSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, CirclesSubscriptionParams param) : base( + jsonRpcDuplexClient) + { + Notification += OnNotification; + _param = param; + + Interlocked.Increment(ref _subscriberCount); + } + + public static event EventHandler? Notification; + + public static void Notify(Context context, Range importedRange) + { + if (_subscriberCount == 0) + { + return; + } + + var queryEvents = new QueryEvents(context); + var events = queryEvents.CirclesEvents(null, importedRange.Min, importedRange.Max); + + Notification?.Invoke(null, new NotifyEventArgs(events)); + } + + private void OnNotification(object? sender, NotifyEventArgs e) + { + ScheduleAction(async () => + { + CirclesEvent[] events; + + if (_param.Address != null) + { + var filterAddress = _param.Address!.ToString(true, false); + events = FilterForAffectedAddress(e, filterAddress); + } + else + { + events = e.Events; + } + + JsonRpcResult result = CreateSubscriptionMessage(events); + await JsonRpcDuplexClient.SendJsonRpcResult(result); + }); + } + + private CirclesEvent[] FilterForAffectedAddress(NotifyEventArgs e, string filterAddress) + { + var filteredForAddress = new List(); + var addressesInEvent = new HashSet(); + + foreach (var circlesEvent in e.Events) + { + addressesInEvent.Clear(); + + foreach (var circlesEventValue in circlesEvent.Values) + { + if (!QueryEvents.AddressColumns.Contains(circlesEventValue.Key)) + { + continue; + } + + if (circlesEventValue.Value is string address) + { + addressesInEvent.Add(address); + } + } + + if (addressesInEvent.Contains(filterAddress)) + { + filteredForAddress.Add(circlesEvent); + } + } + + return filteredForAddress.ToArray(); + } + + public override void Dispose() + { + base.Dispose(); + + Interlocked.Decrement(ref _subscriberCount); + } +} \ No newline at end of file diff --git a/Circles.Index.Rpc/CirclesSubscriptionParams.cs b/Circles.Index.Rpc/CirclesSubscriptionParams.cs new file mode 100644 index 0000000..e4e9ed3 --- /dev/null +++ b/Circles.Index.Rpc/CirclesSubscriptionParams.cs @@ -0,0 +1,32 @@ +using System.Text.Json; +using Nethermind.Core; +using Nethermind.JsonRpc; + +namespace Circles.Index.Rpc; + +public class CirclesSubscriptionParams : IJsonRpcParam +{ + public Address? Address { get; private set; } + + public void ReadJson(JsonElement element, JsonSerializerOptions options) + { + JsonDocument? doc = null; + try + { + if (element.ValueKind == JsonValueKind.String) + { + doc = JsonDocument.Parse(element.GetString() ?? "{}"); + element = doc.RootElement; + } + + if (element.TryGetProperty("address", out JsonElement addressElement)) + { + Address = Address.TryParse(addressElement.GetString(), out Address? address) ? address : null; + } + } + finally + { + doc?.Dispose(); + } + } +} \ No newline at end of file diff --git a/Circles.Index.Rpc/ICirclesRpcModule.cs b/Circles.Index.Rpc/ICirclesRpcModule.cs index 58a2917..13ec046 100644 --- a/Circles.Index.Rpc/ICirclesRpcModule.cs +++ b/Circles.Index.Rpc/ICirclesRpcModule.cs @@ -10,12 +10,15 @@ namespace Circles.Index.Rpc; #region DTOs public record CirclesTokenBalance(Address Token, string Balance, string TokenOwner); + public record CirclesTokenBalanceV2(UInt256 TokenId, string Balance, string TokenOwner); public record CirclesTrustRelation(Address User, int limit); public record CirclesTrustRelations(Address User, CirclesTrustRelation[] Trusts, CirclesTrustRelation[] TrustedBy); +public record CirclesEvent(string Event, IDictionary Values); + #endregion [RpcModule("Circles")] @@ -31,15 +34,20 @@ public interface ICirclesRpcModule : IRpcModule [JsonRpcMethod(Description = "Gets the balance of each V1 Circles token the specified address holds", IsImplemented = true)] Task> circles_getTokenBalances(Address address, bool asTimeCircles = false); - + [JsonRpcMethod(Description = "Gets the V2 Circles balance of the specified address", IsImplemented = true)] Task> circlesV2_getTotalBalance(Address address, bool asTimeCircles = false); - + [JsonRpcMethod(Description = "Gets the balance of each V2 Circles token the specified address holds", IsImplemented = true)] - Task> circlesV2_getTokenBalances(Address address, bool asTimeCircles = false); + Task> + circlesV2_getTokenBalances(Address address, bool asTimeCircles = false); [JsonRpcMethod(Description = "Queries the data of one Circles index table", IsImplemented = true)] ResultWrapper circles_query(SelectDto query); + + [JsonRpcMethod(Description = "Returns all events affecting the specified account since block N", + IsImplemented = true)] + ResultWrapper circles_events(Address address, long fromBlock, long? toBlock = null); } \ No newline at end of file diff --git a/Circles.Index.Rpc/QueryEvents.cs b/Circles.Index.Rpc/QueryEvents.cs new file mode 100644 index 0000000..33dade1 --- /dev/null +++ b/Circles.Index.Rpc/QueryEvents.cs @@ -0,0 +1,133 @@ +using System.Collections.Concurrent; +using System.Collections.Immutable; +using Circles.Index.Common; +using Circles.Index.Query; +using Nethermind.Core; + +namespace Circles.Index.Rpc; + +public class QueryEvents(Context context) +{ + public static readonly ImmutableHashSet AddressColumns = new HashSet + { + "user", "avatar", "organization", "from", "to", "canSendTo", "account", "group", "human", "invited", + "inviter", "truster", "trustee" + }.ToImmutableHashSet(); + + public CirclesEvent[] CirclesEvents(Address? address, long fromBlock, long? toBlock = null) + { + long currentHead = context.NethermindApi.BlockTree?.Head?.Number + ?? throw new Exception("BlockTree or Head is null"); + + string? addressString = address?.ToString(true, false); + + ValidateInputs(addressString, fromBlock, toBlock, currentHead); + + var queries = BuildQueries(addressString, fromBlock, toBlock); + + var events = ExecuteQueries(queries); + + var sortedEvents = SortEvents(events); + + return sortedEvents; + } + + private void ValidateInputs(string? address, long fromBlock, long? toBlock, long currentHead) + { + if (address == "0x0000000000000000000000000000000000000000") + throw new Exception("The zero address cannot be queried."); + + if (fromBlock < 0) + throw new Exception("The fromBlock parameter must be greater than or equal to 0."); + + if (toBlock.HasValue && toBlock.Value < fromBlock) + throw new Exception("The toBlock parameter must be greater than or equal to fromBlock."); + + if (toBlock.HasValue && toBlock.Value > currentHead) + throw new Exception( + "The toBlock parameter must be less than or equal to the current head. Leave it empty to query all blocks until the current head."); + } + + private List(); + + foreach (var table in context.Database.Schema.Tables) + { + if (table.Key.Namespace.StartsWith("V_") || table.Key.Namespace == "System") + continue; + + var addressColumnFilters = address == null + ? [] + : table.Value.Columns + .Where(column => AddressColumns.Contains(column.Column)) + .Select(column => new FilterPredicate(column.Column, FilterType.Equals, address)) + .Cast() + .ToList(); + + if (addressColumnFilters.Count == 0) + continue; + + var filters = new List + { + new FilterPredicate("blockNumber", FilterType.GreaterThanOrEquals, fromBlock), + new Conjunction(ConjunctionType.Or, addressColumnFilters.ToArray()) + }; + + if (toBlock.HasValue) + { + filters.Add(new FilterPredicate("blockNumber", FilterType.LessThanOrEquals, toBlock.Value)); + } + + queries.Add(new Select(table.Key.Namespace, table.Key.Table, Array.Empty(), + new[] { new Conjunction(ConjunctionType.And, filters.ToArray()) }, + new[] + { + new OrderBy("blockNumber", "ASC"), new OrderBy("transactionIndex", "ASC"), + new OrderBy("logIndex", "ASC") + }, + null, true, int.MaxValue)); + } + + return queries; + } + + private ConcurrentDictionary<(long BlockNo, long TransactionIndex, long LogIndex), CirclesEvent> ExecuteQueries( + List BuildQueries(string? address, long fromBlock, long? toBlock .Cast() .ToList(); - if (addressColumnFilters.Count == 0) - continue; - var filters = new List { new FilterPredicate("blockNumber", FilterType.GreaterThanOrEquals, fromBlock), - new Conjunction(ConjunctionType.Or, addressColumnFilters.ToArray()) }; + if (addressColumnFilters.Count > 0) + { + if (addressColumnFilters.Count == 1) + { + filters.Add(addressColumnFilters[0]); + } + else + { + addressColumnFilters.Add(new Conjunction(ConjunctionType.Or, addressColumnFilters.ToArray())); + } + } + if (toBlock.HasValue) { filters.Add(new FilterPredicate("blockNumber", FilterType.LessThanOrEquals, toBlock.Value)); } queries.Add(new Select(table.Key.Namespace, table.Key.Table, Array.Empty(), - new[] { new Conjunction(ConjunctionType.And, filters.ToArray()) }, + filters.Count > 1 + ? new[] { new Conjunction(ConjunctionType.And, filters.ToArray()) } + : filters, new[] { new OrderBy("blockNumber", "ASC"), new OrderBy("transactionIndex", "ASC"), diff --git a/Circles.Index/Plugin.cs b/Circles.Index/Plugin.cs index 8d0b6c6..0550da4 100644 --- a/Circles.Index/Plugin.cs +++ b/Circles.Index/Plugin.cs @@ -207,7 +207,7 @@ public async Task InitRpcModules() getFromAPi.SubscriptionFactory.RegisterSubscriptionType( "circles", - (client, param) => new CirclesSubscription(client, param)); + (client, param) => new CirclesSubscription(client, _indexerContext, param)); } public ValueTask DisposeAsync() diff --git a/Circles.Index/tools/test_ws.html b/Circles.Index/tools/test_ws.html index 1a436ef..9d76348 100644 --- a/Circles.Index/tools/test_ws.html +++ b/Circles.Index/tools/test_ws.html @@ -89,7 +89,7 @@

WebSocket Circles Event Subscription

try { await wsConnection.connect(); log('Connected to websocket...'); - const subscriptionId = await wsConnection.subscribe('circles', JSON.stringify({"address": "0x0000000000000000000000000000000000000000"}), (event) => { + const subscriptionId = await wsConnection.subscribe('circles', JSON.stringify({}), (event) => { log(`Circles event: ${JSON.stringify(event)}`); }); log(`Subscribed with ID: ${subscriptionId}`); From 55752d32431a84f90b48b6492d8aa8c4f5686f72 Mon Sep 17 00:00:00 2001 From: daniel <4954577+jaensen@users.noreply.github.com> Date: Tue, 4 Jun 2024 04:10:41 +0200 Subject: [PATCH 3/5] fix addressColumnFilters in QueryEvents.cs --- Circles.Index.Rpc/QueryEvents.cs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/Circles.Index.Rpc/QueryEvents.cs b/Circles.Index.Rpc/QueryEvents.cs index 0ffcae7..0a933f6 100644 --- a/Circles.Index.Rpc/QueryEvents.cs +++ b/Circles.Index.Rpc/QueryEvents.cs @@ -72,14 +72,9 @@ private List