From 6fbc2972b3aa078ca0250b04a7b6792a9b9b5d2d Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 4 Aug 2023 14:26:13 +0200 Subject: [PATCH] [ADDED] KV implementation in new JetStream API Signed-off-by: Piotr Piotrowski --- jetstream/errors.go | 34 ++ jetstream/jetstream.go | 1 + jetstream/kv.go | 1014 +++++++++++++++++++++++++++++++ jetstream/options.go | 59 ++ jetstream/test/kv_test.go | 1192 +++++++++++++++++++++++++++++++++++++ js.go | 49 +- kv.go | 20 +- nats.go | 12 +- object.go | 2 +- test/js_test.go | 15 + test/sub_test.go | 43 ++ 11 files changed, 2403 insertions(+), 38 deletions(-) create mode 100644 jetstream/kv.go create mode 100644 jetstream/test/kv_test.go diff --git a/jetstream/errors.go b/jetstream/errors.go index ebfdebcd0..e401702a3 100644 --- a/jetstream/errors.go +++ b/jetstream/errors.go @@ -57,6 +57,8 @@ const ( JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeBadRequest ErrorCode = 10003 + + JSErrCodeStreamWrongLastSequence ErrorCode = 10071 ) var ( @@ -178,6 +180,38 @@ var ( // ErrOrderedConsumerNotCreated is returned when trying to get consumer info of an // ordered consumer which was not yet created. ErrOrderedConsumerNotCreated = &jsError{message: "consumer instance not yet created"} + + // KeyValue Errors + + // ErrKeyExists is returned when attempting to create a key that already exists. + ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"} + + // ErrKeyValueConfigRequired is returned when attempting to create a bucket without a config. + ErrKeyValueConfigRequired = &jsError{message: "config required"} + + // ErrInvalidBucketName is returned when attempting to create a bucket with an invalid name. + ErrInvalidBucketName = &jsError{message: "invalid bucket name"} + + // ErrInvalidKey is returned when attempting to create a key with an invalid name. + ErrInvalidKey = &jsError{message: "invalid key"} + + // ErrBucketNotFound is returned when attempting to access a bucket that does not exist. + ErrBucketNotFound = &jsError{message: "bucket not found"} + + // ErrBadBucket is returned when attempting to access a bucket that is not a key-value store. + ErrBadBucket = &jsError{message: "bucket not valid key-value store"} + + // ErrKeyNotFound is returned when attempting to access a key that does not exist. + ErrKeyNotFound = &jsError{message: "key not found"} + + // ErrKeyDeleted is returned when attempting to access a key that was deleted. + ErrKeyDeleted = &jsError{message: "key was deleted"} + + // ErrHistoryToLarge is returned when provided history limit is larger than 64. + ErrHistoryToLarge = &jsError{message: "history limited to a max of 64"} + + // ErrNoKeysFound is returned when no keys are found. + ErrNoKeysFound = &jsError{message: "no keys found"} ) // Error prints the JetStream API error code and description diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index cf1d211a2..2dd5d8b4d 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -42,6 +42,7 @@ type ( StreamConsumerManager StreamManager Publisher + KeyValueManager } Publisher interface { diff --git a/jetstream/kv.go b/jetstream/kv.go new file mode 100644 index 000000000..ba82c7377 --- /dev/null +++ b/jetstream/kv.go @@ -0,0 +1,1014 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jetstream + +import ( + "context" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/internal/parser" +) + +// Public interfaces and structs +type ( + // KeyValueManager is used to manage KeyValue stores. + KeyValueManager interface { + // KeyValue will lookup and bind to an existing KeyValue store. + KeyValue(ctx context.Context, bucket string) (KeyValue, error) + // CreateKeyValue will create a KeyValue store with the following configuration. + CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) + // DeleteKeyValue will delete this KeyValue store (JetStream stream). + DeleteKeyValue(ctx context.Context, bucket string) error + // KeyValueStoreNames is used to retrieve a list of key value store names + KeyValueStoreNames(ctx context.Context) KeyValueNamesLister + // KeyValueStores is used to retrieve a list of key value store statuses + KeyValueStores(ctx context.Context) KeyValueLister + } + + // KeyValue contains methods to operate on a KeyValue store. + KeyValue interface { + // Get returns the latest value for the key. + Get(ctx context.Context, key string) (KeyValueEntry, error) + // GetRevision returns a specific revision value for the key. + GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) + // Put will place the new value for the key into the store. + Put(ctx context.Context, key string, value []byte) (uint64, error) + // PutString will place the string for the key into the store. + PutString(ctx context.Context, key string, value string) (uint64, error) + // Create will add the key/value pair if it does not exist. + Create(ctx context.Context, key string, value []byte) (uint64, error) + // Update will update the value if the latest revision matches. + Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) + // Delete will place a delete marker and leave all revisions. + Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error + // Purge will place a delete marker and remove all previous revisions. + Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error + // Watch for any updates to keys that match the keys argument which could include wildcards. + // Watch will send a nil entry when it has received all initial values. + Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) + // WatchAll will invoke the callback for all updates. + WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) + // Keys will return all keys. + Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) + // History will return all historical values for the key. + History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) + // Bucket returns the current bucket name. + Bucket() string + // PurgeDeletes will remove all current delete markers. + PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error + // Status retrieves the status and configuration of a bucket + Status(ctx context.Context) (KeyValueStatus, error) + } + + // KeyValueConfig is for configuring a KeyValue store. + KeyValueConfig struct { + Bucket string + Description string + MaxValueSize int32 + History uint8 + TTL time.Duration + MaxBytes int64 + Storage StorageType + Replicas int + Placement *Placement + RePublish *RePublish + Mirror *StreamSource + Sources []*StreamSource + } + + KeyValueLister interface { + Status() <-chan KeyValueStatus + Error() error + } + + KeyValueNamesLister interface { + Name() <-chan string + Error() error + } + + // KeyValueStatus is run-time status about a Key-Value bucket + KeyValueStatus interface { + // Bucket the name of the bucket + Bucket() string + + // Values is how many messages are in the bucket, including historical values + Values() uint64 + + // History returns the configured history kept per key + History() int64 + + // TTL is how long the bucket keeps values for + TTL() time.Duration + + // BackingStore indicates what technology is used for storage of the bucket + BackingStore() string + + // Bytes returns the size in bytes of the bucket + Bytes() uint64 + } + + // KeyWatcher is what is returned when doing a watch. + KeyWatcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan KeyValueEntry + // Stop will stop this watcher. + Stop() error + } + + // KeyValueEntry is a retrieved entry for Get or List or Watch. + KeyValueEntry interface { + // Bucket is the bucket the data was loaded from. + Bucket() string + // Key is the key that was retrieved. + Key() string + // Value is the retrieved value. + Value() []byte + // Revision is a unique sequence for this value. + Revision() uint64 + // Created is the time the data was put in the bucket. + Created() time.Time + // Delta is distance from the latest value. + Delta() uint64 + // Operation returns Put or Delete or Purge. + Operation() KeyValueOp + } +) + +// Option types + +type ( + WatchOpt interface { + configureWatcher(opts *watchOpts) error + } + + watchOpts struct { + // Do not send delete markers to the update channel. + ignoreDeletes bool + // Include all history per subject, not just last one. + includeHistory bool + // retrieve only the meta data of the entry + metaOnly bool + } + + KVDeleteOpt interface { + configureDelete(opts *deleteOpts) error + } + + deleteOpts struct { + // Remove all previous revisions. + purge bool + + // Delete only if the latest revision matches. + revision uint64 + } + + KVPurgeOpt interface { + configurePurge(opts *purgeOpts) error + } + + purgeOpts struct { + dmthr time.Duration // Delete markers threshold + } +) + +// kvs is the implementation of KeyValue +type kvs struct { + name string + streamName string + pre string + putPre string + pushJS nats.JetStreamContext + js *jetStream + stream Stream + // If true, it means that APIPrefix/Domain was set in the context + // and we need to add something to some of our high level protocols + // (such as Put, etc..) + useJSPfx bool + // To know if we can use the stream direct get API + useDirect bool +} + +// KeyValueOp represents the type of KV operation (Put, Delete, Purge) +// Returned as part of watcher entry. +type KeyValueOp uint8 + +const ( + KeyValuePut KeyValueOp = iota + KeyValueDelete + KeyValuePurge +) + +func (op KeyValueOp) String() string { + switch op { + case KeyValuePut: + return "KeyValuePutOp" + case KeyValueDelete: + return "KeyValueDeleteOp" + case KeyValuePurge: + return "KeyValuePurgeOp" + default: + return "Unknown Operation" + } +} + +const ( + kvBucketNamePre = "KV_" + kvBucketNameTmpl = "KV_%s" + kvSubjectsTmpl = "$KV.%s.>" + kvSubjectsPreTmpl = "$KV.%s." + kvSubjectsPreDomainTmpl = "%s.$KV.%s." + kvNoPending = "0" +) + +const ( + KeyValueMaxHistory = 64 + AllKeys = ">" + kvLatestRevision = 0 + kvop = "KV-Operation" + kvdel = "DEL" + kvpurge = "PURGE" +) + +// Regex for valid keys and buckets. +var ( + validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`) + validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`) +) + +func (js *jetStream) KeyValue(ctx context.Context, bucket string) (KeyValue, error) { + if !validBucketRe.MatchString(bucket) { + return nil, ErrInvalidBucketName + } + streamName := fmt.Sprintf(kvBucketNameTmpl, bucket) + stream, err := js.Stream(ctx, streamName) + if err != nil { + if err == ErrStreamNotFound { + err = ErrBucketNotFound + } + return nil, err + } + // Do some quick sanity checks that this is a correctly formed stream for KV. + // Max msgs per subject should be > 0. + if stream.CachedInfo().Config.MaxMsgsPerSubject < 1 { + return nil, ErrBadBucket + } + pushJS, err := js.legacyJetStream() + if err != nil { + return nil, err + } + + return mapStreamToKVS(js, pushJS, stream), nil +} + +// CreateKeyValue will create a KeyValue store with the following configuration. +func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) { + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidBucketName + } + if _, err := js.AccountInfo(ctx); err != nil { + return nil, err + } + + // Default to 1 for history. Max is 64 for now. + history := int64(1) + if cfg.History > 0 { + if cfg.History > KeyValueMaxHistory { + return nil, ErrHistoryToLarge + } + history = int64(cfg.History) + } + + replicas := cfg.Replicas + if replicas == 0 { + replicas = 1 + } + + // We will set explicitly some values so that we can do comparison + // if we get an "already in use" error and need to check if it is same. + maxBytes := cfg.MaxBytes + if maxBytes == 0 { + maxBytes = -1 + } + maxMsgSize := cfg.MaxValueSize + if maxMsgSize == 0 { + maxMsgSize = -1 + } + // When stream's MaxAge is not set, server uses 2 minutes as the default + // for the duplicate window. If MaxAge is set, and lower than 2 minutes, + // then the duplicate window will be set to that. If MaxAge is greater, + // we will cap the duplicate window to 2 minutes (to be consistent with + // previous behavior). + duplicateWindow := 2 * time.Minute + if cfg.TTL > 0 && cfg.TTL < duplicateWindow { + duplicateWindow = cfg.TTL + } + scfg := StreamConfig{ + Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), + Description: cfg.Description, + MaxMsgsPerSubject: history, + MaxBytes: maxBytes, + MaxAge: cfg.TTL, + MaxMsgSize: maxMsgSize, + Storage: cfg.Storage, + Replicas: replicas, + Placement: cfg.Placement, + AllowRollup: true, + DenyDelete: true, + Duplicates: duplicateWindow, + MaxMsgs: -1, + MaxConsumers: -1, + AllowDirect: true, + RePublish: cfg.RePublish, + } + if cfg.Mirror != nil { + // Copy in case we need to make changes so we do not change caller's version. + m := cfg.Mirror.copy() + if !strings.HasPrefix(m.Name, kvBucketNamePre) { + m.Name = fmt.Sprintf(kvBucketNameTmpl, m.Name) + } + scfg.Mirror = m + scfg.MirrorDirect = true + } else if len(cfg.Sources) > 0 { + // For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly. + for _, ss := range cfg.Sources { + if !strings.HasPrefix(ss.Name, kvBucketNamePre) { + ss = ss.copy() + ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name) + } + scfg.Sources = append(scfg.Sources, ss) + } + } else { + scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)} + } + + stream, err := js.CreateStream(ctx, scfg) + if err != nil { + return nil, err + } + pushJS, err := js.legacyJetStream() + if err != nil { + return nil, err + } + + return mapStreamToKVS(js, pushJS, stream), nil +} + +// DeleteKeyValue will delete this KeyValue store (JetStream stream). +func (js *jetStream) DeleteKeyValue(ctx context.Context, bucket string) error { + if !validBucketRe.MatchString(bucket) { + return ErrInvalidBucketName + } + stream := fmt.Sprintf(kvBucketNameTmpl, bucket) + return js.DeleteStream(ctx, stream) +} + +// KeyValueStoreNames is used to retrieve a list of key value store names +func (js *jetStream) KeyValueStoreNames(ctx context.Context) KeyValueNamesLister { + res := &kvLister{ + kvNames: make(chan string), + } + l := &streamLister{js: js} + streamsReq := streamsRequest{ + Subject: fmt.Sprintf(kvSubjectsTmpl, "*"), + } + go func() { + defer close(res.kvNames) + for { + page, err := l.streamNames(ctx, streamsReq) + if err != nil && !errors.Is(err, ErrEndOfData) { + res.err = err + return + } + for _, name := range page { + if !strings.HasPrefix(name, kvBucketNamePre) { + continue + } + res.kvNames <- name + } + if errors.Is(err, ErrEndOfData) { + return + } + } + }() + return res +} + +// KeyValueStores is used to retrieve a list of key value store statuses +func (js *jetStream) KeyValueStores(ctx context.Context) KeyValueLister { + res := &kvLister{ + kvs: make(chan KeyValueStatus), + } + l := &streamLister{js: js} + streamsReq := streamsRequest{ + Subject: fmt.Sprintf(kvSubjectsTmpl, "*"), + } + go func() { + defer close(res.kvs) + for { + page, err := l.streamInfos(ctx, streamsReq) + if err != nil && !errors.Is(err, ErrEndOfData) { + res.err = err + return + } + for _, info := range page { + if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) { + continue + } + res.kvs <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, kvBucketNamePre)} + } + if errors.Is(err, ErrEndOfData) { + return + } + } + }() + return res +} + +// KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus +type KeyValueBucketStatus struct { + nfo *StreamInfo + bucket string +} + +// Bucket the name of the bucket +func (s *KeyValueBucketStatus) Bucket() string { return s.bucket } + +// Values is how many messages are in the bucket, including historical values +func (s *KeyValueBucketStatus) Values() uint64 { return s.nfo.State.Msgs } + +// History returns the configured history kept per key +func (s *KeyValueBucketStatus) History() int64 { return s.nfo.Config.MaxMsgsPerSubject } + +// TTL is how long the bucket keeps values for +func (s *KeyValueBucketStatus) TTL() time.Duration { return s.nfo.Config.MaxAge } + +// BackingStore indicates what technology is used for storage of the bucket +func (s *KeyValueBucketStatus) BackingStore() string { return "JetStream" } + +// StreamInfo is the stream info retrieved to create the status +func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo } + +// Bytes is the size of the stream +func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes } + +type kvLister struct { + kvs chan KeyValueStatus + kvNames chan string + err error +} + +func (kl *kvLister) Status() <-chan KeyValueStatus { + return kl.kvs +} + +func (kl *kvLister) Name() <-chan string { + return kl.kvNames +} + +func (kl *kvLister) Error() error { + return kl.err +} + +func (js *jetStream) legacyJetStream() (nats.JetStreamContext, error) { + opts := make([]nats.JSOpt, 0) + if js.apiPrefix != "" { + opts = append(opts, nats.APIPrefix(js.apiPrefix)) + } + if js.clientTrace != nil { + opts = append(opts, nats.ClientTrace{ + RequestSent: js.clientTrace.RequestSent, + ResponseReceived: js.clientTrace.ResponseReceived, + }) + } + return js.conn.JetStream(opts...) +} + +func keyValid(key string) bool { + if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { + return false + } + return validKeyRe.MatchString(key) +} + +func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) { + if !keyValid(key) { + return nil, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + var m *RawStreamMsg + var err error + + if revision == kvLatestRevision { + m, err = kv.stream.GetLastMsgForSubject(ctx, b.String()) + } else { + m, err = kv.stream.GetMsg(ctx, revision) + // If a sequence was provided, just make sure that the retrieved + // message subject matches the request. + if err == nil && m.Subject != b.String() { + return nil, ErrKeyNotFound + } + } + if err != nil { + if err == ErrMsgNotFound { + err = ErrKeyNotFound + } + return nil, err + } + + entry := &kve{ + bucket: kv.name, + key: key, + value: m.Data, + revision: m.Sequence, + created: m.Time, + } + + // Double check here that this is not a DEL Operation marker. + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + entry.op = KeyValueDelete + return entry, ErrKeyDeleted + case kvpurge: + entry.op = KeyValuePurge + return entry, ErrKeyDeleted + } + } + + return entry, nil +} + +// kve is the implementation of KeyValueEntry +type kve struct { + bucket string + key string + value []byte + revision uint64 + delta uint64 + created time.Time + op KeyValueOp +} + +func (e *kve) Bucket() string { return e.bucket } +func (e *kve) Key() string { return e.key } +func (e *kve) Value() []byte { return e.value } +func (e *kve) Revision() uint64 { return e.revision } +func (e *kve) Created() time.Time { return e.created } +func (e *kve) Delta() uint64 { return e.delta } +func (e *kve) Operation() KeyValueOp { return e.op } + +// Get returns the latest value for the key. +func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) { + e, err := kv.get(ctx, key, kvLatestRevision) + if err != nil { + if err == ErrKeyDeleted { + return nil, ErrKeyNotFound + } + return nil, err + } + + return e, nil +} + +// GetRevision returns a specific revision value for the key. +func (kv *kvs) GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) { + e, err := kv.get(ctx, key, revision) + if err != nil { + if err == ErrKeyDeleted { + return nil, ErrKeyNotFound + } + return nil, err + } + + return e, nil +} + +// Put will place the new value for the key into the store. +func (kv *kvs) Put(ctx context.Context, key string, value []byte) (uint64, error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + if kv.useJSPfx { + b.WriteString(kv.js.apiPrefix) + } + if kv.putPre != "" { + b.WriteString(kv.putPre) + } else { + b.WriteString(kv.pre) + } + b.WriteString(key) + + pa, err := kv.js.Publish(ctx, b.String(), value) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// PutString will place the string for the key into the store. +func (kv *kvs) PutString(ctx context.Context, key string, value string) (uint64, error) { + return kv.Put(ctx, key, []byte(value)) +} + +// Create will add the key/value pair iff it does not exist. +func (kv *kvs) Create(ctx context.Context, key string, value []byte) (revision uint64, err error) { + v, err := kv.Update(ctx, key, value, 0) + if err == nil { + return v, nil + } + + if e, err := kv.get(ctx, key, kvLatestRevision); err == ErrKeyDeleted { + return kv.Update(ctx, key, value, e.Revision()) + } + + // Check if the expected last subject sequence is not zero which implies + // the key already exists. + if errors.Is(err, ErrKeyExists) { + jserr := ErrKeyExists.(*jsError) + return 0, fmt.Errorf("%w: %s", err, jserr.message) + } + + return 0, err +} + +// Update will update the value if the latest revision matches. +func (kv *kvs) Update(ctx context.Context, key string, value []byte, revision uint64) (uint64, error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + if kv.useJSPfx { + b.WriteString(kv.js.apiPrefix) + } + b.WriteString(kv.pre) + b.WriteString(key) + + m := nats.Msg{Subject: b.String(), Header: nats.Header{}, Data: value} + m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(revision, 10)) + + pa, err := kv.js.PublishMsg(ctx, &m) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// Delete will place a delete marker and leave all revisions. +func (kv *kvs) Delete(ctx context.Context, key string, opts ...KVDeleteOpt) error { + if !keyValid(key) { + return ErrInvalidKey + } + + var b strings.Builder + if kv.useJSPfx { + b.WriteString(kv.js.apiPrefix) + } + if kv.putPre != "" { + b.WriteString(kv.putPre) + } else { + b.WriteString(kv.pre) + } + b.WriteString(key) + + // DEL op marker. For watch functionality. + m := nats.NewMsg(b.String()) + + var o deleteOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureDelete(&o); err != nil { + return err + } + } + } + + if o.purge { + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + } else { + m.Header.Set(kvop, kvdel) + } + + if o.revision != 0 { + m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(o.revision, 10)) + } + + _, err := kv.js.PublishMsg(ctx, m) + return err +} + +// Purge will place a delete marker and remove all previous revisions. +func (kv *kvs) Purge(ctx context.Context, key string, opts ...KVDeleteOpt) error { + return kv.Delete(ctx, key, append(opts, purge())...) +} + +// purge removes all previous revisions. +func purge() KVDeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.purge = true + return nil + }) +} + +// Implementation for Watch +type watcher struct { + mu sync.Mutex + updates chan KeyValueEntry + sub *nats.Subscription + initDone bool + initPending uint64 + received uint64 +} + +// Updates returns the interior channel. +func (w *watcher) Updates() <-chan KeyValueEntry { + if w == nil { + return nil + } + return w.updates +} + +// Stop will unsubscribe from the watcher. +func (w *watcher) Stop() error { + if w == nil { + return nil + } + return w.sub.Unsubscribe() +} + +// Watch for any updates to keys that match the keys argument which could include wildcards. +// Watch will send a nil entry when it has received all initial values. +func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + + // Could be a pattern so don't check for validity as we normally do. + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(keys) + keys = b.String() + + // We will block below on placing items on the chan. That is by design. + w := &watcher{updates: make(chan KeyValueEntry, 256)} + + update := func(m *nats.Msg) { + tokens, err := parser.GetMetadataFields(m.Reply) + if err != nil { + return + } + if len(m.Subject) <= len(kv.pre) { + return + } + subj := m.Subject[len(kv.pre):] + + var op KeyValueOp + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + op = KeyValueDelete + case kvpurge: + op = KeyValuePurge + } + } + delta := parser.ParseNum(tokens[parser.AckNumPendingTokenPos]) + w.mu.Lock() + defer w.mu.Unlock() + if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { + entry := &kve{ + bucket: kv.name, + key: subj, + value: m.Data, + revision: parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), + created: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))), + delta: delta, + op: op, + } + w.updates <- entry + } + // Check if done and initial values. + if !w.initDone { + w.received++ + // We set this on the first trip through.. + if w.initPending == 0 { + w.initPending = delta + } + if w.received > w.initPending || delta == 0 { + w.initDone = true + w.updates <- nil + } + } + } + + // Used ordered consumer to deliver results. + subOpts := []nats.SubOpt{nats.BindStream(kv.streamName), nats.OrderedConsumer()} + if !o.includeHistory { + subOpts = append(subOpts, nats.DeliverLastPerSubject()) + } + if o.metaOnly { + subOpts = append(subOpts, nats.HeadersOnly()) + } + subOpts = append(subOpts, nats.Context(ctx)) + // Create the sub and rest of initialization under the lock. + // We want to prevent the race between this code and the + // update() callback. + w.mu.Lock() + defer w.mu.Unlock() + sub, err := kv.pushJS.Subscribe(keys, update, subOpts...) + if err != nil { + return nil, err + } + sub.SetClosedHandler(func(_ string) { + close(w.updates) + }) + initialPending, err := sub.InitialConsumerPending() + // If there were no pending messages at the time of the creation + // of the consumer, send the marker. + if err == nil && initialPending == 0 { + w.initDone = true + w.updates <- nil + } + w.sub = sub + return w, nil +} + +// WatchAll will invoke the callback for all updates. +func (kv *kvs) WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) { + return kv.Watch(ctx, AllKeys, opts...) +} + +// Keys will return all keys. +func (kv *kvs) Keys(ctx context.Context, opts ...WatchOpt) ([]string, error) { + opts = append(opts, IgnoreDeletes(), MetaOnly()) + watcher, err := kv.WatchAll(ctx, opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var keys []string + for entry := range watcher.Updates() { + if entry == nil { + break + } + keys = append(keys, entry.Key()) + } + if len(keys) == 0 { + return nil, ErrNoKeysFound + } + return keys, nil +} + +// History will return all historical values for the key. +func (kv *kvs) History(ctx context.Context, key string, opts ...WatchOpt) ([]KeyValueEntry, error) { + opts = append(opts, IncludeHistory()) + watcher, err := kv.Watch(ctx, key, opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var entries []KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break + } + entries = append(entries, entry) + } + if len(entries) == 0 { + return nil, ErrKeyNotFound + } + return entries, nil +} + +// Bucket returns the current bucket name. +func (kv *kvs) Bucket() string { + return kv.name +} + +const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute + +// PurgeDeletes will remove all current delete markers. +func (kv *kvs) PurgeDeletes(ctx context.Context, opts ...KVPurgeOpt) error { + var o purgeOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configurePurge(&o); err != nil { + return err + } + } + } + watcher, err := kv.WatchAll(ctx) + if err != nil { + return err + } + defer watcher.Stop() + + var limit time.Time + olderThan := o.dmthr + // Negative value is used to instruct to always remove markers, regardless + // of age. If set to 0 (or not set), use our default value. + if olderThan == 0 { + olderThan = kvDefaultPurgeDeletesMarkerThreshold + } + if olderThan > 0 { + limit = time.Now().Add(-olderThan) + } + + var deleteMarkers []KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break + } + if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { + deleteMarkers = append(deleteMarkers, entry) + } + } + + var b strings.Builder + // Do actual purges here. + for _, entry := range deleteMarkers { + b.WriteString(kv.pre) + b.WriteString(entry.Key()) + purgeOpts := []StreamPurgeOpt{WithPurgeSubject(b.String())} + if olderThan > 0 && entry.Created().After(limit) { + purgeOpts = append(purgeOpts, WithPurgeKeep(1)) + } + if err := kv.stream.Purge(ctx, purgeOpts...); err != nil { + return err + } + b.Reset() + } + return nil +} + +// Status retrieves the status and configuration of a bucket +func (kv *kvs) Status(ctx context.Context) (KeyValueStatus, error) { + nfo, err := kv.stream.Info(ctx) + if err != nil { + return nil, err + } + + return &KeyValueBucketStatus{nfo: nfo, bucket: kv.name}, nil +} + +func mapStreamToKVS(js *jetStream, pushJS nats.JetStreamContext, stream Stream) *kvs { + info := stream.CachedInfo() + bucket := strings.TrimPrefix(info.Config.Name, kvBucketNamePre) + kv := &kvs{ + name: bucket, + streamName: info.Config.Name, + pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), + js: js, + pushJS: pushJS, + stream: stream, + // Determine if we need to use the JS prefix in front of Put and Delete operations + useJSPfx: js.apiPrefix != DefaultAPIPrefix, + useDirect: info.Config.AllowDirect, + } + + // If we are mirroring, we will have mirror direct on, so just use the mirror name + // and override use + if m := info.Config.Mirror; m != nil { + bucket := strings.TrimPrefix(m.Name, kvBucketNamePre) + if m.External != nil && m.External.APIPrefix != "" { + kv.useJSPfx = false + kv.pre = fmt.Sprintf(kvSubjectsPreTmpl, bucket) + kv.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, m.External.APIPrefix, bucket) + } else { + kv.putPre = fmt.Sprintf(kvSubjectsPreTmpl, bucket) + } + } + + return kv +} diff --git a/jetstream/options.go b/jetstream/options.go index ce79fc56c..ecbd08cd7 100644 --- a/jetstream/options.go +++ b/jetstream/options.go @@ -316,3 +316,62 @@ func WithStallWait(ttl time.Duration) PublishOpt { return nil } } + +// KV Options + +type watchOptFn func(opts *watchOpts) error + +func (opt watchOptFn) configureWatcher(opts *watchOpts) error { + return opt(opts) +} + +// IncludeHistory instructs the key watcher to include historical values as well. +func IncludeHistory() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.includeHistory = true + return nil + }) +} + +// IgnoreDeletes will have the key watcher not pass any deleted keys. +func IgnoreDeletes() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.ignoreDeletes = true + return nil + }) +} + +// MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value +func MetaOnly() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.metaOnly = true + return nil + }) +} + +// DeleteMarkersOlderThan indicates that delete or purge markers older than that +// will be deleted as part of PurgeDeletes() operation, otherwise, only the data +// will be removed but markers that are recent will be kept. +// Note that if no option is specified, the default is 30 minutes. You can set +// this option to a negative value to instruct to always remove the markers, +// regardless of their age. +type DeleteMarkersOlderThan time.Duration + +func (ttl DeleteMarkersOlderThan) configurePurge(opts *purgeOpts) error { + opts.dmthr = time.Duration(ttl) + return nil +} + +type deleteOptFn func(opts *deleteOpts) error + +func (opt deleteOptFn) configureDelete(opts *deleteOpts) error { + return opt(opts) +} + +// LastRevision deletes if the latest revision matches. +func LastRevision(revision uint64) KVDeleteOpt { + return deleteOptFn(func(opts *deleteOpts) error { + opts.revision = revision + return nil + }) +} diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go new file mode 100644 index 000000000..f2295f81f --- /dev/null +++ b/jetstream/test/kv_test.go @@ -0,0 +1,1192 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test + +import ( + "context" + "errors" + "fmt" + "os" + "reflect" + "strconv" + "strings" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func TestKeyValueBasics(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx := context.Background() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", History: 5, TTL: time.Hour}) + expectOk(t, err) + + if kv.Bucket() != "TEST" { + t.Fatalf("Expected bucket name to be %q, got %q", "TEST", kv.Bucket()) + } + + // Simple Put + r, err := kv.Put(ctx, "name", []byte("derek")) + expectOk(t, err) + if r != 1 { + t.Fatalf("Expected 1 for the revision, got %d", r) + } + // Simple Get + e, err := kv.Get(ctx, "name") + expectOk(t, err) + if string(e.Value()) != "derek" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "derek") + } + if e.Revision() != 1 { + t.Fatalf("Expected 1 for the revision, got %d", e.Revision()) + } + + // Delete + err = kv.Delete(ctx, "name") + expectOk(t, err) + _, err = kv.Get(ctx, "name") + expectErr(t, err, jetstream.ErrKeyNotFound) + r, err = kv.Create(ctx, "name", []byte("derek")) + expectOk(t, err) + if r != 3 { + t.Fatalf("Expected 3 for the revision, got %d", r) + } + err = kv.Delete(ctx, "name", jetstream.LastRevision(4)) + expectErr(t, err) + err = kv.Delete(ctx, "name", jetstream.LastRevision(3)) + expectOk(t, err) + + // Conditional Updates. + r, err = kv.Update(ctx, "name", []byte("rip"), 4) + expectOk(t, err) + _, err = kv.Update(ctx, "name", []byte("ik"), 3) + expectErr(t, err) + _, err = kv.Update(ctx, "name", []byte("ik"), r) + expectOk(t, err) + r, err = kv.Create(ctx, "age", []byte("22")) + expectOk(t, err) + _, err = kv.Update(ctx, "age", []byte("33"), r) + expectOk(t, err) + + // Status + status, err := kv.Status(ctx) + expectOk(t, err) + if status.History() != 5 { + t.Fatalf("expected history of 5 got %d", status.History()) + } + if status.Bucket() != "TEST" { + t.Fatalf("expected bucket TEST got %v", status.Bucket()) + } + if status.TTL() != time.Hour { + t.Fatalf("expected 1 hour TTL got %v", status.TTL()) + } + if status.Values() != 7 { + t.Fatalf("expected 7 values got %d", status.Values()) + } + if status.BackingStore() != "JetStream" { + t.Fatalf("invalid backing store kind %s", status.BackingStore()) + } + + kvs := status.(*jetstream.KeyValueBucketStatus) + si := kvs.StreamInfo() + if si == nil { + t.Fatalf("StreamInfo not received") + } +} + +func TestKeyValueHistory(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "LIST", History: 10}) + expectOk(t, err) + + for i := 0; i < 50; i++ { + age := strconv.FormatUint(uint64(i+22), 10) + _, err := kv.Put(ctx, "age", []byte(age)) + expectOk(t, err) + } + + vl, err := kv.History(ctx, "age") + expectOk(t, err) + + if len(vl) != 10 { + t.Fatalf("Expected %d values, got %d", 10, len(vl)) + } + for i, v := range vl { + if v.Key() != "age" { + t.Fatalf("Expected key of %q, got %q", "age", v.Key()) + } + if v.Revision() != uint64(i+41) { + // History of 10, sent 50.. + t.Fatalf("Expected revision of %d, got %d", i+41, v.Revision()) + } + age, err := strconv.Atoi(string(v.Value())) + expectOk(t, err) + if age != i+62 { + t.Fatalf("Expected data value of %d, got %d", i+22, age) + } + } +} + +func TestKeyValueWatch(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + watcher, err := kv.WatchAll(ctx) + expectOk(t, err) + defer watcher.Stop() + + expectUpdate := func(key, value string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Key() != key || string(v.Value()) != value || v.Revision() != revision { + t.Fatalf("Did not get expected: %+v vs %q %q %d", v, key, value, revision) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + expectDelete := func(key string, revision uint64) { + t.Helper() + select { + case v := <-watcher.Updates(): + if v.Operation() != jetstream.KeyValueDelete { + t.Fatalf("Expected a delete operation but got %+v", v) + } + if v.Revision() != revision { + t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision()) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive an update like expected") + } + } + expectInitDone := func() { + t.Helper() + select { + case v := <-watcher.Updates(): + if v != nil { + t.Fatalf("Did not get expected: %+v", v) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive a init done like expected") + } + } + + // Make sure we already got an initial value marker. + expectInitDone() + + kv.Create(ctx, "name", []byte("derek")) + expectUpdate("name", "derek", 1) + kv.Put(ctx, "name", []byte("rip")) + expectUpdate("name", "rip", 2) + kv.Put(ctx, "name", []byte("ik")) + expectUpdate("name", "ik", 3) + kv.Put(ctx, "age", []byte("22")) + expectUpdate("age", "22", 4) + kv.Put(ctx, "age", []byte("33")) + expectUpdate("age", "33", 5) + kv.Delete(ctx, "age") + expectDelete("age", 6) + + // Stop first watcher. + watcher.Stop() + + // Now try wildcard matching and make sure we only get last value when starting. + kv.Put(ctx, "t.name", []byte("rip")) + kv.Put(ctx, "t.name", []byte("ik")) + kv.Put(ctx, "t.age", []byte("22")) + kv.Put(ctx, "t.age", []byte("44")) + + watcher, err = kv.Watch(ctx, "t.*") + expectOk(t, err) + defer watcher.Stop() + + expectUpdate("t.name", "ik", 8) + expectUpdate("t.age", "44", 10) + expectInitDone() +} + +func TestKeyValueWatchContext(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCHCTX"}) + expectOk(t, err) + + watcher, err := kv.WatchAll(ctx) + expectOk(t, err) + defer watcher.Stop() + + // Trigger unsubscribe internally. + cancel() + + // Wait for a bit for unsubscribe to be done. + time.Sleep(500 * time.Millisecond) + + // Stopping watch that is already stopped via cancellation propagation is an error. + err = watcher.Stop() + if err == nil || err != nats.ErrBadSubscription { + t.Errorf("Expected invalid subscription, got: %v", err) + } +} + +func TestKeyValueWatchContextUpdates(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCHCTX"}) + expectOk(t, err) + + watcher, err := kv.WatchAll(ctx) + expectOk(t, err) + defer watcher.Stop() + + // Pull the initial state done marker which is nil. + select { + case v := <-watcher.Updates(): + if v != nil { + t.Fatalf("Expected nil marker, got %+v", v) + } + case <-time.After(time.Second): + t.Fatalf("Did not receive nil marker like expected") + } + + // Fire a timer and cancel the context after 250ms. + time.AfterFunc(250*time.Millisecond, cancel) + + // Make sure canceling will break us out here. + select { + case <-watcher.Updates(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not break out like expected") + } +} + +func TestKeyValueBindStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + // Now bind to it.. + _, err = js.KeyValue(ctx, "WATCH") + expectOk(t, err) + + // Make sure we can't bind to a non-kv style stream. + // We have some protection with stream name prefix. + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "KV_TEST", + Subjects: []string{"foo"}, + }) + expectOk(t, err) + + _, err = js.KeyValue(ctx, "TEST") + expectErr(t, err) + if err != jetstream.ErrBadBucket { + t.Fatalf("Expected %v but got %v", jetstream.ErrBadBucket, err) + } +} + +func TestKeyValueDeleteStore(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"}) + expectOk(t, err) + + err = js.DeleteKeyValue(ctx, "WATCH") + expectOk(t, err) + + _, err = js.KeyValue(ctx, "WATCH") + expectErr(t, err) +} + +func TestKeyValueDeleteVsPurge(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(ctx, key, []byte(value)) + expectOk(t, err) + } + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("name", "ivan") + put("age", "33") + put("name", "rip") + put("age", "44") + + kv.Delete(ctx, "age") + entries, err := kv.History(ctx, "age") + expectOk(t, err) + // Expect three entries and delete marker. + if len(entries) != 4 { + t.Fatalf("Expected 4 entries for age after delete, got %d", len(entries)) + } + err = kv.Purge(ctx, "name", jetstream.LastRevision(4)) + expectErr(t, err) + err = kv.Purge(ctx, "name", jetstream.LastRevision(5)) + expectOk(t, err) + // Check marker + e, err := kv.Get(ctx, "name") + expectErr(t, err, jetstream.ErrKeyNotFound) + if e != nil { + t.Fatalf("Expected a nil entry but got %v", e) + } + entries, err = kv.History(ctx, "name") + expectOk(t, err) + if len(entries) != 1 { + t.Fatalf("Expected only 1 entry for age after delete, got %d", len(entries)) + } + // Make sure history also reports the purge operation. + if e := entries[0]; e.Operation() != jetstream.KeyValuePurge { + t.Fatalf("Expected a purge operation but got %v", e.Operation()) + } +} + +func TestKeyValueDeleteTombstones(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(ctx, key, []byte(value)) + expectOk(t, err) + } + + v := strings.Repeat("ABC", 33) + for i := 1; i <= 100; i++ { + put(fmt.Sprintf("key-%d", i), v) + } + // Now delete them. + for i := 1; i <= 100; i++ { + err := kv.Delete(ctx, fmt.Sprintf("key-%d", i)) + expectOk(t, err) + } + + // Now cleanup. + err = kv.PurgeDeletes(ctx, jetstream.DeleteMarkersOlderThan(-1)) + expectOk(t, err) + + si, err := js.Stream(ctx, "KV_KVS") + expectOk(t, err) + if si.CachedInfo().State.Msgs != 0 { + t.Fatalf("Expected no stream msgs to be left, got %d", si.CachedInfo().State.Msgs) + } + + // Try with context + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + err = kv.PurgeDeletes(nats.Context(ctx)) + expectOk(t, err) +} + +func TestKeyValuePurgeDeletesMarkerThreshold(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 10}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(ctx, key, []byte(value)) + expectOk(t, err) + } + + put("foo", "foo1") + put("bar", "bar1") + put("foo", "foo2") + err = kv.Delete(ctx, "foo") + expectOk(t, err) + + time.Sleep(200 * time.Millisecond) + + err = kv.Delete(ctx, "bar") + expectOk(t, err) + + err = kv.PurgeDeletes(ctx, jetstream.DeleteMarkersOlderThan(100*time.Millisecond)) + expectOk(t, err) + + // The key foo should have been completely cleared of the data + // and the delete marker. + fooEntries, err := kv.History(ctx, "foo") + if err != jetstream.ErrKeyNotFound { + t.Fatalf("Expected all entries for key foo to be gone, got err=%v entries=%v", err, fooEntries) + } + barEntries, err := kv.History(ctx, "bar") + expectOk(t, err) + if len(barEntries) != 1 { + t.Fatalf("Expected 1 entry, got %v", barEntries) + } + if e := barEntries[0]; e.Operation() != jetstream.KeyValueDelete { + t.Fatalf("Unexpected entry: %+v", e) + } +} + +func TestKeyValueKeys(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2}) + expectOk(t, err) + + put := func(key, value string) { + t.Helper() + _, err := kv.Put(ctx, key, []byte(value)) + expectOk(t, err) + } + + _, err = kv.Keys(ctx) + expectErr(t, err, jetstream.ErrNoKeysFound) + + // Put in a few names and ages. + put("name", "derek") + put("age", "22") + put("country", "US") + put("name", "ivan") + put("age", "33") + put("country", "US") + put("name", "rip") + put("age", "44") + put("country", "MT") + + keys, err := kv.Keys(ctx) + expectOk(t, err) + + kmap := make(map[string]struct{}) + for _, key := range keys { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 3 { + t.Fatalf("Expected 3 total keys, got %d", len(kmap)) + } + expected := map[string]struct{}{ + "name": struct{}{}, + "age": struct{}{}, + "country": struct{}{}, + } + if !reflect.DeepEqual(kmap, expected) { + t.Fatalf("Expected %+v but got %+v", expected, kmap) + } + // Make sure delete and purge do the right thing and not return the keys. + err = kv.Delete(ctx, "name") + expectOk(t, err) + err = kv.Purge(ctx, "country") + expectOk(t, err) + + keys, err = kv.Keys(ctx) + expectOk(t, err) + + kmap = make(map[string]struct{}) + for _, key := range keys { + if _, ok := kmap[key]; ok { + t.Fatalf("Already saw %q", key) + } + kmap[key] = struct{}{} + } + if len(kmap) != 1 { + t.Fatalf("Expected 1 total key, got %d", len(kmap)) + } + if _, ok := kmap["age"]; !ok { + t.Fatalf("Expected %q to be only key present", "age") + } +} + +func TestKeyValueCrossAccounts(t *testing.T) { + conf := createConfFile(t, []byte(` + jetstream: enabled + accounts: { + A: { + users: [ {user: a, password: a} ] + jetstream: enabled + exports: [ + {service: '$JS.API.>' } + {service: '$KV.>'} + {stream: 'accI.>'} + ] + }, + I: { + users: [ {user: i, password: i} ] + imports: [ + {service: {account: A, subject: '$JS.API.>'}, to: 'fromA.>' } + {service: {account: A, subject: '$KV.>'}, to: 'fromA.$KV.>' } + {stream: {subject: 'accI.>', account: A}} + ] + } + }`)) + defer os.Remove(conf) + s, _ := RunServerWithConfig(conf) + defer shutdownJSServerAndRemoveStorage(t, s) + + watchNext := func(w jetstream.KeyWatcher) jetstream.KeyValueEntry { + t.Helper() + select { + case e := <-w.Updates(): + return e + case <-time.After(time.Second): + t.Fatal("Fail to get the next update") + } + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + nc1, js1 := jsClient(t, s, nats.UserInfo("a", "a")) + defer nc1.Close() + + kv1, err := js1.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "Map", History: 10}) + if err != nil { + t.Fatalf("Error creating kv store: %v", err) + } + + w1, err := kv1.Watch(ctx, "map") + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + if e := watchNext(w1); e != nil { + t.Fatalf("Expected nil entry, got %+v", e) + } + + nc2, err := nats.Connect(s.ClientURL(), nats.UserInfo("i", "i"), nats.CustomInboxPrefix("accI")) + if err != nil { + t.Fatalf("Error on connect: %v", err) + } + defer nc2.Close() + js2, err := jetstream.NewWithAPIPrefix(nc2, "fromA") + if err != nil { + t.Fatalf("Error getting jetstream context: %v", err) + } + + kv2, err := js2.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "Map", History: 10}) + if err != nil { + t.Fatalf("Error creating kv store: %v", err) + } + + w2, err := kv2.Watch(ctx, "map") + if err != nil { + t.Fatalf("Error creating watcher: %v", err) + } + if e := watchNext(w2); e != nil { + t.Fatalf("Expected nil entry, got %+v", e) + } + + // Do a Put from kv2 + rev, err := kv2.Put(ctx, "map", []byte("value")) + if err != nil { + t.Fatalf("Error on put: %v", err) + } + + // Get from kv1 + e, err := kv1.Get(ctx, "map") + if err != nil { + t.Fatalf("Error on get: %v", err) + } + if e.Key() != "map" || string(e.Value()) != "value" { + t.Fatalf("Unexpected entry: +%v", e) + } + + // Get from kv2 + e, err = kv2.Get(ctx, "map") + if err != nil { + t.Fatalf("Error on get: %v", err) + } + if e.Key() != "map" || string(e.Value()) != "value" { + t.Fatalf("Unexpected entry: +%v", e) + } + + // Watcher 1 + if e := watchNext(w1); e == nil || e.Key() != "map" || string(e.Value()) != "value" { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Watcher 2 + if e := watchNext(w2); e == nil || e.Key() != "map" || string(e.Value()) != "value" { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Try an update form kv2 + if _, err := kv2.Update(ctx, "map", []byte("updated"), rev); err != nil { + t.Fatalf("Failed to update: %v", err) + } + + // Get from kv1 + e, err = kv1.Get(ctx, "map") + if err != nil { + t.Fatalf("Error on get: %v", err) + } + if e.Key() != "map" || string(e.Value()) != "updated" { + t.Fatalf("Unexpected entry: +%v", e) + } + + // Get from kv2 + e, err = kv2.Get(ctx, "map") + if err != nil { + t.Fatalf("Error on get: %v", err) + } + if e.Key() != "map" || string(e.Value()) != "updated" { + t.Fatalf("Unexpected entry: +%v", e) + } + + // Watcher 1 + if e := watchNext(w1); e == nil || e.Key() != "map" || string(e.Value()) != "updated" { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Watcher 2 + if e := watchNext(w2); e == nil || e.Key() != "map" || string(e.Value()) != "updated" { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Purge from kv2 + if err := kv2.Purge(ctx, "map"); err != nil { + t.Fatalf("Error on purge: %v", err) + } + + // Check purge ok from w1 + if e := watchNext(w1); e == nil || e.Operation() != jetstream.KeyValuePurge { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Check purge ok from w2 + if e := watchNext(w2); e == nil || e.Operation() != jetstream.KeyValuePurge { + t.Fatalf("Unexpected entry: %+v", e) + } + + // Delete purge records from kv2 + if err := kv2.PurgeDeletes(ctx, jetstream.DeleteMarkersOlderThan(-1)); err != nil { + t.Fatalf("Error on purge deletes: %v", err) + } + + // Check all gone from js1 + if si, err := js1.Stream(ctx, "KV_Map"); err != nil || si == nil || si.CachedInfo().State.Msgs != 0 { + t.Fatalf("Error getting stream info: err=%v si=%+v", err, si) + } + + // Delete key from kv2 + if err := kv2.Delete(ctx, "map"); err != nil { + t.Fatalf("Error on delete: %v", err) + } + + // Check key gone from kv1 + if e, err := kv1.Get(ctx, "map"); err != jetstream.ErrKeyNotFound || e != nil { + t.Fatalf("Expected key not found, got err=%v e=%+v", err, e) + } +} + +func TestKeyValueDuplicatesWindow(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + checkWindow := func(ttl, expectedDuplicates time.Duration) { + t.Helper() + + _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST", History: 5, TTL: ttl}) + expectOk(t, err) + defer js.DeleteKeyValue(ctx, "TEST") + + si, err := js.Stream(ctx, "KV_TEST") + if err != nil { + t.Fatalf("StreamInfo error: %v", err) + } + if si.CachedInfo().Config.Duplicates != expectedDuplicates { + t.Fatalf("Expected duplicates to be %v, got %v", expectedDuplicates, si.CachedInfo().Config.Duplicates) + } + } + + checkWindow(0, 2*time.Minute) + checkWindow(time.Hour, 2*time.Minute) + checkWindow(5*time.Second, 5*time.Second) +} + +func TestListKeyValueStores(t *testing.T) { + tests := []struct { + name string + bucketsNum int + }{ + { + name: "single page", + bucketsNum: 5, + }, + { + name: "multi page", + bucketsNum: 1025, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // create stream without the chunk subject, but with KV_ prefix + _, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "KV_FOO", Subjects: []string{"FOO.*"}}) + expectOk(t, err) + // create stream with chunk subject, but without "KV_" prefix + _, err = js.CreateStream(ctx, jetstream.StreamConfig{Name: "FOO", Subjects: []string{"$KV.ABC.>"}}) + expectOk(t, err) + for i := 0; i < test.bucketsNum; i++ { + _, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: fmt.Sprintf("KVS_%d", i), MaxBytes: 1024}) + expectOk(t, err) + } + names := make([]string, 0) + kvNames := js.KeyValueStoreNames(ctx) + for name := range kvNames.Name() { + names = append(names, name) + } + if kvNames.Error() != nil { + t.Fatalf("Unexpected error: %v", kvNames.Error()) + } + if len(names) != test.bucketsNum { + t.Fatalf("Invalid number of stream names; want: %d; got: %d", test.bucketsNum, len(names)) + } + infos := make([]nats.KeyValueStatus, 0) + kvInfos := js.KeyValueStores(ctx) + for info := range kvInfos.Status() { + infos = append(infos, info) + } + if kvInfos.Error() != nil { + t.Fatalf("Unexpected error: %v", kvNames.Error()) + } + if len(infos) != test.bucketsNum { + t.Fatalf("Invalid number of streams; want: %d; got: %d", test.bucketsNum, len(infos)) + } + }) + } +} + +func TestKeyValueMirrorCrossDomains(t *testing.T) { + conf := createConfFile(t, []byte(` + server_name: HUB + listen: 127.0.0.1:-1 + jetstream: { domain: HUB } + leafnodes { listen: 127.0.0.1:7422 } + }`)) + defer os.Remove(conf) + s, _ := RunServerWithConfig(conf) + defer shutdownJSServerAndRemoveStorage(t, s) + + lconf := createConfFile(t, []byte(` + server_name: LEAF + listen: 127.0.0.1:-1 + jetstream: { domain:LEAF } + leafnodes { + remotes = [ { url: "leaf://127.0.0.1" } ] + } + }`)) + defer os.Remove(lconf) + ln, _ := RunServerWithConfig(lconf) + defer shutdownJSServerAndRemoveStorage(t, ln) + + // Create main KV on HUB + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST"}) + expectOk(t, err) + + _, err = kv.PutString(ctx, "name", "derek") + expectOk(t, err) + _, err = kv.PutString(ctx, "age", "22") + expectOk(t, err) + _, err = kv.PutString(ctx, "v", "v") + expectOk(t, err) + err = kv.Delete(ctx, "v") + expectOk(t, err) + + lnc, ljs := jsClient(t, ln) + defer lnc.Close() + + // Capture cfg so we can make sure it does not change. + // NOTE: We use different name to test all possibilities, etc, but in practice for truly nomadic applications + // this should be named the same, e.g. TEST. + cfg := jetstream.KeyValueConfig{ + Bucket: "MIRROR", + Mirror: &jetstream.StreamSource{ + Name: "TEST", + Domain: "HUB", + }, + } + ccfg := cfg + + _, err = ljs.CreateKeyValue(ctx, cfg) + expectOk(t, err) + + if !reflect.DeepEqual(cfg, ccfg) { + t.Fatalf("Did not expect config to be altered: %+v vs %+v", cfg, ccfg) + } + + si, err := ljs.Stream(ctx, "KV_MIRROR") + expectOk(t, err) + + // Make sure mirror direct set. + if !si.CachedInfo().Config.MirrorDirect { + t.Fatalf("Expected mirror direct to be set") + } + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + si, err := ljs.Stream(ctx, "KV_MIRROR") + expectOk(t, err) + if si.CachedInfo().State.Msgs == 3 { + return nil + } + return fmt.Errorf("Did not get synched messages: %d", si.CachedInfo().State.Msgs) + }) + + // Bind locally from leafnode and make sure both get and put work. + mkv, err := ljs.KeyValue(ctx, "MIRROR") + expectOk(t, err) + + _, err = mkv.PutString(ctx, "name", "rip") + expectOk(t, err) + + _, err = mkv.PutString(ctx, "v", "vv") + expectOk(t, err) + e, err := mkv.Get(ctx, "v") + expectOk(t, err) + if e.Operation() != jetstream.KeyValuePut { + t.Fatalf("Got wrong value: %q vs %q", e.Operation(), nats.KeyValuePut) + } + err = mkv.Delete(ctx, "v") + expectOk(t, err) + _, err = mkv.Get(ctx, "v") + expectErr(t, err, jetstream.ErrKeyNotFound) + + e, err = mkv.Get(ctx, "name") + expectOk(t, err) + if string(e.Value()) != "rip" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "rip") + } + + // Also make sure we can create a watcher on the mirror KV. + watcher, err := mkv.WatchAll(ctx) + expectOk(t, err) + defer watcher.Stop() + + // Bind through leafnode connection but to origin KV. + rjs, err := jetstream.NewWithDomain(nc, "HUB") + expectOk(t, err) + + rkv, err := rjs.KeyValue(ctx, "TEST") + expectOk(t, err) + + _, err = rkv.PutString(ctx, "name", "ivan") + expectOk(t, err) + + e, err = rkv.Get(ctx, "name") + expectOk(t, err) + if string(e.Value()) != "ivan" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan") + } + _, err = rkv.PutString(ctx, "v", "vv") + expectOk(t, err) + e, err = mkv.Get(ctx, "v") + expectOk(t, err) + if e.Operation() != jetstream.KeyValuePut { + t.Fatalf("Got wrong value: %q vs %q", e.Operation(), nats.KeyValuePut) + } + err = rkv.Delete(ctx, "v") + expectOk(t, err) + _, err = rkv.Get(ctx, "v") + expectErr(t, err, jetstream.ErrKeyNotFound) + + // Shutdown cluster and test get still work. + shutdownJSServerAndRemoveStorage(t, s) + + e, err = mkv.Get(ctx, "name") + expectOk(t, err) + if string(e.Value()) != "ivan" { + t.Fatalf("Got wrong value: %q vs %q", e.Value(), "ivan") + } +} + +func TestKeyValueRePublish(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "TEST_UPDATE", + }); err != nil { + t.Fatalf("Error creating store: %v", err) + } + // This is expected to fail since server does not support as of now + // the update of RePublish. + if _, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "TEST_UPDATE", + RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"}, + }); err == nil { + t.Fatal("Expected failure, did not get one") + } + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "TEST", + RePublish: &jetstream.RePublish{Source: ">", Destination: "bar.>"}, + }) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + si, err := js.Stream(ctx, "KV_TEST") + if err != nil { + t.Fatalf("Error getting stream info: %v", err) + } + if si.CachedInfo().Config.RePublish == nil { + t.Fatal("Expected republish to be set, it was not") + } + + sub, err := nc.SubscribeSync("bar.>") + if err != nil { + t.Fatalf("Error on sub: %v", err) + } + if _, err := kv.Put(ctx, "foo", []byte("value")); err != nil { + t.Fatalf("Error on put: %v", err) + } + msg, err := sub.NextMsg(time.Second) + if err != nil { + t.Fatalf("Error on next: %v", err) + } + if v := string(msg.Data); v != "value" { + t.Fatalf("Unexpected value: %s", v) + } + // The message should also have a header with the actual subject + expected := "$KV.TEST.foo" + if v := msg.Header.Get(jetstream.SubjectHeader); v != expected { + t.Fatalf("Expected subject header %q, got %q", expected, v) + } +} + +func TestKeyValueMirrorDirectGet(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST"}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ + Name: "MIRROR", + Mirror: &jetstream.StreamSource{Name: "KV_TEST"}, + MirrorDirect: true, + }) + if err != nil { + t.Fatalf("Error creating mirror: %v", err) + } + + for i := 0; i < 100; i++ { + key := fmt.Sprintf("KEY.%d", i) + if _, err := kv.PutString(ctx, key, "42"); err != nil { + t.Fatalf("Error adding key: %v", err) + } + } + + // Make sure all gets work. + for i := 0; i < 100; i++ { + if _, err := kv.Get(ctx, "KEY.22"); err != nil { + t.Fatalf("Got error getting key: %v", err) + } + } +} + +func TestKeyValueCreate(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "TEST"}) + if err != nil { + t.Fatalf("Error creating kv: %v", err) + } + + _, err = kv.Create(ctx, "key", []byte("1")) + if err != nil { + t.Fatalf("Error creating key: %v", err) + } + + _, err = kv.Create(ctx, "key", []byte("1")) + expected := "wrong last sequence: 1: key exists" + if !strings.Contains(err.Error(), expected) { + t.Fatalf("Expected %q, got: %v", expected, err) + } + if !errors.Is(err, jetstream.ErrKeyExists) { + t.Fatalf("Expected ErrKeyExists, got: %v", err) + } + aerr := &jetstream.APIError{} + if !errors.As(err, &aerr) { + t.Fatalf("Expected APIError, got: %v", err) + } + if aerr.Description != "wrong last sequence: 1" { + t.Fatalf("Unexpected APIError message, got: %v", aerr.Description) + } + if aerr.ErrorCode != 10071 { + t.Fatalf("Unexpected error code, got: %v", aerr.ErrorCode) + } + if aerr.Code != jetstream.ErrKeyExists.APIError().Code { + t.Fatalf("Unexpected error code, got: %v", aerr.Code) + } + var kerr jetstream.JetStreamError + if !errors.As(err, &kerr) { + t.Fatalf("Expected KeyValueError, got: %v", err) + } + if kerr.APIError().ErrorCode != 10071 { + t.Fatalf("Unexpected error code, got: %v", kerr.APIError().ErrorCode) + } +} + +// Helpers + +func client(t *testing.T, s *server.Server, opts ...nats.Option) *nats.Conn { + t.Helper() + nc, err := nats.Connect(s.ClientURL(), opts...) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + return nc +} + +func jsClient(t *testing.T, s *server.Server, opts ...nats.Option) (*nats.Conn, jetstream.JetStream) { + t.Helper() + nc := client(t, s, opts...) + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error getting JetStream context: %v", err) + } + return nc, js +} + +func expectOk(t *testing.T, err error) { + t.Helper() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + +func expectErr(t *testing.T, err error, expected ...error) { + t.Helper() + if err == nil { + t.Fatalf("Expected error but got none") + } + if len(expected) == 0 { + return + } + for _, e := range expected { + if err == e || strings.Contains(e.Error(), err.Error()) { + return + } + } + t.Fatalf("Expected one of %+v, got '%v'", expected, err) +} diff --git a/js.go b/js.go index b83cedb4f..8836bbb49 100644 --- a/js.go +++ b/js.go @@ -1822,6 +1822,17 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, return sub, nil } +// InitialConsumerPending returns the number of messages pending to be +// delivered to the consumer when the subscription was created. +func (sub *Subscription) InitialConsumerPending() (uint64, error) { + sub.mu.Lock() + defer sub.mu.Unlock() + if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ { + return 0, fmt.Errorf("%w: not a JetStream subscription", ErrTypeSubscription) + } + return sub.jsi.pending, nil +} + // This long-lived routine is used per ChanSubscription to check // on the number of delivered messages and check for flow control response. func (sub *Subscription) chanSubcheckForFlowControlResponse() { @@ -1915,7 +1926,7 @@ func (sub *Subscription) checkOrderedMsgs(m *Msg) bool { if err != nil { return false } - sseq, dseq := parser.ParseNum(tokens[ackStreamSeqTokenPos]), parser.ParseNum(tokens[ackConsumerSeqTokenPos]) + sseq, dseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos]) jsi := sub.jsi if dseq != jsi.dseq { @@ -2029,7 +2040,7 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg) if err != nil { var apiErr *APIError - if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) { + if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { // if creating consumer failed, retry return } else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr { @@ -2157,7 +2168,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) // Consumer sequence. var ldseq string - dseq := tokens[ackConsumerSeqTokenPos] + dseq := tokens[parser.AckConsumerSeqTokenPos] hdr := msg.Header[lastConsumerSeqHdr] if len(hdr) == 1 { ldseq = hdr[0] @@ -2168,7 +2179,7 @@ func (nc *Conn) checkForSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) if ldseq != dseq { // Dispatch async error including details such as // from where the consumer could be restarted. - sseq := parser.ParseNum(tokens[ackStreamSeqTokenPos]) + sseq := parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]) if ordered { s.mu.Lock() s.resetOrderedConsumer(jsi.sseq + 1) @@ -3280,18 +3291,6 @@ type MsgMetadata struct { Domain string } -const ( - ackDomainTokenPos = 2 - ackAccHashTokenPos = 3 - ackStreamTokenPos = 4 - ackConsumerTokenPos = 5 - ackNumDeliveredTokenPos = 6 - ackStreamSeqTokenPos = 7 - ackConsumerSeqTokenPos = 8 - ackTimestampSeqTokenPos = 9 - ackNumPendingTokenPos = 10 -) - // Metadata retrieves the metadata from a JetStream message. This method will // return an error for non-JetStream Msgs. func (m *Msg) Metadata() (*MsgMetadata, error) { @@ -3305,15 +3304,15 @@ func (m *Msg) Metadata() (*MsgMetadata, error) { } meta := &MsgMetadata{ - Domain: tokens[ackDomainTokenPos], - NumDelivered: parser.ParseNum(tokens[ackNumDeliveredTokenPos]), - NumPending: parser.ParseNum(tokens[ackNumPendingTokenPos]), - Timestamp: time.Unix(0, int64(parser.ParseNum(tokens[ackTimestampSeqTokenPos]))), - Stream: tokens[ackStreamTokenPos], - Consumer: tokens[ackConsumerTokenPos], - } - meta.Sequence.Stream = parser.ParseNum(tokens[ackStreamSeqTokenPos]) - meta.Sequence.Consumer = parser.ParseNum(tokens[ackConsumerSeqTokenPos]) + Domain: tokens[parser.AckDomainTokenPos], + NumDelivered: parser.ParseNum(tokens[parser.AckNumDeliveredTokenPos]), + NumPending: parser.ParseNum(tokens[parser.AckNumPendingTokenPos]), + Timestamp: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))), + Stream: tokens[parser.AckStreamTokenPos], + Consumer: tokens[parser.AckConsumerTokenPos], + } + meta.Sequence.Stream = parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]) + meta.Sequence.Consumer = parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos]) return meta, nil } diff --git a/kv.go b/kv.go index 9ad1b22d2..34d585d4f 100644 --- a/kv.go +++ b/kv.go @@ -622,7 +622,7 @@ func (kv *kvs) PutString(key string, value string) (revision uint64, err error) return kv.Put(key, []byte(value)) } -// Create will add the key/value pair iff it does not exist. +// Create will add the key/value pair if it does not exist. func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { v, err := kv.Update(key, value, 0) if err == nil { @@ -645,7 +645,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { return 0, err } -// Update will update the value iff the latest revision matches. +// Update will update the value if the latest revision matches. func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { if !keyValid(key) { return 0, ErrInvalidKey @@ -909,7 +909,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { op = KeyValuePurge } } - delta := parser.ParseNum(tokens[ackNumPendingTokenPos]) + delta := parser.ParseNum(tokens[parser.AckNumPendingTokenPos]) w.mu.Lock() defer w.mu.Unlock() if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { @@ -917,8 +917,8 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { bucket: kv.name, key: subj, value: m.Data, - revision: parser.ParseNum(tokens[ackStreamSeqTokenPos]), - created: time.Unix(0, int64(parser.ParseNum(tokens[ackTimestampSeqTokenPos]))), + revision: parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]), + created: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))), delta: delta, op: op, } @@ -966,7 +966,7 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { w.updates <- nil } // Set us up to close when the waitForMessages func returns. - sub.pDone = func() { + sub.pDone = func(_ string) { close(w.updates) } sub.mu.Unlock() @@ -1020,16 +1020,16 @@ func (kv *kvs) Status() (KeyValueStatus, error) { // KeyValueStoreNames is used to retrieve a list of key value store names func (js *js) KeyValueStoreNames() <-chan string { ch := make(chan string) - l := &streamLister{js: js} + l := &streamNamesLister{js: js} l.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*") go func() { defer close(ch) for l.Next() { - for _, info := range l.Page() { - if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) { + for _, name := range l.Page() { + if !strings.HasPrefix(name, kvBucketNamePre) { continue } - ch <- info.Config.Name + ch <- name } } }() diff --git a/nats.go b/nats.go index 350f2306a..8b9fd8862 100644 --- a/nats.go +++ b/nats.go @@ -613,7 +613,7 @@ type Subscription struct { pHead *Msg pTail *Msg pCond *sync.Cond - pDone func() + pDone func(subject string) // Pending stats, async subscriptions, high-speed etc. pMsgs int @@ -3025,7 +3025,7 @@ func (nc *Conn) waitForMsgs(s *Subscription) { s.mu.Unlock() if done != nil { - done() + done(s.Subject) } } @@ -4450,6 +4450,14 @@ func (s *Subscription) AutoUnsubscribe(max int) error { return conn.unsubscribe(s, max, false) } +// SetClosedHandler will set the closed handler for when a subscription +// is closed (either unsubscribed or drained). +func (s *Subscription) SetClosedHandler(handler func(subject string)) { + s.mu.Lock() + s.pDone = handler + s.mu.Unlock() +} + // unsubscribe performs the low level unsubscribe to the server. // Use Subscription.Unsubscribe() func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { diff --git a/object.go b/object.go index 9d558cc6e..b12af5c53 100644 --- a/object.go +++ b/object.go @@ -658,7 +658,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { result.digest.Write(m.Data) // Check if we are done. - if tokens[ackNumPendingTokenPos] == objNoPending { + if tokens[parser.AckNumPendingTokenPos] == objNoPending { pw.Close() m.Sub.Unsubscribe() } diff --git a/test/js_test.go b/test/js_test.go index 849de3cbb..c025d5faf 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -476,6 +476,13 @@ func TestJetStreamSubscribe(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + initialPending, err := sub.InitialConsumerPending() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if initialPending != 0 { + t.Fatalf("Expected no initial pending, got %d", initialPending) + } sub.Unsubscribe() // Check that Queue subscribe with HB or FC fails. @@ -563,12 +570,20 @@ func TestJetStreamSubscribe(t *testing.T) { done <- true } }) + if err != nil { t.Fatalf("Unexpected error: %v", err) } expectConsumers(t, 3) defer sub3.Unsubscribe() + initialPending, err = sub3.InitialConsumerPending() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if initialPending != 10 { + t.Fatalf("Expected initial pending of 10, got %d", initialPending) + } select { case <-done: case <-time.After(5 * time.Second): diff --git a/test/sub_test.go b/test/sub_test.go index 253e00f61..cb177c73a 100644 --- a/test/sub_test.go +++ b/test/sub_test.go @@ -1571,3 +1571,46 @@ func TestAutoUnsubOnSyncSubCanStillRespond(t *testing.T) { t.Fatalf("Error responding: %v", err) } } + +func TestSubscribe_ClosedHandler(t *testing.T) { + s := RunDefaultServer() + defer s.Shutdown() + + nc := NewDefaultConnection(t) + defer nc.Close() + + ch := make(chan string, 1) + sub, err := nc.Subscribe("foo", func(_ *nats.Msg) {}) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + sub.SetClosedHandler(func(subj string) { + ch <- subj + }) + sub.Unsubscribe() + select { + case subj := <-ch: + if subj != "foo" { + t.Fatalf("Expected 'foo', got '%v'", subj) + } + case <-time.After(1 * time.Second): + t.Fatal("Did not receive closed callback") + } + + sub, err = nc.Subscribe("bar", func(_ *nats.Msg) {}) + if err != nil { + t.Fatalf("Error subscribing: %v", err) + } + sub.SetClosedHandler(func(subj string) { + ch <- subj + }) + sub.Drain() + select { + case subj := <-ch: + if subj != "bar" { + t.Fatalf("Expected 'bar', got '%v'", subj) + } + case <-time.After(1 * time.Second): + t.Fatal("Did not receive closed callback") + } +}