Skip to content

Commit

Permalink
Merge pull request #24 from CirclesUBI/feature/subscriptions
Browse files Browse the repository at this point in the history
Feature/subscriptions
  • Loading branch information
jaensen authored Jun 4, 2024
2 parents 865a6eb + ce1170a commit 465ff67
Show file tree
Hide file tree
Showing 13 changed files with 850 additions and 21 deletions.
1 change: 1 addition & 0 deletions Circles.Index.Rpc/Circles.Index.Rpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Nethermind.ReferenceAssemblies" Version="1.25.4" />
</ItemGroup>

Expand Down
6 changes: 6 additions & 0 deletions Circles.Index.Rpc/CirclesRpcModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public ResultWrapper<DatabaseQueryResult> circles_query(SelectDto query)
return ResultWrapper<DatabaseQueryResult>.Success(result);
}

public ResultWrapper<CirclesEvent[]> circles_events(Address address, long fromBlock, long? toBlock = null)
{
var queryEvents = new QueryEvents(_indexerContext);
return ResultWrapper<CirclesEvent[]>.Success(queryEvents.CirclesEvents(address, fromBlock, toBlock));
}

#region private methods

private IEnumerable<Address> TokenAddressesForAccount(Address circlesAccount)
Expand Down
130 changes: 130 additions & 0 deletions Circles.Index.Rpc/CirclesSubscription.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using Circles.Index.Common;
using Circles.Index.Query;
using Nethermind.Core;
using Nethermind.JsonRpc;
using Nethermind.JsonRpc.Modules.Subscribe;

namespace Circles.Index.Rpc;

public class NotifyEventArgs(CirclesEvent[] events) : EventArgs
{
public CirclesEvent[] Events { get; } = events;
}

public class CirclesSubscription : Subscription
{
public override string Type => "circles";

private readonly CirclesSubscriptionParams _param;

public static long SubscriberCount => _subscriberCount;
private static long _subscriberCount;

public CirclesSubscription(IJsonRpcDuplexClient jsonRpcDuplexClient, Context context,
CirclesSubscriptionParams param) : base(
jsonRpcDuplexClient)
{
Notification += OnNotification;
_param = param;

if (param.Address == Address.Zero)
{
throw new Exception("The zero address cannot be subscribed to.");
}

if (param.Address != null)
{
var select = new Select("V_Crc", "Avatars", ["avatar"], [
new FilterPredicate("avatar", FilterType.Equals, param.Address?.ToString(true, false))
], [], 1);

var parameterizedSql = select.ToSql(context.Database);
var avatarInfo = context.Database.Select(parameterizedSql);
if (!avatarInfo.Rows.Any())
{
throw new Exception($"The address {param.Address} is not a circles avatar.");
}
}

Interlocked.Increment(ref _subscriberCount);
}

public static event EventHandler<NotifyEventArgs>? Notification;

public static void Notify(Context context, Range<long> importedRange)
{
if (_subscriberCount == 0)
{
return;
}

var queryEvents = new QueryEvents(context);
var events = queryEvents.CirclesEvents(null, importedRange.Min, importedRange.Max);

if (events.Length == 0)
{
return;
}

Notification?.Invoke(null, new NotifyEventArgs(events));
}

private void OnNotification(object? sender, NotifyEventArgs e)
{
ScheduleAction(async () =>
{
CirclesEvent[] events;
if (_param.Address != null)
{
var filterAddress = _param.Address!.ToString(true, false);
events = FilterForAffectedAddress(e, filterAddress);
}
else
{
events = e.Events;
}
JsonRpcResult result = CreateSubscriptionMessage(events);
await JsonRpcDuplexClient.SendJsonRpcResult(result);
});
}

private CirclesEvent[] FilterForAffectedAddress(NotifyEventArgs e, string filterAddress)
{
var filteredForAddress = new List<CirclesEvent>();
var addressesInEvent = new HashSet<string>();

foreach (var circlesEvent in e.Events)
{
addressesInEvent.Clear();

foreach (var circlesEventValue in circlesEvent.Values)
{
if (!QueryEvents.AddressColumns.Contains(circlesEventValue.Key))
{
continue;
}

if (circlesEventValue.Value is string address)
{
addressesInEvent.Add(address);
}
}

if (addressesInEvent.Contains(filterAddress))
{
filteredForAddress.Add(circlesEvent);
}
}

return filteredForAddress.ToArray();
}

public override void Dispose()
{
base.Dispose();

Interlocked.Decrement(ref _subscriberCount);
}
}
32 changes: 32 additions & 0 deletions Circles.Index.Rpc/CirclesSubscriptionParams.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Text.Json;
using Nethermind.Core;
using Nethermind.JsonRpc;

