Skip to content

Commit

Permalink
Makes connections be parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
ThadHouse committed May 26, 2017
1 parent b6f8fd3 commit ec9f256
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 37 deletions.
19 changes: 12 additions & 7 deletions src/FRC.NetworkTables/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using NetworkTables.Interfaces;
using NetworkTables.TcpSockets;
using NetworkTables.Logging;
using System;

namespace NetworkTables
{
Expand All @@ -18,7 +19,7 @@ private Dispatcher() : this(Storage.Instance, Notifier.Instance)
public Dispatcher(Storage storage, Notifier notifier)
: base(storage, notifier)
{

}

/// <summary>
Expand All @@ -45,27 +46,31 @@ public void StartServer(string persistentFilename, string listenAddress, int por

public void SetServer(string serverName, int port)
{
SetConnector(() => TcpConnector.Connect(serverName, port, Logger.Instance, 1));
SetConnector(() =>
{
return TcpConnector.Connect(new List<(string server, int port)> { (serverName, port) }, Logger.Instance, TimeSpan.FromSeconds(1));
});
}

public void SetServer(IList<NtIPAddress> servers)
{
List<Connector> connectors = new List<Connector>();
List<(string server, int port)> addresses = new List<(string server, int port)>(servers.Count);
foreach (var server in servers)
{
connectors.Add(() => TcpConnector.Connect(server.IpAddress, server.Port, Logger.Instance, 1));
addresses.Add((server.IpAddress, server.Port));
}
SetConnector(connectors);

SetConnector(() => TcpConnector.Connect(addresses, Logger.Instance, TimeSpan.FromSeconds(1)));
}

public void SetServerOverride(IPAddress address, int port)
{
SetConnectorOverride(() => TcpConnector.Connect(address.ToString(), port, Logger.Instance, 1));
SetConnectorOverride(() => TcpConnector.Connect(new List<(string server, int port)> { (address.ToString(), port) }, Logger.Instance, TimeSpan.FromSeconds(1)));
}

public void ClearServerOverride()
{
ClearConnectorOverride();
}
}
}
}
2 changes: 1 addition & 1 deletion src/FRC.NetworkTables/DispatcherBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace NetworkTables
{
internal class DispatcherBase : IDisposable
{
public delegate NtTcpClient Connector();
public delegate IClient Connector();

public const double MinimumUpdateTime = 0.01; //100ms
public const double MaximumUpdateTime = 1.0; //1 second
Expand Down
117 changes: 88 additions & 29 deletions src/FRC.NetworkTables/TcpSockets/TcpConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,117 @@
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;
using static NetworkTables.Logging.Logger;
using Nito.AsyncEx;
using System.IO;

namespace NetworkTables.TcpSockets
{
internal class TcpConnector
{
private static bool WaitAndUnwrapException(Task task, int timeout)
public class TcpClientNt : IClient
{
try
private readonly TcpClient m_client;

internal TcpClientNt(TcpClient client)
{
return task.Wait(timeout);
m_client = client;
}
catch (AggregateException ex)

public Stream GetStream()
{
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
throw ex.InnerException;
return m_client.GetStream();
}
public EndPoint RemoteEndPoint
{
get
{
return m_client.Client.RemoteEndPoint;
}
}
public bool NoDelay
{
set
{
}
}

public void Dispose()
{
m_client.Dispose();
}
}

private static int ResolveHostName(string hostName, out IPAddress[] addr)
public static IClient Connect(IList<(string server, int port)> servers, Logger logger, TimeSpan timeout)
{
try
if (servers.Count == 0)
{
var entries = Dns.GetHostAddressesAsync(hostName);
var success = WaitAndUnwrapException(entries, 1000);
if (!success)
return null;
}

TcpClient c = AsyncContext.Run(async () => {
TcpClient toReturn = null;
var clientTcp = new List<TcpClient>();
var clientTask = new List<Task>();
try
{
addr = null;
return 1;
for (int i = 0; i < servers.Count; i++)
{
TcpClient client = new TcpClient();
Task connectTask = client.ConnectAsync(servers[i].server, servers[i].port);
clientTcp.Add(client);
clientTask.Add(connectTask);
}
// 10 second timeout
var delayTask = Task.Delay(timeout);
clientTask.Add(delayTask);
while (clientTcp.Count != 0)
{
var finished = await Task.WhenAny(clientTask);
var index = clientTask.IndexOf(finished);
if (finished == delayTask)
{
return null;
}
else if (finished.IsCompleted && !finished.IsFaulted && !finished.IsCanceled)
{
toReturn = clientTcp[index];
return toReturn;
}
var remove = clientTcp[index];
clientTcp.RemoveAt(index);
remove.Dispose();
clientTask.RemoveAt(index);
}
return null;
}
List<IPAddress> addresses = new List<IPAddress>();
foreach (var ipAddress in entries.Result)
finally
{
// Only allow IPV4 addresses for now
// Sockets don't all support IPV6
if (ipAddress.AddressFamily == AddressFamily.InterNetwork)
for (int i = 0; i < clientTcp.Count; i++)
{
if (!addresses.Contains(ipAddress))
if (clientTcp[i] != toReturn)
{
addresses.Add(ipAddress);
try
{
clientTcp[i].Dispose();
}
catch (Exception e)
{
// Ignore exception
}
}
}
}
addr = addresses.ToArray();
});

}
catch (SocketException e)
{
addr = null;
return (int)e.SocketErrorCode;
}
return 0;
if (c == null) return null;
return new TcpClientNt(c);
}

/*
public static NtTcpClient Connect(string server, int port, Logger logger, int timeout = 0)
{
if (ResolveHostName(server, out IPAddress[] addr) != 0)
Expand Down Expand Up @@ -102,5 +160,6 @@ public static NtTcpClient Connect(string server, int port, Logger logger, int ti
}
return client;
}
*/
}
}
}

0 comments on commit ec9f256

Please sign in to comment.