Skip to content

Commit

Permalink
Merge pull request #608 from nats-io/kv_mirror_source_cross_domains
Browse files Browse the repository at this point in the history
[ADDED] KeyValue: Support for Mirror and Sources
  • Loading branch information
kozlovic authored Oct 31, 2022
2 parents 3a1250f + 4613e1f commit ef3a454
Show file tree
Hide file tree
Showing 7 changed files with 675 additions and 64 deletions.
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ extern const int64_t jsDefaultRequestWait;
#define jsAckInProgress "+WPI"
#define jsAckTerm "+TERM"

// jsExtDomainT is used to create a StreamSource External APIPrefix
#define jsExtDomainT "$JS.%s.API"

// jsApiAccountInfo is for obtaining general information about JetStream.
#define jsApiAccountInfo "%.*s.INFO"

Expand Down
105 changes: 103 additions & 2 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,11 @@ _marshalExternalStream(jsExternalStream *external, const char *fieldName, natsBu
IFOK(s, natsBuf_Append(buf, fieldName, -1));
IFOK(s, natsBuf_Append(buf, "\":{\"api\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->APIPrefix, -1));
IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1));
if ((s == NATS_OK) && !nats_IsStringEmpty(external->DeliverPrefix))
{
IFOK(s, natsBuf_Append(buf, "\",\"deliver\":\"", -1));
IFOK(s, natsBuf_Append(buf, external->DeliverPrefix, -1));
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));

return NATS_UPDATE_ERR_STACK(s);
Expand Down Expand Up @@ -1061,6 +1064,95 @@ jsStreamConfig_Init(jsStreamConfig *cfg)
return NATS_OK;
}

static void
_restoreMirrorAndSourcesExternal(jsStreamConfig *cfg)
{
int i;

// We are guaranteed that if a source's Domain is set, there was originally
// no External value. So free any External value and reset to NULL to
// restore the original setting.
if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain))
{
_destroyExternalStream(cfg->Mirror->External);
cfg->Mirror->External = NULL;
}
for (i=0; i<cfg->SourcesLen; i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
{
_destroyExternalStream(src->External);
src->External = NULL;
}
}
}

static natsStatus
_convertDomain(jsStreamSource *src)
{
jsExternalStream *e = NULL;

e = (jsExternalStream*) NATS_CALLOC(1, sizeof(jsExternalStream));
if (e == NULL)
return nats_setDefaultError(NATS_NO_MEMORY);

if (nats_asprintf((char**) &(e->APIPrefix), jsExtDomainT, src->Domain) < 0)
{
NATS_FREE(e);
return nats_setDefaultError(NATS_NO_MEMORY);
}
src->External = e;
return NATS_OK;
}

static natsStatus
_convertMirrorAndSourcesDomain(bool *converted, jsStreamConfig *cfg)
{
natsStatus s = NATS_OK;
bool cm = false;
bool cs = false;
int i;

*converted = false;

if ((cfg->Mirror != NULL) && !nats_IsStringEmpty(cfg->Mirror->Domain))
{
if (cfg->Mirror->External != NULL)
return nats_setError(NATS_INVALID_ARG, "%s", "mirror's domain and external are both set");
cm = true;
}
for (i=0; i<cfg->SourcesLen; i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
{
if (src->External != NULL)
return nats_setError(NATS_INVALID_ARG, "%s", "source's domain and external are both set");
cs = true;
}
}
if (!cm && !cs)
return NATS_OK;

if (cm)
s = _convertDomain(cfg->Mirror);
if ((s == NATS_OK) && cs)
{
for (i=0; (s == NATS_OK) && (i<cfg->SourcesLen); i++)
{
jsStreamSource *src = cfg->Sources[i];
if ((src != NULL) && !nats_IsStringEmpty(src->Domain))
s = _convertDomain(src);
}
}
if (s == NATS_OK)
*converted = true;
else
_restoreMirrorAndSourcesExternal(cfg);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamConfig *cfg, jsOptions *opts, jsErrCode *errCode)
{
Expand All @@ -1071,6 +1163,7 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
natsConnection *nc = NULL;
const char *apiT = NULL;
bool freePfx = false;
bool msc = false;
jsOptions o;

if (errCode != NULL)
Expand Down Expand Up @@ -1101,6 +1194,8 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
if (freePfx)
NATS_FREE((char*) o.Prefix);
}
if ((s == NATS_OK) && (action == jsStreamActionCreate))
s = _convertMirrorAndSourcesDomain(&msc, cfg);

// Marshal the stream create/update request
IFOK(s, js_marshalStreamConfig(&buf, cfg));
Expand All @@ -1111,6 +1206,12 @@ _addOrUpdate(jsStreamInfo **new_si, jsStreamAction action, jsCtx *js, jsStreamCo
// If we got a response, check for error or return the stream info result.
IFOK(s, _unmarshalStreamCreateResp(new_si, NULL, resp, errCode));

// If mirror and/or sources were converted for the domain, then we need
// to restore the original values (which will free the memory that was
// allocated for the conversion).
if (msc)
_restoreMirrorAndSourcesExternal(cfg);

natsBuf_Destroy(buf);
natsMsg_Destroy(resp);
NATS_FREE(subj);
Expand Down
Loading

0 comments on commit ef3a454

Please sign in to comment.