Skip to content

Commit

Permalink
Reuse Large Buffers in MigrateSession (#623)
Browse files Browse the repository at this point in the history
* add buffer pool to migration manager

* expose separate receive and send buffers from NetworkHandler

* add network buffers wrapper to gcs

* push NetworkBuffers to NetworkHandler

* add networkBuffers instance in migration manager

* add constructor

* reuse buffer for replication communication

* consolidate network buffer pool

* nit

* add gc collect for migration buffer pool

* add ACL test for MIGRATEGC

* make migrategc management commands

* add verbose logging for IOCallback

* expose maxEntries from NetworkBuffers.Allocate

* configure GarnetClient for Failover

* cleanup ReplicationManager

* Cancel cts before disposing in AofSyncTask

* consolidate buffer pool purge under a single command

* purge without disposing pool

* introduce info bpstats metrics

* ensure shared bp uses correct allocation size for send and recv

* fix bp stats call

* change format of info bpstats

* fix out of bounds level request for LFBP

* introduce purge for server buffer pool

* move PURGEBP in server namespace

* fix migrate bench keys option

* fix formatting

* add outOfBound allocation request metric

* augment migrate bench

* separate pool definition from buffer spec

* rename to NetworkBufferSettings

* rename NetworkBuffers to NetworkBufferSettings

* revert dispose order

* revert replication networkSettings

* add timeout to dispose of LFBP

* add timeout in cluster test TearDown

* addressing review comments

* add constants for NetworkBufferSettings in ReplicationManager

* rename types for purgebp

* rename to createBufferPool

* bump Garnet version

* add comments to LFBP

* remove unused variables
  • Loading branch information
vazois authored Sep 27, 2024
1 parent 9e45dec commit eb40cd9
Show file tree
Hide file tree
Showing 57 changed files with 835 additions and 190 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
name: 1.0.28
name: 1.0.29
trigger:
branches:
include:
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Resp.benchmark/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static void Main(string[] args)

static void WaitForServer(Options opts)
{
using var client = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
using var client = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
while (true)
{
try
Expand Down
12 changes: 8 additions & 4 deletions benchmark/Resp.benchmark/RespOnlineBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void InitializeClients()
{
gcsPool = new AsyncPool<GarnetClientSession>(opts.NumThreads.First(), () =>
{
var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -573,8 +573,8 @@ public async void OpRunnerGarnetClientSession(int thread_id)
client = new GarnetClientSession(
address,
port,
opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null,
bufferSize: Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism));
new(Math.Max(bufferSizeValue, opts.ValueLength * opts.IntraThreadParallelism)),
tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -669,7 +669,11 @@ public async void OpRunnerGarnetClientSessionParallel(int thread_id, int paralle
GarnetClientSession client = null;
if (!opts.Pool)
{
client = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null, null, null, Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength));
client = new GarnetClientSession(
address,
port,
new NetworkBufferSettings(Math.Max(131072, opts.IntraThreadParallelism * opts.ValueLength)),
tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Resp.benchmark/RespPerfBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ private void GarnetClientSessionOperateThreadRunner(int NumOps, OpType opType, R
default:
throw new Exception($"opType: {opType} benchmark not supported with GarnetClientSession!");
}
var c = new GarnetClientSession(opts.Address, opts.Port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(opts.Address, opts.Port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (opts.Auth != null)
{
Expand Down
4 changes: 2 additions & 2 deletions benchmark/Resp.benchmark/TxnPerfBench.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void Run()
{
gcsPool = new AsyncPool<GarnetClientSession>(opts.NumThreads.First(), () =>
{
var c = new GarnetClientSession(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
var c = new GarnetClientSession(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
c.Connect();
if (auth != null)
{
Expand Down Expand Up @@ -325,7 +325,7 @@ public void OpRunnerSERedis(int thread_id)
public void LoadData()
{
var req = new OnlineReqGen(0, opts.DbSize, true, opts.Zipf, opts.KeyLength, opts.ValueLength);
GarnetClientSession client = new(address, port, opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
GarnetClientSession client = new(address, port, new(), tlsOptions: opts.EnableTLS ? BenchUtils.GetTlsOptions(opts.TlsHost, opts.CertFileName, opts.CertPassword) : null);
client.Connect();
if (auth != null)
{
Expand Down
57 changes: 45 additions & 12 deletions libs/client/ClientSession/GarnetClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
{
readonly string address;
readonly int port;
readonly int bufferSize;
readonly int bufferSizeDigits;
INetworkSender networkSender;
readonly ElasticCircularBuffer<TaskType> tasksTypes = new();
Expand Down Expand Up @@ -61,8 +60,6 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// </summary>
public bool IsConnected => socket != null && socket.Connected && !Disposed;

readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// Username to authenticate the session on the server.
/// </summary>
Expand All @@ -73,6 +70,21 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// </summary>
readonly string authPassword = null;

/// <summary>
/// Indicating whether this instance is using its own network pool or one that was provided
/// </summary>
readonly bool usingManagedNetworkPool = false;

/// <summary>
/// Instance of network buffer settings describing the send and receive buffer sizes
/// </summary>
readonly NetworkBufferSettings networkBufferSettings;

/// <summary>
/// NetworkPool used to allocate send and receive buffers
/// </summary>
readonly LimitedFixedBufferPool networkPool;

/// <summary>
/// Create client instance
/// </summary>
Expand All @@ -81,16 +93,29 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// <param name="tlsOptions">TLS options</param>
/// <param name="authUsername">Username to authenticate with</param>
/// <param name="authPassword">Password to authenticate with</param>
/// <param name="bufferSize">Network buffer size</param>
/// <param name="networkBufferSettings">Settings for send and receive network buffers</param>
/// <param name="networkPool">Buffer pool to use for allocating send and receive buffers</param>
/// <param name="networkSendThrottleMax">Max outstanding network sends allowed</param>
/// <param name="logger">Logger</param>
public GarnetClientSession(string address, int port, SslClientAuthenticationOptions tlsOptions = null, string authUsername = null, string authPassword = null, int bufferSize = 1 << 17, int networkSendThrottleMax = 8, ILogger logger = null)
public GarnetClientSession(
string address,
int port,
NetworkBufferSettings networkBufferSettings,
LimitedFixedBufferPool networkPool = null,
SslClientAuthenticationOptions tlsOptions = null,
string authUsername = null,
string authPassword = null,
int networkSendThrottleMax = 8,
ILogger logger = null)
{
this.networkPool = new LimitedFixedBufferPool(bufferSize, logger: logger);
this.address = address;
this.port = port;
this.bufferSize = bufferSize;
this.bufferSizeDigits = NumUtils.NumDigits(bufferSize);

this.usingManagedNetworkPool = networkPool != null;
this.networkBufferSettings = networkBufferSettings;
this.networkPool = networkPool ?? networkBufferSettings.CreateBufferPool();
this.bufferSizeDigits = NumUtils.NumDigits(this.networkBufferSettings.sendBufferSize);

this.logger = logger;
this.sslOptions = tlsOptions;
this.networkSendThrottleMax = networkSendThrottleMax;
Expand All @@ -107,7 +132,15 @@ public GarnetClientSession(string address, int port, SslClientAuthenticationOpti
public void Connect(int timeoutMs = 0, CancellationToken token = default)
{
socket = GetSendSocket(address, port, timeoutMs);
networkHandler = new GarnetClientSessionTcpNetworkHandler(this, socket, networkPool, sslOptions != null, this, networkSendThrottleMax, logger);
networkHandler = new GarnetClientSessionTcpNetworkHandler(
this,
socket,
networkBufferSettings,
networkPool,
sslOptions != null,
messageConsumer: this,
networkSendThrottleMax: networkSendThrottleMax,
logger: logger);
networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult();
networkSender = networkHandler.GetNetworkSender();
networkSender.GetResponseObject();
Expand Down Expand Up @@ -159,7 +192,7 @@ public void Dispose()
networkSender?.ReturnResponseObject();
socket?.Dispose();
networkHandler?.Dispose();
networkPool.Dispose();
if (!usingManagedNetworkPool) networkPool.Dispose();
}

/// <summary>
Expand Down Expand Up @@ -259,8 +292,8 @@ public void ExecuteClusterAppendLog(string nodeId, long previousAddress, long cu
}
offset = curr;

if (payloadLength > bufferSize)
throw new Exception($"Payload length {payloadLength} is larger than bufferSize {bufferSize} bytes");
if (payloadLength > networkBufferSettings.sendBufferSize)
throw new Exception($"Payload length {payloadLength} is larger than bufferSize {networkBufferSettings.sendBufferSize} bytes");

while (!RespWriteUtils.WriteBulkString(new Span<byte>((void*)payloadPtr, payloadLength), ref curr, end))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Garnet.client
{
sealed class GarnetClientSessionTcpNetworkHandler : TcpNetworkHandlerBase<GarnetClientSession, GarnetTcpNetworkSender>
{
public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new GarnetTcpNetworkSender(socket, networkPool, networkSendThrottleMax), socket, networkPool, useTLS, messageConsumer, logger)
public GarnetClientSessionTcpNetworkHandler(GarnetClientSession serverHook, Socket socket, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new GarnetTcpNetworkSender(socket, networkBufferSettings, networkPool, networkSendThrottleMax), socket, networkBufferSettings, networkPool, useTLS, messageConsumer: messageConsumer, logger: logger)
{
}

Expand Down
5 changes: 3 additions & 2 deletions libs/client/ClientTcpNetworkSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public class ClientTcpNetworkSender : GarnetTcpNetworkSender
/// </summary>
/// <param name="socket"></param>
/// <param name="callback"></param>
/// <param name="networkBufferSettings"></param>
/// <param name="networkPool"></param>
/// <param name="networkSendThrottleMax"></param>
public ClientTcpNetworkSender(Socket socket, Action<object> callback, LimitedFixedBufferPool networkPool, int networkSendThrottleMax)
: base(socket, networkPool, networkSendThrottleMax)
public ClientTcpNetworkSender(Socket socket, Action<object> callback, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, int networkSendThrottleMax)
: base(socket, networkBufferSettings, networkPool, networkSendThrottleMax)
{
this.callback = callback;
this.reusableSaea = new SimpleObjectPool<SocketAsyncEventArgs>(() =>
Expand Down
7 changes: 5 additions & 2 deletions libs/client/GarnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public sealed partial class GarnetClient : IServerHook, IMessageConsumer, IDispo
readonly string address;
readonly int port;
readonly int sendPageSize;
readonly int bufferSize;
readonly int maxOutstandingTasks;
NetworkWriter networkWriter;
INetworkSender networkSender;
Expand Down Expand Up @@ -133,6 +134,7 @@ public GarnetClient(
string authUsername = null,
string authPassword = null,
int sendPageSize = 1 << 21,
int bufferSize = 1 << 17,
int maxOutstandingTasks = 1 << 19,
int timeoutMilliseconds = 0,
MemoryPool<byte> memoryPool = null,
Expand All @@ -144,6 +146,7 @@ public GarnetClient(
this.address = address;
this.port = port;
this.sendPageSize = (int)Utility.PreviousPowerOf2(sendPageSize);
this.bufferSize = bufferSize;
this.authUsername = authUsername;
this.authPassword = authPassword;

Expand Down Expand Up @@ -186,7 +189,7 @@ public GarnetClient(
public void Connect(CancellationToken token = default)
{
socket = GetSendSocket(timeoutMilliseconds);
networkWriter = new NetworkWriter(this, socket, 1 << 17, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false).GetAwaiter().GetResult();
networkSender = networkHandler.GetNetworkSender();

Expand Down Expand Up @@ -219,7 +222,7 @@ public void Connect(CancellationToken token = default)
public async Task ConnectAsync(CancellationToken token = default)
{
socket = GetSendSocket(timeoutMilliseconds);
networkWriter = new NetworkWriter(this, socket, 1 << 17, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
networkWriter = new NetworkWriter(this, socket, bufferSize, sslOptions, out networkHandler, sendPageSize, networkSendThrottleMax, logger);
await networkHandler.StartAsync(sslOptions, $"{address}:{port}", token).ConfigureAwait(false);
networkSender = networkHandler.GetNetworkSender();

Expand Down
4 changes: 2 additions & 2 deletions libs/client/GarnetClientTcpNetworkHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace Garnet.client
{
sealed class GarnetClientTcpNetworkHandler : TcpNetworkHandlerBase<GarnetClient, ClientTcpNetworkSender>
{
public GarnetClientTcpNetworkHandler(GarnetClient serverHook, Action<object> callback, Socket socket, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new ClientTcpNetworkSender(socket, callback, networkPool, networkSendThrottleMax), socket, networkPool, useTLS, messageConsumer, logger)
public GarnetClientTcpNetworkHandler(GarnetClient serverHook, Action<object> callback, Socket socket, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer, int networkSendThrottleMax = 8, ILogger logger = null)
: base(serverHook, new ClientTcpNetworkSender(socket, callback, networkBufferSettings, networkPool, networkSendThrottleMax), socket, networkBufferSettings, networkPool, useTLS, messageConsumer: messageConsumer, logger: logger)
{
}

Expand Down
8 changes: 5 additions & 3 deletions libs/client/NetworkWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ internal sealed class NetworkWriter : IDisposable
/// </summary>
CompletionEvent FlushEvent;

readonly NetworkBufferSettings networkBufferSettings;
readonly LimitedFixedBufferPool networkPool;
readonly GarnetClientTcpNetworkHandler networkHandler;

Expand All @@ -80,10 +81,11 @@ internal sealed class NetworkWriter : IDisposable
/// </summary>
public NetworkWriter(GarnetClient serverHook, Socket socket, int messageBufferSize, SslClientAuthenticationOptions sslOptions, out GarnetClientTcpNetworkHandler networkHandler, int sendPageSize, int networkSendThrottleMax, ILogger logger = null)
{
this.networkPool = new LimitedFixedBufferPool(messageBufferSize, logger: logger);
this.networkBufferSettings = new NetworkBufferSettings(messageBufferSize, messageBufferSize);
this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);

if (BufferSize > PageOffset.kPageMask) throw new Exception();
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkPool, sslOptions != null, serverHook, networkSendThrottleMax, logger);
this.networkHandler = networkHandler = new GarnetClientTcpNetworkHandler(serverHook, AsyncFlushPageCallback, socket, networkBufferSettings, networkPool, sslOptions != null, serverHook, networkSendThrottleMax: networkSendThrottleMax, logger: logger);
networkSender = networkHandler.GetNetworkSender();

FlushEvent.Initialize();
Expand All @@ -109,7 +111,7 @@ public void Dispose()
FlushEvent.Dispose();
epoch.Dispose();
networkHandler.Dispose();
networkPool.Dispose();
networkPool?.Dispose();
}

/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,19 @@ public MetricsItem[] GetGossipStats(bool metricsDisabled)
];
}

public MetricsItem[] GetBufferPoolStats()
=> [new("migration_manager", migrationManager.GetBufferPoolStats()), new("replication_manager", replicationManager.GetBufferPoolStats())];

public void PurgeBufferPool(ManagerType managerType)
{
if (managerType == ManagerType.MigrationManager)
migrationManager.Purge();
else if (managerType == ManagerType.ReplicationManager)
replicationManager.Purge();
else
throw new GarnetException();
}

internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(StoreType storeType)
{
Debug.Assert(serverOptions.EnableCluster);
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Server/ClusterUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public static void IOCallback(this ILogger logger, uint errorCode, uint numBytes
{
if (errorCode != 0)
{
string errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
var errorMessage = new Win32Exception((int)errorCode).Message;
logger.LogError("[ClusterUtils] OverlappedStream GetQueuedCompletionStatus error: {errorCode} msg: {errorMessage}", errorCode, errorMessage);
}
((SemaphoreSlim)context).Release();
}
Expand Down
Loading

0 comments on commit eb40cd9

Please sign in to comment.