-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathBufferedClientStreamWriter.cs
306 lines (282 loc) · 13.3 KB
/
BufferedClientStreamWriter.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
/*
* Copyright 2016 Google Inc. All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file or at
* https://developers.google.com/open-source/licenses/bsd
*/
using Grpc.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Google.Api.Gax.Grpc
{
// TODO: Check whether cancellation token associated with original RPC is honoured throughout
// TODO: Check whether CallOptions are honoured throughout
// TODO: Check that the "further calls to Write after failure" handling is appropriate. (And document it!)
/// <summary>
/// A wrapper around <see cref="IClientStreamWriter{T}"/> which removes the "one write at a time"
/// restriction by buffering messages (and the completion signal) up to a given capacity.
/// </summary>
/// <typeparam name="T">The type of message in the stream.</typeparam>
public class BufferedClientStreamWriter<T> where T : class
{
private readonly IClientStreamWriter<T> _writer;
private readonly object _lock = new object();
/// <summary>
/// Queue of requests. If this is non-empty, there's at least one request in-flight, which
/// is always the head of the queue.
/// </summary>
private readonly LinkedList<Request> _queue;
// TODO: Make these internal properties public?
/// <summary>
/// The capacity of the writer.
/// </summary>
internal int Capacity { get; }
/// <summary>
/// The number of write calls that have been buffered.
/// </summary>
/// <remarks>
/// The value of this property may change due to activity from other threads. It should only be used
/// for testing and similar scenarios where the system state is well understood.
/// </remarks>
internal int BufferedWriteCount
{
get
{
lock (_lock)
{
return _queue.Count;
}
}
}
private bool _completed;
private Task _failedTask;
/// <summary>
/// Constructs an instance which writes to the specified writer, and with the given capacity.
/// </summary>
/// <param name="writer">The writer to delegate to.</param>
/// <param name="capacity">The maximum number of messages to buffer.</param>
public BufferedClientStreamWriter(IClientStreamWriter<T> writer, int capacity)
{
_writer = GaxPreconditions.CheckNotNull(writer, nameof(writer));
_queue = new LinkedList<Request>();
Capacity = capacity;
GaxPreconditions.CheckArgument(capacity >= 1, nameof(capacity), "Capacity cannot be less than 1.");
}
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called. The same write options will be used as for the previous message.
/// </summary>
/// <param name="message">The message to write.</param>
/// <returns><c>null</c> if the message queue is full or the stream has already been completed;
/// otherwise, a <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public Task TryWriteAsync(T message) => WriteAsyncImpl(message, null, false, false);
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called.
/// </summary>
/// <param name="message">The message to write.</param>
/// <param name="options">The write options to use for this message.</param>
/// <returns><c>null</c> if the message queue is full or the stream has already been completed.</returns>
public Task TryWriteAsync(T message, WriteOptions options) => WriteAsyncImpl(message, options, true, false);
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called. The same write options will be used as for the previous message.
/// </summary>
/// <param name="message">The message to write.</param>
/// <exception cref="InvalidOperationException">There isn't enough space left in the buffer,
/// or the stream has been completed.</exception>
/// <returns>A <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public Task WriteAsync(T message) => WriteAsyncImpl(message, null, false, true);
/// <summary>
/// Writes a message to the stream, if there is enough space in the buffer and <see cref="WriteCompleteAsync"/>
/// hasn't already been called.
/// </summary>
/// <param name="message">The message to write.</param>
/// <param name="options">The write options to use for this message.</param>
/// <exception cref="InvalidOperationException">There isn't enough space left in the buffer,
/// or <see cref="WriteCompleteAsync"/> has already been called.</exception>
/// <returns>A <see cref="Task"/> which will complete when the message has been written to the stream.</returns>
public Task WriteAsync(T message, WriteOptions options) => WriteAsyncImpl(message, options, true, true);
private Task WriteAsyncImpl(T message, WriteOptions options, bool modifyOptions, bool throwOnError)
{
GaxPreconditions.CheckNotNull(message, nameof(message));
lock (_lock)
{
if (!ValidateStateForWrite(throwOnError, false))
{
return null;
}
// Not part of state validation, as it's slightly different.
if (_failedTask != null)
{
return _failedTask;
}
var lastRequest = _queue.Last?.Value;
var effectiveOptions = modifyOptions ? options
: lastRequest != null ? lastRequest.WriteOptions
: _writer.WriteOptions;
var request = new Request(message, effectiveOptions, HandleWriteComplete);
if (lastRequest == null)
{
// If there's nothing in flight, send immediately. Otherwise,
// we'll let HandleWriteComplete trigger sending us at the right time.
request.WriteTo(_writer);
}
_queue.AddLast(request);
return request.CompletionSource.Task;
}
}
/// <summary>
/// One of the writes completes - possibly successfully, possibly not. On success,
/// we start the next write (or complete) sending if there is one. On failure, we propagate the result
/// of this task to all other tasks. Those will in turn trigger further calls to this method,
/// but by that time we'll have retained the failed task so we can just exit quickly.
/// </summary>
/// <param name="writeResult"></param>
private void HandleWriteComplete(Task writeResult)
{
lock (_lock)
{
if (_failedTask != null)
{
return;
}
switch (writeResult.Status)
{
case TaskStatus.RanToCompletion:
_queue.RemoveFirst();
_queue.First?.Value?.WriteTo(_writer);
break;
default:
_failedTask = writeResult;
// Collect all the pending tasks - skipping the head, which is the one we've already got.
var sourcesToPropagateTo = _queue.Skip(1).Select(r => r.CompletionSource).ToList();
_queue.Clear();
foreach (var source in sourcesToPropagateTo)
{
PropagateTaskResult(writeResult, source);
}
break;
}
}
}
/// <summary>
/// Validates that we can write to the stream, optionally throwing if there's an error.
/// This is basically to avoid a big chunk of code appearing in WriteAsyncImpl.
/// </summary>
private bool ValidateStateForWrite(bool throwOnError, bool isCompletion)
{
if (_completed)
{
if (throwOnError)
{
throw new InvalidOperationException(isCompletion ?
$"Cannot call {nameof(WriteCompleteAsync)} twice" :
$"Cannot call {nameof(WriteAsync)} after {nameof(WriteCompleteAsync)}");
}
else
{
return false;
}
}
if (_queue.Count == Capacity)
{
if (throwOnError)
{
throw new InvalidOperationException("Queue is full");
}
else
{
return false;
}
}
return true;
}
/// <summary>
/// Completes the stream when all buffered messages have been sent, if there is enough space in the buffer.
/// This method can only be successfully called once, and further messages cannot be written after it
/// has been successfully called.
/// </summary>
/// <returns><c>null</c> if this stream has already be completed, or if the buffer is full; otherwise a
/// <see cref="Task"/> which will complete when the stream has finished being completed.</returns>
public Task TryWriteCompleteAsync() => WriteCompleteAsyncImpl(false);
/// <summary>
/// Completes the stream when all buffered messages have been sent, if there is enough space in the buffer.
/// This method can only be successfully called once, and further messages cannot be written after it
/// has been successfully called.
/// </summary>
/// <exception cref="InvalidOperationException">This stream has already be completed, or the buffer is full</exception>
/// <returns>A <see cref="Task"/> which will complete when the stream has finished being completed.</returns>
public Task WriteCompleteAsync() => WriteCompleteAsyncImpl(true);
private Task WriteCompleteAsyncImpl(bool throwOnError)
{
lock (_lock)
{
if (!ValidateStateForWrite(throwOnError, true))
{
return null;
}
_completed = true;
if (_failedTask != null)
{
return _failedTask;
}
var lastRequest = _queue.Last?.Value;
var request = new Request(null, null, null);
if (lastRequest == null)
{
// If there's nothing in flight, send immediately. Otherwise,
// we'll let HandleWriteComplete trigger sending us at the right time.
request.WriteTo(_writer);
}
_queue.AddLast(request);
// No need to hook to HandleWriteComplete, as there are no more tasks to start.
return request.CompletionSource.Task;
}
}
private static void PropagateTaskResult(Task task, TaskCompletionSource<int> completionSource)
{
switch (task.Status)
{
case TaskStatus.Canceled:
completionSource.SetCanceled();
break;
case TaskStatus.Faulted:
completionSource.SetException(task.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
completionSource.SetResult(0);
break;
default:
throw new InvalidOperationException($"Invalid status for completed task: {task.Status}");
}
}
private class Request
{
// The message to write, or null to call Complete instead.
internal T Message { get; }
internal WriteOptions WriteOptions { get; }
internal TaskCompletionSource<int> CompletionSource { get; }
private Action<Task> _continuation;
public Request(T message, WriteOptions options, Action<Task> continuation)
{
Message = message;
WriteOptions = options;
_continuation = continuation;
CompletionSource = new TaskCompletionSource<int>();
}
internal void WriteTo(IClientStreamWriter<T> writer)
{
writer.WriteOptions = WriteOptions;
Task task = Message == null ? writer.CompleteAsync() : writer.WriteAsync(Message);
task.ContinueWith(t =>
{
_continuation?.Invoke(t);
PropagateTaskResult(t, CompletionSource);
});
}
}
}
}