diff --git a/Circles.Index.Rpc/Circles.Index.Rpc.csproj b/Circles.Index.Rpc/Circles.Index.Rpc.csproj index 73e7ea9..71884e0 100644 --- a/Circles.Index.Rpc/Circles.Index.Rpc.csproj +++ b/Circles.Index.Rpc/Circles.Index.Rpc.csproj @@ -7,6 +7,7 @@ + diff --git a/Circles.Index.Rpc/CirclesRpcModule.cs b/Circles.Index.Rpc/CirclesRpcModule.cs index 9a1a3cc..01e14cb 100644 --- a/Circles.Index.Rpc/CirclesRpcModule.cs +++ b/Circles.Index.Rpc/CirclesRpcModule.cs @@ -183,6 +183,12 @@ public ResultWrapper circles_query(SelectDto query) return ResultWrapper.Success(result); } + public ResultWrapper circles_events(Address address, long fromBlock, long? toBlock = null) + { + var queryEvents = new QueryEvents(_indexerContext); + return ResultWrapper.Success(queryEvents.CirclesEvents(address, fromBlock, toBlock)); + } + #region private methods private IEnumerable
TokenAddressesForAccount(Address circlesAccount) diff --git a/Circles.Index.Rpc/CirclesSubscription.cs b/Circles.Index.Rpc/CirclesSubscription.cs new file mode 100644 index 0000000..b90759a --- /dev/null +++ b/Circles.Index.Rpc/CirclesSubscription.cs @@ -0,0 +1,130 @@ +using Circles.Index.Common; +using Circles.Index.Query; +using Nethermind.Core; +using Nethermind.JsonRpc; +using Nethermind.JsonRpc.Modules.Subscribe; + +namespace Circles.Index.Rpc; + +public class NotifyEventArgs(CirclesEvent[] events) : EventArgs +{ + public CirclesEvent[] Events { get; } = events; +} + +public class CirclesSubscription : Subscription +{ + public override string Type => "circles"; + + private readonly CirclesSubscriptionParams _param; + + public static long SubscriberCount => _subscriberCount; + private static long _subscriberCount; + + public CirclesSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, Context context, + CirclesSubscriptionParams param) : base( + jsonRpcDuplexClient) + { + Notification += OnNotification; + _param = param; + + if (param.Address == Address.Zero) + { + throw new Exception("The zero address cannot be subscribed to."); + } + + if (param.Address != null) + { + var select = new Select("V_Crc", "Avatars", ["avatar"], [ + new FilterPredicate("avatar", FilterType.Equals, param.Address?.ToString(true, false)) + ], [], 1); + + var parameterizedSql = select.ToSql(context.Database); + var avatarInfo = context.Database.Select(parameterizedSql); + if (!avatarInfo.Rows.Any()) + { + throw new Exception($"The address {param.Address} is not a circles avatar."); + } + } + + Interlocked.Increment(ref _subscriberCount); + } + + public static event EventHandler? Notification; + + public static void Notify(Context context, Range importedRange) + { + if (_subscriberCount == 0) + { + return; + } + + var queryEvents = new QueryEvents(context); + var events = queryEvents.CirclesEvents(null, importedRange.Min, importedRange.Max); + + if (events.Length == 0) + { + return; + } + + Notification?.Invoke(null, new NotifyEventArgs(events)); + } + + private void OnNotification(object? sender, NotifyEventArgs e) + { + ScheduleAction(async () => + { + CirclesEvent[] events; + + if (_param.Address != null) + { + var filterAddress = _param.Address!.ToString(true, false); + events = FilterForAffectedAddress(e, filterAddress); + } + else + { + events = e.Events; + } + + JsonRpcResult result = CreateSubscriptionMessage(events); + await JsonRpcDuplexClient.SendJsonRpcResult(result); + }); + } + + private CirclesEvent[] FilterForAffectedAddress(NotifyEventArgs e, string filterAddress) + { + var filteredForAddress = new List(); + var addressesInEvent = new HashSet(); + + foreach (var circlesEvent in e.Events) + { + addressesInEvent.Clear(); + + foreach (var circlesEventValue in circlesEvent.Values) + { + if (!QueryEvents.AddressColumns.Contains(circlesEventValue.Key)) + { + continue; + } + + if (circlesEventValue.Value is string address) + { + addressesInEvent.Add(address); + } + } + + if (addressesInEvent.Contains(filterAddress)) + { + filteredForAddress.Add(circlesEvent); + } + } + + return filteredForAddress.ToArray(); + } + + public override void Dispose() + { + base.Dispose(); + + Interlocked.Decrement(ref _subscriberCount); + } +} \ No newline at end of file diff --git a/Circles.Index.Rpc/CirclesSubscriptionParams.cs b/Circles.Index.Rpc/CirclesSubscriptionParams.cs new file mode 100644 index 0000000..e4e9ed3 --- /dev/null +++ b/Circles.Index.Rpc/CirclesSubscriptionParams.cs @@ -0,0 +1,32 @@ +using System.Text.Json; +using Nethermind.Core; +using Nethermind.JsonRpc; + +namespace Circles.Index.Rpc; + +public class CirclesSubscriptionParams : IJsonRpcParam +{ + public Address? Address { get; private set; } + + public void ReadJson(JsonElement element, JsonSerializerOptions options) + { + JsonDocument? doc = null; + try + { + if (element.ValueKind == JsonValueKind.String) + { + doc = JsonDocument.Parse(element.GetString() ?? "{}"); + element = doc.RootElement; + } + + if (element.TryGetProperty("address", out JsonElement addressElement)) + { + Address = Address.TryParse(addressElement.GetString(), out Address? address) ? address : null; + } + } + finally + { + doc?.Dispose(); + } + } +} \ No newline at end of file diff --git a/Circles.Index.Rpc/ICirclesRpcModule.cs b/Circles.Index.Rpc/ICirclesRpcModule.cs index 58a2917..13ec046 100644 --- a/Circles.Index.Rpc/ICirclesRpcModule.cs +++ b/Circles.Index.Rpc/ICirclesRpcModule.cs @@ -10,12 +10,15 @@ namespace Circles.Index.Rpc; #region DTOs public record CirclesTokenBalance(Address Token, string Balance, string TokenOwner); + public record CirclesTokenBalanceV2(UInt256 TokenId, string Balance, string TokenOwner); public record CirclesTrustRelation(Address User, int limit); public record CirclesTrustRelations(Address User, CirclesTrustRelation[] Trusts, CirclesTrustRelation[] TrustedBy); +public record CirclesEvent(string Event, IDictionary Values); + #endregion [RpcModule("Circles")] @@ -31,15 +34,20 @@ public interface ICirclesRpcModule : IRpcModule [JsonRpcMethod(Description = "Gets the balance of each V1 Circles token the specified address holds", IsImplemented = true)] Task> circles_getTokenBalances(Address address, bool asTimeCircles = false); - + [JsonRpcMethod(Description = "Gets the V2 Circles balance of the specified address", IsImplemented = true)] Task> circlesV2_getTotalBalance(Address address, bool asTimeCircles = false); - + [JsonRpcMethod(Description = "Gets the balance of each V2 Circles token the specified address holds", IsImplemented = true)] - Task> circlesV2_getTokenBalances(Address address, bool asTimeCircles = false); + Task> + circlesV2_getTokenBalances(Address address, bool asTimeCircles = false); [JsonRpcMethod(Description = "Queries the data of one Circles index table", IsImplemented = true)] ResultWrapper circles_query(SelectDto query); + + [JsonRpcMethod(Description = "Returns all events affecting the specified account since block N", + IsImplemented = true)] + ResultWrapper circles_events(Address address, long fromBlock, long? toBlock = null); } \ No newline at end of file diff --git a/Circles.Index.Rpc/QueryEvents.cs b/Circles.Index.Rpc/QueryEvents.cs new file mode 100644 index 0000000..0a933f6 --- /dev/null +++ b/Circles.Index.Rpc/QueryEvents.cs @@ -0,0 +1,138 @@ +using System.Collections.Concurrent; +using System.Collections.Immutable; +using Circles.Index.Common; +using Circles.Index.Query; +using Nethermind.Core; + +namespace Circles.Index.Rpc; + +public class QueryEvents(Context context) +{ + public static readonly ImmutableHashSet AddressColumns = new HashSet + { + "user", "avatar", "organization", "from", "to", "canSendTo", "account", "group", "human", "invited", + "inviter", "truster", "trustee" + }.ToImmutableHashSet(); + + public CirclesEvent[] CirclesEvents(Address? address, long fromBlock, long? toBlock = null) + { + long currentHead = context.NethermindApi.BlockTree?.Head?.Number + ?? throw new Exception("BlockTree or Head is null"); + + string? addressString = address?.ToString(true, false); + + ValidateInputs(addressString, fromBlock, toBlock, currentHead); + + var queries = BuildQueries(addressString, fromBlock, toBlock); + + var events = ExecuteQueries(queries); + + var sortedEvents = SortEvents(events); + + return sortedEvents; + } + + private void ValidateInputs(string? address, long fromBlock, long? toBlock, long currentHead) + { + if (address == "0x0000000000000000000000000000000000000000") + throw new Exception("The zero address cannot be queried."); + + if (fromBlock < 0) + throw new Exception("The fromBlock parameter must be greater than or equal to 0."); + + if (toBlock.HasValue && toBlock.Value < fromBlock) + throw new Exception("The toBlock parameter must be greater than or equal to fromBlock."); + + if (toBlock.HasValue && toBlock.Value > currentHead) + throw new Exception( + "The toBlock parameter must be less than or equal to the current head. Leave it empty to query all blocks until the current head."); + } + + private List(); + + foreach (var table in context.Database.Schema.Tables) + { + if (table.Key.Namespace.StartsWith("V_") || table.Key.Namespace == "System") + continue; + + var addressColumnFilters = address == null + ? [] + : table.Value.Columns + .Where(column => AddressColumns.Contains(column.Column)) + .Select(column => new FilterPredicate(column.Column, FilterType.Equals, address)) + .Cast() + .ToList(); + + var filters = new List + { + new FilterPredicate("blockNumber", FilterType.GreaterThanOrEquals, fromBlock), + }; + + if (addressColumnFilters.Count > 0) + { + filters.Add(addressColumnFilters.Count == 1 + ? addressColumnFilters[0] + : new Conjunction(ConjunctionType.Or, addressColumnFilters.ToArray())); + } + + if (toBlock.HasValue) + { + filters.Add(new FilterPredicate("blockNumber", FilterType.LessThanOrEquals, toBlock.Value)); + } + + queries.Add(new Select(table.Key.Namespace, table.Key.Table, Array.Empty(), + filters.Count > 1 + ? new[] { new Conjunction(ConjunctionType.And, filters.ToArray()) } + : filters, + new[] + { + new OrderBy("blockNumber", "ASC"), new OrderBy("transactionIndex", "ASC"), + new OrderBy("logIndex", "ASC") + }, + null, true, int.MaxValue)); + } + + return queries; + } + + private ConcurrentDictionary<(long BlockNo, long TransactionIndex, long LogIndex), CirclesEvent> ExecuteQueries( + List