Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Added] More v2.10 related changes #682

Merged
merged 11 commits into from
Oct 12, 2023
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ install/
html/
!doc/html/
test/datastore_*/
test/conf_*

# Emacs
*~
Expand Down
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsStorageTypeFileStr "file"
#define jsStorageTypeMemStr "memory"

#define jsStorageCompressionNoneStr "none"
#define jsStorageCompressionS2Str "s2"

#define jsDeliverAllStr "all"
#define jsDeliverLastStr "last"
#define jsDeliverNewStr "new"
Expand Down
200 changes: 193 additions & 7 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ _destroyRePublish(jsRePublish *rp)
NATS_FREE(rp);
}

void
js_destroyStreamConfig(jsStreamConfig *cfg)
void js_destroyStreamConfig(jsStreamConfig *cfg)
{
int i;

Expand All @@ -136,6 +135,9 @@ js_destroyStreamConfig(jsStreamConfig *cfg)
_destroyStreamSource(cfg->Sources[i]);
NATS_FREE(cfg->Sources);
_destroyRePublish(cfg->RePublish);
nats_freeMetadata(&(cfg->Metadata));
NATS_FREE((char *)cfg->SubjectTransform.Source);
NATS_FREE((char *)cfg->SubjectTransform.Destination);
NATS_FREE(cfg);
}

