Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support for (multiple) ConsumerConfig.FilterSubjects #679

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ cmake-build*/
install/
html/
!doc/html/
test/datastore_*/

# Emacs
*~
Expand Down
36 changes: 32 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: cpp
dist: bionic
dist: focal
os: linux

cache:
Expand All @@ -25,6 +25,34 @@ env:
jobs:
include:

- name: "NATS server - latest release"
compiler: gcc
addons:
apt:
sources:
- ubuntu-toolchain-r-test
- sourceline: ppa:ubuntu-toolchain-r/test
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=latest
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

- name: "NATS server - dev"
compiler: gcc
addons:
apt:
sources:
- ubuntu-toolchain-r-test
- sourceline: ppa:ubuntu-toolchain-r/test
packages:
- g++-9
env:
- NATS_TEST_SERVER_VERSION=dev
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release" DO_COVERAGE="no"

- name: "gcc-9 - TLS OFF"
compiler: gcc
addons:
Expand Down Expand Up @@ -62,7 +90,7 @@ jobs:
- g++-9
env:
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

- name: "gcc-9 - Lib msg delivery - sanitize address"
compiler: gcc
Expand All @@ -75,7 +103,7 @@ jobs:
- g++-9
env:
- MATRIX_EVAL="CC=gcc-9"
- NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"
- NATS_DEFAULT_TO_LIB_MSG_DELIVERY=yes BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_FLAGS=-fsanitize=address" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

- name: "gcc-9 - Write deadline - sanitize address"
compiler: gcc
Expand All @@ -101,7 +129,7 @@ jobs:
- g++-9
env:
- MATRIX_EVAL="CC=gcc-9"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" DO_COVERAGE="no"
- BUILD_OPT="-DNATS_BUILD_ARCH=64 -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS=-fsanitize=thread" NATS_TEST_VALGRIND=yes DO_COVERAGE="no"

- name: "clang-8 - TLS OFF"
compiler: clang
Expand Down
21 changes: 20 additions & 1 deletion buildOnTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ echo "coverage = " $2
echo "build opts = " $3
echo "test opts = " $4

if [ "$NATS_TEST_SERVER_VERSION" != "" ]; then
rel=$NATS_TEST_SERVER_VERSION
mkdir -p $HOME/nats-server-$rel
if [ "$rel" = "latest" ]; then
rel=$(curl -s https://api.github.com/repos/nats-io/nats-server/releases/latest | jq -r '.tag_name')
fi

if [ "$rel" != "${rel#v}" ] && wget https://github.com/nats-io/nats-server/releases/download/$rel/nats-server-$rel-linux-amd64.tar.gz; then
tar -xzf nats-server-$rel-linux-amd64.tar.gz
mv nats-server-$rel-linux-amd64 $HOME/nats-server-$rel
else
curl -sf "https://binaries.nats.dev/nats-io/nats-server/v2@$rel" | PREFIX=. sh
mv nats-server $HOME/nats-server-$rel
fi
PATH=$HOME/nats-server-$rel:$PATH
fi

if [ "$1" != "gcc" ]; then
if [ "$2" = "coverage" ]; then
# only coverage for gcc compiler
Expand All @@ -35,8 +52,10 @@ res=$?
if [ $res -ne 0 ]; then
exit $res
fi
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"

export NATS_TEST_TRAVIS=yes
export NATS_TEST_SERVER_VERSION="$(nats-server -v)"
echo "Using NATS server version: $NATS_TEST_SERVER_VERSION"
ctest --timeout 60 --output-on-failure $4
res=$?
if [ $res -ne 0 ]; then
Expand Down
28 changes: 22 additions & 6 deletions src/js.c
Original file line number Diff line number Diff line change
Expand Up @@ -2201,18 +2201,34 @@ _processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
bool matches = false;
int i;

*dlvSubject = NULL;

// Make sure this new subject matches or is a subset.
if (!nats_IsStringEmpty(subj)
&& !nats_IsStringEmpty(ccfg->FilterSubject)
&& (strcmp(subj, ccfg->FilterSubject) != 0))
if (!nats_IsStringEmpty(subj))
{
return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'",
subj, ccfg->FilterSubject);
if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0))
{
matches = true;
}
else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject))
{
matches = true;
}
else if (ccfg->FilterSubjectsLen > 0)
{
for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++)
{
matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]);
}
}
if (!matches)
{
return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj);
}
}