namespace Circles.Index.Rpc;

public class CirclesSubscriptionParams : IJsonRpcParam
{
public Address? Address { get; private set; }

public void ReadJson(JsonElement element, JsonSerializerOptions options)
{
JsonDocument? doc = null;
try
{
if (element.ValueKind == JsonValueKind.String)
{
doc = JsonDocument.Parse(element.GetString() ?? "{}");
element = doc.RootElement;
}

if (element.TryGetProperty("address", out JsonElement addressElement))
{
Address = Address.TryParse(addressElement.GetString(), out Address? address) ? address : null;
}
}
finally
{
doc?.Dispose();
}
}
}
14 changes: 11 additions & 3 deletions Circles.Index.Rpc/ICirclesRpcModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ namespace Circles.Index.Rpc;
#region DTOs

public record CirclesTokenBalance(Address Token, string Balance, string TokenOwner);

public record CirclesTokenBalanceV2(UInt256 TokenId, string Balance, string TokenOwner);

public record CirclesTrustRelation(Address User, int limit);

public record CirclesTrustRelations(Address User, CirclesTrustRelation[] Trusts, CirclesTrustRelation[] TrustedBy);

public record CirclesEvent(string Event, IDictionary<string, object?> Values);

#endregion

[RpcModule("Circles")]
Expand All @@ -31,15 +34,20 @@ public interface ICirclesRpcModule : IRpcModule
[JsonRpcMethod(Description = "Gets the balance of each V1 Circles token the specified address holds",
IsImplemented = true)]
Task<ResultWrapper<CirclesTokenBalance[]>> circles_getTokenBalances(Address address, bool asTimeCircles = false);

[JsonRpcMethod(Description = "Gets the V2 Circles balance of the specified address", IsImplemented = true)]
Task<ResultWrapper<string>> circlesV2_getTotalBalance(Address address, bool asTimeCircles = false);

[JsonRpcMethod(Description = "Gets the balance of each V2 Circles token the specified address holds",
IsImplemented = true)]
Task<ResultWrapper<CirclesTokenBalanceV2[]>> circlesV2_getTokenBalances(Address address, bool asTimeCircles = false);
Task<ResultWrapper<CirclesTokenBalanceV2[]>>
circlesV2_getTokenBalances(Address address, bool asTimeCircles = false);

[JsonRpcMethod(Description = "Queries the data of one Circles index table",
IsImplemented = true)]
ResultWrapper<DatabaseQueryResult> circles_query(SelectDto query);

[JsonRpcMethod(Description = "Returns all events affecting the specified account since block N",
IsImplemented = true)]
ResultWrapper<CirclesEvent[]> circles_events(Address address, long fromBlock, long? toBlock = null);
}
138 changes: 138 additions & 0 deletions Circles.Index.Rpc/QueryEvents.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using Circles.Index.Common;
using Circles.Index.Query;
using Nethermind.Core;

namespace Circles.Index.Rpc;

