Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add dsrouter client-client mode." #2420

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal class DiagnosticsServerRouterFactory

public virtual ILogger Logger { get; }

public virtual Task Start(CancellationToken token)
public virtual void Start()
{
throw new NotImplementedException();
}
Expand Down Expand Up @@ -319,29 +319,12 @@ public TcpClientRouterFactory(string tcpClient, int runtimeTimeoutMs, ILogger lo
}

public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token)
{
return await ConnectTcpStreamAsyncInternal(token, _auto_shutdown).ConfigureAwait(false);
}

public virtual async Task<Stream> ConnectTcpStreamAsync(CancellationToken token, bool retry)
{
return await ConnectTcpStreamAsyncInternal(token, retry).ConfigureAwait(false);
}

public virtual void Start()
{
}

public virtual void Stop()
{
}

async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool retry)
{
Stream tcpClientStream = null;

_logger?.LogDebug($"Connecting new tcp endpoint \"{_tcpClientAddress}\".");

bool retry = false;
IpcTcpSocketEndPoint clientTcpEndPoint = new IpcTcpSocketEndPoint(_tcpClientAddress);
Socket clientSocket = null;

Expand All @@ -356,7 +339,7 @@ async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool r

try
{
await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, connectTokenSource.Token).ConfigureAwait(false);
await ConnectAsyncInternal(clientSocket, clientTcpEndPoint, token).ConfigureAwait(false);
retry = false;
}
catch (Exception)
Expand All @@ -373,10 +356,10 @@ async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool r
throw new TimeoutException();
}

// If we are not doing retries when runtime is unavailable, fail right away, this will
// If we are not doing auto shutdown when runtime is unavailable, fail right away, this will
// break any accepted IPC connections, making sure client is notified and could reconnect.
// If not, retry until succeed or time out.
if (!retry)
// If we do have auto shutdown enabled, retry until succeed or time out.
if (!_auto_shutdown)
{
_logger?.LogTrace($"Failed connecting {_tcpClientAddress}.");
throw;
Expand All @@ -387,6 +370,8 @@ async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool r
// If we get an error (without hitting timeout above), most likely due to unavailable listener.
// Delay execution to prevent to rapid retry attempts.
await Task.Delay(TcpClientRetryTimeoutMs, token).ConfigureAwait(false);

retry = true;
}
}
while (retry);
Expand All @@ -397,6 +382,14 @@ async Task<Stream> ConnectTcpStreamAsyncInternal(CancellationToken token, bool r
return tcpClientStream;
}

public virtual void Start()
{
}

public virtual void Stop()
{
}

async Task ConnectAsyncInternal(Socket clientSocket, EndPoint remoteEP, CancellationToken token)
{
using (token.Register(() => clientSocket.Close(0)))
Expand Down Expand Up @@ -657,14 +650,12 @@ public override ILogger Logger
}
}

public override Task Start(CancellationToken token)
public override void Start()
{
_tcpServerRouterFactory.Start();
_ipcServerRouterFactory.Start();

_logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");

return Task.CompletedTask;
}

public override Task Stop()
Expand Down Expand Up @@ -849,13 +840,11 @@ public override ILogger Logger
}
}

public override Task Start(CancellationToken token)
public override void Start()
{
_ipcServerRouterFactory.Start();
_tcpClientRouterFactory.Start();
_logger?.LogInformation($"Starting IPC server ({_ipcServerRouterFactory.IpcServerPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");

return Task.CompletedTask;
}

public override Task Stop()
Expand Down Expand Up @@ -973,12 +962,10 @@ public override ILogger Logger
}
}

public override Task Start(CancellationToken token)
public override void Start()
{
_tcpServerRouterFactory.Start();
_logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP server ({_tcpServerRouterFactory.TcpServerAddress}) router.");

return Task.CompletedTask;
}

public override Task Stop()
Expand Down Expand Up @@ -1069,178 +1056,6 @@ public override async Task<Router> CreateRouterAsync(CancellationToken token)
}
}

