Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support for (multiple) ConsumerConfig.FilterSubjects #679

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cmake-build*/
install/
html/
!doc/html/
test/datastore_*/

# Emacs
*~
Expand Down
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

- name: "NATS server - dev"
- name: "NATS server - main"
compiler: gcc
addons:
apt:
Expand All @@ -49,7 +49,7 @@ jobs:
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=dev
- NATS_TEST_SERVER_VERSION=main
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

Expand Down
28 changes: 22 additions & 6 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2201,18 +2201,34 @@ _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
bool matches = false;
int i;

*dlvSubject = NULL;

// Make sure this new subject matches or is a subset.
if (!nats_IsStringEmpty(subj)
&& !nats_IsStringEmpty(ccfg->FilterSubject)
&& (strcmp(subj, ccfg->FilterSubject) != 0))
if (!nats_IsStringEmpty(subj))
{
return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'",
subj, ccfg->FilterSubject);
if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0))
{
matches = true;
}
else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject))
{
matches = true;
}
else if (ccfg->FilterSubjectsLen > 0)
{
for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++)
{
matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]);
}
}
if (!matches)
{
return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj);
}
}

// Check that if user wants to create a queue sub,
// the consumer has no HB nor FC.
queue = (nats_IsStringEmpty(queue) ? NULL : queue);
Expand Down
38 changes: 37 additions & 1 deletion src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,22 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
IFOK(s, natsBuf_Append(buf, cfg->FilterSubject, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (cfg->FilterSubjectsLen > 0))
{
int i;

s = natsBuf_Append(buf, ",\"filter_subjects\":[", -1);
for (i = 0; (s == NATS_OK) && (i < cfg->FilterSubjectsLen); i++)
{
if (i > 0)
s = natsBuf_AppendByte(buf, ',');
IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, cfg->FilterSubjects[i], -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}

IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -2815,6 +2831,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
void
js_destroyConsumerConfig(jsConsumerConfig *cc)
{
int i;

if (cc == NULL)
return;

Expand All @@ -2824,7 +2842,10 @@ js_destroyConsumerConfig(jsConsumerConfig *cc)
NATS_FREE((char*) cc->DeliverSubject);
NATS_FREE((char*) cc->DeliverGroup);
NATS_FREE((char*) cc->FilterSubject);
NATS_FREE((char*) cc->SampleFrequency);
for (i = 0; i < cc->FilterSubjectsLen; i++)
NATS_FREE((char *)cc->FilterSubjects[i]);
NATS_FREE((char *)cc->FilterSubjects);
NATS_FREE((char *)cc->SampleFrequency);
NATS_FREE(cc->BackOff);
NATS_FREE(cc);
}
Expand Down Expand Up @@ -2931,6 +2952,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetLong(cjson, "ack_wait", &(cc->AckWait)));
IFOK(s, nats_JSONGetLong(cjson, "max_deliver", &(cc->MaxDeliver)));
IFOK(s, nats_JSONGetStr(cjson, "filter_subject", (char**) &(cc->FilterSubject)));
IFOK(s, nats_JSONGetArrayStr(cjson, "filter_subjects", (char ***)&(cc->FilterSubjects), &(cc->FilterSubjectsLen)));
IFOK(s, _unmarshalReplayPolicy(cjson, "replay_policy", &(cc->ReplayPolicy)));
IFOK(s, nats_JSONGetULong(cjson, "rate_limit_bps", &(cc->RateLimit)));
IFOK(s, nats_JSONGetStr(cjson, "sample_freq", (char**) &(cc->SampleFrequency)));
Expand Down Expand Up @@ -3633,6 +3655,8 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
c->Description = NULL;
c->BackOff = NULL;
c->FilterSubject = NULL;
c->FilterSubjects = NULL;
c->FilterSubjectsLen = 0;
c->SampleFrequency = NULL;
c->DeliverSubject = NULL;
c->DeliverGroup = NULL;
Expand All @@ -3652,6 +3676,18 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
else
memcpy(c->BackOff, org->BackOff, org->BackOffLen*sizeof(int64_t));
}
if ((s == NATS_OK) && (org->FilterSubjects != NULL) && (org->FilterSubjectsLen > 0))
{
c->FilterSubjects = (const char **)NATS_CALLOC(org->FilterSubjectsLen, sizeof(const char *));
if (c->FilterSubjects == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (int i = 0; (s == NATS_OK) && (i < org->FilterSubjectsLen); i++)
{
IF_OK_DUP_STRING(s, c->FilterSubjects[i], org->FilterSubjects[i]);
}
c->FilterSubjectsLen = org->FilterSubjectsLen;
}
if (s == NATS_OK)
*clone = c;
else
Expand Down
2 changes: 2 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ typedef struct jsConsumerConfig
int64_t *BackOff; ///< Redelivery durations expressed in nanoseconds
int BackOffLen;
const char *FilterSubject;
const char **FilterSubjects; // Multiple filter subjects introduced in 2.10
int FilterSubjectsLen;
jsReplayPolicy ReplayPolicy;
uint64_t RateLimit;
const char *SampleFrequency;
Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
#define MAX_FRAMES (50)

#define nats_IsStringEmpty(s) ((((s) == NULL) || ((s)[0] == '\0')) ? true : false)
#define nats_HasPrefix(_s, _prefix) (nats_IsStringEmpty(_s) ? nats_IsStringEmpty(_prefix) : (strncmp((_s), (_prefix), strlen(_prefix)) == 0))

#define DUP_STRING(s, s1, s2) \
{ \
Expand Down
105 changes: 99 additions & 6 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -16318,6 +16318,11 @@ test_ServerPoolUpdatedOnClusterUpdate(void)
const char *urls[] = {"127.0.0.1:4222", "127.0.0.1:4223", "127.0.0.1:4224"};
test("Check pool: ");
s = _checkPool(conn, (char**)urls, (int)(sizeof(urls)/sizeof(char*)));
if (s != NATS_OK)
{
nats_Sleep(100);
s = _checkPool(conn, (char **)urls, (int)(sizeof(urls) / sizeof(char *)));
}
testCond(s == NATS_OK);
}

Expand Down Expand Up @@ -23879,6 +23884,7 @@ test_JetStreamMgtConsumers(void)
int count = 0;
natsMsg *msg = NULL;
jsConsumerConfig *cloneCfg = NULL;
const char *multiFilterSubjects[] = {"bar1", "bar2"};

JS_SETUP(2, 9, 0);

Expand Down Expand Up @@ -24061,6 +24067,37 @@ test_JetStreamMgtConsumers(void)
natsMsg_Destroy(resp);
resp = NULL;

if (serverVersionAtLeast(2, 10, 0))
{
test("Add consumer (non durable, filter subjects): ");
cfg.FilterSubject = NULL;
cfg.FilterSubjects = multiFilterSubjects;
cfg.FilterSubjectsLen = 2;
s = js_AddConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
testCond((s = NATS_ERR) && (jerr == JSStreamNotFoundErr) && (ci == NULL));
nats_clearLastError();

test("Verify config: ");
s = natsSubscription_NextMsg(&resp, sub, 1000);
testCond((s == NATS_OK) && (resp != NULL) && (strncmp(natsMsg_GetData(resp), "{\"stream_name\":\"MY_STREAM\","
"\"config\":{\"deliver_policy\":\"last\","
"\"description\":\"MyDescription\","
"\"deliver_subject\":\"foo\","
"\"opt_start_seq\":100,"
"\"opt_start_time\":\"2021-06-23T18:22:00.12345Z\",\"ack_policy\":\"explicit\","
"\"ack_wait\":200,\"max_deliver\":300,\"filter_subjects\":[\"bar1\",\"bar2\"],"
"\"replay_policy\":\"instant\",\"rate_limit_bps\":400,"
"\"sample_freq\":\"60%%\",\"max_waiting\":500,\"max_ack_pending\":600,"
"\"flow_control\":true,\"idle_heartbeat\":700,"
"\"num_replicas\":1,\"mem_storage\":true}}",
natsMsg_GetDataLength(resp)) == 0));
natsMsg_Destroy(resp);
resp = NULL;
cfg.FilterSubjects = NULL;
cfg.FilterSubjectsLen = 0;
cfg.FilterSubject = "bar";
}

test("Create check sub: ");
natsSubscription_Destroy(sub);
sub = NULL;
Expand Down Expand Up @@ -24355,6 +24392,32 @@ test_JetStreamMgtConsumers(void)
jsConsumerInfo_Destroy(ci);
ci = NULL;

if (serverVersionAtLeast(2, 10, 0))
{
test("Update (filter subjects) works ok: ");
cfg.FilterSubject = NULL;
cfg.FilterSubjects = multiFilterSubjects;
cfg.FilterSubjectsLen = 2;
s = js_UpdateConsumer(&ci, js, "MY_STREAM", &cfg, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0) && (ci != NULL) && (ci->Config != NULL)
&& (strcmp(ci->Config->Description, "my description") == 0)
&& (ci->Config->AckWait == NATS_SECONDS_TO_NANOS(2))
&& (ci->Config->MaxDeliver == 1)
&& (strcmp(ci->Config->SampleFrequency, "30") == 0)
&& (ci->Config->MaxAckPending == 10)
&& (ci->Config->HeadersOnly)
&& (ci->Config->FilterSubject == NULL)
&& (ci->Config->FilterSubjectsLen == 2)
&& (ci->Config->FilterSubjects != NULL)
&& (strcmp(ci->Config->FilterSubjects[0], "bar1") == 0)
&& (strcmp(ci->Config->FilterSubjects[1], "bar2") == 0));
jsConsumerInfo_Destroy(ci);
ci = NULL;
cfg.FilterSubject = "bar.bat";
cfg.FilterSubjects = NULL;
cfg.FilterSubjectsLen = 0;
}