public class QueryEvents(Context context)
{
public static readonly ImmutableHashSet<string> AddressColumns = new HashSet<string>
{
"user", "avatar", "organization", "from", "to", "canSendTo", "account", "group", "human", "invited",
"inviter", "truster", "trustee"
}.ToImmutableHashSet();

public CirclesEvent[] CirclesEvents(Address? address, long fromBlock, long? toBlock = null)
{
long currentHead = context.NethermindApi.BlockTree?.Head?.Number
?? throw new Exception("BlockTree or Head is null");

string? addressString = address?.ToString(true, false);

ValidateInputs(addressString, fromBlock, toBlock, currentHead);

var queries = BuildQueries(addressString, fromBlock, toBlock);

var events = ExecuteQueries(queries);

var sortedEvents = SortEvents(events);

return sortedEvents;
}

private void ValidateInputs(string? address, long fromBlock, long? toBlock, long currentHead)
{
if (address == "0x0000000000000000000000000000000000000000")
throw new Exception("The zero address cannot be queried.");

if (fromBlock < 0)
throw new Exception("The fromBlock parameter must be greater than or equal to 0.");

if (toBlock.HasValue && toBlock.Value < fromBlock)
throw new Exception("The toBlock parameter must be greater than or equal to fromBlock.");

if (toBlock.HasValue && toBlock.Value > currentHead)
throw new Exception(
"The toBlock parameter must be less than or equal to the current head. Leave it empty to query all blocks until the current head.");
}

private List<Select> BuildQueries(string? address, long fromBlock, long? toBlock)
{
var queries = new List<Select>();

foreach (var table in context.Database.Schema.Tables)
{
if (table.Key.Namespace.StartsWith("V_") || table.Key.Namespace == "System")
continue;

var addressColumnFilters = address == null
? []
: table.Value.Columns
.Where(column => AddressColumns.Contains(column.Column))
.Select(column => new FilterPredicate(column.Column, FilterType.Equals, address))
.Cast<IFilterPredicate>()
.ToList();

var filters = new List<IFilterPredicate>
{
new FilterPredicate("blockNumber", FilterType.GreaterThanOrEquals, fromBlock),
};

if (addressColumnFilters.Count > 0)
{
filters.Add(addressColumnFilters.Count == 1
? addressColumnFilters[0]
: new Conjunction(ConjunctionType.Or, addressColumnFilters.ToArray()));
}

if (toBlock.HasValue)
{
filters.Add(new FilterPredicate("blockNumber", FilterType.LessThanOrEquals, toBlock.Value));
}

queries.Add(new Select(table.Key.Namespace, table.Key.Table, Array.Empty<string>(),
filters.Count > 1
? new[] { new Conjunction(ConjunctionType.And, filters.ToArray()) }
: filters,
new[]
{
new OrderBy("blockNumber", "ASC"), new OrderBy("transactionIndex", "ASC"),
new OrderBy("logIndex", "ASC")
},
null, true, int.MaxValue));
}

return queries;
}

private ConcurrentDictionary<(long BlockNo, long TransactionIndex, long LogIndex), CirclesEvent> ExecuteQueries(
List<Select> queries)
{
var events = new ConcurrentDictionary<(long BlockNo, long TransactionIndex, long LogIndex), CirclesEvent>();
var tasks = queries.Select(query => Task.Run(() =>
{
var sql = query.ToSql(context.Database);
var result = context.Database.Select(sql);
foreach (var row in result.Rows)
{
var eventName = $"{query.Namespace}_{query.Table}";
var values = result.Columns.Select((col, i) => new { col, value = row[i] })
.ToDictionary(x => x.col, x => x.value);
var key = ((long)(row[0] ?? new Exception("Block number is null")),
(long)(row[2] ?? throw new Exception("Transaction index is null")),
(long)(row[3] ?? throw new Exception("Log index is null")));
events.TryAdd(key, new CirclesEvent(eventName, values));
}
})).ToArray();

Task.WaitAll(tasks);

return events;
}

private CirclesEvent[] SortEvents(
ConcurrentDictionary<(long BlockNo, long TransactionIndex, long LogIndex), CirclesEvent> events)
{
return events
.OrderBy(o => o.Key.BlockNo)
.ThenBy(o => o.Key.TransactionIndex)
.ThenBy(o => o.Key.LogIndex)
.Select(o => o.Value)
.ToArray();
}
}
4 changes: 2 additions & 2 deletions Circles.Index/Circles.Index.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
<Authors>Daniel Janz (Gnosis Service GmbH)</Authors>
<Copyright>Gnosis Service GmbH</Copyright>
<Product>Circles</Product>
<AssemblyVersion>1.1</AssemblyVersion>
<FileVersion>1.1</FileVersion>
<AssemblyVersion>1.2</AssemblyVersion>
<FileVersion>1.2</FileVersion>
</PropertyGroup>


Expand Down
Loading

0 comments on commit 465ff67

Please sign in to comment.