Expand Down Expand Up @@ -168,10 +170,19 @@ _destroyClusterInfo(jsClusterInfo *cluster)
static void
_destroyStreamSourceInfo(jsStreamSourceInfo *info)
{
int i;

if (info == NULL)
return;

NATS_FREE(info->Name);
NATS_FREE((char*)info->FilterSubject);
for (i=0; i < info->SubjectTransformsLen; i++)
{
NATS_FREE((char *)info->SubjectTransforms[i].Source);
NATS_FREE((char *)info->SubjectTransforms[i].Destination);
}
NATS_FREE(info->SubjectTransforms);
_destroyExternalStream(info->External);
NATS_FREE(info);
}
Expand Down Expand Up @@ -535,6 +546,113 @@ _marshalStorageType(jsStorageType storage, natsBuffer *buf)
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStorageCompression(nats_JSON *json, const char *fieldName, jsStorageCompression *compression)
{
natsStatus s = NATS_OK;
const char *str = NULL;

s = nats_JSONGetStrPtr(json, "compression", &str);
if (str == NULL)
return NATS_UPDATE_ERR_STACK(s);

if (strcmp(str, jsStorageCompressionNoneStr) == 0)
*compression = js_StorageCompressionNone;
else if (strcmp(str, jsStorageCompressionS2Str) == 0)
*compression = js_StorageCompressionS2;
else
s = nats_setError(NATS_ERR, "unable to unmarshal storage compression '%s'", str);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalStorageCompression(jsStorageCompression compression, natsBuffer *buf)
{
natsStatus s;
const char *st = NULL;

s = natsBuf_Append(buf, ",\"compression\":\"", -1);
switch (compression)
{
case js_StorageCompressionNone:
st = jsStorageCompressionNoneStr;
break;
case js_StorageCompressionS2:
st = jsStorageCompressionS2Str;
break;
default:
return nats_setError(NATS_INVALID_ARG, "invalid storage type %d", (int)compression);
}
IFOK(s, natsBuf_Append(buf, st, -1));
IFOK(s, natsBuf_AppendByte(buf, '"'));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalSubjectTransformConfig(nats_JSON *obj, jsSubjectTransformConfig *cfg)
{
natsStatus s = NATS_OK;

memset(cfg, 0, sizeof(jsSubjectTransformConfig));
if (obj == NULL)
{
return NATS_OK;
}

IFOK(s, nats_JSONGetStr(obj, "src", (char **)&(cfg->Source)));
IFOK(s, nats_JSONGetStr(obj, "dest", (char **)&(cfg->Destination)));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalSubjectTransformConfig(jsSubjectTransformConfig *cfg, natsBuffer *buf)
{
natsStatus s;
if (cfg == NULL || (nats_IsStringEmpty(cfg->Source) && nats_IsStringEmpty(cfg->Destination)))
return NATS_OK;

s = natsBuf_Append(buf, ",\"subject_transform\":{", -1);
IFOK(s, natsBuf_Append(buf, "\"src\":\"", -1));
if (cfg->Source != NULL)
IFOK(s, natsBuf_Append(buf, cfg->Source, -1));
IFOK(s, natsBuf_Append(buf, "\",\"dest\":\"", -1));
if (cfg->Destination != NULL)
IFOK(s, natsBuf_Append(buf, cfg->Destination, -1));
IFOK(s, natsBuf_Append(buf, "\"}", -1));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_marshalStreamConsumerLimits(jsStreamConsumerLimits *limits, natsBuffer *buf)
{
natsStatus s;
if (limits == NULL || (limits->InactiveThreshold == 0 && limits->MaxAckPending == 0))
return NATS_OK;

s = natsBuf_Append(buf, ",\"consumer_limits\":{", -1);
IFOK(s, nats_marshalLong(buf, false, "inactive_threshold", limits->InactiveThreshold));
IFOK(s, nats_marshalLong(buf, true, "max_ack_pending", limits->MaxAckPending));
IFOK(s, natsBuf_AppendByte(buf, '}'));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStreamConsumerLimits(nats_JSON *obj, jsStreamConsumerLimits *limits)
{
natsStatus s = NATS_OK;

memset(limits, 0, sizeof(*limits));
if (obj == NULL)
{
return NATS_OK;
}

IFOK(s, nats_JSONGetLong(obj, "inactive_threshold", &limits->InactiveThreshold));
IFOK(s, nats_JSONGetInt(obj, "max_ack_pending", &limits->MaxAckPending));
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalRePublish(nats_JSON *json, const char *fieldName, jsRePublish **new_republish)
{
Expand Down Expand Up @@ -570,6 +688,7 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
jsStreamConfig *cfg = NULL;
nats_JSON **sources = NULL;
int sourcesLen = 0;
nats_JSON *obj = NULL;
natsStatus s;

if (fieldName != NULL)
Expand Down Expand Up @@ -633,6 +752,15 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetBool(jcfg, "mirror_direct", &(cfg->MirrorDirect)));
IFOK(s, nats_JSONGetBool(jcfg, "discard_new_per_subject", &(cfg->DiscardNewPerSubject)));

IFOK(s, nats_unmarshalMetadata(jcfg, "metadata", &(cfg->Metadata)));
IFOK(s, _unmarshalStorageCompression(jcfg, "storage", &(cfg->Compression)));
IFOK(s, nats_JSONGetULong(jcfg, "first_seq", &(cfg->FirstSeq)));
IFOK(s, nats_JSONGetObject(jcfg, "subject_transform", &obj));
IFOK(s, _unmarshalSubjectTransformConfig(obj, &(cfg->SubjectTransform)));
obj = NULL;
IFOK(s, nats_JSONGetObject(jcfg, "consumer_limits", &obj));
IFOK(s, _unmarshalStreamConsumerLimits(obj, &(cfg->ConsumerLimits)));

if (s == NATS_OK)
*new_cfg = cfg;
else
Expand Down Expand Up @@ -754,6 +882,12 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
if ((s == NATS_OK) && cfg->DiscardNewPerSubject)
IFOK(s, natsBuf_Append(buf, ",\"discard_new_per_subject\":true", -1));

IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
IFOK(s, _marshalStorageCompression(cfg->Compression, buf));
IFOK(s, nats_marshalULong(buf, true, "first_seq", cfg->FirstSeq));
IFOK(s, _marshalSubjectTransformConfig(&cfg->SubjectTransform, buf));
IFOK(s, _marshalStreamConsumerLimits(&cfg->ConsumerLimits, buf));

IFOK(s, natsBuf_AppendByte(buf, '}'));

if (s == NATS_OK)
Expand Down Expand Up @@ -941,6 +1075,8 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
nats_JSON *json = NULL;
jsStreamSourceInfo *ssi = NULL;
natsStatus s;
nats_JSON **subjectTransforms = NULL;
int subjectTransformsLen = 0;

if (fieldName != NULL)
{
Expand All @@ -961,6 +1097,27 @@ _unmarshalStreamSourceInfo(nats_JSON *pjson, const char *fieldName, jsStreamSour
IFOK(s, _unmarshalExternalStream(json, "external", &(ssi->External)));
IFOK(s, nats_JSONGetULong(json, "lag", &(ssi->Lag)));
IFOK(s, nats_JSONGetLong(json, "active", &(ssi->Active)));
IFOK(s, nats_JSONGetStr(json, "filter_subject", (char **)&(ssi->FilterSubject)));

// Get the sources and unmarshal if present
IFOK(s, nats_JSONGetArrayObject(json, "subject_transforms", &subjectTransforms, &subjectTransformsLen));
if ((s == NATS_OK) && (subjectTransforms != NULL))
{
int i;

ssi->SubjectTransforms = (jsSubjectTransformConfig *)NATS_CALLOC(subjectTransformsLen, sizeof(jsSubjectTransformConfig));
if (ssi->SubjectTransforms == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);

for (i = 0; (s == NATS_OK) && (i < subjectTransformsLen); i++)
{
s = _unmarshalSubjectTransformConfig(subjectTransforms[i], &(ssi->SubjectTransforms[i]));
if (s == NATS_OK)
ssi->SubjectTransformsLen++;
}
// Free the array of JSON objects that was allocated by nats_JSONGetArrayObject.
NATS_FREE(subjectTransforms);
}

if (s == NATS_OK)
*new_src = ssi;
Expand Down Expand Up @@ -1118,6 +1275,7 @@ jsStreamConfig_Init(jsStreamConfig *cfg)
cfg->Storage = js_FileStorage;
cfg->Discard = js_DiscardOld;
cfg->Replicas = 1;
cfg->Compression = js_StorageCompressionNone;
return NATS_OK;
}

Expand Down Expand Up @@ -1269,6 +1427,22 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
if (msc)
_restoreMirrorAndSourcesExternal(cfg);

// Make sure the 2.10 config fields actually worked, in case the server is
// older.
if ((s == NATS_OK) && (new_si != NULL) && (*new_si != NULL)
&& (cfg->Compression != (*new_si)->Config->Compression)
&& (cfg->FirstSeq != (*new_si)->Config->FirstSeq)
&& (cfg->Metadata.Count != (*new_si)->Config->Metadata.Count)
&& nats_StringEquals(cfg->SubjectTransform.Source, (*new_si)->Config->SubjectTransform.Source)
&& nats_StringEquals(cfg->SubjectTransform.Destination, (*new_si)->Config->SubjectTransform.Destination)
&& (cfg->ConsumerLimits.InactiveThreshold != (*new_si)->Config->ConsumerLimits.InactiveThreshold)
&& (cfg->ConsumerLimits.MaxAckPending != (*new_si)->Config->ConsumerLimits.MaxAckPending)
)
{
// <>/<> wrong error
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamConfigRequired);
}

natsBuf_Destroy(buf);
natsMsg_Destroy(resp);
NATS_FREE(subj);
Expand Down Expand Up @@ -2772,6 +2946,7 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo

IFOK(s, natsBuf_AppendByte(buf, ']'));
}
IFOK(s, nats_marshalMetadata(buf, true, "metadata", cfg->Metadata));
IFOK(s, _marshalReplayPolicy(buf, cfg->ReplayPolicy))
if ((s == NATS_OK) && (cfg->RateLimit > 0))
s = nats_marshalULong(buf, true, "rate_limit_bps", cfg->RateLimit);
Expand Down Expand Up @@ -2844,6 +3019,7 @@ js_destroyConsumerConfig(jsConsumerConfig *cc)
NATS_FREE((char*) cc->FilterSubject);
for (i = 0; i < cc->FilterSubjectsLen; i++)
NATS_FREE((char *)cc->FilterSubjects[i]);
nats_freeMetadata(&(cc->Metadata));
NATS_FREE((char *)cc->FilterSubjects);
NATS_FREE((char *)cc->SampleFrequency);
NATS_FREE(cc->BackOff);
Expand Down Expand Up @@ -2968,6 +3144,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetArrayLong(cjson, "backoff", &(cc->BackOff), &(cc->BackOffLen)));
IFOK(s, nats_JSONGetLong(cjson, "num_replicas", &(cc->Replicas)));
IFOK(s, nats_JSONGetBool(cjson, "mem_storage", &(cc->MemoryStorage)));
IFOK(s, nats_unmarshalMetadata(cjson, "metadata", &(cc->Metadata)));
}

if (s == NATS_OK)
Expand Down Expand Up @@ -3105,14 +3282,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
{
// No subject filter, use <stream>.<consumer name>
// otherwise, the filter subject goes at the end.
if (nats_IsStringEmpty(cfg->FilterSubject))
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
else
if (!nats_IsStringEmpty(cfg->FilterSubject) && (cfg->FilterSubjectsLen == 0))
res = nats_asprintf(&subj, jsApiConsumerCreateExWithFilterT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name, cfg->FilterSubject);
else
res = nats_asprintf(&subj, jsApiConsumerCreateExT,
js_lenWithoutTrailingDot(o.Prefix), o.Prefix,
stream, cfg->Name);
}
else if (nats_IsStringEmpty(cfg->Durable))
res = nats_asprintf(&subj, jsApiConsumerCreateT,
Expand All @@ -3136,6 +3313,14 @@ js_AddConsumer(jsConsumerInfo **new_ci, jsCtx *js,
// If we got a response, check for error or return the consumer info result.
IFOK(s, _unmarshalConsumerCreateOrGetResp(new_ci, resp, errCode));

if ((s == NATS_OK)
&& (new_ci != NULL)
&& (cfg->FilterSubjectsLen > 0)
&& ((*new_ci)->Config->FilterSubjectsLen == 0))
{
s = nats_setError(NATS_INVALID_ARG, "%s", "multiple consumer filter subjects not supported by the server");
}

NATS_FREE(subj);
natsMsg_Destroy(resp);
natsBuf_Destroy(buf);
Expand Down Expand Up @@ -3688,6 +3873,7 @@ js_cloneConsumerConfig(jsConsumerConfig *org, jsConsumerConfig **clone)
}
c->FilterSubjectsLen = org->FilterSubjectsLen;
}
IFOK(s, nats_cloneMetadata(&(c->Metadata), org->Metadata));
if (s == NATS_OK)
*clone = c;
else
Expand Down
Loading