/// <summary>
/// This class creates IPC Client - TCP Client router instances.
/// Supports NamedPipes/UnixDomainSocket client and TCP/IP client.
/// </summary>
internal class IpcClientTcpClientRouterFactory : DiagnosticsServerRouterFactory
{
Guid _runtimeInstanceId;
ulong _runtimeProcessId;
ILogger _logger;
IpcClientRouterFactory _ipcClientRouterFactory;
TcpClientRouterFactory _tcpClientRouterFactory;

public IpcClientTcpClientRouterFactory(string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate factory, ILogger logger)
{
_runtimeInstanceId = Guid.Empty;
_runtimeProcessId = 0;
_logger = logger;
_ipcClientRouterFactory = new IpcClientRouterFactory(ipcClient, logger);
_tcpClientRouterFactory = factory(tcpClient, runtimeTimeoutMs, logger);
}

public override string IpcAddress {
get
{
return _ipcClientRouterFactory.IpcClientPath;
}
}

public override string TcpAddress {
get
{
return _tcpClientRouterFactory.TcpClientAddress;
}
}

public override ILogger Logger {
get
{
return _logger;
}
}

public override Task Start(CancellationToken token)
{
_tcpClientRouterFactory.Start();
_logger?.LogInformation($"Starting IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");

var requestRuntimeInfo = new Task(() =>
{
try
{
_logger?.LogDebug($"Requesting runtime process information.");

// Get new tcp client endpoint.
using var tcpClientStream = _tcpClientRouterFactory.ConnectTcpStreamAsync(token).Result;

// Request process info.
IpcMessage message = new IpcMessage(DiagnosticsServerCommandSet.Process, (byte)ProcessCommandId.GetProcessInfo);

byte[] buffer = message.Serialize();
tcpClientStream.Write(buffer, 0, buffer.Length);

var response = IpcMessage.Parse(tcpClientStream);
if ((DiagnosticsServerResponseId)response.Header.CommandId == DiagnosticsServerResponseId.OK)
{
var info = ProcessInfo.Parse(response.Payload);

_runtimeProcessId = info.ProcessId;
_runtimeInstanceId = info.RuntimeInstanceCookie;

_logger?.LogDebug($"Retrieved runtime process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
}
else
{
throw new ServerErrorException("Failed to retrieve runtime process info.");
}
}
catch (Exception)
{
_runtimeProcessId = (ulong)Process.GetCurrentProcess().Id;
_runtimeInstanceId = Guid.NewGuid();
_logger?.LogWarning($"Failed to retrieve runtime process info, fallback to current process information, pid={_runtimeProcessId}, cookie={_runtimeInstanceId}.");
}
});

requestRuntimeInfo.Start();
return requestRuntimeInfo;
}

public override Task Stop()
{
_logger?.LogInformation($"Stopping IPC client ({_ipcClientRouterFactory.IpcClientPath}) <--> TCP client ({_tcpClientRouterFactory.TcpClientAddress}) router.");
_tcpClientRouterFactory.Stop();
return Task.CompletedTask;
}

public override async Task<Router> CreateRouterAsync(CancellationToken token)
{
Stream tcpClientStream = null;
Stream ipcClientStream = null;

_logger?.LogDebug("Trying to create a new router instance.");

try
{
using CancellationTokenSource cancelRouter = CancellationTokenSource.CreateLinkedTokenSource(token);

// Get new tcp client endpoint.
tcpClientStream = await _tcpClientRouterFactory.ConnectTcpStreamAsync(cancelRouter.Token, true).ConfigureAwait(false);

// Get new ipc client endpoint.
using var ipcClientStreamTask = _ipcClientRouterFactory.ConnectIpcStreamAsync(cancelRouter.Token);

// We have a valid tcp stream and a pending ipc stream. Wait for completion
// or disconnect of tcp stream.
using var checkTcpStreamTask = IsStreamConnectedAsync(tcpClientStream, cancelRouter.Token);

// Wait for at least completion of one task.
await Task.WhenAny(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);

// Cancel out any pending tasks not yet completed.
cancelRouter.Cancel();

try
{
await Task.WhenAll(ipcClientStreamTask, checkTcpStreamTask).ConfigureAwait(false);
}
catch (Exception)
{
// Check if we have an accepted ipc stream.
if (IsCompletedSuccessfully(ipcClientStreamTask))
ipcClientStreamTask.Result?.Dispose();

if (checkTcpStreamTask.IsFaulted)
{
_logger?.LogInformation("Broken tcp connection detected, aborting ipc connection.");
checkTcpStreamTask.GetAwaiter().GetResult();
}

throw;
}

ipcClientStream = ipcClientStreamTask.Result;

try
{
await IpcAdvertise.SerializeAsync(ipcClientStream, _runtimeInstanceId, _runtimeProcessId, token).ConfigureAwait(false);
}
catch (Exception)
{
_logger?.LogDebug("Failed sending advertise message.");
throw;
}
}
catch (Exception)
{
_logger?.LogDebug("Failed creating new router instance.");

// Cleanup and rethrow.
tcpClientStream?.Dispose();
ipcClientStream?.Dispose();

throw;
}

// Create new router.
_logger?.LogDebug("New router instance successfully created.");

return new Router(ipcClientStream, tcpClientStream, _logger, (ulong)IpcAdvertise.V1SizeInBytes);
}
}

internal class Router : IDisposable
{
readonly ILogger _logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ public static async Task<int> runIpcServerTcpClientRouter(CancellationToken toke
return await runRouter(token, new IpcServerTcpClientRouterFactory(ipcServer, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false);
}

public static async Task<int> runIpcClientTcpClientRouter(CancellationToken token, string ipcClient, string tcpClient, int runtimeTimeoutMs, TcpClientRouterFactory.CreateInstanceDelegate tcpClientRouterFactory, ILogger logger, Callbacks callbacks)
{
return await runRouter(token, new IpcClientTcpClientRouterFactory(ipcClient, tcpClient, runtimeTimeoutMs, tcpClientRouterFactory, logger), callbacks).ConfigureAwait(false);
}

public static bool isLoopbackOnly(string address)
{
bool isLooback = false;
Expand All @@ -63,9 +58,8 @@ async static Task<int> runRouter(CancellationToken token, DiagnosticsServerRoute

try
{
await routerFactory.Start(token);
if (!token.IsCancellationRequested)
callbacks?.OnRouterStarted(routerFactory.TcpAddress);
routerFactory.Start();
callbacks?.OnRouterStarted(routerFactory.TcpAddress);

while (!token.IsCancellationRequested)
{
Expand Down
Loading