diff --git a/src/js.h b/src/js.h index 8f0f26044..79ce521cd 100644 --- a/src/js.h +++ b/src/js.h @@ -103,6 +103,9 @@ extern const int64_t jsDefaultRequestWait; #define jsAckInProgress "+WPI" #define jsAckTerm "+TERM" +// jsExtDomainT is used to create a StreamSource External APIPrefix +#define jsExtDomainT "$JS.%s.API" + // jsApiAccountInfo is for obtaining general information about JetStream. #define jsApiAccountInfo "%.*s.INFO" diff --git a/src/jsm.c b/src/jsm.c index 1fd25db59..c62fb3f83 100644 --- a/src/jsm.c +++ b/src/jsm.c @@ -264,8 +264,11 @@ _marshalExternalStream(jsExternalStream *external, const char *fieldName, natsBu IFOK(s, natsBuf_Append(buf, fieldName, -1)); IFOK(s, natsBuf_Append(buf, "\":{\"api\":\"", -1)); IFOK(s, natsBuf_Append(buf, external->APIPrefix, -1)); - IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1)); - IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1)); + if ((s == NATS_OK) && !nats_IsStringEmpty(external->DeliverPrefix)) + { + IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1)); + IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1)); + } IFOK(s, natsBuf_Append(buf, "\"}", -1)); return NATS_UPDATE_ERR_STACK(s); @@ -1061,6 +1064,95 @@ jsStreamConfig_Init(jsStreamConfig *cfg) return NATS_OK; } +static void +_restoreMirrorAndSourcesExternal(jsStreamConfig *cfg) +{ + int i; + + // We are guaranteed that if a source's Domain is set, there was originally + // no External value. So free any External value and reset to NULL to + // restore the original setting. + if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain)) + { + _destroyExternalStream(cfg->Mirror->External); + cfg->Mirror->External = NULL; + } + for (i=0; iSourcesLen; i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + { + _destroyExternalStream(src->External); + src->External = NULL; + } + } +} + +static natsStatus +_convertDomain(jsStreamSource *src) +{ + jsExternalStream *e = NULL; + + e = (jsExternalStream*) NATS_CALLOC(1, sizeof(jsExternalStream)); + if (e == NULL) + return nats_setDefaultError(NATS_NO_MEMORY); + + if (nats_asprintf((char**) &(e->APIPrefix), jsExtDomainT, src->Domain) < 0) + { + NATS_FREE(e); + return nats_setDefaultError(NATS_NO_MEMORY); + } + src->External = e; + return NATS_OK; +} + +static natsStatus +_convertMirrorAndSourcesDomain(bool *converted, jsStreamConfig *cfg) +{ + natsStatus s = NATS_OK; + bool cm = false; + bool cs = false; + int i; + + *converted = false; + + if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain)) + { + if (cfg->Mirror->External != NULL) + return nats_setError(NATS_INVALID_ARG, "%s", "mirror's domain and external are both set"); + cm = true; + } + for (i=0; iSourcesLen; i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + { + if (src->External != NULL) + return nats_setError(NATS_INVALID_ARG, "%s", "source's domain and external are both set"); + cs = true; + } + } + if (!cm && !cs) + return NATS_OK; + + if (cm) + s = _convertDomain(cfg->Mirror); + if ((s == NATS_OK) && cs) + { + for (i=0; (s == NATS_OK) && (iSourcesLen); i++) + { + jsStreamSource *src = cfg->Sources[i]; + if ((src != NULL) && !nats_IsStringEmpty(src->Domain)) + s = _convertDomain(src); + } + } + if (s == NATS_OK) + *converted = true; + else + _restoreMirrorAndSourcesExternal(cfg); + return NATS_UPDATE_ERR_STACK(s); +} + static natsStatus _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode) { @@ -1071,6 +1163,7 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo natsConnection *nc = NULL; const char *apiT = NULL; bool freePfx = false; + bool msc = false; jsOptions o; if (errCode != NULL) @@ -1101,6 +1194,8 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo if (freePfx) NATS_FREE((char*) o.Prefix); } + if ((s == NATS_OK) && (action == jsStreamActionCreate)) + s = _convertMirrorAndSourcesDomain(&msc, cfg); // Marshal the stream create/update request IFOK(s, js_marshalStreamConfig(&buf, cfg)); @@ -1111,6 +1206,12 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo // If we got a response, check for error or return the stream info result. IFOK(s, _unmarshalStreamCreateResp(new_si, NULL, resp, errCode)); + // If mirror and/or sources were converted for the domain, then we need + // to restore the original values (which will free the memory that was + // allocated for the conversion). + if (msc) + _restoreMirrorAndSourcesExternal(cfg); + natsBuf_Destroy(buf); natsMsg_Destroy(resp); NATS_FREE(subj); diff --git a/src/kv.c b/src/kv.c index 9a798a538..a82826876 100644 --- a/src/kv.c +++ b/src/kv.c @@ -21,9 +21,11 @@ #include "conn.h" #include "sub.h" -static const char *kvBucketNameTmpl = "KV_%s"; -static const char *kvSubjectsTmpl = "$KV.%s.>"; -static const char *kvSubjectsPreTmpl = "$KV.%s."; +static const char *kvBucketNamePre = "KV_"; +static const char *kvBucketNameTmpl = "KV_%s"; +static const char *kvSubjectsTmpl = "$KV.%s.>"; +static const char *kvSubjectsPreTmpl = "$KV.%s."; +static const char *kvSubjectsPreDomainTmpl = "%s.$KV.%s."; #define KV_WATCH_FOR_EVER (int64_t)(0x7FFFFFFFFFFFFFFF) @@ -34,14 +36,17 @@ natsBuffer buf; #define USE_JS_PREFIX true #define KEY_NAME_ONLY false -#define BUILD_SUBJECT(p) \ +#define FOR_A_PUT true +#define NOT_FOR_A_PUT false + +#define BUILD_SUBJECT(p, fp) \ s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); \ if ((p) && kv->useJSPrefix) \ { \ IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); \ IFOK(s, natsBuf_AppendByte(&buf, '.')); \ } \ -IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); \ +IFOK(s, natsBuf_Append(&buf, ((fp) ? (kv->usePutPre ? kv->putPre : kv->pre) : kv->pre), -1)); \ IFOK(s, natsBuf_Append(&buf, key, -1)); \ IFOK(s, natsBuf_AppendByte(&buf, 0)); @@ -119,6 +124,7 @@ _freeKV(kvStore *kv) NATS_FREE(kv->bucket); NATS_FREE(kv->stream); NATS_FREE(kv->pre); + NATS_FREE(kv->putPre); natsMutex_Destroy(kv->mu); NATS_FREE(kv); js_release(js); @@ -188,52 +194,38 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket) return NATS_UPDATE_ERR_STACK(s); } -static bool -_sameStrings(const char *s1, const char *s2) +static natsStatus +_changePutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si) { - bool s1Empty = nats_IsStringEmpty(s1); - bool s2Empty = nats_IsStringEmpty(s2); + natsStatus s = NATS_OK; + const char *bucket = NULL; + jsStreamSource *m = si->Config->Mirror; - // Same if both empty. - if (s1Empty && s2Empty) - return true; + if (m == NULL) + return NATS_OK; - // Not same if one is empty while other is not. - if ((s1Empty && !s2Empty) || (!s1Empty && s2Empty)) - return false; + bucket = m->Name; + if (strstr(m->Name, kvBucketNamePre) == m->Name) + bucket = m->Name + strlen(kvBucketNamePre); - // Return result of comparison of s1 and s2 - return (strcmp(s1, s2) == 0 ? true : false); -} + if ((m->External != NULL) && !nats_IsStringEmpty(m->External->APIPrefix)) + { + kv->useJSPrefix = false; -static bool -_sameStreamCfg(jsStreamConfig *oc, jsStreamConfig *nc) -{ - // Check some of the stream's configuration properties only, - // the ones that we set when creating a KV stream. - if (!_sameStrings(oc->Description, nc->Description)) - return false; - if (oc->SubjectsLen != nc->SubjectsLen) - return false; - if (!_sameStrings(oc->Subjects[0], nc->Subjects[0])) - return false; - if (oc->MaxMsgsPerSubject != nc->MaxMsgsPerSubject) - return false; - if (oc->MaxBytes != nc->MaxBytes) - return false; - if (oc->MaxAge != nc->MaxAge) - return false; - if (oc->MaxMsgSize != nc->MaxMsgSize) - return false; - if (oc->Storage != nc->Storage) - return false; - if (oc->Replicas != nc->Replicas) - return false; - if (oc->AllowRollup != nc->AllowRollup) - return false; - if (oc->DenyDelete != nc->DenyDelete) - return false; - return true; + NATS_FREE(kv->pre); + kv->pre = NULL; + if (nats_asprintf(&(kv->pre), kvSubjectsPreTmpl, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else if (nats_asprintf(&(kv->putPre), kvSubjectsPreDomainTmpl, m->External->APIPrefix, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + } + else if (nats_asprintf(&(kv->putPre), kvSubjectsPreTmpl, bucket) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (s == NATS_OK) + kv->usePutPre = true; + + return NATS_UPDATE_ERR_STACK(s); } natsStatus @@ -245,6 +237,8 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) kvStore *kv = NULL; char *subject= NULL; jsStreamInfo *si = NULL; + const char *omn = NULL; + const char **osn = NULL; jsStreamConfig sc; if ((new_kv == NULL) || (js == NULL) || (cfg == NULL)) @@ -274,12 +268,11 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) int64_t maxBytes = (cfg->MaxBytes == 0 ? -1 : cfg->MaxBytes); int32_t maxMsgSize = (cfg->MaxValueSize == 0 ? -1 : cfg->MaxValueSize); jsErrCode jerr = 0; + const char **subjects = (const char*[1]){subject}; jsStreamConfig_Init(&sc); sc.Name = kv->stream; sc.Description = cfg->Description; - sc.Subjects = (const char*[1]){subject}; - sc.SubjectsLen = 1; sc.MaxMsgsPerSubject = history; sc.MaxBytes = maxBytes; sc.MaxAge = cfg->TTL; @@ -291,15 +284,106 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg) sc.AllowDirect = true; sc.RePublish = cfg->RePublish; + if (cfg->Mirror != NULL) + { + jsStreamSource *m = cfg->Mirror; + + if (!nats_IsStringEmpty(m->Name) + && (strstr(m->Name, kvBucketNamePre) != m->Name)) + { + char *newName = NULL; + if (nats_asprintf(&newName, kvBucketNameTmpl, m->Name) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + { + omn = m->Name; + m->Name = newName; + } + } + sc.Mirror = m; + sc.MirrorDirect = true; + } + else if (cfg->SourcesLen > 0) + { + osn = (const char**) NATS_CALLOC(cfg->SourcesLen, sizeof(char*)); + if (osn == NULL) + s = nats_setDefaultError(NATS_NO_MEMORY); + + if (s == NATS_OK) + { + int i; + + for (i=0; iSourcesLen; i++) + { + jsStreamSource *ss = cfg->Sources[i]; + + if (ss == NULL) + continue; + + // Set this regardless of error in the loop. We need it for + // proper cleanup at the end. + osn[i] = ss->Name; + + if ((s == NATS_OK) && !nats_IsStringEmpty(ss->Name) + && (strstr(ss->Name, kvBucketNamePre) != ss->Name)) + { + char *newName = NULL; + + if (nats_asprintf(&newName, kvBucketNameTmpl, ss->Name) < 0) + s = nats_setDefaultError(NATS_NO_MEMORY); + else + ss->Name = newName; + } + } + if (s == NATS_OK) + { + sc.Sources = cfg->Sources; + sc.SourcesLen = cfg->SourcesLen; + } + } + } + else + { + sc.Subjects = subjects; + sc.SubjectsLen = 1; + } + // If connecting to a v2.7.2+, create with discard new policy if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2)) sc.Discard = js_DiscardNew; s = js_AddStream(&si, js, &sc, NULL, &jerr); - // If the stream allow direct get message calls, then we will do so. if (s == NATS_OK) + { + // If the stream allow direct get message calls, then we will do so. kv->useDirect = si->Config->AllowDirect; + + s = _changePutPrefixIfMirrorPresent(kv, si); + } jsStreamInfo_Destroy(si); + + // Restore original mirror/source names + if (omn != NULL) + { + NATS_FREE((char*) cfg->Mirror->Name); + cfg->Mirror->Name = omn; + } + if (osn != NULL) + { + int i; + + for (i=0; iSourcesLen; i++) + { + jsStreamSource *ss = cfg->Sources[i]; + + if ((ss != NULL) && (ss->Name != osn[i])) + { + NATS_FREE((char*) ss->Name); + ss->Name = osn[i]; + } + } + NATS_FREE((char**) osn); + } } if (s == NATS_OK) *new_kv = kv; @@ -336,6 +420,8 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket) if (si->Config->MaxMsgsPerSubject < 1) s = nats_setError(NATS_INVALID_ARG, "%s", kvErrBadBucket); + IFOK(s, _changePutPrefixIfMirrorPresent(kv, si)); + jsStreamInfo_Destroy(si); } @@ -457,7 +543,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(KEY_NAME_ONLY); + BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); if (kv->useDirect) { @@ -577,7 +663,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(USE_JS_PREFIX); + BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT); IFOK(s, js_Publish(ppa, kv->js, natsBuf_Data(&buf), data, len, po, NULL)); if ((s == NATS_OK) && (rev != NULL)) @@ -680,7 +766,7 @@ _delete(kvStore *kv, const char *key, bool purge, kvPurgeOptions *opts) if (!validKey(key)) return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey); - BUILD_SUBJECT(USE_JS_PREFIX); + BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT); IFOK(s, natsMsg_Create(&msg, natsBuf_Data(&buf), NULL, NULL, 0)); if (s == NATS_OK) { @@ -793,6 +879,7 @@ kvStore_PurgeDeletes(kvStore *kv, kvPurgeOptions *opts) for (; h != NULL; ) { natsBuf_Reset(&buf); + // Use kv->pre here, always. IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); IFOK(s, natsBuf_Append(&buf, h->key, -1)); IFOK(s, natsBuf_AppendByte(&buf, '\0')); @@ -899,6 +986,7 @@ kvWatcher_Next(kvEntry **new_entry, kvWatcher *w, int64_t timeout) } w->refs--; + // Use kv->pre here, always. if ((s == NATS_OK) && (strlen(msg->subject) <= strlen(w->kv->pre))) s = nats_setError(NATS_ERR, "invalid update's subject '%s'", msg->subject); @@ -988,7 +1076,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti w->kv = kv; w->refs = 1; - BUILD_SUBJECT(KEY_NAME_ONLY); + BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); IFOK(s, natsMutex_Create(&(w->mu))); if (s == NATS_OK) { @@ -1005,6 +1093,9 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti if (opts->IgnoreDeletes) w->ignoreDel = true; } + // Need to explicitly bind to the stream here because the subject + // we construct may not help find the stream when using mirrors. + so.Stream = kv->stream; s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL); IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1)); if (s == NATS_OK) diff --git a/src/nats.h b/src/nats.h index 00112540e..bfb1c90cd 100644 --- a/src/nats.h +++ b/src/nats.h @@ -375,6 +375,10 @@ typedef struct jsStreamSource int64_t OptStartTime; ///< UTC time expressed as number of nanoseconds since epoch. const char *FilterSubject; jsExternalStream *External; + // Domain and External are mutually exclusive. + // If Domain is set, an External value will be created with + // the APIPrefix constructed based on the Domain value. + const char *Domain; } jsStreamSource; @@ -1189,6 +1193,9 @@ typedef struct kvConfig jsStorageType StorageType; int Replicas; jsRePublish *RePublish; + jsStreamSource *Mirror; + jsStreamSource **Sources; + int SourcesLen; } kvConfig; diff --git a/src/natsp.h b/src/natsp.h index 8385e4844..fb6dde262 100644 --- a/src/natsp.h +++ b/src/natsp.h @@ -433,6 +433,8 @@ struct __kvStore char *bucket; char *stream; char *pre; + char *putPre; + bool usePutPre; bool useJSPrefix; bool useDirect; diff --git a/test/list.txt b/test/list.txt index 38f5138ef..83690fdd8 100644 --- a/test/list.txt +++ b/test/list.txt @@ -253,6 +253,7 @@ KeyValueCrossAccount KeyValueDiscardOldToNew KeyValueRePublish KeyValueMirrorDirectGet +KeyValueMirrorCrossDomains StanPBufAllocator StanConnOptions StanSubOptions diff --git a/test/test.c b/test/test.c index c1834a502..36c117ce9 100644 --- a/test/test.c +++ b/test/test.c @@ -5509,7 +5509,8 @@ _stopServer(natsPid pid) CloseHandle(pid->hThread); natsMutex_Lock(slMu); - natsHash_Remove(slMap, (int64_t) pid); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t) pid); natsMutex_Unlock(slMu); free(pid); @@ -5614,7 +5615,8 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } natsMutex_Lock(slMu); - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t) pid, NULL, NULL); natsMutex_Unlock(slMu); return (natsPid) pid; @@ -5644,7 +5646,8 @@ _stopServer(natsPid pid) waitpid(pid, &status, 0); natsMutex_Lock(slMu); - natsHash_Remove(slMap, (int64_t) pid); + if (slMap != NULL) + natsHash_Remove(slMap, (int64_t) pid); natsMutex_Unlock(slMu); } @@ -5720,7 +5723,8 @@ _startServerImpl(const char *serverExe, const char *url, const char *cmdLineOpts } natsMutex_Lock(slMu); - natsHash_Set(slMap, (int64_t) pid, NULL, NULL); + if (slMap != NULL) + natsHash_Set(slMap, (int64_t) pid, NULL, NULL); natsMutex_Unlock(slMu); // parent, return the child's PID back. @@ -22683,6 +22687,8 @@ test_JetStreamMgtStreams(void) jsStreamInfoList *siList = NULL; jsStreamNamesList *snList = NULL; int count = 0; + jsStreamSource ss; + jsExternalStream se; jsOptions o; int i; @@ -23107,6 +23113,7 @@ test_JetStreamMgtStreams(void) natsMsg_GetData(resp), natsMsg_GetDataLength(resp)) == 0)); jsStreamInfo_Destroy(si); + si = NULL; natsMsg_Destroy(resp); resp = NULL; @@ -23150,6 +23157,7 @@ test_JetStreamMgtStreams(void) && (strcmp(si->Config->Subjects[0], "foo.>") == 0) && (strcmp(si->Config->Subjects[1], "bar.*") == 0)); jsStreamInfo_Destroy(si); + si = NULL; test("List stream infos (bad args): "); s = js_Streams(NULL, js, NULL, NULL); @@ -23286,6 +23294,35 @@ test_JetStreamMgtStreams(void) s = js_StreamNames(&snList, js, &o, &jerr); testCond((s == NATS_NOT_FOUND) && (snList == NULL)); + test("Mirror domain and external set error: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "MDESET"; + jsStreamSource_Init(&ss); + ss.Domain = "Domain"; + jsExternalStream_Init(&se); + se.DeliverPrefix = "some.prefix"; + ss.External = &se; + cfg.Mirror = &ss; + s = js_AddStream(&si, js, &cfg, NULL, NULL); + testCond((s == NATS_INVALID_ARG) && (si == NULL) + && (strstr(nats_GetLastError(NULL), "domain and external are both set") != NULL)); + nats_clearLastError(); + + test("Source domain and external set error: "); + jsStreamConfig_Init(&cfg); + cfg.Name = "SDESET"; + jsStreamSource_Init(&ss); + ss.Domain = "Domain"; + jsExternalStream_Init(&se); + se.DeliverPrefix = "some.prefix"; + ss.External = &se; + cfg.Sources = (jsStreamSource*[1]){&ss}; + cfg.SourcesLen = 1; + s = js_AddStream(&si, js, &cfg, NULL, NULL); + testCond((s == NATS_INVALID_ARG) && (si == NULL) + && (strstr(nats_GetLastError(NULL), "domain and external are both set") != NULL)); + nats_clearLastError(); + JS_TEARDOWN; } @@ -31222,6 +31259,356 @@ test_KeyValueMirrorDirectGet(void) JS_TEARDOWN; } +static natsStatus +_connectToHubAndCheckLeaf(natsConnection **hub, natsConnection *lnc) +{ + natsStatus s = NATS_OK; + natsConnection *nc = NULL; + natsSubscription *sub = NULL; + int i; + + s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL); + IFOK(s, natsConnection_SubscribeSync(&sub, nc, "check")); + IFOK(s, natsConnection_Flush(nc)); + if (s == NATS_OK) + { + for (i=0; i<10; i++) + { + s = natsConnection_PublishString(lnc, "check", "hello"); + if (s == NATS_OK) + { + natsMsg *msg = NULL; + s = natsSubscription_NextMsg(&msg, sub, 500); + natsMsg_Destroy(msg); + if (s == NATS_OK) + break; + } + } + } + natsSubscription_Destroy(sub); + if (s == NATS_OK) + *hub = nc; + else + natsConnection_Destroy(nc); + return s; +} + +static void +test_KeyValueMirrorCrossDomains(void) +{ + natsStatus s; + natsConnection *nc = NULL; + natsConnection *lnc= NULL; + jsCtx *js = NULL; + jsCtx *ljs= NULL; + jsCtx *rjs= NULL; + natsPid pid = NATS_INVALID_PID; + natsPid pid2= NATS_INVALID_PID; + jsOptions o; + jsErrCode jerr = 0; + char datastore[256] = {'\0'}; + char datastore2[256] = {'\0'}; + char cmdLine[1024] = {'\0'}; + char confFile[256] = {'\0'}; + char lconfFile[256] = {'\0'}; + kvStore *kv = NULL; + kvStore *lkv = NULL; + kvStore *mkv = NULL; + kvStore *rkv = NULL; + kvEntry *e = NULL; + jsStreamInfo *si = NULL; + kvWatcher *w = NULL; + int ok = 0; + kvPurgeOptions po; + kvConfig kvc; + jsStreamSource src; + int i; + + ENSURE_JS_VERSION(2, 9, 0); + + _makeUniqueDir(datastore, sizeof(datastore), "datastore_"); + _createConfFile(confFile, sizeof(confFile), + "server_name: HUB\n"\ + "listen: 127.0.0.1:4222\n"\ + "jetstream: { domain: HUB }\n"\ + "leafnodes { listen: 127.0.0.1:7422 }\n"); + + test("Start hub: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + _makeUniqueDir(datastore2, sizeof(datastore2), "datastore_"); + _createConfFile(lconfFile, sizeof(lconfFile), + "server_name: LEAF\n"\ + "listen: 127.0.0.1:4223\n"\ + "jetstream: { domain: LEAF }\n"\ + "leafnodes {\n"\ + " remotes = [ { url: leaf://127.0.0.1:7422 } ]\n"\ + "}\n"); + + test("Start leaf: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, lconfFile); + pid2 = _startServer("nats://127.0.0.1:4223", cmdLine, true); + CHECK_SERVER_STARTED(pid2); + testCond(true); + + test("Connect to leaf: "); + s = natsConnection_ConnectTo(&lnc, "nats://127.0.0.1:4223"); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&ljs, lnc, NULL); + testCond(s == NATS_OK); + + test("Connect to hub and check connectivity through leaf: "); + s = _connectToHubAndCheckLeaf(&nc, lnc); + testCond(s == NATS_OK); + + test("Get context: "); + s = natsConnection_JetStream(&js, nc, NULL); + testCond(s == NATS_OK); + + test("Create KV value: "); + kvConfig_Init(&kvc); + kvc.Bucket = "TEST"; + s = js_CreateKeyValue(&kv, js, &kvc); + testCond(s == NATS_OK); + + test("Put keys: "); + s = kvStore_PutString(NULL, kv, "name", "derek"); + IFOK(s, kvStore_PutString(NULL, kv, "age", "22")); + testCond(s == NATS_OK); + + test("Create KV: "); + kvConfig_Init(&kvc); + kvc.Bucket = "MIRROR"; + jsStreamSource_Init(&src); + src.Name = "TEST"; + src.Domain = "HUB"; + kvc.Mirror = &src; + s = js_CreateKeyValue(&lkv, ljs, &kvc); + testCond(s == NATS_OK); + + test("Check config not changed: "); + testCond((strcmp(kvc.Bucket, "MIRROR") == 0) + && (kvc.Mirror != NULL) + && (strcmp(kvc.Mirror->Name, "TEST") == 0) + && (strcmp(kvc.Mirror->Domain, "HUB") == 0) + && (kvc.Mirror->External == NULL)); + + test("Get stream info: "); + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, &jerr); + testCond((s == NATS_OK) && (si != NULL) && (jerr == 0)); + + test("Check mirror direct: "); + testCond(si->Config->MirrorDirect); + jsStreamInfo_Destroy(si); + si = NULL; + + test("Check mirror syncs: "); + for (i=0; i<10; i++) + { + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, NULL); + if (s != NATS_OK) + break; + + if (si->State.Msgs != 2) + s = NATS_ERR; + + jsStreamInfo_Destroy(si); + si = NULL; + if (s == NATS_OK) + break; + nats_Sleep(250); + } + testCond(s == NATS_OK); + + // Bind locally from leafnode and make sure both get and put work. + test("Leaf KV: "); + s = js_KeyValue(&mkv, ljs, "MIRROR"); + testCond(s == NATS_OK); + + test("Put key: "); + s = kvStore_PutString(NULL, mkv, "name", "rip"); + testCond(s == NATS_OK); + + test("Get key: "); + s = kvStore_Get(&e, mkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "rip") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + test("Get context for HUB: "); + jsOptions_Init(&o); + o.Domain = "HUB"; + s = natsConnection_JetStream(&rjs, lnc, &o); + testCond(s == NATS_OK); + + test("Get KV: "); + s = js_KeyValue(&rkv, rjs, "TEST"); + testCond(s == NATS_OK); + + test("Put key: "); + s = kvStore_PutString(NULL, rkv, "name", "ivan"); + testCond(s == NATS_OK); + + test("Get key: "); + s = kvStore_Get(&e, rkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "ivan") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + test("Shutdown hub: "); + jsCtx_Destroy(js); + kvStore_Destroy(kv); + natsConnection_Destroy(nc); + nc = NULL; + _stopServer(pid); + pid = NATS_INVALID_PID; + testCond(true); + nats_Sleep(500); + + test("Get key: "); + // Use mkv here, not rkv. + s = kvStore_Get(&e, mkv, "name"); + if ((s == NATS_OK) && (e != NULL)) + s = (strcmp(kvEntry_ValueString(e), "ivan") == 0 ? NATS_OK : NATS_ERR); + testCond(s == NATS_OK); + kvEntry_Destroy(e); + e = NULL; + + test("Create watcher (name): "); + s = kvStore_Watch(&w, mkv, "name", NULL); + testCond(s == NATS_OK); + + test("Check watcher: "); + s = kvWatcher_Next(&e, w, 1000); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "name") != 0) || (strcmp(kvEntry_ValueString(e), "ivan") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((kvEntry_Key(e) != NULL) || (kvEntry_ValueString(e) != NULL)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + testCond(s == NATS_OK); + + test("No more: "); + s = kvWatcher_Next(&e, w, 250); + testCond((s == NATS_TIMEOUT) && (e == NULL)); + nats_clearLastError(); + + kvWatcher_Destroy(w); + w = NULL; + + test("Create watcher (all): ") + s = kvStore_WatchAll(&w, mkv, NULL); + testCond((s == NATS_OK) && (w != NULL)); + + test("Check watcher: "); + s = kvWatcher_Next(&e, w, 1000); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "age") != 0) || (strcmp(kvEntry_ValueString(e), "22") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((strcmp(kvEntry_Key(e), "name") != 0) || (strcmp(kvEntry_ValueString(e), "ivan") != 0)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + IFOK(s, kvWatcher_Next(&e, w, 1000)); + if (s == NATS_OK) + { + if ((kvEntry_Key(e) != NULL) || (kvEntry_ValueString(e) != NULL)) + s = NATS_ERR; + kvEntry_Destroy(e); + e = NULL; + } + testCond(s == NATS_OK); + + test("No more: "); + s = kvWatcher_Next(&e, w, 250); + testCond((s == NATS_TIMEOUT) && (e == NULL)); + nats_clearLastError(); + + test("Restart hub: "); + snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile); + pid = _startServer("nats://127.0.0.1:4222", cmdLine, true); + CHECK_SERVER_STARTED(pid); + testCond(true); + + test("Connect to hub and check connectivity through leaf: "); + s = _connectToHubAndCheckLeaf(&nc, lnc); + testCond(s == NATS_OK); + + test("Delete keys: "); + s = kvStore_Delete(mkv, "age"); + IFOK(s, kvStore_Delete(mkv, "name")); + testCond(s == NATS_OK); + + test("Check mirror syncs: "); + for (i=0; (ok != 2) && (i < 10); i++) + { + if (kvWatcher_Next(&e, w, 1000) == NATS_OK) + { + if (((strcmp(kvEntry_Key(e), "age") == 0) || (strcmp(kvEntry_Key(e), "name") == 0)) + && (kvEntry_Operation(e) == kvOp_Delete)) + { + ok++; + } + kvEntry_Destroy(e); + e = NULL; + } + } + testCond((s == NATS_OK) && (ok == 2)); + + test("Purge deletes: "); + kvPurgeOptions_Init(&po); + po.DeleteMarkersOlderThan = -1; + s = kvStore_PurgeDeletes(mkv, &po); + testCond(s == NATS_OK); + + nats_clearLastError(); + test("Check stream: "); + s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, NULL); + testCond((s == NATS_OK) && (si != NULL) && (si->State.Msgs == 0)); + jsStreamInfo_Destroy(si); + + kvWatcher_Destroy(w); + natsConnection_Destroy(nc); + kvStore_Destroy(rkv); + kvStore_Destroy(mkv); + kvStore_Destroy(lkv); + jsCtx_Destroy(rjs); + jsCtx_Destroy(ljs); + natsConnection_Destroy(lnc); + _stopServer(pid2); + rmtree(datastore2); + _stopServer(pid); + rmtree(datastore); + remove(confFile); + remove(lconfFile); +} + #if defined(NATS_HAS_STREAMING) static int @@ -33690,6 +34077,7 @@ static testInfo allTests[] = {"KeyValueDiscardOldToNew", test_KeyValueDiscardOldToNew}, {"KeyValueRePublish", test_KeyValueRePublish}, {"KeyValueMirrorDirectGet", test_KeyValueMirrorDirectGet}, + {"KeyValueMirrorCrossDomains", test_KeyValueMirrorCrossDomains}, #if defined(NATS_HAS_STREAMING) {"StanPBufAllocator", test_StanPBufAllocator}, @@ -33855,16 +34243,34 @@ int main(int argc, char **argv) // Shutdown servers that are still running likely due to failed test { + natsHash *pids = NULL; natsHashIter iter; int64_t key; - natsMutex_Lock(slMu); - natsHashIter_Init(&iter, slMap); + if (natsHash_Create(&pids, 16) == NATS_OK) + { + natsMutex_Lock(slMu); + natsHashIter_Init(&iter, slMap); + while (natsHashIter_Next(&iter, &key, NULL)) + { + natsHash_Set(pids, key, NULL, NULL); + natsHashIter_RemoveCurrent(&iter); + } + natsHashIter_Done(&iter); + natsHash_Destroy(slMap); + slMap = NULL; + natsMutex_Unlock(slMu); - while (natsHashIter_Next(&iter, &key, NULL)) - _stopServer((natsPid) key); + natsHashIter_Init(&iter, pids); + while (natsHashIter_Next(&iter, &key, NULL)) + _stopServer((natsPid) key); - natsHash_Destroy(slMap); + natsHash_Destroy(pids); + } + else + { + natsHash_Destroy(slMap); + } natsMutex_Destroy(slMu); }