-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathclient.go
1779 lines (1584 loc) · 56.9 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package liftbridge implements a client for the Liftbridge messaging system.
// Liftbridge provides lightweight, fault-tolerant message streams by
// implementing a durable stream augmentation NATS. In particular, it offers a
// publish-subscribe log API that is highly available and horizontally
// scalable.
//
// This package provides APIs for creating and consuming Liftbridge streams and
// some utility APIs for using Liftbridge in combination with NATS.
package liftbridge
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"math/rand"
"sync"
"time"
"github.com/nats-io/nuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
proto "github.com/liftbridge-io/liftbridge-api/go"
)
// MaxReplicationFactor can be used to tell the server to set the replication
// factor equal to the current number of servers in the cluster when creating a
// stream.
const MaxReplicationFactor int32 = -1
// StartPosition controls where to begin consuming in a stream.
type StartPosition int32
func (s StartPosition) toProto() proto.StartPosition {
return proto.StartPosition(s)
}
// StopPosition controls where to stop consuming in a stream.
type StopPosition int32
func (s StopPosition) toProto() proto.StopPosition {
return proto.StopPosition(s)
}
const (
defaultMaxConnsPerBroker = 2
defaultKeepAliveTime = 30 * time.Second
defaultResubscribeWaitTime = 30 * time.Second
defaultAckWaitTime = 5 * time.Second
cursorsStream = "__cursors"
)
var (
// ErrStreamExists is returned by CreateStream if the specified stream
// already exists in the Liftbridge cluster.
ErrStreamExists = errors.New("stream already exists")
// ErrNoSuchStream is returned by DeleteStream if the specified stream does
// not exist in the Liftbridge cluster.
ErrNoSuchStream = errors.New("stream does not exist")
// ErrNoSuchPartition is returned by Subscribe or Publish if the specified
// stream partition does not exist in the Liftbridge cluster.
ErrNoSuchPartition = errors.New("stream partition does not exist")
// ErrStreamDeleted is sent to subscribers when the stream they are
// subscribed to has been deleted.
ErrStreamDeleted = errors.New("stream has been deleted")
// ErrPartitionPaused is sent to subscribers when the stream partition they
// are subscribed to has been paused.
ErrPartitionPaused = errors.New("stream partition has been paused")
// ErrAckTimeout indicates a publish ack was not received in time.
ErrAckTimeout = errors.New("publish ack timeout")
// ErrReadonlyPartition is returned when all messages have been read from a
// read only stream, or when the subscribed to stop position has been
// reached. It is also returned when attempting to publish to a readonly
// partition.
ErrReadonlyPartition = errors.New("readonly partition")
)
// Handler is the callback invoked by Subscribe when a message is received on
// the specified stream. If err is not nil, the subscription will be terminated
// and no more messages will be received.
type Handler func(msg *Message, err error)
// AckHandler is used to handle the results of asynchronous publishes to a
// stream. If the AckPolicy on the published message is not NONE, the handler
// will receive the ack once it's received from the cluster or an error if the
// message was not received successfully.
type AckHandler func(ack *Ack, err error)
// ackContext tracks state for an in-flight message expecting an ack.
type ackContext struct {
handler AckHandler
timer *time.Timer
}
// StreamOptions are used to configure new streams.
type StreamOptions struct {
// Group is the name of a load-balance group. When there are multiple
// streams in the same group, messages will be balanced among them.
Group string
// ReplicationFactor controls the number of servers to replicate a stream
// to. E.g. a value of 1 would mean only 1 server would have the data, and
// a value of 3 would be 3 servers would have it. If this is not set, it
// defaults to 1. A value of -1 will signal to the server to set the
// replication factor equal to the current number of servers in the
// cluster.
ReplicationFactor int32
// Partitions determines how many partitions to create for a stream. If 0,
// this will behave as a stream with a single partition. If this is not
// set, it defaults to 1.
Partitions int32
// The maximum size a stream's log can grow to, in bytes, before we will
// discard old log segments to free up space. A value of 0 indicates no
// limit. If this is not set, it uses the server default value.
RetentionMaxBytes *int64
// The maximum size a stream's log can grow to, in number of messages,
// before we will discard old log segments to free up space. A value of 0
// indicates no limit. If this is not set, it uses the server default
// value.
RetentionMaxMessages *int64
// The TTL for stream log segment files, after which they are deleted. A
// value of 0 indicates no TTL. If this is not set, it uses the server
// default value.
RetentionMaxAge *time.Duration
// The frequency to check if a new stream log segment file should be rolled
// and whether any segments are eligible for deletion based on the
// retention policy or compaction if enabled. If this is not set, it uses
// the server default value.
CleanerInterval *time.Duration
// The maximum size of a single stream log segment file in bytes. Retention
// is always done a file at a time, so a larger segment size means fewer
// files but less granular control over retention. If this is not set, it
// uses the server default value.
SegmentMaxBytes *int64
// The maximum time before a new stream log segment is rolled out. A value
// of 0 means new segments will only be rolled when segment.max.bytes is
// reached. Retention is always done a file at a time, so a larger value
// means fewer files but less granular control over retention. If this is
// not set, it uses the server default value.
SegmentMaxAge *time.Duration
// The maximum number of concurrent goroutines to use for compaction on a
// stream log (only applicable if compact.enabled is true). If this is not
// set, it uses the server default value.
CompactMaxGoroutines *int32
// CompactEnabled controls the activation of stream log compaction. If this
// is not set, it uses the server default value.
CompactEnabled *bool
// The amount of time a stream partition can go idle before it is
// automatically paused. If this is not set, it uses the server default
// value.
AutoPauseTime *time.Duration
// Disables automatic partition pausing when there are subscribers. If this
// is not set, it uses the server default value.
AutoPauseDisableIfSubscribers *bool
// The minimum number of replicas that must acknowledge a stream write
// before it can be committed. If this is not set, it uses the server
// default value.
MinISR *int
}
func (s *StreamOptions) newRequest(subject, name string) *proto.CreateStreamRequest {
req := &proto.CreateStreamRequest{
Subject: subject,
Name: name,
}
req.ReplicationFactor = s.ReplicationFactor
req.Group = s.Group
req.Partitions = s.Partitions
if s.RetentionMaxAge != nil {
req.RetentionMaxAge = &proto.NullableInt64{Value: s.RetentionMaxAge.Milliseconds()}
}
if s.RetentionMaxBytes != nil {
req.RetentionMaxBytes = &proto.NullableInt64{Value: *s.RetentionMaxBytes}
}
if s.RetentionMaxMessages != nil {
req.RetentionMaxMessages = &proto.NullableInt64{Value: *s.RetentionMaxMessages}
}
if s.CleanerInterval != nil {
req.CleanerInterval = &proto.NullableInt64{Value: s.CleanerInterval.Milliseconds()}
}
if s.SegmentMaxBytes != nil {
req.SegmentMaxBytes = &proto.NullableInt64{Value: *s.SegmentMaxBytes}
}
if s.SegmentMaxAge != nil {
req.SegmentMaxAge = &proto.NullableInt64{Value: s.SegmentMaxAge.Milliseconds()}
}
if s.CompactMaxGoroutines != nil {
req.CompactMaxGoroutines = &proto.NullableInt32{Value: *s.CompactMaxGoroutines}
}
if s.CompactEnabled != nil {
req.CompactEnabled = &proto.NullableBool{Value: *s.CompactEnabled}
}
if s.AutoPauseTime != nil {
req.AutoPauseTime = &proto.NullableInt64{Value: s.AutoPauseTime.Milliseconds()}
}
if s.AutoPauseDisableIfSubscribers != nil {
req.AutoPauseDisableIfSubscribers = &proto.NullableBool{Value: *s.AutoPauseDisableIfSubscribers}
}
if s.MinISR != nil {
req.MinIsr = &proto.NullableInt32{Value: int32(*s.MinISR)}
}
return req
}
// StreamOption is a function on the StreamOptions for a stream. These are used
// to configure particular stream options.
type StreamOption func(*StreamOptions) error
// Group is a StreamOption to set the load-balance group for a stream. When
// there are multiple streams in the same group, messages will be balanced
// among them.
func Group(group string) StreamOption {
return func(o *StreamOptions) error {
o.Group = group
return nil
}
}
// ReplicationFactor is a StreamOption to set the replication factor for a
// stream. The replication factor controls the number of servers to replicate a
// stream to. E.g. a value of 1 would mean only 1 server would have the data,
// and a value of 3 would be 3 servers would have it. If this is not set, it
// defaults to 1. A value of -1 will signal to the server to set the
// replication factor equal to the current number of servers in the cluster.
func ReplicationFactor(replicationFactor int32) StreamOption {
return func(o *StreamOptions) error {
o.ReplicationFactor = replicationFactor
return nil
}
}
// MaxReplication is a StreamOption to set the stream replication factor equal
// to the current number of servers in the cluster.
func MaxReplication() StreamOption {
return func(o *StreamOptions) error {
o.ReplicationFactor = MaxReplicationFactor
return nil
}
}
// Partitions is a StreamOption to set the number of partitions for a stream.
// Partitions are ordered, replicated, and durably stored on disk and serve as
// the unit of storage and parallelism for a stream. A partitioned stream for
// NATS subject "foo.bar" with three partitions internally maps to the NATS
// subjects "foo.bar", "foo.bar.1", and "foo.bar.2". A single partition would
// map to "foo.bar" to match behavior of an "un-partitioned" stream. If this is
// not set, it defaults to 1.
func Partitions(partitions int32) StreamOption {
return func(o *StreamOptions) error {
if partitions < 0 {
return fmt.Errorf("invalid number of partitions: %d", partitions)
}
o.Partitions = partitions
return nil
}
}
// RetentionMaxBytes sets the value of the retention.max.bytes configuration
// for the stream. This controls the maximum size a stream's log can grow to,
// in bytes, before we will discard old log segments to free up space. A value
// of 0 indicates no limit. If this is not set, it uses the server default
// value.
func RetentionMaxBytes(val int64) StreamOption {
return func(o *StreamOptions) error {
o.RetentionMaxBytes = &val
return nil
}
}
// RetentionMaxMessages sets the value of the retention.max.messages
// configuration for the stream. This controls the maximum size a stream's log
// can grow to, in number of messages, before we will discard old log segments
// to free up space. A value of 0 indicates no limit. If this is not set, it
// uses the server default value.
func RetentionMaxMessages(val int64) StreamOption {
return func(o *StreamOptions) error {
o.RetentionMaxMessages = &val
return nil
}
}
// RetentionMaxAge sets the value of the retention.max.age configuration for
// the stream. This controls the TTL for stream log segment files, after which
// they are deleted. A value of 0 indicates no TTL. If this is not set, it uses
// the server default value.
func RetentionMaxAge(val time.Duration) StreamOption {
return func(o *StreamOptions) error {
o.RetentionMaxAge = &val
return nil
}
}
// CleanerInterval sets the value of the cleaner.interval configuration for the
// stream. This controls the frequency to check if a new stream log segment
// file should be rolled and whether any segments are eligible for deletion
// based on the retention policy or compaction if enabled. If this is not set,
// it uses the server default value.
func CleanerInterval(val time.Duration) StreamOption {
return func(o *StreamOptions) error {
o.CleanerInterval = &val
return nil
}
}
// SegmentMaxBytes sets the value of the segment.max.bytes configuration for
// the stream. This controls the maximum size of a single stream log segment
// file in bytes. Retention is always done a file at a time, so a larger
// segment size means fewer files but less granular control over retention. If
// this is not set, it uses the server default value.
func SegmentMaxBytes(val int64) StreamOption {
return func(o *StreamOptions) error {
o.SegmentMaxBytes = &val
return nil
}
}
// SegmentMaxAge sets the value of the segment.max.age configuration for the
// stream. Thia controls the maximum time before a new stream log segment is
// rolled out. A value of 0 means new segments will only be rolled when
// segment.max.bytes is reached. Retention is always done a file at a time, so
// a larger value means fewer files but less granular control over retention.
// If this is not set, it uses the server default value.
func SegmentMaxAge(val time.Duration) StreamOption {
return func(o *StreamOptions) error {
o.SegmentMaxAge = &val
return nil
}
}
// CompactMaxGoroutines sets the value of the compact.max.goroutines
// configuration for the stream. This controls the maximum number of concurrent
// goroutines to use for compaction on a stream log (only applicable if
// compact.enabled is true). If this is not set, it uses the server default
// value.
func CompactMaxGoroutines(val int32) StreamOption {
return func(o *StreamOptions) error {
o.CompactMaxGoroutines = &val
return nil
}
}
// CompactEnabled sets the value of the compact.enabled configuration for the
// stream. This controls the activation of stream log compaction. If this is
// not set, it uses the server default value.
func CompactEnabled(val bool) StreamOption {
return func(o *StreamOptions) error {
o.CompactEnabled = &val
return nil
}
}
// AutoPauseTime sets the value of auto.pause.time. This controls the amount of
// time a stream partition can go idle, i.e. not receive a message, before it
// is automatically paused. If this is not set, it uses the server default
// value.
func AutoPauseTime(val time.Duration) StreamOption {
return func(o *StreamOptions) error {
o.AutoPauseTime = &val
return nil
}
}
// AutoPauseDisableIfSubscribers sets the value of
// auto.pause.disable.if.subscribers. This controls whether automatic partition
// pausing should be disabled when there are subscribers. If this is not set,
// it uses the server default value.
func AutoPauseDisableIfSubscribers(val bool) StreamOption {
return func(o *StreamOptions) error {
o.AutoPauseDisableIfSubscribers = &val
return nil
}
}
// MinISR overrides clustering.min.insync.replicas for the given stream. This
// controls the minimum number of replicas that must acknowledge a stream write
// before it can be committed. If this is not set, it uses the server default
// value.
func MinISR(minISR int) StreamOption {
return func(o *StreamOptions) error {
o.MinISR = &minISR
return nil
}
}
// Client is the main API used to communicate with a Liftbridge cluster. Call
// Connect to get a Client instance.
type Client interface {
// Close the client connection.
Close() error
// CreateStream creates a new stream attached to a NATS subject. Subject is
// the NATS subject the stream is attached to, and name is the stream
// identifier, unique per subject. It returns ErrStreamExists if a stream
// with the given subject and name already exists.
CreateStream(ctx context.Context, subject, name string, opts ...StreamOption) error
// DeleteStream deletes a stream and all of its partitions. Name is the
// stream identifier, globally unique.
DeleteStream(ctx context.Context, name string) error
// PauseStream pauses a stream and some or all of its partitions. Name is
// the stream identifier, globally unique. It returns an ErrNoSuchPartition
// if the given stream or partition does not exist. By default, this will
// pause all partitions. A partition is resumed when it is published to via
// the Liftbridge Publish API or ResumeAll is enabled and another partition
// in the stream is published to.
PauseStream(ctx context.Context, name string, opts ...PauseOption) error
// SetStreamReadonly sets the readonly flag on a stream and some or all of
// its partitions. Name is the stream identifier, globally unique. It
// returns an ErrNoSuchPartition if the given stream or partition does not
// exist. By default, this will set the readonly flag on all partitions.
// Subscribers to a readonly partition will see their subscription ended
// with a ErrReadonlyPartition error once all messages currently in the
// partition have been read.
SetStreamReadonly(ctx context.Context, name string, opts ...ReadonlyOption) error
// Subscribe creates an ephemeral subscription for the given stream. It
// begins receiving messages starting at the configured position and waits
// for new messages when it reaches the end of the stream. The default
// start position is the end of the stream.
// ErrNoSuchPartition is returned if the given stream or partition does not
// exist.
// ErrReadonlyPartition is return to subscribers when all messages have been
// read from a read only stream, or when the configured stop position is
// reached.
// Use a cancelable Context to close a subscription.
Subscribe(ctx context.Context, stream string, handler Handler, opts ...SubscriptionOption) error
// Publish publishes a new message to the Liftbridge stream. The partition
// that gets published to is determined by the provided partition or
// Partitioner passed through MessageOptions, if any. If a partition or
// Partitioner is not provided, this defaults to the base partition. This
// partition determines the underlying NATS subject that gets published to.
// To publish directly to a specific NATS subject, use the low-level
// PublishToSubject API.
//
// If the AckPolicy is not NONE, this will synchronously block until the
// ack is received. If the ack is not received in time, ErrAckTimeout is
// returned. If AckPolicy is NONE, this returns nil on success.
Publish(ctx context.Context, stream string, value []byte, opts ...MessageOption) (*Ack, error)
// PublishAsync publishes a new message to the Liftbridge stream and
// asynchronously processes the ack or error for the message.
PublishAsync(ctx context.Context, stream string, value []byte, ackHandler AckHandler, opts ...MessageOption) error
// PublishToSubject publishes a new message to the NATS subject. Note that
// because this publishes directly to a subject, there may be multiple (or
// no) streams that receive the message. As a result, MessageOptions
// related to partitioning will be ignored. To publish at the
// stream/partition level, use the high-level Publish API.
//
// If the AckPolicy is not NONE and a deadline is provided, this will
// synchronously block until the first ack is received. If an ack is not
// received in time, ErrAckTimeout is returned. If an AckPolicy and
// deadline are configured, this returns the first Ack on success,
// otherwise it returns nil.
PublishToSubject(ctx context.Context, subject string, value []byte, opts ...MessageOption) (*Ack, error)
// FetchMetadata returns cluster metadata including broker and stream
// information.
FetchMetadata(ctx context.Context) (*Metadata, error)
// SetCursor persists a cursor position for a particular stream partition.
// This can be used to checkpoint a consumer's position in a stream to
// resume processing later.
SetCursor(ctx context.Context, id, stream string, partition int32, offset int64) error
// FetchCursor retrieves a cursor position for a particular stream
// partition. It returns -1 if the cursor does not exist.
FetchCursor(ctx context.Context, id, stream string, partition int32) (int64, error)
// FetchPartitionMetadata retrieves the metadata of a particular partition
FetchPartitionMetadata(ctx context.Context, stream string, partition int32) (*PartitionInfo, error)
}
// client implements the Client interface. It maintains a pool of connections
// for each broker in the cluster, limiting the number of connections and
// closing them when they go unused for a prolonged period of time.
type client struct {
mu sync.RWMutex
conn *conn
asyncConn *conn
asyncStream proto.API_PublishAsyncClient
ackContexts map[string]*ackContext
metadata *metadataCache
pools map[string]*connPool
opts ClientOptions
dialOpts []grpc.DialOption
closed chan struct{}
}
// ClientOptions are used to control the Client configuration.
type ClientOptions struct {
// Brokers it the set of hosts the client will use when attempting to
// connect.
Brokers []string
// MaxConnsPerBroker is the maximum number of connections to pool for a
// given broker in the cluster. The default is 2.
MaxConnsPerBroker int
// KeepAliveTime is the amount of time a pooled connection can be idle
// before it is closed and removed from the pool. The default is 30
// seconds.
KeepAliveTime time.Duration
// TLSCert is the TLS certificate file to use. The client does not use a
// TLS connection if this is not set.
TLSCert string
// TLSConfig is the TLS configuration to use. The client does not use a
// TLS connection if this is not set. Overrides TLSCert if set.
TLSConfig *tls.Config
// ResubscribeWaitTime is the amount of time to attempt to re-establish a
// stream subscription after being disconnected. For example, if the server
// serving a subscription dies and the stream is replicated, the client
// will attempt to re-establish the subscription once the stream leader has
// failed over. This failover can take several moments, so this option
// gives the client time to retry. The default is 30 seconds.
ResubscribeWaitTime time.Duration
// AckWaitTime is the default amount of time to wait for an ack to be
// received for a published message before ErrAckTimeout is returned. This
// can be overridden on individual requests by setting a timeout on the
// Context. This defaults to 5 seconds if not set.
AckWaitTime time.Duration
}
// Connect will attempt to connect to a Liftbridge server with multiple
// options.
func (o ClientOptions) Connect() (Client, error) {
if len(o.Brokers) == 0 {
return nil, errors.New("no addresses provided")
}
opts := []grpc.DialOption{}
if o.TLSConfig != nil {
// Setup TLS configuration if it is provided.
creds := credentials.NewTLS(o.TLSConfig)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else if o.TLSCert != "" {
// Setup TLS credentials if cert is provided.
creds, err := credentials.NewClientTLSFromFile(o.TLSCert, "")
if err != nil {
return nil, fmt.Errorf("could not load tls cert: %s", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
// Otherwise use an insecure connection.
opts = append(opts, grpc.WithInsecure())
}
conn, err := dialBroker(o.Brokers, opts)
if err != nil {
return nil, err
}
asyncConn, err := dialBroker(o.Brokers, opts)
if err != nil {
conn.Close()
return nil, err
}
asyncStream, err := asyncConn.PublishAsync(context.Background())
if err != nil {
conn.Close()
asyncConn.Close()
return nil, err
}
c := &client{
conn: conn,
asyncConn: asyncConn,
asyncStream: asyncStream,
pools: make(map[string]*connPool),
opts: o,
dialOpts: opts,
ackContexts: make(map[string]*ackContext),
closed: make(chan struct{}),
}
c.metadata = newMetadataCache(o.Brokers, c.doResilientRPC)
if _, err := c.metadata.update(context.Background()); err != nil {
return nil, err
}
go c.dispatchAcks()
return c, nil
}
// DefaultClientOptions returns the default configuration options for the
// client.
func DefaultClientOptions() ClientOptions {
return ClientOptions{
MaxConnsPerBroker: defaultMaxConnsPerBroker,
KeepAliveTime: defaultKeepAliveTime,
ResubscribeWaitTime: defaultResubscribeWaitTime,
AckWaitTime: defaultAckWaitTime,
}
}
// ClientOption is a function on the ClientOptions for a connection. These are
// used to configure particular client options.
type ClientOption func(*ClientOptions) error
// MaxConnsPerBroker is a ClientOption to set the maximum number of connections
// to pool for a given broker in the cluster. The default is 2.
func MaxConnsPerBroker(max int) ClientOption {
return func(o *ClientOptions) error {
o.MaxConnsPerBroker = max
return nil
}
}
// KeepAliveTime is a ClientOption to set the amount of time a pooled
// connection can be idle before it is closed and removed from the pool. The
// default is 30 seconds.
func KeepAliveTime(keepAlive time.Duration) ClientOption {
return func(o *ClientOptions) error {
o.KeepAliveTime = keepAlive
return nil
}
}
// TLSCert is a ClientOption to set the TLS certificate for the client.
func TLSCert(cert string) ClientOption {
return func(o *ClientOptions) error {
o.TLSCert = cert
return nil
}
}
// TLSConfig is a ClientOption to set the TLS configuration for the client.
// Overrides TLSCert.
func TLSConfig(config *tls.Config) ClientOption {
return func(o *ClientOptions) error {
o.TLSConfig = config
return nil
}
}
// ResubscribeWaitTime is a ClientOption to set the amount of time to attempt
// to re-establish a stream subscription after being disconnected. For example,
// if the server serving a subscription dies and the stream is replicated, the
// client will attempt to re-establish the subscription once the stream leader
// has failed over. This failover can take several moments, so this option
// gives the client time to retry. The default is 30 seconds.
func ResubscribeWaitTime(wait time.Duration) ClientOption {
return func(o *ClientOptions) error {
o.ResubscribeWaitTime = wait
return nil
}
}
// AckWaitTime is a ClientOption to set the default amount of time to wait for
// an ack to be received for a published message before ErrAckTimeout is
// returned. This can be overridden on individual requests by setting a timeout
// on the Context. This defaults to 5 seconds if not set.
func AckWaitTime(wait time.Duration) ClientOption {
return func(o *ClientOptions) error {
o.AckWaitTime = wait
return nil
}
}
// Connect creates a Client connection for the given Liftbridge cluster.
// Multiple addresses can be provided. Connect will use whichever it connects
// successfully to first in random order. The Client will use the pool of
// addresses for failover purposes. Note that only one seed address needs to be
// provided as the Client will discover the other brokers when fetching
// metadata for the cluster.
func Connect(addrs []string, options ...ClientOption) (Client, error) {
opts := DefaultClientOptions()
opts.Brokers = addrs
for _, opt := range options {
if err := opt(&opts); err != nil {
return nil, err
}
}
return opts.Connect()
}
// Close the client connection.
func (c *client) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
select {
case <-c.closed:
return nil
default:
}
for _, pool := range c.pools {
if err := pool.close(); err != nil {
return err
}
}
if err := c.conn.Close(); err != nil {
return err
}
close(c.closed)
return nil
}
func (c *client) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
// CreateStream creates a new stream attached to a NATS subject. Subject is the
// NATS subject the stream is attached to, and name is the stream identifier,
// unique per subject. It returns ErrStreamExists if a stream with the given
// subject and name already exists.
func (c *client) CreateStream(ctx context.Context, subject, name string, options ...StreamOption) error {
opts := &StreamOptions{}
for _, opt := range options {
if err := opt(opts); err != nil {
return err
}
}
req := opts.newRequest(subject, name)
err := c.doResilientRPC(func(client proto.APIClient) error {
_, err := client.CreateStream(ctx, req)
return err
})
if status.Code(err) == codes.AlreadyExists {
return ErrStreamExists
}
return err
}
// DeleteStream deletes a stream and all of its partitions. Name is the stream
// identifier, globally unique.
func (c *client) DeleteStream(ctx context.Context, name string) error {
req := &proto.DeleteStreamRequest{Name: name}
err := c.doResilientRPC(func(client proto.APIClient) error {
_, err := client.DeleteStream(ctx, req)
return err
})
if status.Code(err) == codes.NotFound {
return ErrNoSuchStream
}
return err
}
// PauseOptions are used to setup stream pausing.
type PauseOptions struct {
// Partitions sets the list of partitions to pause or all of them if
// nil/empty.
Partitions []int32
// ResumeAll will resume all partitions in the stream if one of them is
// published to instead of resuming only that partition.
ResumeAll bool
}
// PauseOption is a function on the PauseOptions for a pause call. These are
// used to configure particular pausing options.
type PauseOption func(*PauseOptions) error
// PausePartitions sets the list of partition to pause or all of them if
// nil/empty.
func PausePartitions(partitions ...int32) PauseOption {
return func(o *PauseOptions) error {
o.Partitions = partitions
return nil
}
}
// ResumeAll will resume all partitions in the stream if one of them is
// published to instead of resuming only that partition.
func ResumeAll() PauseOption {
return func(o *PauseOptions) error {
o.ResumeAll = true
return nil
}
}
// PauseStream pauses a stream and some or all of its partitions. Name is the
// stream identifier, globally unique. It returns an ErrNoSuchPartition if the
// given stream or partition does not exist. By default, this will pause all
// partitions. A partition is resumed when it is published to via the
// Liftbridge Publish API or ResumeAll is enabled and another partition in the
// stream is published to.
func (c *client) PauseStream(ctx context.Context, name string, options ...PauseOption) error {
opts := &PauseOptions{}
for _, opt := range options {
if err := opt(opts); err != nil {
return err
}
}
req := &proto.PauseStreamRequest{
Name: name,
Partitions: opts.Partitions,
ResumeAll: opts.ResumeAll,
}
err := c.doResilientRPC(func(client proto.APIClient) error {
_, err := client.PauseStream(ctx, req)
return err
})
if status.Code(err) == codes.NotFound {
return ErrNoSuchPartition
}
return err
}
// ReadonlyOptions are used to setup stream readonly operations.
type ReadonlyOptions struct {
// Partitions sets the list of partitions on which to set the readonly flag
// or all of them if nil/empty.
Partitions []int32
// Readwrite defines if the partitions should be set to readonly (false) or
// to readwrite (true). This field is called readwrite and not readonly so
// that the default value corresponds to "enable readonly".
Readwrite bool
}
// ReadonlyOption is a function on the ReadonlyOptions for a set readonly call.
// These are used to configure particular set readonly options.
type ReadonlyOption func(*ReadonlyOptions) error
// ReadonlyPartitions sets the list of partition on which to set the readonly
// flag or all of them if nil/empty.
func ReadonlyPartitions(partitions ...int32) ReadonlyOption {
return func(o *ReadonlyOptions) error {
o.Partitions = partitions
return nil
}
}
// Readonly defines if the partitions should be set to readonly or to readwrite.
func Readonly(readonly bool) ReadonlyOption {
return func(o *ReadonlyOptions) error {
o.Readwrite = !readonly
return nil
}
}
// SetStreamReadonly sets the readonly flag on a stream and some or all of
// its partitions. Name is the stream identifier, globally unique. It
// returns an ErrNoSuchPartition if the given stream or partition does not
// exist. By default, this will set the readonly flag on all partitions.
// Subscribers to a readonly partition will see their subscription ended
// with a ErrReadonlyPartition error once all messages currently in the
// partition have been read.
func (c *client) SetStreamReadonly(ctx context.Context, name string, options ...ReadonlyOption) error {
opts := &ReadonlyOptions{}
for _, opt := range options {
if err := opt(opts); err != nil {
return err
}
}
req := &proto.SetStreamReadonlyRequest{
Name: name,
Partitions: opts.Partitions,
Readonly: !opts.Readwrite,
}
err := c.doResilientRPC(func(client proto.APIClient) error {
_, err := client.SetStreamReadonly(ctx, req)
return err
})
if status.Code(err) == codes.NotFound {
return ErrNoSuchPartition
}
return err
}
// SubscriptionOptions are used to control a subscription's behavior.
type SubscriptionOptions struct {
// StartPosition controls where to begin consuming from in the stream.
StartPosition StartPosition
// StartOffset sets the stream offset to begin consuming from.
StartOffset int64
// StartTimestamp sets the stream start position to the given timestamp.
StartTimestamp time.Time
// StopPosition controls where to stop consuming in the stream.
StopPosition StopPosition
// StopOffset sets the stream offset to stop consuming at.
StopOffset int64
// StopTimestamp sets the stream stop position to the given timestamp.
StopTimestamp time.Time
// Partition sets the stream partition to consume.
Partition int32
// ReadISRReplica sets client's ability to subscribe from a random ISR.
ReadISRReplica bool
// Resume controls if a paused partition can be resumed before
// subscription.
Resume bool
}
// SubscriptionOption is a function on the SubscriptionOptions for a
// subscription. These are used to configure particular subscription options.
type SubscriptionOption func(*SubscriptionOptions) error
// StartAtOffset sets the desired start offset to begin consuming from in the
// stream.
func StartAtOffset(offset int64) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartPosition = StartPosition(proto.StartPosition_OFFSET)
o.StartOffset = offset
return nil
}
}
// StartAtTime sets the desired timestamp to begin consuming from in the
// stream.
func StartAtTime(start time.Time) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartPosition = StartPosition(proto.StartPosition_TIMESTAMP)
o.StartTimestamp = start
return nil
}
}
// StartAtTimeDelta sets the desired timestamp to begin consuming from in the
// stream using a time delta in the past.
func StartAtTimeDelta(ago time.Duration) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartPosition = StartPosition(proto.StartPosition_TIMESTAMP)
o.StartTimestamp = time.Now().Add(-ago)
return nil
}
}
// StartAtLatestReceived sets the subscription start position to the last
// message received in the stream.
func StartAtLatestReceived() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartPosition = StartPosition(proto.StartPosition_LATEST)
return nil
}
}
// StartAtEarliestReceived sets the subscription start position to the earliest
// message received in the stream.
func StartAtEarliestReceived() SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartPosition = StartPosition(proto.StartPosition_EARLIEST)
return nil
}
}
// StopAtOffset sets the desired stop offset to stop consuming at in the stream.
func StopAtOffset(offset int64) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StopPosition = StopPosition(proto.StopPosition_STOP_OFFSET)
o.StopOffset = offset
return nil
}
}
// StopAtTime sets the desired timestamp to stop consuming at in the stream.
func StopAtTime(stop time.Time) SubscriptionOption {
return func(o *SubscriptionOptions) error {
if stop.After(time.Now()) {
return errors.New("stop time cannot be in the future")
}
o.StopPosition = StopPosition(proto.StopPosition_STOP_TIMESTAMP)
o.StopTimestamp = stop
return nil
}