-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
QueueBufferConfig.java
742 lines (658 loc) · 32.9 KB
/
QueueBufferConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
/*
* Copyright 2012-2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.sqs.buffered;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteMessageResult;
public class QueueBufferConfig {
public static final int MAX_BATCH_SIZE_DEFAULT = 10;
/** the maximum number of entries in a batch command */
private int maxBatchSize;
/** Updated as the service now supports messages of size max 256 KiB. */
public static final long SERVICE_MAX_BATCH_SIZE_BYTES = 256 * 1024;
/**
* The maximum time (milliseconds) a send batch is held open for additional outbound requests.
* The longer this timeout, the longer messages wait for other messages to be added to the
* batch. Increasing this timeout reduces the number of calls made and increases throughput, but
* also increases average message latency.
*/
private long maxBatchOpenMs;
/** 200 milliseconds */
public static final long MAX_BATCH_OPEN_MS_DEFAULT = 200;
/**
* If true, even synchronous calls to delete messages will be made using background
* asynchronous batches. The client will return results indicating that the messages were deleted successfully
* even if the background calls eventually fail; the actual errors will be logged instead.
* This can be beneficial for decreasing message acknowledgement latency at the cost of potential
* duplicate messages (which can be produced by SQS itself anyway).
*/
private boolean deleteInBackground = false;
/**
* Should we use long polling or not?
*/
private boolean longPoll;
/** true */
private static final boolean LONG_POLL_DEFAULT = true;
/**
* The maximum number of concurrent batches for each type of outbound request. The greater the
* number, the greater the throughput that can be achieved (at the expense of consuming more
* threads).
*/
private int maxInflightOutboundBatches;
/** 5 batches */
public static final int MAX_INFLIGHT_OUTBOUND_BATCHES_DEFAULT = 5;
/**
* The maximum number of concurrent receive message batches. The greater this number, the faster
* the queue will be pulling messages from the SQS servers (at the expense of consuming more
* threads).
*/
private int maxInflightReceiveBatches;
/** 10 batches */
public static final int MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT = 10;
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop. The larger this number, the more messages the buffer queue will
* pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
* satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
* which means that while the message is in the local buffer it is unavailable for other clients
* to process, and when this client retrieves it, part of the visibility timeout may have
* already expired. The number of messages prefetched will not exceed maxBatchSize *
* maxDoneReceiveBatches.
*/
private int maxDoneReceiveBatches;
/** 10 batches */
public static final int MAX_DONE_RECEIVE_BATCHES_DEFAULT = 10;
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes
*/
private long maxBatchSizeBytes;
/** 256 kilobytes */
public static final long MAX_BATCH_SIZE_BYTES_DEFAULT = SERVICE_MAX_BATCH_SIZE_BYTES;
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
private int visibilityTimeoutSeconds;
/** -1, which means use the visibility timeout of the queue */
public static final int VISIBILITY_TIMEOUT_SECONDS_DEFAULT = -1;
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
private int longPollWaitTimeoutSeconds;
public static final int LONGPOLL_WAIT_TIMEOUT_SECONDS_DEFAULT = 20;
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
* <p></p>
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
private int minReceiveWaitTimeMs = MIN_RECEIVE_WAIT_TIME_MS_DEFAULT;
/** 50 ms, which is in the ballpark for typical latency contacting a remote service like SQS */
public static final int MIN_RECEIVE_WAIT_TIME_MS_DEFAULT = 50;
/**
* Specifies the message attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
private List<String> receiveMessageAttributeNames = RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT;
public static final List<String> RECEIVE_MESSAGE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList();
/**
* Specifies the attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
private List<String> receiveAttributeNames = RECEIVE_ATTRIBUTE_NAMES_DEFAULT;
public static final List<String> RECEIVE_ATTRIBUTE_NAMES_DEFAULT = Collections.emptyList();
/**
* If set, prefetching will be scaled with the number of in-flight incoming receive requests
* made to the client. The advantage of this is reducing the number of outgoing requests
* made to SQS when incoming requests are reduced: in particular, if all incoming requests
* stop no future requests to SQS will be made. The disadvantage is increased latency when
* incoming requests first start occurring.
*/
private boolean adaptivePrefetching = ADAPTIVE_PREFETCHING_DEFAULT;
public static final boolean ADAPTIVE_PREFETCHING_DEFAULT = false;
/**
* Option to configure flushOnShutdown. Enabling this option will flush the pending requests in the
* {@link SendQueueBuffer} during shutdown.
* <p>
* The default value is false which indicates flushOnShutdown is disabled.
* </p>
*/
private boolean flushOnShutdown = false;
public QueueBufferConfig(long maxBatchOpenMs, int maxInflightOutboundBatches, int maxInflightReceiveBatches,
int maxDoneReceiveBatches, boolean paramLongPoll, long maxBatchSizeBytes, int visibilityTimeout,
int longPollTimeout, int maxBatch) {
super();
this.maxBatchOpenMs = maxBatchOpenMs;
this.maxInflightOutboundBatches = maxInflightOutboundBatches;
this.maxInflightReceiveBatches = maxInflightReceiveBatches;
this.maxDoneReceiveBatches = maxDoneReceiveBatches;
this.longPoll = paramLongPoll;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.visibilityTimeoutSeconds = visibilityTimeout;
this.longPollWaitTimeoutSeconds = longPollTimeout;
this.maxBatchSize = maxBatch;
}
public QueueBufferConfig() {
this(MAX_BATCH_OPEN_MS_DEFAULT, MAX_INFLIGHT_OUTBOUND_BATCHES_DEFAULT, MAX_INFLIGHT_RECEIVE_BATCHES_DEFAULT,
MAX_DONE_RECEIVE_BATCHES_DEFAULT, LONG_POLL_DEFAULT, MAX_BATCH_SIZE_BYTES_DEFAULT,
VISIBILITY_TIMEOUT_SECONDS_DEFAULT, LONGPOLL_WAIT_TIMEOUT_SECONDS_DEFAULT, MAX_BATCH_SIZE_DEFAULT);
}
/** copy constructor */
public QueueBufferConfig(QueueBufferConfig other) {
longPoll = other.longPoll;
longPollWaitTimeoutSeconds = other.longPollWaitTimeoutSeconds;
minReceiveWaitTimeMs = other.minReceiveWaitTimeMs;
maxBatchOpenMs = other.maxBatchOpenMs;
maxBatchSize = other.maxBatchSize;
maxBatchSizeBytes = other.maxBatchSizeBytes;
maxDoneReceiveBatches = other.maxDoneReceiveBatches;
maxInflightOutboundBatches = other.maxInflightOutboundBatches;
maxInflightReceiveBatches = other.maxInflightReceiveBatches;
visibilityTimeoutSeconds = other.visibilityTimeoutSeconds;
flushOnShutdown = other.flushOnShutdown;
receiveAttributeNames = other.receiveAttributeNames;
receiveMessageAttributeNames = other.receiveMessageAttributeNames;
adaptivePrefetching = other.adaptivePrefetching;
deleteInBackground = other.deleteInBackground;
}
@Override
public String toString() {
return "QueueBufferConfig [maxBatchSize=" + maxBatchSize + ", maxBatchOpenMs=" + maxBatchOpenMs + ", longPoll="
+ longPoll + ", maxInflightOutboundBatches=" + maxInflightOutboundBatches
+ ", maxInflightReceiveBatches=" + maxInflightReceiveBatches + ", maxDoneReceiveBatches="
+ maxDoneReceiveBatches + ", maxBatchSizeBytes=" + maxBatchSizeBytes + ", visibilityTimeoutSeconds="
+ visibilityTimeoutSeconds + ", longPollWaitTimeoutSeconds=" + longPollWaitTimeoutSeconds + "]";
}
/**
* The maximum time (milliseconds) a send batch is held open for additional outbound requests.
* The longer this timeout, the longer messages wait for other messages to be added to the
* batch. Increasing this timeout reduces the number of calls made and increases throughput, but
* also increases average message latency.
*/
public long getMaxBatchOpenMs() {
return maxBatchOpenMs;
}
/**
* The maximum time (milliseconds) a send batch is held open for additional outbound requests.
* The longer this timeout, the longer messages wait for other messages to be added to the
* batch. Increasing this timeout reduces the number of calls made and increases throughput, but
* also increases average message latency.
*/
public void setMaxBatchOpenMs(long maxBatchOpenMs) {
this.maxBatchOpenMs = maxBatchOpenMs;
}
/**
* The maximum time (milliseconds) a send batch is held open for additional outbound requests.
* The longer this timeout, the longer messages wait for other messages to be added to the
* batch. Increasing this timeout reduces the number of calls made and increases throughput, but
* also increases average message latency.
*/
public QueueBufferConfig withMaxBatchOpenMs(long maxBatchOpenMs) {
setMaxBatchOpenMs(maxBatchOpenMs);
return this;
}
/**
* If set, even synchronous calls to delete messages will be made using background
* asynchronous batches. The client will return results indicating that the messages were deleted successfully
* even if the background calls eventually fail; the actual result of the deletions will be reported
* through the given handler instead (often just logging errors). This can be beneficial for decreasing message
* acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
*/
public boolean isDeleteInBackground() {
return deleteInBackground;
}
/**
* If set, even synchronous calls to delete messages will be made using background
* asynchronous batches. The client will return results indicating that the messages were deleted successfully
* even if the background calls eventually fail; any errors result of the deletions will be reported
* through the given handler instead (often just logging errors). This can be beneficial for decreasing message
* acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
*/
public void setDeleteInBackground(boolean deleteInBackground) {
this.deleteInBackground = deleteInBackground;
}
/**
* If set, even synchronous calls to delete messages will be made using background
* asynchronous batches. The client will return results indicating that the messages were deleted successfully
* even if the background calls eventually fail; the actual result of the deletions will be reported
* through the given handler instead (often just logging errors). This can be beneficial for decreasing message
* acknowledgement latency at the cost of potential duplicate messages (which can be produced by SQS itself anyway).
*/
public QueueBufferConfig withDeleteInBackground(boolean deleteInBackground) {
setDeleteInBackground(deleteInBackground);
return this;
}
/**
* @return true if the queue buffer will use long polling while retrieving messages from the
* SQS server, false otherwise.
*/
public boolean isLongPoll() {
return longPoll;
}
/**
* Specify "true" for receive requests to use long polling.
*/
public void setLongPoll(boolean longPoll) {
this.longPoll = longPoll;
}
/**
* Specify "true" for receive requests to use long polling.
*/
public QueueBufferConfig withLongPoll(boolean longPoll) {
setLongPoll(longPoll);
return this;
}
/**
* The maximum number of concurrent batches for each type of outbound request. The greater the
* number, the greater the throughput that can be achieved (at the expense of consuming more
* threads).
*/
public int getMaxInflightOutboundBatches() {
return maxInflightOutboundBatches;
}
/**
* The maximum number of concurrent batches for each type of outbound request. The greater the
* number, the greater the throughput that can be achieved (at the expense of consuming more
* threads).
*/
public void setMaxInflightOutboundBatches(int maxInflightOutboundBatches) {
this.maxInflightOutboundBatches = maxInflightOutboundBatches;
}
/**
* The maximum number of concurrent batches for each type of outbound request. The greater the
* number, the greater the throughput that can be achieved (at the expense of consuming more
* threads).
*/
public QueueBufferConfig withMaxInflightOutboundBatches(int maxInflightOutboundBatches) {
setMaxInflightOutboundBatches(maxInflightOutboundBatches);
return this;
}
/**
* The maximum number of concurrent receive message batches. The greater this number, the faster
* the queue will be pulling messages from the SQS servers (at the expense of consuming more
* threads).
*/
public int getMaxInflightReceiveBatches() {
return maxInflightReceiveBatches;
}
/**
* The maximum number of concurrent receive message batches. The greater this number, the faster
* the queue will be pulling messages from the SQS servers (at the expense of consuming more
* threads).
*/
public void setMaxInflightReceiveBatches(int maxInflightReceiveBatches) {
this.maxInflightReceiveBatches = maxInflightReceiveBatches;
}
/**
* The maximum number of concurrent receive message batches. The greater this number, the faster
* the queue will be pulling messages from the SQS servers (at the expense of consuming more
* threads).
*/
public QueueBufferConfig withMaxInflightReceiveBatches(int maxInflightReceiveBatches) {
setMaxInflightReceiveBatches(maxInflightReceiveBatches);
return this;
}
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop.<br>
* The larger this number, the more messages the queue buffer will pre-fetch and keep in the
* buffer on the client side, and the faster receive requests will be satisfied. <br>
* The visibility timeout of a pre-fetched message starts at the point of pre-fetch, which means
* that while the message is in the local buffer it is unavailable for other clients to process,
* and when this client retrieves it, part of the visibility timeout may have already expired.<br>
* The number of messages prefetched will not exceed 10 * maxDoneReceiveBatches, as there can be
* a maximum of 10 messages per batch.<br>
*/
public int getMaxDoneReceiveBatches() {
return maxDoneReceiveBatches;
}
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop. The larger this number, the more messages the buffer queue will
* pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
* satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
* which means that while the message is in the local buffer it is unavailable for other clients
* to process, and when this client retrieves it, part of the visibility timeout may have
* already expired. The number of messages prefetched will not exceed maxBatchSize *
* maxDoneReceiveBatches.
*/
public void setMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
this.maxDoneReceiveBatches = maxDoneReceiveBatches;
}
/**
* If more than that number of completed receive batches are waiting in the buffer, the querying
* for new messages will stop. The larger this number, the more messages the buffer queue will
* pre-fetch and keep in the buffer on the client side, and the faster receive requests will be
* satisfied. The visibility timeout of a pre-fetched message starts at the point of pre-fetch,
* which means that while the message is in the local buffer it is unavailable for other clients
* to process, and when this client retrieves it, part of the visibility timeout may have
* already expired. The number of messages prefetched will not exceed maxBatchSize *
* maxDoneReceiveBatches.
*/
public QueueBufferConfig withMaxDoneReceiveBatches(int maxDoneReceiveBatches) {
setMaxDoneReceiveBatches(maxDoneReceiveBatches);
return this;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*/
public long getMaxBatchSizeBytes() {
return maxBatchSizeBytes;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*
* @throws IllegalArgumentException
* if the size being set is greater than the service allowed size for message body.
*/
public void setMaxBatchSizeBytes(long maxBatchSizeBytes) {
if (maxBatchSizeBytes > SERVICE_MAX_BATCH_SIZE_BYTES) {
throw new IllegalArgumentException(
"Maximum Size of the message cannot be greater than the allowed limit of "
+ SERVICE_MAX_BATCH_SIZE_BYTES + " bytes");
}
this.maxBatchSizeBytes = maxBatchSizeBytes;
}
/**
* Maximum permitted size of a SendMessage or SendMessageBatch message, in bytes. This setting
* is also enforced on the server, and if this client submits a request of a size larger than
* the server can support, the server will reject the request.
*
* @throws IllegalArgumentException
* if the size being set is greater than the service allowed size for message body.
*/
public QueueBufferConfig withMaxBatchSizeBytes(long maxBatchSizeBytes) {
setMaxBatchSizeBytes(maxBatchSizeBytes);
return this;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public int getVisibilityTimeoutSeconds() {
return visibilityTimeoutSeconds;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public void setVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
this.visibilityTimeoutSeconds = visibilityTimeoutSeconds;
}
/**
* Custom visibility timeout to use when retrieving messages from SQS. If set to a value greater
* than zero, this timeout will override the default visibility timeout set on the SQS queue.
* Set it to -1 to use the default visiblity timeout of the queue. Visibility timeout of 0
* seconds is not supported.
*/
public QueueBufferConfig withVisibilityTimeoutSeconds(int visibilityTimeoutSeconds) {
setVisibilityTimeoutSeconds(visibilityTimeoutSeconds);
return this;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public void setLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
this.longPollWaitTimeoutSeconds = longPollWaitTimeoutSeconds;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public int getLongPollWaitTimeoutSeconds() {
return longPollWaitTimeoutSeconds;
}
/**
* Specifies the amount of time, in seconds, the receive call will block on the server waiting
* for messages to arrive if the queue is empty when the receive call is first made. This
* setting has no effect if long polling is disabled.
*/
public QueueBufferConfig withLongPollWaitTimeoutSeconds(int longPollWaitTimeoutSeconds) {
setLongPollWaitTimeoutSeconds(longPollWaitTimeoutSeconds);
return this;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
* <p></p>
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public int getMinReceiveWaitTimeMs() {
return minReceiveWaitTimeMs;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
* <p></p>
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public void setMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
this.minReceiveWaitTimeMs = minReceiveWaitTimeMs;
}
/**
* Configures the minimum wait time for incoming receive message requests. Without a non-zero
* minimum wait time, threads can easily waste CPU time busy-waiting against empty local buffers.
* Avoid setting this to 0 unless you are confident threads will do useful work in-between
* each call to receive messages!
* <p></p>
* This will be applied to both requests that explicitly set WaitTimeSeconds and
* those that inherit the ReceiveMessageWaitTimeSeconds queue attribute.
*/
public QueueBufferConfig withMinReceiveWaitTimeMs(int minReceiveWaitTimeMs) {
setMinReceiveWaitTimeMs(minReceiveWaitTimeMs);
return this;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public int getMaxBatchSize() {
return maxBatchSize;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public void setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}
/**
* Specifies the maximum number of entries the buffered client will put in a single batch
* request.
*/
public QueueBufferConfig withMaxBatchSize(int maxBatchSize) {
setMaxBatchSize(maxBatchSize);
return this;
}
/**
* Specifies the attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public List<String> getReceiveAttributeNames() {
return receiveAttributeNames;
}
/**
* Specifies the attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public void setReceiveAttributeNames(List<String> receiveAttributeNames) {
if (receiveAttributeNames == null) {
this.receiveAttributeNames = Collections.emptyList();
} else {
this.receiveAttributeNames = Collections.unmodifiableList(new ArrayList<String>(receiveAttributeNames));
}
}
/**
* Specifies the attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require attributes
* will not be fulfilled from buffers.
*/
public QueueBufferConfig withReceiveAttributeNames(List<String> receiveAttributes) {
setReceiveAttributeNames(receiveAttributes);
return this;
}
/**
* Specifies the message attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public List<String> getReceiveMessageAttributeNames() {
return receiveMessageAttributeNames;
}
/**
* Specifies the message attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public void setReceiveMessageAttributeNames(List<String> receiveMessageAttributeNames) {
if (receiveMessageAttributeNames == null) {
this.receiveMessageAttributeNames = Collections.emptyList();
} else {
this.receiveMessageAttributeNames = Collections.unmodifiableList(new ArrayList<String>(receiveMessageAttributeNames));
}
}
/**
* Specifies the message attributes receive calls will request. Only receive message requests that
* request the same set of attributes will be satisfied from the receive buffers.
* <p>
* The default value is an empty list, so any receive requests that require message attributes
* will not be fulfilled from buffers.
*/
public QueueBufferConfig withReceiveMessageAttributeNames(List<String> receiveMessageAttributes) {
setReceiveMessageAttributeNames(receiveMessageAttributes);
return this;
}
/**
* If set, prefetching will be scaled with the number of in-flight incoming receive requests
* made to the client. The advantage of this is reducing the number of outgoing requests
* made to SQS when incoming requests are reduced: in particular, if all incoming requests
* stop no future requests to SQS will be made. The disadvantage is increased latency when
* incoming requests first start occurring.
*/
public void setAdaptivePrefetching(boolean adaptivePrefetching) {
this.adaptivePrefetching = adaptivePrefetching;
}
/**
* If set, prefetching will be scaled with the number of in-flight incoming receive requests
* made to the client. The advantage of this is reducing the number of outgoing requests
* made to SQS when incoming requests are reduced: in particular, if all incoming requests
* stop no future requests to SQS will be made. The disadvantage is increased latency when
* incoming requests first start occurring.
*/
public boolean isAdapativePrefetching() {
return adaptivePrefetching;
}
/**
* If set, prefetching will be scaled with the number of in-flight incoming receive requests
* made to the client. The advantage of this is reducing the number of outgoing requests
* made to SQS when incoming requests are reduced: in particular, if all incoming requests
* stop no future requests to SQS will be made. The disadvantage is increased latency when
* incoming requests first start occurring.
*/
public QueueBufferConfig withAdapativePrefetching(boolean adaptivePrefetching) {
setAdaptivePrefetching(adaptivePrefetching);
return this;
}
/**
* Returns the flushOnShutdown value. The default value is false which indicates flushOnShutdown is disabled.
*
* Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
*
* @return true if flushOnShutdown is enabled, otherwise false.
*/
public boolean isFlushOnShutdown() {
return flushOnShutdown;
}
/**
* Sets the flushOnShutdown option. The default value is false which indicates flushOnShutdown is disabled.
*
* Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
*
* @param flushOnShutdown boolean value to configure flushOnShutdown.
*/
public void setFlushOnShutdown(boolean flushOnShutdown) {
this.flushOnShutdown = flushOnShutdown;
}
/**
* Sets the flushOnShutdown option. The default value is false which indicates flushOnShutdown is disabled.
*
* Enabling this option will flush the pending requests in the {@link SendQueueBuffer} during shutdown.
*
* @param flushOnShutdown boolean value to configure flushOnShutdown.
* @return This object for method chaining.
*/
public QueueBufferConfig withFlushOnShutdown(boolean flushOnShutdown) {
setFlushOnShutdown(flushOnShutdown);
return this;
}
/**
* this method checks the config for validity. If the config is deemed to be invalid, an
* informative exception is thrown.
*
* @throws AmazonClientException
* with a message explaining why the config was invalid
*/
void validate() {
if (visibilityTimeoutSeconds == 0) {
throw new AmazonClientException("Visibility timeout value may not be equal to zero ");
}
}
}