test("Add pull consumer: ");
jsConsumerConfig_Init(&cfg);
cfg.Durable = "update_pull_consumer";
Expand Down Expand Up @@ -24666,9 +24729,11 @@ test_JetStreamMgtConsumers(void)
cfg.Durable = "B";
cfg.Description = "C";
cfg.FilterSubject = "D";
cfg.SampleFrequency = "E";
cfg.DeliverSubject = "F";
cfg.DeliverGroup = "G";
cfg.FilterSubjects = (const char*[]){"E", "F"};
cfg.FilterSubjectsLen = 2;
cfg.SampleFrequency = "G";
cfg.DeliverSubject = "H";
cfg.DeliverGroup = "I";
cfg.BackOff = (int64_t[]){NATS_MILLIS_TO_NANOS(50), NATS_MILLIS_TO_NANOS(250)};
cfg.BackOffLen = 2;
s = js_cloneConsumerConfig(&cfg, &cloneCfg);
Expand All @@ -24677,9 +24742,14 @@ test_JetStreamMgtConsumers(void)
&& (cloneCfg->Durable != NULL) && (strcmp(cloneCfg->Durable, "B") == 0)
&& (cloneCfg->Description != NULL) && (strcmp(cloneCfg->Description, "C") == 0)
&& (cloneCfg->FilterSubject != NULL) && (strcmp(cloneCfg->FilterSubject, "D") == 0)
&& (cloneCfg->SampleFrequency != NULL) && (strcmp(cloneCfg->SampleFrequency, "E") == 0)
&& (cloneCfg->DeliverSubject != NULL) && (strcmp(cloneCfg->DeliverSubject, "F") == 0)
&& (cloneCfg->DeliverGroup != NULL) && (strcmp(cloneCfg->DeliverGroup, "G") == 0)
&& (cloneCfg->FilterSubject != NULL) && (strcmp(cloneCfg->FilterSubject, "D") == 0)
&& (cloneCfg->FilterSubjectsLen == 2)
&& (cloneCfg->FilterSubjects != NULL)
&& (strcmp(cloneCfg->FilterSubjects[0], "E") == 0)
&& (strcmp(cloneCfg->FilterSubjects[1], "F") == 0)
&& (cloneCfg->SampleFrequency != NULL) && (strcmp(cloneCfg->SampleFrequency, "G") == 0)
&& (cloneCfg->DeliverSubject != NULL) && (strcmp(cloneCfg->DeliverSubject, "H") == 0)
&& (cloneCfg->DeliverGroup != NULL) && (strcmp(cloneCfg->DeliverGroup, "I") == 0)
&& (cloneCfg->BackOffLen == 2)
&& (cloneCfg->BackOff != NULL)
&& (cloneCfg->BackOff[0] == NATS_MILLIS_TO_NANOS(50))
Expand Down Expand Up @@ -26205,6 +26275,29 @@ test_JetStreamSubscribe(void)
&& (strstr(nats_GetLastError(NULL), "filter subject") != NULL));
nats_clearLastError();

if (serverVersionAtLeast(2, 10, 0))
{
test("Create consumer with multiple filters: ");
jsConsumerConfig_Init(&cc);
cc.Durable = "dur-multi-filter";
cc.DeliverSubject = "push.dur.sub.2";
cc.FilterSubjectsLen = 2;
cc.FilterSubjects = (const char *[2]){"sub.1", "sub.2"};
s = js_AddConsumer(NULL, js, "MULTIPLE_SUBJS", &cc, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Subscribe subj != filters: ");
so.Consumer = "dur-multi-filter";
s = js_Subscribe(&sub, js, "foo", _jsMsgHandler, &args, NULL, &so, &jerr);
testCond((s == NATS_ERR) && (sub == NULL)
&& (strstr(nats_GetLastError(NULL), "filter subject") != NULL));
nats_clearLastError();
cc.FilterSubject = "sub.2";
cc.FilterSubjects = NULL;
cc.FilterSubjectsLen = 0;
so.Consumer = "dur";
}

test("Subject not required when binding to stream/consumer: ");
s = js_Subscribe(&sub, js, NULL, _jsMsgHandler, &args, NULL, &so, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));
Expand Down