Skip to content

Commit

Permalink
Add test for watcher/delete/purge deletes
Browse files Browse the repository at this point in the history
Needed to have delete use the put prefix and bind to the stream
when creating the watcher's subscription because in case of mirror
the subject would not allow to find the stream.

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Oct 28, 2022
1 parent 2c87273 commit c736d16
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 19 deletions.
37 changes: 18 additions & 19 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ natsBuffer buf;
#define USE_JS_PREFIX true
#define KEY_NAME_ONLY false

#define BUILD_SUBJECT(p) \
#define FOR_A_PUT true
#define NOT_FOR_A_PUT false

#define BUILD_SUBJECT(p, fp) \
s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); \
if ((p) && kv->useJSPrefix) \
{ \
IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); \
IFOK(s, natsBuf_AppendByte(&buf, '.')); \
} \
IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); \
IFOK(s, natsBuf_Append(&buf, ((fp) ? (kv->usePutPre ? kv->putPre : kv->pre) : kv->pre), -1)); \
IFOK(s, natsBuf_Append(&buf, key, -1)); \
IFOK(s, natsBuf_AppendByte(&buf, 0));

Expand Down Expand Up @@ -192,7 +195,7 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket)
}

static natsStatus
_changeBucketNameAndPutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si)
_changePutPrefixIfMirrorPresent(kvStore *kv, jsStreamInfo *si)
{
natsStatus s = NATS_OK;
const char *bucket = NULL;
Expand Down Expand Up @@ -355,7 +358,7 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
// If the stream allow direct get message calls, then we will do so.
kv->useDirect = si->Config->AllowDirect;

s = _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si);
s = _changePutPrefixIfMirrorPresent(kv, si);
}
jsStreamInfo_Destroy(si);

Expand Down Expand Up @@ -417,7 +420,7 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket)
if (si->Config->MaxMsgsPerSubject < 1)
s = nats_setError(NATS_INVALID_ARG, "%s", kvErrBadBucket);

IFOK(s, _changeBucketNameAndPutPrefixIfMirrorPresent(kv, si));
IFOK(s, _changePutPrefixIfMirrorPresent(kv, si));

jsStreamInfo_Destroy(si);
}
Expand Down Expand Up @@ -540,7 +543,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT(KEY_NAME_ONLY);
BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT);

if (kv->useDirect)
{
Expand Down Expand Up @@ -646,8 +649,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v
natsStatus s = NATS_OK;
jsPubAck *pa = NULL;
jsPubAck **ppa = NULL;
char buffer[128];
natsBuffer buf;
DEFINE_BUF_FOR_SUBJECT;

if (rev != NULL)
{
Expand All @@ -661,15 +663,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer));
if (kv->useJSPrefix)
{
IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1));
IFOK(s, natsBuf_AppendByte(&buf, '.'));
}
IFOK(s, natsBuf_Append(&buf, (kv->usePutPre ? kv->putPre : kv->pre), -1));
IFOK(s, natsBuf_Append(&buf, key, -1));
IFOK(s, natsBuf_AppendByte(&buf, 0));
BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT);
IFOK(s, js_Publish(ppa, kv->js, natsBuf_Data(&buf), data, len, po, NULL));

if ((s == NATS_OK) && (rev != NULL))
Expand Down Expand Up @@ -772,7 +766,7 @@ _delete(kvStore *kv, const char *key, bool purge, kvPurgeOptions *opts)
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT(USE_JS_PREFIX);
BUILD_SUBJECT(USE_JS_PREFIX, FOR_A_PUT);
IFOK(s, natsMsg_Create(&msg, natsBuf_Data(&buf), NULL, NULL, 0));
if (s == NATS_OK)
{
Expand Down Expand Up @@ -885,6 +879,7 @@ kvStore_PurgeDeletes(kvStore *kv, kvPurgeOptions *opts)
for (; h != NULL; )
{
natsBuf_Reset(&buf);
// Use kv->pre here, always.
IFOK(s, natsBuf_Append(&buf, kv->pre, -1));
IFOK(s, natsBuf_Append(&buf, h->key, -1));
IFOK(s, natsBuf_AppendByte(&buf, '\0'));
Expand Down Expand Up @@ -991,6 +986,7 @@ kvWatcher_Next(kvEntry **new_entry, kvWatcher *w, int64_t timeout)
}
w->refs--;

// Use kv->pre here, always.
if ((s == NATS_OK) && (strlen(msg->subject) <= strlen(w->kv->pre)))
s = nats_setError(NATS_ERR, "invalid update's subject '%s'", msg->subject);

Expand Down Expand Up @@ -1080,7 +1076,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
w->kv = kv;
w->refs = 1;

BUILD_SUBJECT(KEY_NAME_ONLY);
BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT);
IFOK(s, natsMutex_Create(&(w->mu)));
if (s == NATS_OK)
{
Expand All @@ -1097,6 +1093,9 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
if (opts->IgnoreDeletes)
w->ignoreDel = true;
}
// Need to explicitly bind to the stream here because the subject
// we construct may not help find the stream when using mirrors.
so.Stream = kv->stream;
s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL);
IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1));
if (s == NATS_OK)
Expand Down
115 changes: 115 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -31284,6 +31284,10 @@ test_KeyValueMirrorCrossDomains(void)
kvEntry *e = NULL;
jsStreamInfo *si = NULL;
natsSubscription *sub = NULL;
kvWatcher *w = NULL;
int ok = 0;
kvPurgeOptions po;
kvWatchOptions wo;
kvConfig kvc;
jsStreamSource src;
int i;
Expand Down Expand Up @@ -31451,7 +31455,10 @@ test_KeyValueMirrorCrossDomains(void)
test("Shutdown hub: ");
jsCtx_Destroy(js);
kvStore_Destroy(kv);
natsSubscription_Destroy(sub);
sub = NULL;
natsConnection_Destroy(nc);
nc = NULL;
_stopServer(pid);
pid = NATS_INVALID_PID;
testCond(true);
Expand All @@ -31466,7 +31473,115 @@ test_KeyValueMirrorCrossDomains(void)
kvEntry_Destroy(e);
e = NULL;

