Skip to content

Commit

Permalink
Speed up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rubo committed Nov 6, 2024
1 parent 7a28554 commit 2fe744d
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 53 deletions.
15 changes: 15 additions & 0 deletions src/Nethermind/ConsoleApp1/ConsoleApp1.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\Nethermind.JsonRpc\Nethermind.JsonRpc.csproj" />
<ProjectReference Include="..\Nethermind.Sockets\Nethermind.Sockets.csproj" />
</ItemGroup>

</Project>
167 changes: 167 additions & 0 deletions src/Nethermind/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
using System.Net.Sockets;
using System.Net;
using Nethermind.Sockets;
using Nethermind.JsonRpc.WebSockets;
using Nethermind.JsonRpc;
using Nethermind.JsonRpc.Modules;
using Nethermind.Serialization.Json;
using Nethermind.Evm.Tracing.GethStyle;
using System.Diagnostics;

static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken token)
{
using IpcSocketMessageStream stream = new(socket);



int messages = 0;
try
{
byte[] buffer = new byte[10];
//int i = 0;
while (true)
{
ReceiveResult? result = await stream.ReceiveAsync(buffer).ConfigureAwait(false);

if (Stopwatch.GetTimestamp() % 1001 == 0)
{
await Task.Delay(1, token);
}


if (result is not null && IsEndOfIpcMessage(result))
{
//var data = buffer.Take(result.Read).ToArray();
//var str = System.Text.Encoding.UTF8.GetString(data);

//Console.WriteLine(i++);

//Console.WriteLine($"{Convert.ToHexString(data)}");
//Console.WriteLine($"{str}: {data.Length}");

//if (data.Length != 0)
messages++;
}

if (result is null || result.Closed)
{
break;
}
}
}
catch (OperationCanceledException) { }

return messages;
}

var messageCount = 50;

CancellationTokenSource cts = new();
IPEndPoint ipEndPoint = IPEndPoint.Parse("127.0.0.1:1337");

Task<int> receiveMessages = OneShotServer(
ipEndPoint,
async socket => await CountNumberOfMessages(socket, cts.Token)
);

Task<int> sendMessages = Task.Run(async () =>
{
using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(ipEndPoint).ConfigureAwait(false);

using IpcSocketMessageStream stream = new(socket);
using JsonRpcSocketsClient<IpcSocketMessageStream> client = new(
clientName: "TestClient",
stream: stream,
endpointType: RpcEndpoint.Ws,
jsonRpcProcessor: null!,
jsonRpcLocalStats: new NullJsonRpcLocalStats(),
jsonSerializer: new EthereumJsonSerializer()
);
int disposeCount = 0;

for (int i = 0; i < messageCount; i++)
{
using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1000, () => disposeCount++), default);
await client.SendJsonRpcResult(result).ConfigureAwait(false);
await Task.Delay(1).ConfigureAwait(false);
}

//disposeCount.Should().Be(messageCount);
await cts.CancelAsync().ConfigureAwait(false);

return messageCount;
});

await Task.WhenAll(sendMessages, receiveMessages).ConfigureAwait(false);
int sent = sendMessages.Result;
int received = receiveMessages.Result;

Console.WriteLine($"Sent: {sent}, Received: {received}");

static async Task<T> OneShotServer<T>(IPEndPoint ipEndPoint, Func<Socket, Task<T>> func)
{
using Socket socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.Bind(ipEndPoint);
socket.Listen();

Socket handler = await socket.AcceptAsync();

return await func(handler);
}

static JsonRpcSuccessResponse RandomSuccessResponse(int size, Action? disposeAction = null)
{
return new JsonRpcSuccessResponse(disposeAction)
{
MethodName = "mock",
Id = "42",
Result = RandomObject(size)
};
}



static object RandomObject(int size)
{
string[] strings = RandomStringArray(size / 2);
object obj = new GethLikeTxTrace()
{
Entries =
{
new GethTxTraceEntry
{
Stack = strings, Memory = strings,
}
}
};
return obj;
}

static string[] RandomStringArray(int length, bool runGc = true)
{
string[] array = new string[length];
for (int i = 0; i < length; i++)
{
array[i] = RandomString(length);
if (runGc && i % 100 == 0)
{
GC.Collect();
}
}
return array;
}