// Check that if user wants to create a queue sub,
// the consumer has no HB nor FC.
queue = (nats_IsStringEmpty(queue) ? NULL : queue);
Expand Down
38 changes: 37 additions & 1 deletion src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2756,6 +2756,22 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
IFOK(s, natsBuf_Append(buf, cfg->FilterSubject, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}
if ((s == NATS_OK) && (cfg->FilterSubjectsLen > 0))
{
int i;

s = natsBuf_Append(buf, ",\"filter_subjects\":[", -1);
for (i = 0; (s == NATS_OK) && (i < cfg->FilterSubjectsLen); i++)
{
if (i > 0)
s = natsBuf_AppendByte(buf, ',');
IFOK(s, natsBuf_AppendByte(buf, '"'));
IFOK(s, natsBuf_Append(buf, cfg->FilterSubjects[i], -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
}

IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -2815,6 +2831,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
void
js_destroyConsumerConfig(jsConsumerConfig *cc)
{
int i;

if (cc == NULL)
return;

Expand All @@ -2824,7 +2842,10 @@ js_destroyConsumerConfig(jsConsumerConfig *cc)
NATS_FREE((char*) cc->DeliverSubject);
NATS_FREE((char*) cc->DeliverGroup);
NATS_FREE((char*) cc->FilterSubject);
NATS_FREE((char*) cc->SampleFrequency);
for (i = 0; i < cc->FilterSubjectsLen; i++)
NATS_FREE((char *)cc->FilterSubjects[i]);
NATS_FREE((char *)cc->FilterSubjects);
NATS_FREE((char *)cc->SampleFrequency);
NATS_FREE(cc->BackOff);
NATS_FREE(cc);
}
Expand Down Expand Up @@ -2931,6 +2952,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetLong(cjson, "ack_wait", &(cc->AckWait)));
IFOK(s, nats_JSONGetLong(cjson, "max_deliver", &(cc->MaxDeliver)));
IFOK(s, nats_JSONGetStr(cjson, "filter_subject", (char**) &(cc->FilterSubject)));
IFOK(s, nats_JSONGetArrayStr(cjson, "filter_subjects", (char ***)&(cc->FilterSubjects), &(cc->FilterSubjectsLen)));
IFOK(s, _unmarshalReplayPolicy(cjson, "replay_policy", &(cc->ReplayPolicy)));
IFOK(s, nats_JSONGetULong(cjson, "rate_limit_bps", &(cc->RateLimit)));
IFOK(s, nats_JSONGetStr(cjson, "sample_freq", (char**) &(cc->SampleFrequency)));
Expand Down Expand Up @@ -3633,6 +3655,8 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
c->Description = NULL;
c->BackOff = NULL;
c->FilterSubject = NULL;
c->FilterSubjects = NULL;
c->FilterSubjectsLen = 0;
c->SampleFrequency = NULL;
c->DeliverSubject = NULL;
c->DeliverGroup = NULL;
Expand All @@ -3652,6 +3676,18 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
else
memcpy(c->BackOff, org->BackOff, org->BackOffLen*sizeof(int64_t));
}
if ((s == NATS_OK) && (org->FilterSubjects != NULL) && (org->FilterSubjectsLen > 0))
{
c->FilterSubjects = (const char **)NATS_CALLOC(org->FilterSubjectsLen, sizeof(const char *));
if (c->FilterSubjects == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (int i = 0; (s == NATS_OK) && (i < org->FilterSubjectsLen); i++)
{
IF_OK_DUP_STRING(s, c->FilterSubjects[i], org->FilterSubjects[i]);
}
c->FilterSubjectsLen = org->FilterSubjectsLen;
}
if (s == NATS_OK)
*clone = c;
else
Expand Down
2 changes: 2 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ typedef struct jsConsumerConfig
int64_t *BackOff; ///< Redelivery durations expressed in nanoseconds
int BackOffLen;
const char *FilterSubject;
const char **FilterSubjects; // Multiple filter subjects introduced in 2.10
int FilterSubjectsLen;
jsReplayPolicy ReplayPolicy;
uint64_t RateLimit;
const char *SampleFrequency;
Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
#define MAX_FRAMES (50)

#define nats_IsStringEmpty(s) ((((s) == NULL) || ((s)[0] == '\0')) ? true : false)
#define nats_HasPrefix(_s, _prefix) (nats_IsStringEmpty(_s) ? nats_IsStringEmpty(_prefix) : (strncmp((_s), (_prefix), strlen(_prefix)) == 0))

#define DUP_STRING(s, s1, s2) \
{ \
Expand Down
Loading