-
Notifications
You must be signed in to change notification settings - Fork 315
/
DefaultSocketWrapper.cs
141 lines (125 loc) · 5.52 KB
/
DefaultSocketWrapper.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using Microsoft.Spark.Services;
using Microsoft.Spark.Utils;
namespace Microsoft.Spark.Network
{
/// <summary>
/// A simple wrapper of System.Net.Sockets.Socket class.
/// </summary>
internal sealed class DefaultSocketWrapper : ISocketWrapper
{
private readonly Socket _innerSocket;
private Stream _inputStream;
private Stream _outputStream;
/// <summary>
/// Default constructor that creates a new instance of DefaultSocket class which represents
/// a traditional socket (System.Net.Socket.Socket).
///
/// This socket is bound to Loopback with port 0.
/// </summary>
public DefaultSocketWrapper() :
this(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
_innerSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
}
/// <summary>
/// Initializes a instance of DefaultSocket class using the specified
/// System.Net.Socket.Socket object.
/// </summary>
/// <param name="socket">The existing socket</param>
private DefaultSocketWrapper(Socket socket)
{
// Disable Nagle algorithm, which works to combine small packets together at the
// expense of responsiveness; it's focused on reducing congestion on slow networks,
// but all of our accesses are on localhost.
socket.NoDelay = true;
_innerSocket = socket;
}
/// <summary>
/// Releases all resources used by the current instance of the DefaultSocket class.
/// </summary>
public void Dispose()
{
_outputStream?.Dispose();
_inputStream?.Dispose();
_innerSocket.Dispose();
}
/// <summary>
/// Accepts a incoming connection request.
/// </summary>
/// <returns>A DefaultSocket instance used to send and receive data</returns>
public ISocketWrapper Accept() => new DefaultSocketWrapper(_innerSocket.Accept());
/// <summary>
/// Establishes a connection to a remote host that is specified by an IP address and
/// a port number.
/// </summary>
/// <param name="remoteaddr">The IP address of the remote host</param>
/// <param name="port">The port number of the remote host</param>
/// <param name="secret">Secret string to use for connection</param>
public void Connect(IPAddress remoteaddr, int port, string secret)
{
_innerSocket.Connect(new IPEndPoint(remoteaddr, port));
if (!string.IsNullOrWhiteSpace(secret))
{
using (NetworkStream stream = CreateNetworkStream())
{
if (!Authenticator.AuthenticateAsClient(stream, secret))
{
throw new Exception($"Failed to authenticate for port: {port}.");
}
}
}
}
/// <summary>
/// Returns the NetworkStream used to send and receive data.
/// </summary>
/// <remarks>
/// GetStream returns a NetworkStream that you can use to send and receive data.
/// You must close/dispose the NetworkStream by yourself. Closing DefaultSocketWrapper
/// does not release the NetworkStream.
/// </remarks>
/// <returns>The underlying Stream instance that be used to send and receive data</returns>
private NetworkStream CreateNetworkStream() => new NetworkStream(_innerSocket);
/// <summary>
/// Returns a stream used to receive data only.
/// </summary>
/// <returns>The underlying Stream instance that be used to receive data</returns>
public Stream InputStream =>
_inputStream ?? (_inputStream = CreateStream(
ConfigurationService.WorkerReadBufferSizeEnvVarName));
/// <summary>
/// Returns a stream used to send data only.
/// </summary>
/// <returns>The underlying Stream instance that be used to send data</returns>
public Stream OutputStream =>
_outputStream ?? (_outputStream = CreateStream(
ConfigurationService.WorkerWriteBufferSizeEnvVarName));
private Stream CreateStream(string bufferSizeEnvVarName)
{
string envVar = Environment.GetEnvironmentVariable(bufferSizeEnvVarName);
if (string.IsNullOrEmpty(envVar) ||
!int.TryParse(envVar, out var writeBufferSize))
{
// The default buffer size is 64K, PythonRDD also use 64K as default buffer size.
writeBufferSize = 64 * 1024;
}
Stream ns = CreateNetworkStream();
return (writeBufferSize > 0) ? new BufferedStream(ns, writeBufferSize) : ns;
}
/// <summary>
/// Starts listening for incoming connections requests
/// </summary>
/// <param name="backlog">The maximum length of the pending connections queue. </param>
public void Listen(int backlog = 16) => _innerSocket.Listen(backlog);
/// <summary>
/// Returns the local endpoint.
/// </summary>
public EndPoint LocalEndPoint => _innerSocket.LocalEndPoint;
}
}