static string RandomString(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
char[] stringChars = new char[length];
Random random = new();

for (int i = 0; i < stringChars.Length; i++)
{
stringChars[i] = chars[random.Next(chars.Length)];
}
return new string(stringChars);
}
static bool IsEndOfIpcMessage(ReceiveResult result) => result.EndOfMessage && (!result.Closed || result.Read != 0);
11 changes: 11 additions & 0 deletions src/Nethermind/ConsoleApp1/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"profiles": {
"ConsoleApp1": {
"commandName": "Project"
},
"WSL": {
"commandName": "WSL2",
"distributionName": ""
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to
ReceiveResult? result = await stream.ReceiveAsync(buffer);

// Imitate random delays
if (Stopwatch.GetTimestamp() % 1001 == 0)
if (Stopwatch.GetTimestamp() % 101 == 0)
await Task.Delay(1);

if (result is not null && IsEndOfIpcMessage(result))
Expand Down Expand Up @@ -128,7 +128,7 @@ static async Task<int> CountNumberOfMessages(Socket socket, CancellationToken to

for (int i = 0; i < messageCount; i++)
{
using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(1000, () => disposeCount++), default);
using JsonRpcResult result = JsonRpcResult.Single(RandomSuccessResponse(100, () => disposeCount++), default);
await client.SendJsonRpcResult(result);
await Task.Delay(1);
}
Expand Down
111 changes: 61 additions & 50 deletions src/Nethermind/Nethermind.Runner/JsonRpc/JsonRpcIpcRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ public void Start(CancellationToken cancellationToken)
{
if (_logger.IsInfo) _logger.Info($"Starting IPC JSON RPC service over '{_path}'");

Task.Factory.StartNew(_ => StartServer(_path), cancellationToken, TaskCreationOptions.LongRunning);
Task.Factory.StartNew(_ => StartServer(_path, cancellationToken), cancellationToken, TaskCreationOptions.LongRunning);
}
}

private void StartServer(string path)
private async Task StartServer(string path, CancellationToken cancellationToken)
{
try
{
Expand All @@ -74,74 +74,85 @@ private void StartServer(string path)

while (true)
{
_resetEvent.Reset();

if (_logger.IsInfo) _logger.Info("Waiting for a IPC connection...");
_server.BeginAccept(AcceptCallback, null);

_resetEvent.WaitOne();
Socket socket = await _server.AcceptAsync(cancellationToken);

socket.ReceiveTimeout = _jsonRpcConfig.Timeout;
socket.SendTimeout = _jsonRpcConfig.Timeout;

using JsonRpcSocketsClient<IpcSocketMessageStream>? socketsClient = new(
string.Empty,
new IpcSocketMessageStream(socket),
RpcEndpoint.IPC,
_jsonRpcProcessor,
_jsonRpcLocalStats,
_jsonSerializer,
maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize);

await socketsClient.ReceiveLoopAsync();
}
}
catch (IOException exc) when (exc.InnerException is not null && exc.InnerException is SocketException se && se.SocketErrorCode == SocketError.ConnectionReset)
catch (IOException ex) when (ex.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset })
{
LogDebug("Client disconnected.");
}
catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset)
catch (SocketException ex) when (ex.SocketErrorCode == SocketError.ConnectionReset || ex.ErrorCode == OperationCancelledError)
{
LogDebug("Client disconnected.");
}
catch (SocketException exc)
catch (SocketException ex)
{
if (_logger.IsError) _logger.Error($"Error ({exc.ErrorCode}) when starting IPC server over '{_path}' path.", exc);
if (_logger.IsError) _logger.Error($"Error {ex.ErrorCode} starting IPC server over '{_path}'.", ex);
}
catch (Exception exc)
catch (Exception ex)
{
if (_logger.IsError) _logger.Error($"Error when starting IPC server over '{_path}' path.", exc);
if (_logger.IsError) _logger.Error($"Error starting IPC server over '{_path}'.", ex);
}
finally
{
Dispose();
}
}

private async void AcceptCallback(IAsyncResult ar)
{
try
{
Socket socket = _server.EndAccept(ar);
socket.ReceiveTimeout = _jsonRpcConfig.Timeout;
socket.SendTimeout = _jsonRpcConfig.Timeout;

_resetEvent.Set();

using JsonRpcSocketsClient<IpcSocketMessageStream>? socketsClient = new(
string.Empty,
new IpcSocketMessageStream(socket),
RpcEndpoint.IPC,
_jsonRpcProcessor,
_jsonRpcLocalStats,
_jsonSerializer,
maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize);

await socketsClient.ReceiveLoopAsync();
}
catch (IOException exc) when (exc.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset })
{
LogDebug("Client disconnected.");
}
catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset || exc.ErrorCode == OperationCancelledError)
{
LogDebug("Client disconnected.");
}
catch (SocketException exc)
{
if (_logger.IsWarn) _logger.Warn($"Error {exc.ErrorCode}:{exc.Message}");
}
catch (Exception exc)
{
if (_logger.IsError) _logger.Error("Error when handling IPC communication with a client.", exc);
}
}
//private async void AcceptCallback(IAsyncResult ar)
//{
// try
// {
// Socket socket = _server.EndAccept(ar);
// socket.ReceiveTimeout = _jsonRpcConfig.Timeout;
// socket.SendTimeout = _jsonRpcConfig.Timeout;

// _resetEvent.Set();

// using JsonRpcSocketsClient<IpcSocketMessageStream>? socketsClient = new(
// string.Empty,
// new IpcSocketMessageStream(socket),
// RpcEndpoint.IPC,
// _jsonRpcProcessor,
// _jsonRpcLocalStats,
// _jsonSerializer,
// maxBatchResponseBodySize: _jsonRpcConfig.MaxBatchResponseBodySize);

// await socketsClient.ReceiveLoopAsync();
// }
// catch (IOException exc) when (exc.InnerException is SocketException { SocketErrorCode: SocketError.ConnectionReset })
// {
// LogDebug("Client disconnected.");
// }
// catch (SocketException exc) when (exc.SocketErrorCode == SocketError.ConnectionReset || exc.ErrorCode == OperationCancelledError)
// {
// LogDebug("Client disconnected.");
// }
// catch (SocketException exc)
// {
// if (_logger.IsWarn) _logger.Warn($"Error {exc.ErrorCode}:{exc.Message}");
// }
// catch (Exception exc)
// {
// if (_logger.IsError) _logger.Error("Error when handling IPC communication with a client.", exc);
// }
//}

private void DeleteSocketFileIfExists(string path)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
},
"WSL": {
"commandName": "WSL",
"commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data",
"commandLineArgs": "\"{OutDir}/nethermind.dll\" -c holesky --data-dir .data --JsonRpc.IpcUnixDomainSocketPath /home/rubo/ipc.f",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
Expand Down

0 comments on commit 2fe744d

Please sign in to comment.