diff --git a/go.mod b/go.mod index b0de92b0548..eb92a1b05b4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/klauspost/compress v1.11.12 github.com/minio/highwayhash v1.0.1 github.com/nats-io/jwt/v2 v2.0.3 - github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 + github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e diff --git a/go.sum b/go.sum index e77ed9117cd..ca43c6c6fd7 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,10 @@ github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xB github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4= github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66 h1:J3LNTmD/AUgjKJjZK2IEsGl2GD1znemMOq64ZKu83ok= +github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0= +github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= diff --git a/server/filestore.go b/server/filestore.go index aa85991c235..45d9af393d3 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -178,6 +178,8 @@ const ( purgeDir = "__msgs__" // used to scan blk file names. blkScan = "%d.blk" + // used for compacted blocks that are staged. + newScan = "%d.new" // used to scan index file names. indexScan = "%d.idx" // used to load per subject meta information. @@ -216,8 +218,12 @@ const ( defaultStreamBlockSize = 16 * 1024 * 1024 // 16MB // Default for workqueue or interest based. defaultOtherBlockSize = 8 * 1024 * 1024 // 8MB + // Default for KV based + defaultKVBlockSize = 8 * 1024 * 1024 // 8MB // max block size for now. maxBlockSize = defaultStreamBlockSize + // Compact minimum threshold. + compactMinimum = 2 * 1024 * 1024 // 2MB // FileStoreMinBlkSize is minimum size we will do for a blk size. FileStoreMinBlkSize = 32 * 1000 // 32kib // FileStoreMaxBlkSize is maximum size we will do for a blk size. @@ -495,7 +501,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index))) if err != nil { // We do not seem to have keys even though we should. Could be a plaintext conversion. - // Create the keys and we will doubel check below. + // Create the keys and we will double check below. if err := fs.genEncryptionKeysForBlock(mb); err != nil { return nil, err } @@ -635,7 +641,10 @@ func (fs *fileStore) rebuildState(ld *LostStreamData) { func (mb *msgBlock) rebuildState() (*LostStreamData, error) { mb.mu.Lock() defer mb.mu.Unlock() + return mb.rebuildStateLocked() +} +func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { startLastSeq := mb.last.seq // Clear state we need to rebuild. @@ -791,7 +800,6 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) { mb.msgs++ mb.bytes += uint64(rl) - mb.rbytes += uint64(rl) // Do per subject info. if mb.fss != nil { @@ -1812,12 +1820,22 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error } } } else { - // Out of order delete. - if mb.dmap == nil { - mb.dmap = make(map[uint64]struct{}) + // Check if we are empty first, as long as not the last message block. + if isLast := mb != fs.lmb; isLast && mb.msgs == 0 { + fs.removeMsgBlock(mb) + firstSeqNeedsUpdate = seq == fs.state.FirstSeq + } else { + // Out of order delete. + shouldWriteIndex = true + if mb.dmap == nil { + mb.dmap = make(map[uint64]struct{}) + } + mb.dmap[seq] = struct{}{} + // Check if <50% utilization and minimum size met. + if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes { + mb.compact() + } } - mb.dmap[seq] = struct{}{} - shouldWriteIndex = true } var qch, fch chan struct{} @@ -1870,6 +1888,95 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error return true, nil } +// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. +// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with +// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. +// Write lock needs to be held. +func (mb *msgBlock) compact() { + if !mb.cacheAlreadyLoaded() { + if err := mb.loadMsgsWithLock(); err != nil { + return + } + } + + buf := mb.cache.buf + nbuf := make([]byte, 0, len(buf)) + + var le = binary.LittleEndian + var firstSet bool + + isDeleted := func(seq uint64) bool { + if seq == 0 || seq&ebit != 0 || seq < mb.first.seq { + return true + } + if mb.dmap != nil { + if _, ok := mb.dmap[seq]; ok { + return true + } + } + return false + } + + for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; { + if index+msgHdrSize >= lbuf { + return + } + hdr := buf[index : index+msgHdrSize] + rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:]) + // Clear any headers bit that could be set. + rl &^= hbit + dlen := int(rl) - msgHdrSize + // Do some quick sanity checks here. + if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf { + return + } + // Only need to process non-deleted messages. + if seq := le.Uint64(hdr[4:]); !isDeleted(seq) { + // Normal message here. + nbuf = append(nbuf, buf[index:index+rl]...) + if !firstSet { + firstSet = true + mb.first.seq = seq + } + mb.last.seq = seq + } + // Advance to next record. + index += rl + } + + // Check for encryption. + if mb.bek != nil && len(nbuf) > 0 { + // Recreate to reset counter. + rbek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce) + if err != nil { + return + } + rbek.XORKeyStream(nbuf, nbuf) + } + + // Close FDs first. + mb.closeFDsLocked() + + // We will write to a new file and mv/rename it in case of failure. + mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index)) + defer os.Remove(mfn) + if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil { + return + } + os.Rename(mfn, mb.mfn) + + // Close cache and open FDs and index file. + mb.clearCacheAndOffset() + mb.removeIndexFileLocked() + mb.deleteDmap() + mb.rebuildStateLocked() +} + +// Nil out our dmap. +func (mb *msgBlock) deleteDmap() { + mb.dmap = nil +} + // Grab info from a slot. // Lock should be held. func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) { @@ -3290,6 +3397,18 @@ func (fs *fileStore) State() StreamState { return state } +func (fs *fileStore) Utilization() (total, reported uint64, err error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + for _, mb := range fs.blks { + mb.mu.RLock() + reported += mb.bytes + total += mb.rbytes + mb.mu.RUnlock() + } + return total, reported, nil +} + const emptyRecordLen = 22 + 8 func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 { @@ -3880,6 +3999,10 @@ func (fs *fileStore) numMsgBlocks() int { func (mb *msgBlock) removeIndexFile() { mb.mu.RLock() defer mb.mu.RUnlock() + mb.removeIndexFileLocked() +} + +func (mb *msgBlock) removeIndexFileLocked() { if mb.ifd != nil { mb.ifd.Close() mb.ifd = nil diff --git a/server/filestore_test.go b/server/filestore_test.go index ab95595b51b..de6258a4d86 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -16,6 +16,8 @@ package server import ( "archive/tar" "bytes" + "crypto/hmac" + "crypto/sha256" "encoding/base64" "encoding/hex" "encoding/json" @@ -1825,6 +1827,18 @@ func TestFileStoreSnapshot(t *testing.T) { snap = snapshot() verifySnapshot(snap) + // Make sure compaction works with snapshots. + fs.mu.RLock() + for _, mb := range fs.blks { + mb.mu.Lock() + mb.compact() + mb.mu.Unlock() + } + fs.mu.RUnlock() + + snap = snapshot() + verifySnapshot(snap) + // Now check to make sure that we get the correct error when trying to delete or erase // a message when a snapshot is in progress and that closing the reader releases that condition. sr, err := fs.Snapshot(5*time.Second, false, true) @@ -3162,3 +3176,139 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) { checkFiltered("orders.5", SimpleState{Msgs: 5, First: 55, Last: 95}) checkBlkState(0) } + +func TestFileStoreSparseCompaction(t *testing.T) { + storeDir := createDir(t, JetStreamStoreDir) + defer removeDir(t, storeDir) + + cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage} + var fs *fileStore + + fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes + loadMsgs := func(n int) { + t.Helper() + for i := 1; i <= n; i++ { + if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, msg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + } + + checkState := func(msgs, first, last uint64) { + t.Helper() + if fs == nil { + t.Fatalf("No fs") + return + } + state := fs.State() + if state.Msgs != msgs { + t.Fatalf("Expected %d msgs, got %d", msgs, state.Msgs) + } + if state.FirstSeq != first { + t.Fatalf("Expected %d as first, got %d", first, state.FirstSeq) + } + if state.LastSeq != last { + t.Fatalf("Expected %d as last, got %d", last, state.LastSeq) + } + } + + deleteMsgs := func(seqs ...uint64) { + t.Helper() + for _, seq := range seqs { + removed, err := fs.RemoveMsg(seq) + if err != nil || !removed { + t.Fatalf("Got an error on remove of %d: %v", seq, err) + } + } + } + + eraseMsgs := func(seqs ...uint64) { + t.Helper() + for _, seq := range seqs { + removed, err := fs.EraseMsg(seq) + if err != nil || !removed { + t.Fatalf("Got an error on erase of %d: %v", seq, err) + } + } + } + + compact := func() { + t.Helper() + var ssb, ssa StreamState + fs.FastState(&ssb) + tb, ub, _ := fs.Utilization() + + fs.mu.RLock() + if len(fs.blks) == 0 { + t.Fatalf("No blocks?") + } + mb := fs.blks[0] + fs.mu.RUnlock() + + mb.mu.Lock() + mb.compact() + mb.mu.Unlock() + fs.FastState(&ssa) + if !reflect.DeepEqual(ssb, ssa) { + t.Fatalf("States do not match; %+v vs %+v", ssb, ssa) + } + ta, ua, _ := fs.Utilization() + if ub != ua { + t.Fatalf("Expected used to be the same, got %d vs %d", ub, ua) + } + if ta >= tb { + t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta) + } + if ta != ua { + t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua) + } + } + + // Actual testing here. + loadMsgs(1000) + checkState(1000, 1, 1000) + + // Now delete a few messages. + deleteMsgs(1) + compact() + + deleteMsgs(1000, 999, 998, 997) + compact() + + eraseMsgs(500, 502, 504, 506, 508, 510) + compact() + + // Now test encrypted mode. + fs.Delete() + + prf := func(context []byte) ([]byte, error) { + h := hmac.New(sha256.New, []byte("dlc22")) + if _, err := h.Write(context); err != nil { + return nil, err + } + return h.Sum(nil), nil + } + + fs, err = newFileStoreWithCreated(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg, time.Now(), prf) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + loadMsgs(1000) + checkState(1000, 1, 1000) + + // Now delete a few messages. + deleteMsgs(1) + compact() + + deleteMsgs(1000, 999, 998, 997) + compact() + + eraseMsgs(500, 502, 504, 506, 508, 510) + compact() +} diff --git a/server/memstore.go b/server/memstore.go index cc20c90b9cb..d8b5761ccc5 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -758,6 +758,12 @@ func (ms *memStore) State() StreamState { return state } +func (ms *memStore) Utilization() (total, reported uint64, err error) { + ms.mu.RLock() + defer ms.mu.RUnlock() + return ms.state.Bytes, ms.state.Bytes, nil +} + func memStoreMsgSize(subj string, hdr, msg []byte) uint64 { return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age } diff --git a/server/norace_test.go b/server/norace_test.go index 8a6d7f4b9b9..4f6aa304b45 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -2855,3 +2855,56 @@ func TestNoRaceJetStreamClusterExtendedStreamPurge(t *testing.T) { }) } } + +func TestNoRaceJetStreamFileStoreCompaction(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + config := s.JetStreamConfig() + if config != nil { + defer removeDir(t, config.StoreDir) + } + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + cfg := &nats.StreamConfig{ + Name: "KV", + Subjects: []string{"KV.>"}, + MaxMsgsPerSubject: 1, + } + if _, err := js.AddStream(cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + toSend := 10_000 + data := make([]byte, 4*1024) + rand.Read(data) + + // First one. + js.PublishAsync("KV.FM", data) + + for i := 0; i < toSend; i++ { + js.PublishAsync(fmt.Sprintf("KV.%d", i+1), data) + } + // Do again and overwrite the previous batch. + for i := 0; i < toSend; i++ { + js.PublishAsync(fmt.Sprintf("KV.%d", i+1), data) + } + + select { + case <-js.PublishAsyncComplete(): + case <-time.After(time.Second): + t.Fatalf("Did not receive completion signal") + } + + // Now check by hand the utilization level. + mset, err := s.GlobalAccount().lookupStream("KV") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + total, used, _ := mset.Store().Utilization() + if pu := 100.0 * float32(used) / float32(total); pu < 80.0 { + t.Fatalf("Utilization is less than 80%%, got %.2f", pu) + } +} diff --git a/server/store.go b/server/store.go index 50eaa904475..d80961ff6c5 100644 --- a/server/store.go +++ b/server/store.go @@ -90,6 +90,7 @@ type StreamStore interface { Stop() error ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) Snapshot(deadline time.Duration, includeConsumers, checkMsgs bool) (*SnapshotResult, error) + Utilization() (total, reported uint64, err error) } // RetentionPolicy determines how messages in a set are retained. diff --git a/server/stream.go b/server/stream.go index 45a0f34cde4..905c4200da2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -571,6 +571,9 @@ func (mset *stream) autoTuneFileStorageBlockSize(fsCfg *FileStoreConfig) { } else if mset.cfg.MaxMsgs > 0 { // Determine max message size to estimate. totalEstSize = mset.maxMsgSize() * uint64(mset.cfg.MaxMsgs) + } else if mset.cfg.MaxMsgsPer > 0 { + fsCfg.BlockSize = uint64(defaultKVBlockSize) + return } else { // If nothing set will let underlying filestore determine blkSize. return @@ -3281,6 +3284,12 @@ func (mset *stream) stateWithDetail(details bool) StreamState { return state } +func (mset *stream) Store() StreamStore { + mset.mu.RLock() + defer mset.mu.RUnlock() + return mset.store +} + // Determines if the new proposed partition is unique amongst all consumers. // Lock should be held. func (mset *stream) partitionUnique(partition string) bool { diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 92b6e54490f..cc92f8c8414 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -260,6 +260,7 @@ type pubOpts struct { lid string // Expected last msgId str string // Expected stream name seq uint64 // Expected last sequence + lss uint64 // Expected last sequence per subject } // pubAckResponse is the ack response from the JetStream API when publishing a message. @@ -278,10 +279,11 @@ type PubAck struct { // Headers for published messages. const ( - MsgIdHdr = "Nats-Msg-Id" - ExpectedStreamHdr = "Nats-Expected-Stream" - ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" - ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgIdHdr = "Nats-Msg-Id" + ExpectedStreamHdr = "Nats-Expected-Stream" + ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" + ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" + ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" ) // PublishMsg publishes a Msg to a stream from JetStream. @@ -317,6 +319,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } var resp *Msg var err error @@ -618,6 +623,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.seq > 0 { m.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(o.seq, 10)) } + if o.lss > 0 { + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(o.lss, 10)) + } // Reply if m.Reply != _EMPTY_ { @@ -687,6 +695,14 @@ func ExpectLastSequence(seq uint64) PubOpt { }) } +// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish. +func ExpectLastSequencePerSubject(seq uint64) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.lss = seq + return nil + }) +} + // ExpectLastMsgId sets the expected last msgId in the response from the publish. func ExpectLastMsgId(id string) PubOpt { return pubOptFn(func(opts *pubOpts) error { @@ -772,6 +788,7 @@ func Context(ctx context.Context) ContextOpt { // ConsumerConfig is the configuration of a JetStream consumer. type ConsumerConfig struct { Durable string `json:"durable_name,omitempty"` + Description string `json:"description,omitempty"` DeliverSubject string `json:"deliver_subject,omitempty"` DeliverGroup string `json:"deliver_group,omitempty"` DeliverPolicy DeliverPolicy `json:"deliver_policy"` @@ -1012,7 +1029,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If no stream name is specified, or if option SubjectIsDelivery is // specified, the subject cannot be empty. - if subj == _EMPTY_ && (o.stream == _EMPTY_ || o.subjIsDelivery) { + if subj == _EMPTY_ && o.stream == _EMPTY_ { return nil, fmt.Errorf("nats: subject required") } @@ -1027,7 +1044,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } // No deliver subject should be provided - if o.cfg.DeliverSubject != _EMPTY_ || o.subjIsDelivery { + if o.cfg.DeliverSubject != _EMPTY_ { return nil, ErrPullSubscribeToPushConsumer } } @@ -1081,6 +1098,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 { return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer") } + // No deliver subject, we pick our own. + if o.cfg.DeliverSubject != _EMPTY_ { + return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer") + } // Queue groups not allowed. if queue != _EMPTY_ { return nil, fmt.Errorf("nats: queues not be set for an ordered consumer") @@ -1106,95 +1127,89 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, hbi = o.cfg.Heartbeat } - // With this option, we go directly create the NATS subscription - // and skip all lookup/create. - if o.subjIsDelivery { - deliver = subj - } else { - // In case a consumer has not been set explicitly, then the - // durable name will be used as the consumer name. - if consumer == _EMPTY_ { - consumer = o.cfg.Durable - } + // In case a consumer has not been set explicitly, then the + // durable name will be used as the consumer name. + if consumer == _EMPTY_ { + consumer = o.cfg.Durable + } - // Find the stream mapped to the subject if not bound to a stream already. - if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) - if err != nil { - return nil, err - } - } else { - stream = o.stream + // Find the stream mapped to the subject if not bound to a stream already. + if o.stream == _EMPTY_ { + stream, err = js.lookupStreamBySubject(subj) + if err != nil { + return nil, err } + } else { + stream = o.stream + } - // With an explicit durable name, we can lookup the consumer first - // to which it should be attaching to. - if consumer != _EMPTY_ { - info, err = js.ConsumerInfo(stream, consumer) - notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded - } + // With an explicit durable name, we can lookup the consumer first + // to which it should be attaching to. + if consumer != _EMPTY_ { + info, err = js.ConsumerInfo(stream, consumer) + notFoundErr = errors.Is(err, ErrConsumerNotFound) + lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded + } - switch { - case info != nil: - deliver, err = processConsInfo(info, isPullMode, subj, queue) - if err != nil { - return nil, err - } - icfg := &info.Config - hasFC, hbi = icfg.FlowControl, icfg.Heartbeat - hasHeartbeats = hbi > 0 - case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): - // If the consumer is being bound and we got an error on pull subscribe then allow the error. - if !(isPullMode && lookupErr && consumerBound) { - return nil, err - } - default: - // Attempt to create consumer if not found nor using Bind. - shouldCreate = true - if o.cfg.DeliverSubject != _EMPTY_ { - deliver = o.cfg.DeliverSubject - } else if !isPullMode { - deliver = nc.newInbox() - cfg.DeliverSubject = deliver - } + switch { + case info != nil: + deliver, err = processConsInfo(info, isPullMode, subj, queue) + if err != nil { + return nil, err + } + icfg := &info.Config + hasFC, hbi = icfg.FlowControl, icfg.Heartbeat + hasHeartbeats = hbi > 0 + case (err != nil && !notFoundErr) || (notFoundErr && consumerBound): + // If the consumer is being bound and we got an error on pull subscribe then allow the error. + if !(isPullMode && lookupErr && consumerBound) { + return nil, err + } + default: + // Attempt to create consumer if not found nor using Bind. + shouldCreate = true + if o.cfg.DeliverSubject != _EMPTY_ { + deliver = o.cfg.DeliverSubject + } else if !isPullMode { + deliver = nc.newInbox() + cfg.DeliverSubject = deliver + } - // Do filtering always, server will clear as needed. - cfg.FilterSubject = subj + // Do filtering always, server will clear as needed. + cfg.FilterSubject = subj - // Pass the queue to the consumer config - if queue != _EMPTY_ { - cfg.DeliverGroup = queue - } + // Pass the queue to the consumer config + if queue != _EMPTY_ { + cfg.DeliverGroup = queue + } - // If not set default to ack explicit. - if cfg.AckPolicy == ackPolicyNotSet { - cfg.AckPolicy = AckExplicitPolicy - } - // If we have acks at all and the MaxAckPending is not set go ahead - // and set to the internal max. - // TODO(dlc) - We should be able to update this if client updates PendingLimits. - if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { - if !isPullMode && cb != nil && hasFC { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 - } else if ch != nil { - cfg.MaxAckPending = cap(ch) - } else { - cfg.MaxAckPending = DefaultSubPendingMsgsLimit - } - } - // Create request here. - ccreq = &createConsumerRequest{ - Stream: stream, - Config: &cfg, + // If not set default to ack explicit. + if cfg.AckPolicy == ackPolicyNotSet { + cfg.AckPolicy = AckExplicitPolicy + } + // If we have acks at all and the MaxAckPending is not set go ahead + // and set to the internal max. + // TODO(dlc) - We should be able to update this if client updates PendingLimits. + if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy { + if !isPullMode && cb != nil && hasFC { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit * 16 + } else if ch != nil { + cfg.MaxAckPending = cap(ch) + } else { + cfg.MaxAckPending = DefaultSubPendingMsgsLimit } - hbi = cfg.Heartbeat } - - if isPullMode { - nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) - deliver = nc.newInbox() + // Create request here. + ccreq = &createConsumerRequest{ + Stream: stream, + Config: &cfg, } + hbi = cfg.Heartbeat + } + + if isPullMode { + nms = fmt.Sprintf(js.apiSubj(apiRequestNextT), stream, consumer) + deliver = nc.newInbox() } jsi := &jsSub{ @@ -1671,10 +1686,6 @@ type subOpts struct { mack bool // For an ordered consumer. ordered bool - // Means that the subject passed to subscribe call will be used - // for the low level NATS subscription and no stream nor consumer - // lookup/creation will be done. - subjIsDelivery bool } // OrderedConsumer will create a fifo direct/ephemeral consumer for in order delivery of messages. @@ -1731,6 +1742,15 @@ func DeliverLast() SubOpt { }) } +// DeliverLastPerSubject configures a Consumer to receive messages +// starting with the latest one for each filtered subject. +func DeliverLastPerSubject() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy + return nil + }) +} + // DeliverNew configures a Consumer to receive messages // published after the subscription. func DeliverNew() SubOpt { @@ -1893,21 +1913,6 @@ func DeliverSubject(subject string) SubOpt { }) } -// SubjectIsDelivery specifies that the subject parameter in the subscribe -// call shall be used to create the NATS subscription and matches the -// JetStream consumer's deliver subject. -// -// NOTE: This is an "expert" API and should only be used when consumer lookup or -// creation by the library is not possible (for instance cross accounts). -// Since no lookup of the JetStream consumer is done, there is no way for -// the library to check the validity of this JetStream subscription. -func SubjectIsDelivery() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.subjIsDelivery = true - return nil - }) -} - func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2296,12 +2301,12 @@ const ( ) func getMetadataFields(subject string) ([]string, error) { - const noDomainNoHashsExpectedAckTokens = 9 - const withDomainNoHashExpectedAckTokens = 10 - const withDomainAndHashExpectedAckTokens = 11 + const v1TokenCounts = 9 + const v2TokenCounts = 12 + const noDomainName = "_" const btsep = '.' - tsa := [withDomainAndHashExpectedAckTokens]string{} + tsa := [v2TokenCounts]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(subject); i++ { if subject[i] == btsep { @@ -2311,37 +2316,38 @@ func getMetadataFields(subject string) ([]string, error) { } tokens = append(tokens, subject[start:]) // - // Newer server will include an account hash in the subject, and possibly the domain. - // So the subject could be: + // Newer server will include the domain name and account hash in the subject, + // and a token at the end. + // + // Old subject was: + // $JS.ACK....... // - // no domain: $JS.ACK...... - // with domain: $JS.ACK....... + // New subject would be: + // $JS.ACK.......... // - // So old server number of tokens is 9, newer is 10 or 11. + // v1 has 9 tokens, v2 has 12. // l := len(tokens) - if l < noDomainNoHashsExpectedAckTokens || l > withDomainAndHashExpectedAckTokens { + // If lower than 9 or more than 9 but less than 12, report an error + if l < v1TokenCounts || (l > v1TokenCounts && l < v2TokenCounts) { return nil, ErrNotJSMessage } if tokens[0] != "$JS" || tokens[1] != "ACK" { return nil, ErrNotJSMessage } - // To make the rest of the library agnostic of that, we always return the tokens - // as if it is coming from a new server will all possible tokens. If domain or account - // hash are not specified, the tokens at those locations will simply be empty. - if l == noDomainNoHashsExpectedAckTokens || l == withDomainNoHashExpectedAckTokens { + // For v1 style, we insert 2 empty tokens (domain and hash) so that the + // rest of the library references known fields at a constant location. + if l == 9 { // Extend the array (we know the backend is big enough) - // Compute how many tokens we need to insert. - itc := withDomainAndHashExpectedAckTokens - l - for i := 0; i < itc; i++ { - tokens = append(tokens, _EMPTY_) - } + tokens = append(tokens, _EMPTY_, _EMPTY_) // Move to the right anything that is after "ACK" token. - copy(tokens[ackDomainTokenPos+itc:], tokens[ackDomainTokenPos:]) - // Set the missing tokens to empty - for i := 0; i < itc; i++ { - tokens[ackDomainTokenPos+i] = _EMPTY_ - } + copy(tokens[ackDomainTokenPos+2:], tokens[ackDomainTokenPos:]) + // Clear the domain and hash tokens + tokens[ackDomainTokenPos], tokens[ackAccHashTokenPos] = _EMPTY_, _EMPTY_ + + } else if tokens[ackDomainTokenPos] == noDomainName { + // If domain is "_", replace with empty value. + tokens[ackDomainTokenPos] = _EMPTY_ } return tokens, nil } @@ -2522,6 +2528,10 @@ const ( // DeliverByStartTimePolicy will deliver messages starting from a given // time. DeliverByStartTimePolicy + + // DeliverLastPerSubjectPolicy will start the consumer with the last message + // for all subjects received. + DeliverLastPerSubjectPolicy ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { @@ -2536,6 +2546,8 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { *p = DeliverByStartSequencePolicy case jsonString("by_start_time"): *p = DeliverByStartTimePolicy + case jsonString("last_per_subject"): + *p = DeliverLastPerSubjectPolicy } return nil @@ -2553,6 +2565,8 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { return json.Marshal("by_start_sequence") case DeliverByStartTimePolicy: return json.Marshal("by_start_time") + case DeliverLastPerSubjectPolicy: + return json.Marshal("last_per_subject") default: return nil, fmt.Errorf("nats: unknown deliver policy %v", p) } diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 00ea173a8f0..f40086b2922 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -75,6 +75,7 @@ type JetStreamManager interface { // given the name will be used as the only subject. type StreamConfig struct { Name string `json:"name"` + Description string `json:"description,omitempty"` Subjects []string `json:"subjects,omitempty"` Retention RetentionPolicy `json:"retention"` MaxConsumers int `json:"max_consumers"` diff --git a/vendor/modules.txt b/vendor/modules.txt index 30be76e0bf5..9e7626eb427 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -9,7 +9,7 @@ github.com/minio/highwayhash # github.com/nats-io/jwt/v2 v2.0.3 ## explicit github.com/nats-io/jwt/v2 -# github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 +# github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 ## explicit github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin