Skip to content

Commit

Permalink
Subject Transforms in Mirror/Info and Source/Info (#982)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Sep 28, 2023
1 parent 731ae40 commit f60a689
Show file tree
Hide file tree
Showing 19 changed files with 335 additions and 110 deletions.
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/StatisticsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public interface StatisticsCollector extends Statistics {

/**
* Increment the total number of message bytes that have gone out of this connection.
* @param bytes the number of bytes going out
*/
void incrementOutBytes(long bytes);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/api/MirrorInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ static MirrorInfo optionalInstance(JsonValue vMirror) {

@Override
public String toString() {
return "Mirror" + super.toString();
return "MirrorInfo " + super.toString();
}
}
40 changes: 32 additions & 8 deletions src/main/java/io/nats/client/api/SourceBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,33 @@
import io.nats.client.support.JsonValueUtils;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import static io.nats.client.JetStreamOptions.convertDomainToPrefix;
import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.beginJson;
import static io.nats.client.support.JsonUtils.endJson;
import static io.nats.client.support.JsonValueUtils.readValue;
import static io.nats.client.support.Validator.listsAreEquivalent;

abstract class SourceBase implements JsonSerializable {
public abstract class SourceBase implements JsonSerializable {
private final String name;
private final long startSeq;
private final ZonedDateTime startTime;
private final String filterSubject;
private final External external;
private final List<SubjectTransform> subjectTransforms;

SourceBase(JsonValue jv) {
name = JsonValueUtils.readString(jv, NAME);
startSeq = JsonValueUtils.readLong(jv, OPT_START_SEQ, 0);
startTime = JsonValueUtils.readDate(jv, OPT_START_TIME);
filterSubject = JsonValueUtils.readString(jv, FILTER_SUBJECT);
external = External.optionalInstance(readValue(jv, EXTERNAL));
subjectTransforms = SubjectTransform.optionalListOf(readValue(jv, SUBJECT_TRANSFORMS));
}

@SuppressWarnings("rawtypes") // Don't need the type of the builder to get its vars
Expand All @@ -48,6 +55,7 @@ abstract class SourceBase implements JsonSerializable {
this.startTime = b.startTime;
this.filterSubject = b.filterSubject;
this.external = b.external;
this.subjectTransforms = b.subjectTransforms;
}

/**
Expand All @@ -57,12 +65,11 @@ abstract class SourceBase implements JsonSerializable {
public String toJson() {
StringBuilder sb = beginJson();
JsonUtils.addField(sb, NAME, name);
if (startSeq > 0) {
JsonUtils.addField(sb, OPT_START_SEQ, startSeq);
}
JsonUtils.addFieldWhenGreaterThan(sb, OPT_START_SEQ, startSeq, 0);
JsonUtils.addField(sb, OPT_START_TIME, startTime);
JsonUtils.addField(sb, FILTER_SUBJECT, filterSubject);
JsonUtils.addField(sb, EXTERNAL, external);
JsonUtils.addJsons(sb, SUBJECT_TRANSFORMS, subjectTransforms);
return endJson(sb).toString();
}

Expand Down Expand Up @@ -98,6 +105,10 @@ public External getExternal() {
return external;
}

public List<SubjectTransform> getSubjectTransforms() {
return subjectTransforms;
}

@Override
public String toString() {
return JsonUtils.toKey(getClass()) + toJson();
Expand All @@ -109,6 +120,7 @@ public abstract static class SourceBaseBuilder<T> {
ZonedDateTime startTime;
String filterSubject;
External external;
List<SubjectTransform> subjectTransforms = new ArrayList<>();

abstract T getThis();

Expand Down Expand Up @@ -157,6 +169,16 @@ public T domain(String domain) {
external = prefix == null ? null : External.builder().api(prefix).build();
return getThis();
}

public T subjectTransforms(SubjectTransform... subjectTransforms) {
this.subjectTransforms = subjectTransforms == null ? null : Arrays.asList(subjectTransforms);
return getThis();
}

public T subjectTransforms(List<SubjectTransform> subjectTransforms) {
this.subjectTransforms = subjectTransforms;
return getThis();
}
}

@Override
Expand All @@ -167,11 +189,12 @@ public boolean equals(Object o) {
SourceBase that = (SourceBase) o;

if (startSeq != that.startSeq) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (startTime != null ? !startTime.equals(that.startTime) : that.startTime != null) return false;
if (filterSubject != null ? !filterSubject.equals(that.filterSubject) : that.filterSubject != null)
if (!Objects.equals(name, that.name)) return false;
if (!Objects.equals(startTime, that.startTime)) return false;
if (!Objects.equals(filterSubject, that.filterSubject))
return false;
return external != null ? external.equals(that.external) : that.external == null;
if (!Objects.equals(external, that.external)) return false;
return listsAreEquivalent(subjectTransforms, that.subjectTransforms);
}

@Override
Expand All @@ -181,6 +204,7 @@ public int hashCode() {
result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
result = 31 * result + (filterSubject != null ? filterSubject.hashCode() : 0);
result = 31 * result + (external != null ? external.hashCode() : 0);
result = 31 * result + (subjectTransforms != null ? subjectTransforms.hashCode() : 0);
return result;
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/api/SourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,9 @@ static List<SourceInfo> optionalListOf(JsonValue vSourceInfos) {
SourceInfo(JsonValue vSourceInfo) {
super(vSourceInfo);
}

@Override
public String toString() {
return "SourceInfo " + jv;
}
}
24 changes: 13 additions & 11 deletions src/main/java/io/nats/client/api/SourceInfoBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,27 @@
import io.nats.client.support.JsonValue;

import java.time.Duration;
import java.util.List;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonValueUtils.*;

abstract class SourceInfoBase {
protected JsonValue jv;
protected final String name;
protected final long lag;
protected final Duration active;
protected final External external;
protected final List<SubjectTransform> subjectTransforms;
protected final Error error;

SourceInfoBase(JsonValue vSourceInfo) {
jv = vSourceInfo;
name = readString(vSourceInfo, NAME);
lag = readLong(vSourceInfo, LAG, 0);
active = readNanos(vSourceInfo, ACTIVE, Duration.ZERO);
external = External.optionalInstance(readValue(vSourceInfo, EXTERNAL));
subjectTransforms = SubjectTransform.optionalListOf(readValue(vSourceInfo, SUBJECT_TRANSFORMS));
error = Error.optionalInstance(readValue(vSourceInfo, ERROR));
}

Expand Down Expand Up @@ -67,22 +72,19 @@ public External getExternal() {
return external;
}

/**
* The list of subject transforms, if any
* @return the list of subject transforms
*/
public List<SubjectTransform> getSubjectTransforms() {
return subjectTransforms;
}

/**
* The last error
* @return the error
*/
public Error getError() {
return error;
}

@Override
public String toString() {
return "Mirror{" +
"name='" + getName() + '\'' +
", lag=" + getLag() +
", active=" + getActive() +
", " + external +
", " + error +
'}';
}
}
35 changes: 2 additions & 33 deletions src/main/java/io/nats/client/api/StreamConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ public String toJson() {
JsonUtils.addField(sb, DESCRIPTION, description);
addStrings(sb, SUBJECTS, subjects);
addField(sb, RETENTION, retentionPolicy.toString());
if (compressionOption != CompressionOption.None) {
addField(sb, COMPRESSION, compressionOption.toString());
}
addEnumWhenNot(sb, COMPRESSION, compressionOption, CompressionOption.None);
addField(sb, MAX_CONSUMERS, maxConsumers);
addField(sb, MAX_MSGS, maxMsgs);
addField(sb, MAX_MSGS_PER_SUB, maxMsgsPerSubject);
Expand Down Expand Up @@ -451,36 +449,7 @@ public long getFirstSequence() {

@Override
public String toString() {
return "StreamConfiguration{" +
"name='" + name + '\'' +
", description='" + description + '\'' +
", subjects=" + subjects +
", retentionPolicy=" + retentionPolicy +
", compressionOption=" + compressionOption +
", maxConsumers=" + maxConsumers +
", maxMsgs=" + maxMsgs +
", maxMsgsPerSubject=" + maxMsgsPerSubject +
", maxBytes=" + maxBytes +
", maxAge=" + maxAge +
", maxMsgSize=" + maxMsgSize +
", storageType=" + storageType +
", replicas=" + replicas +
", noAck=" + noAck +
", template='" + templateOwner + '\'' +
", discardPolicy=" + discardPolicy +
", duplicateWindow=" + duplicateWindow +
", allowRollup=" + allowRollup +
", allowDirect=" + allowDirect +
", mirrorDirect=" + mirrorDirect +
", denyDelete=" + denyDelete +
", denyPurge=" + denyPurge +
", discardNewPerSubject=" + discardNewPerSubject +
", firstSequence=" + firstSequence +
", " + mirror +
", " + placement +
", sources=" + sources +
", metadata=" + metadata +
'}';
return toJson();
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/api/StreamInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,9 @@ public StreamConfiguration getConfig() {
public ZonedDateTime getTimestamp() {
return timestamp;
}

@Override
public String toString() {
return "StreamInfo " + jv;
}
}
26 changes: 26 additions & 0 deletions src/main/java/io/nats/client/api/SubjectTransform.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

import io.nats.client.support.JsonSerializable;
import io.nats.client.support.JsonValue;
import io.nats.client.support.JsonValueUtils;

import java.util.List;
import java.util.Objects;

import static io.nats.client.support.ApiConstants.DEST;
import static io.nats.client.support.ApiConstants.SRC;
Expand All @@ -32,6 +36,10 @@ static SubjectTransform optionalInstance(JsonValue vSubjectTransform) {
return vSubjectTransform == null ? null : new SubjectTransform(vSubjectTransform);
}

static List<SubjectTransform> optionalListOf(JsonValue vSubjectTransforms) {
return JsonValueUtils.optionalListOf(vSubjectTransforms, SubjectTransform::new);
}

SubjectTransform(JsonValue vSubjectTransform) {
source = readString(vSubjectTransform, SRC);
destination = readString(vSubjectTransform, DEST);
Expand Down Expand Up @@ -112,4 +120,22 @@ public SubjectTransform build() {
return new SubjectTransform(source, destination);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SubjectTransform that = (SubjectTransform) o;

if (!Objects.equals(source, that.source)) return false;
return Objects.equals(destination, that.destination);
}

@Override
public int hashCode() {
int result = source != null ? source.hashCode() : 0;
result = 31 * result + (destination != null ? destination.hashCode() : 0);
return result;
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public interface ApiConstants {
String STREAMS = "streams";
String SUBJECT = "subject";
String SUBJECT_TRANSFORM = "subject_transform";
String SUBJECT_TRANSFORMS = "subject_transforms";
String SUBJECTS = "subjects";
String SUBJECTS_FILTER = "subjects_filter";
String SUCCESS = "success";
Expand Down
13 changes: 10 additions & 3 deletions src/main/java/io/nats/client/support/JsonUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,14 @@ public static void addField(StringBuilder sb, String fname, Map<String, String>
}
}

interface ListAdder<T> {
@SuppressWarnings("rawtypes")
public static void addEnumWhenNot(StringBuilder sb, String fname, Enum e, Enum dontAddIfThis) {
if (e != null && e != dontAddIfThis) {
addField(sb, fname, e.toString());
}
}

public interface ListAdder<T> {
void append(StringBuilder sb, T t);
}

Expand All @@ -306,7 +313,7 @@ interface ListAdder<T> {
* @param sb string builder
* @param fname fieldname
* @param list value list
* @param adder implementation to add value, including it's quotes if required
* @param adder implementation to add value, including its quotes if required
*/
public static <T> void _addList(StringBuilder sb, String fname, List<T> list, ListAdder<T> adder) {
sb.append(Q);
Expand Down Expand Up @@ -360,7 +367,7 @@ private static void _addStrings(StringBuilder sb, String fname, List<String> str
* @param jsons field value
*/
public static void addJsons(StringBuilder sb, String fname, List<? extends JsonSerializable> jsons) {
if (jsons != null && jsons.size() > 0) {
if (jsons != null && !jsons.isEmpty()) {
_addList(sb, fname, jsons, (sbs, s) -> sbs.append(s.toJson()));
}
}
Expand Down
Loading

0 comments on commit f60a689

Please sign in to comment.