test("Create watcher: ")
kvWatchOptions_Init(&wo);
s = kvStore_WatchAll(&w, mkv, &wo);
testCond((s == NATS_OK) && (w != NULL));

test("Check watcher: ");
for (i=0; (s == NATS_OK) && (i<3); i++)
{
s = kvWatcher_Next(&e, w, 1000);
if (s == NATS_OK)
{
if (i==0)
{
if ((strcmp(kvEntry_Key(e), "age") != 0)
|| (strcmp(kvEntry_ValueString(e), "22") != 0))
{
s = NATS_ERR;
}
}
else if (i == 1)
{
if ((strcmp(kvEntry_Key(e), "name") != 0)
|| (strcmp(kvEntry_ValueString(e), "ivan") != 0))
{
s = NATS_ERR;
}
}
else if (i == 2)
{
if ((kvEntry_Key(e) != NULL) || (kvEntry_ValueString(e) != NULL))
s = NATS_ERR;
}
kvEntry_Destroy(e);
e = NULL;
}
}
testCond(s == NATS_OK);

test("No more: ");
s = kvWatcher_Next(&e, w, 250);
testCond((s == NATS_TIMEOUT) && (e == NULL));
nats_clearLastError();

test("Restart hub: ");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile);
pid = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect to hub: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Sub to check LF connectivity: ");
s = natsConnection_SubscribeSync(&sub, nc, "check");
IFOK(s, natsConnection_Flush(nc));
testCond(s == NATS_OK);

test("Check connectivity: ");
for (i=0; i<10; i++)
{
s = natsConnection_PublishString(lnc, "check", "hello");
if (s == NATS_OK)
{
natsMsg *msg = NULL;
s = natsSubscription_NextMsg(&msg, sub, 500);
natsMsg_Destroy(msg);
if (s == NATS_OK)
break;
}
}
testCond(s == NATS_OK);

test("Delete keys: ");
s = kvStore_Delete(mkv, "age");
IFOK(s, kvStore_Delete(mkv, "name"));
testCond(s == NATS_OK);

test("Check mirror syncs: ");
for (i=0; (ok != 2) && (i < 10); i++)
{
if (kvWatcher_Next(&e, w, 1000) == NATS_OK)
{
if (((strcmp(kvEntry_Key(e), "age") == 0) || (strcmp(kvEntry_Key(e), "name") == 0))
&& (kvEntry_Operation(e) == kvOp_Delete))
{
ok++;
}
kvEntry_Destroy(e);
e = NULL;
}
}
testCond((s == NATS_OK) && (ok == 2));

test("Purge deletes: ");
kvPurgeOptions_Init(&po);
po.DeleteMarkersOlderThan = -1;
s = kvStore_PurgeDeletes(mkv, &po);
testCond(s == NATS_OK);

nats_clearLastError();
test("Check stream: ");
s = js_GetStreamInfo(&si, ljs, "KV_MIRROR", NULL, NULL);
testCond((s == NATS_OK) && (si != NULL) && (si->State.Msgs == 0));
jsStreamInfo_Destroy(si);

kvWatcher_Destroy(w);
natsSubscription_Destroy(sub);
natsConnection_Destroy(nc);
kvStore_Destroy(rkv);
kvStore_Destroy(mkv);
kvStore_Destroy(lkv);
Expand Down

0 comments on commit c736d16

Please sign in to comment.