From 3d5c9260720ee97b76c8d45796388bb08be0c454 Mon Sep 17 00:00:00 2001 From: Paras-Soni Date: Wed, 23 Oct 2024 21:56:32 +0530 Subject: [PATCH] vertx meta data changes final (#114) * vertx meta data changes * vertx meta data changes --- pom.xml | 15 - .../java/io/vertx/ext/consul/Service.java | 65 ++- .../io/vertx/ext/consul/ServiceOptions.java | 45 ++ .../io/vertx/ext/consul/TxnOperation.java | 3 +- .../io/vertx/ext/consul/TxnOperationType.java | 6 +- .../java/io/vertx/ext/consul/TxnRequest.java | 18 + .../java/io/vertx/ext/consul/TxnResponse.java | 10 + .../java/io/vertx/ext/consul/TxnResult.java | 3 +- .../vertx/ext/consul/TxnServiceOperation.java | 109 +++++ .../io/vertx/ext/consul/TxnServiceVerb.java | 52 +++ .../ext/consul/impl/TxnResponseParser.java | 3 + .../ext/consul/tests/ConsulTestBase.java | 4 + .../ext/consul/tests/suite/Transactions.java | 398 +++++++++++++++++- 13 files changed, 707 insertions(+), 24 deletions(-) create mode 100644 src/main/java/io/vertx/ext/consul/TxnServiceOperation.java create mode 100644 src/main/java/io/vertx/ext/consul/TxnServiceVerb.java diff --git a/pom.xml b/pom.xml index 1212ab24..89bfd35b 100644 --- a/pom.xml +++ b/pom.xml @@ -52,21 +52,6 @@ true - - io.vertx - vertx-codegen-api - true - - - io.vertx - vertx-codegen-json - true - - - io.vertx - vertx-docgen-api - true - io.vertx vertx-web-client diff --git a/src/main/java/io/vertx/ext/consul/Service.java b/src/main/java/io/vertx/ext/consul/Service.java index 02422d52..0abb22e0 100644 --- a/src/main/java/io/vertx/ext/consul/Service.java +++ b/src/main/java/io/vertx/ext/consul/Service.java @@ -34,7 +34,7 @@ * @author Ruslan Sennov */ @DataObject -public class Service { +public class Service implements TxnResult { private static final String NODE = "Node"; private static final String ADDRESS = "Address"; @@ -44,6 +44,8 @@ public class Service { private static final String SERVICE_ADDRESS = "ServiceAddress"; private static final String SERVICE_META = "ServiceMeta"; private static final String SERVICE_PORT = "ServicePort"; + private static final String CREATE_INDEX = "CreateIndex"; + private static final String MODIFY_INDEX = "ModifyIndex"; private String node; private String nodeAddress; @@ -53,6 +55,8 @@ public class Service { private String address; private Map meta; private int port; + private long createIndex; + private long modifyIndex; /** * Default constructor @@ -74,6 +78,8 @@ public Service(Service other) { this.address = other.address; this.meta = other.meta; this.port = other.port; + this.createIndex = other.createIndex; + this.modifyIndex = other.modifyIndex; } /** @@ -90,6 +96,8 @@ public Service(JsonObject service) { this.address = service.getString(SERVICE_ADDRESS); this.meta = mapStringString(service.getJsonObject(SERVICE_META)); this.port = service.getInteger(SERVICE_PORT, 0); + this.createIndex = service.getLong(CREATE_INDEX, 0l); + this.modifyIndex = service.getLong(MODIFY_INDEX, 0l); } /** @@ -123,6 +131,12 @@ public JsonObject toJson() { if (port != 0) { jsonObject.put(SERVICE_PORT, port); } + if (createIndex != 0l) { + jsonObject.put(CREATE_INDEX, createIndex); + } + if (modifyIndex != 0l) { + jsonObject.put(MODIFY_INDEX, modifyIndex); + } return jsonObject; } @@ -285,6 +299,51 @@ public Service setPort(int port) { return this; } + /** + * Get the internal index value that represents when the entry was created. + * + * @return the internal index value that represents when the entry was created. + */ + public long getCreateIndex() { + return createIndex; + } + + /** + * Set the internal index value that represents when the entry was created. + * + * @param createIndex the internal index value that represents when the entry was created. + * @return reference to this, for fluency + */ + public Service setCreateIndex(long createIndex) { + this.createIndex = createIndex; + return this; + } + + /** + * Get the last index that modified this key. + * + * @return the last index that modified this key. + */ + public long getModifyIndex() { + return modifyIndex; + } + + /** + * Set the last index that modified this key. + * + * @param modifyIndex the last index that modified this key. + * @return reference to this, for fluency + */ + public Service setModifyIndex(long modifyIndex) { + this.modifyIndex = modifyIndex; + return this; + } + + @Override + public TxnOperationType getOperationType() { + return TxnOperationType.SERVICE; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -292,6 +351,8 @@ public boolean equals(Object o) { Service service = (Service) o; + if (createIndex != service.createIndex) return false; + if (modifyIndex != service.modifyIndex) return false; if (port != service.port) return false; if (node != null ? !node.equals(service.node) : service.node != null) return false; if (nodeAddress != null ? !nodeAddress.equals(service.nodeAddress) : service.nodeAddress != null) return false; @@ -312,6 +373,8 @@ public int hashCode() { result = 31 * result + (address != null ? address.hashCode() : 0); result = 31 * result + (meta != null ? meta.hashCode() : 0); result = 31 * result + port; + result = 31 * result + (int) (createIndex ^ (createIndex >>> 32)); + result = 31 * result + (int) (modifyIndex ^ (modifyIndex >>> 32)); return result; } diff --git a/src/main/java/io/vertx/ext/consul/ServiceOptions.java b/src/main/java/io/vertx/ext/consul/ServiceOptions.java index 687ab1b6..18a75c21 100644 --- a/src/main/java/io/vertx/ext/consul/ServiceOptions.java +++ b/src/main/java/io/vertx/ext/consul/ServiceOptions.java @@ -40,6 +40,8 @@ public class ServiceOptions { private int port; private CheckOptions checkOptions; private List checkListOptions; + private long createIndex; + private long modifyIndex; /** * Default constructor @@ -61,6 +63,8 @@ public ServiceOptions(ServiceOptions options) { this.port = options.port; this.checkOptions = options.checkOptions; this.checkListOptions = options.checkListOptions; + this.createIndex = options.createIndex; + this.modifyIndex = options.modifyIndex; } /** @@ -242,4 +246,45 @@ public ServiceOptions setCheckListOptions(List checkListOptions) { this.checkListOptions = checkListOptions; return this; } + + /** + * Get the internal index value that represents when the entry was created. + * + * @return the internal index value that represents when the entry was created. + */ + public long getCreateIndex() { + return createIndex; + } + + /** + * Set the internal index value that represents when the entry was created. + * + * @param createIndex the internal index value that represents when the entry was created. + * @return reference to this, for fluency + */ + public ServiceOptions setCreateIndex(long createIndex) { + this.createIndex = createIndex; + return this; + } + + /** + * Get the last index that modified this key. + * + * @return the last index that modified this key. + */ + public long getModifyIndex() { + return modifyIndex; + } + + /** + * Set the last index that modified this key. + * + * @param modifyIndex the last index that modified this key. + * @return reference to this, for fluency + */ + public ServiceOptions setModifyIndex(long modifyIndex) { + this.modifyIndex = modifyIndex; + return this; + } + } diff --git a/src/main/java/io/vertx/ext/consul/TxnOperation.java b/src/main/java/io/vertx/ext/consul/TxnOperation.java index 0bd0bc69..6a027dbb 100644 --- a/src/main/java/io/vertx/ext/consul/TxnOperation.java +++ b/src/main/java/io/vertx/ext/consul/TxnOperation.java @@ -16,8 +16,7 @@ package io.vertx.ext.consul; /** - * Represents operation in transaction. Key/Value is the only available operation type, - * though other types of operations may be added in future versions of Consul to be mixed with key/value operations + * Represents operation in transaction. The available operation types are KV and Service * * @author Ruslan Sennov */ diff --git a/src/main/java/io/vertx/ext/consul/TxnOperationType.java b/src/main/java/io/vertx/ext/consul/TxnOperationType.java index b0ac05e1..510ac486 100644 --- a/src/main/java/io/vertx/ext/consul/TxnOperationType.java +++ b/src/main/java/io/vertx/ext/consul/TxnOperationType.java @@ -3,13 +3,13 @@ import io.vertx.codegen.annotations.VertxGen; /** - * Represents the type of operation in a transaction. KV is the only available operation type, - * though other types of operations may be added in future versions of Consul to be mixed with key/value operations + * Represents the type of operation in a transaction. The available operation types are KV and Service * * @author Ruslan Sennov * @see /v1/txn endpoint */ @VertxGen public enum TxnOperationType { - KV + KV, + SERVICE } diff --git a/src/main/java/io/vertx/ext/consul/TxnRequest.java b/src/main/java/io/vertx/ext/consul/TxnRequest.java index 5b68af80..b9f2ca4a 100644 --- a/src/main/java/io/vertx/ext/consul/TxnRequest.java +++ b/src/main/java/io/vertx/ext/consul/TxnRequest.java @@ -59,6 +59,14 @@ public TxnRequest(JsonObject json) { .setIndex(txn.getLong("Index")) .setSession(txn.getString("Session")) .setType(TxnKVVerb.ofVerb(txn.getString("Verb")))); + } else if (obj.containsKey("Service")) { + JsonObject txn = obj.getJsonObject("Service"); + ServiceOptions serviceOptions = new ServiceOptions(txn.getJsonObject("Service")); + serviceOptions.setName(txn.getJsonObject("Service").getString("Service")); + operations.add(new TxnServiceOperation() + .setServiceOptions(serviceOptions) + .setNode(txn.getString("Node")) + .setType(TxnServiceVerb.ofVerb(txn.getString("Verb")))); } }); } @@ -82,6 +90,16 @@ public JsonObject toJson() { .put("Index", kvOp.getIndex()) .put("Session", kvOp.getSession()); arr.add(new JsonObject().put("KV", obj)); + } else if (op instanceof TxnServiceOperation) { + TxnServiceOperation serviceOp = (TxnServiceOperation) op; + JsonObject serviceObj = serviceOp.getServiceOptions().toJson(); + serviceObj.put("Service", serviceObj.getValue("name")); + serviceObj.remove("name"); + JsonObject obj = new JsonObject() + .put("Verb", serviceOp.getType().getVerb()) + .put("Service", serviceObj) + .put("Node", serviceOp.getNode()); + arr.add(new JsonObject().put("Service", obj)); } }); return new JsonObject().put("operations", arr); diff --git a/src/main/java/io/vertx/ext/consul/TxnResponse.java b/src/main/java/io/vertx/ext/consul/TxnResponse.java index 4fe2a684..4303d0c6 100644 --- a/src/main/java/io/vertx/ext/consul/TxnResponse.java +++ b/src/main/java/io/vertx/ext/consul/TxnResponse.java @@ -1,3 +1,4 @@ + /* * Copyright (c) 2016 The original author or authors * @@ -50,6 +51,10 @@ public TxnResponse(JsonObject json) { JsonObject obj = (JsonObject) entry; if (obj.containsKey("KV")) { results.add(new KeyValue(obj.getJsonObject("KV"))); + } else if (obj.containsKey("Service")) { + Service service = new Service(obj.getJsonObject("Service")); + service.setName(obj.getJsonObject("Service").getString("Service")); + results.add(service); } }); } @@ -68,6 +73,11 @@ public JsonObject toJson() { results.forEach(op -> { if (op instanceof KeyValue) { jsonResults.add(new JsonObject().put("KV", ((KeyValue) op).toJson())); + } else if (op instanceof Service) { + JsonObject jsonObject = ((Service) op).toJson(); + jsonObject.put("Service", jsonObject.getString("ServiceName")); + jsonObject.remove("ServiceName"); + jsonResults.add(new JsonObject().put("Service", jsonObject)); } }); JsonArray jsonErrors = new JsonArray(); diff --git a/src/main/java/io/vertx/ext/consul/TxnResult.java b/src/main/java/io/vertx/ext/consul/TxnResult.java index 928e7952..c727f5d8 100644 --- a/src/main/java/io/vertx/ext/consul/TxnResult.java +++ b/src/main/java/io/vertx/ext/consul/TxnResult.java @@ -16,8 +16,7 @@ package io.vertx.ext.consul; /** - * Represents result of operation. Key/Value is the only available result type, - * though other types of results may be added in future versions of Consul to be mixed with key/value operations + * Represents result of operation. The available operation types are KV and Service * * @author Ruslan Sennov */ diff --git a/src/main/java/io/vertx/ext/consul/TxnServiceOperation.java b/src/main/java/io/vertx/ext/consul/TxnServiceOperation.java new file mode 100644 index 00000000..8cbc57b3 --- /dev/null +++ b/src/main/java/io/vertx/ext/consul/TxnServiceOperation.java @@ -0,0 +1,109 @@ +package io.vertx.ext.consul; + +import io.vertx.codegen.annotations.DataObject; +import io.vertx.codegen.annotations.GenIgnore; +import io.vertx.core.json.JsonObject; +import io.vertx.codegen.json.annotations.JsonGen; + +/** + * Holds the operation to apply to the service inside a transaction + */ +@DataObject +@JsonGen(publicConverter = false) +public class TxnServiceOperation implements TxnOperation { + + private TxnServiceVerb type; + private String node; + private ServiceOptions serviceOptions; + + /** + * Default constructor + */ + public + TxnServiceOperation() { + + } + + /** + * Constructor from JSON + * + * @param json the JSON + */ + public TxnServiceOperation(JsonObject json) { + TxnServiceOperationConverter.fromJson(json, this); + } + + /** + * Convert to JSON + * + * @return the JSON + */ + public JsonObject toJson() { + JsonObject jsonObject = new JsonObject(); + TxnServiceOperationConverter.toJson(this, jsonObject); + return jsonObject; + } + + /** + * Get the type of operation to perform + * + * @return the type of operation to perform + */ + public TxnServiceVerb getType() { + return type; + } + + /** + * Get the node + * + * @return the node name + */ + public String getNode() { return node; } + + /** + * Get the service + * + * @return the service + */ + public ServiceOptions getServiceOptions() { return serviceOptions; } + + /** + * Set the type of operation to perform + * + * @param type the type of operation to perform + * @return reference to this, for fluency + */ + public TxnServiceOperation setType(TxnServiceVerb type) { + this.type = type; + return this; + } + + /** + * Set the node + * + * @param node + * @return reference to this, for fluency + */ + public TxnServiceOperation setNode(String node) { + this.node = node; + return this; + } + + /** + * Set the service + * + * @param serviceOptions + * @return reference to this, for fluency + */ + public TxnServiceOperation setServiceOptions(ServiceOptions serviceOptions) { + this.serviceOptions = serviceOptions; + return this; + } + + @GenIgnore + @Override + public TxnOperationType getOperationType() { + return TxnOperationType.SERVICE; + } + +} diff --git a/src/main/java/io/vertx/ext/consul/TxnServiceVerb.java b/src/main/java/io/vertx/ext/consul/TxnServiceVerb.java new file mode 100644 index 00000000..517dde9b --- /dev/null +++ b/src/main/java/io/vertx/ext/consul/TxnServiceVerb.java @@ -0,0 +1,52 @@ +package io.vertx.ext.consul; + +/** + * Holds the type of Service operation in transaction + */ +public enum TxnServiceVerb { + + /** + * Sets the service to the given state + */ + SET("set"), + + /** + * Sets, but with CAS semantics using the given ModifyIndex + */ + CAS("cas"), + + /** + * Get the service, fails if it does not exist + */ + GET("get"), + + /** + * Delete the service + */ + DELETE("delete"), + + /** + * Delete, but with CAS semantics + */ + DELETE_CAS("delete-cas"); + + public static TxnServiceVerb ofVerb(String verb) { + for (TxnServiceVerb type : values()) { + if (type.getVerb().equals(verb)) { + return type; + } + } + return null; + } + + private final String verb; + + TxnServiceVerb(String verb) { + this.verb = verb; + } + + public String getVerb() { + return verb; + } + +} diff --git a/src/main/java/io/vertx/ext/consul/impl/TxnResponseParser.java b/src/main/java/io/vertx/ext/consul/impl/TxnResponseParser.java index 4d8eb2e1..a09039c4 100644 --- a/src/main/java/io/vertx/ext/consul/impl/TxnResponseParser.java +++ b/src/main/java/io/vertx/ext/consul/impl/TxnResponseParser.java @@ -33,6 +33,9 @@ static TxnResponse parse(JsonObject json) { if (obj.containsKey("KV")) { response.addResult(KVParser.parse(obj.getJsonObject("KV"))); } + else if (obj.containsKey("Service")) { + response.addResult(ServiceParser.parseAgentInfo(obj.getJsonObject("Service"))); + } }); } if (json.getValue("Errors") instanceof JsonArray) { diff --git a/src/test/java/io/vertx/ext/consul/tests/ConsulTestBase.java b/src/test/java/io/vertx/ext/consul/tests/ConsulTestBase.java index d410549f..1302b21a 100644 --- a/src/test/java/io/vertx/ext/consul/tests/ConsulTestBase.java +++ b/src/test/java/io/vertx/ext/consul/tests/ConsulTestBase.java @@ -68,6 +68,10 @@ public void setUp() throws Exception { readClient = consul.createClient(vertx, consul.dc().readToken()); } + public String getNodeName() { + return consul.getConfig("node_name"); + } + public String createAclToken(String name, String rules) { AclPolicy policy = new AclPolicy() .setName(name) diff --git a/src/test/java/io/vertx/ext/consul/tests/suite/Transactions.java b/src/test/java/io/vertx/ext/consul/tests/suite/Transactions.java index 9522731c..077c87c7 100644 --- a/src/test/java/io/vertx/ext/consul/tests/suite/Transactions.java +++ b/src/test/java/io/vertx/ext/consul/tests/suite/Transactions.java @@ -16,14 +16,29 @@ package io.vertx.ext.consul.tests.suite; import io.vertx.core.Handler; -import io.vertx.ext.consul.*; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.consul.KeyValue; +import io.vertx.ext.consul.Service; +import io.vertx.ext.consul.ServiceOptions; +import io.vertx.ext.consul.TxnError; +import io.vertx.ext.consul.TxnKVOperation; +import io.vertx.ext.consul.TxnKVVerb; +import io.vertx.ext.consul.TxnOperationType; +import io.vertx.ext.consul.TxnRequest; +import io.vertx.ext.consul.TxnResponse; +import io.vertx.ext.consul.TxnServiceOperation; +import io.vertx.ext.consul.TxnServiceVerb; import io.vertx.ext.consul.tests.ConsulTestBase; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import org.junit.Test; import org.junit.runner.RunWith; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.stream.Collectors; /** @@ -32,6 +47,8 @@ @RunWith(VertxUnitRunner.class) public class Transactions extends ConsulTestBase { + private final String SERVICE_NAME = "service1"; + @Test public void kvSet(TestContext tc) { TxnRequest request = new TxnRequest() @@ -82,6 +99,326 @@ public void kvCas(TestContext tc) { }); } + @Test + public void serviceSet(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.10.10", 8081); + TxnRequest request = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions1).setType(TxnServiceVerb.SET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions2).setType(TxnServiceVerb.SET)); + writeClient.transaction(request).onComplete(tc.asyncAssertSuccess(response -> { + tc.assertEquals(0, response.getErrorsSize()); + tc.assertEquals(2, response.getResultsSize()); + List services = getServices(response); + checkService(tc, services, serviceOptions1); + checkService(tc, services, serviceOptions2); + getRegistrations(tc, SERVICE_NAME, registrations -> { + checkService(tc, registrations, serviceOptions1); + checkService(tc, registrations, serviceOptions2); + writeClient.deregisterService(serviceOptions1.getId()).onComplete(tc.asyncAssertSuccess()); + writeClient.deregisterService(serviceOptions2.getId()).onComplete(tc.asyncAssertSuccess()); + }); + })); + } + + @Test + public void serviceGet(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.10.11", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.10.11", 8081); + TxnRequest getRequest = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions1).setType(TxnServiceVerb.GET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions2).setType(TxnServiceVerb.GET)); + writeClient.transaction(getRequest).onComplete(tc.asyncAssertSuccess(resp1 -> { + tc.assertEquals(2, resp1.getErrorsSize()); + tc.assertEquals(0, resp1.getResultsSize()); + registerServiceAndGet(tc, serviceOptions1, getRequest, resp2 -> { + tc.assertEquals(1, resp2.getErrorsSize()); + tc.assertEquals(0, resp2.getResultsSize()); + registerServiceAndGet(tc, serviceOptions2, getRequest, resp3 -> { + tc.assertEquals(0, resp3.getErrorsSize()); + tc.assertEquals(2, resp3.getResultsSize()); + List services = getServices(resp3); + checkService(tc, services, serviceOptions1); + checkService(tc, services, serviceOptions2); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions1.getId()).onComplete(tc.asyncAssertSuccess()); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions2.getId()).onComplete(tc.asyncAssertSuccess()); + }); + }); + })); + } + + @Test + public void serviceDelete(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.10.10", 8081); + TxnRequest deleteRequest = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions1).setType(TxnServiceVerb.DELETE)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions2).setType(TxnServiceVerb.DELETE)); + writeClient.transaction(deleteRequest).onComplete(tc.asyncAssertSuccess(resp1 -> { + tc.assertEquals(0, resp1.getErrorsSize()); + tc.assertEquals(0, resp1.getResultsSize()); + writeClient.registerService(serviceOptions1).onComplete(tc.asyncAssertSuccess(resp2 -> { + getRegistrations(tc, SERVICE_NAME, registrations1 -> { + checkService(tc, registrations1, serviceOptions1); + writeClient.transaction(deleteRequest).onComplete(tc.asyncAssertSuccess(resp3 -> { + tc.assertEquals(0, resp1.getErrorsSize()); + tc.assertEquals(0, resp1.getResultsSize()); + getRegistrations(tc, SERVICE_NAME, registrations2 -> { + tc.assertEquals(0, registrations2.size()); + }); + })); + }); + })); + })); + } + + @Test + public void serviceCas(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.10.10", 8081); + Map meta = new HashMap<>(); + meta.put("test1", "value2"); + writeClient.registerService(serviceOptions1).onComplete(tc.asyncAssertSuccess(resp1 -> { + writeClient.registerService(serviceOptions2).onComplete(tc.asyncAssertSuccess(resp2 -> { + getRegistrations(tc, SERVICE_NAME, registrations1 -> { + long idx1 = getModifyIndex(tc, registrations1, serviceOptions1); + long idx2 = getModifyIndex(tc, registrations1, serviceOptions2); + TxnRequest requestWithStaleIndex = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setType(TxnServiceVerb.CAS).setServiceOptions( + serviceOptions1.setMeta(meta).setModifyIndex(idx1 - 1))) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setType(TxnServiceVerb.CAS).setServiceOptions( + serviceOptions2.setMeta(meta).setModifyIndex(idx2))); + writeClient.transaction(requestWithStaleIndex).onComplete(tc.asyncAssertSuccess(resp3 -> { + tc.assertEquals(1, resp3.getErrorsSize()); + tc.assertEquals(0, resp3.getResultsSize()); + TxnRequest request = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setType(TxnServiceVerb.CAS).setServiceOptions( + serviceOptions1.setMeta(meta).setModifyIndex(idx1))) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setType(TxnServiceVerb.CAS).setServiceOptions( + serviceOptions2.setMeta(meta).setModifyIndex(idx2))); + writeClient.transaction(request).onComplete(tc.asyncAssertSuccess(resp4 -> { + tc.assertEquals(0, resp4.getErrorsSize()); + tc.assertEquals(2, resp4.getResultsSize()); + getRegistrations(tc, SERVICE_NAME, registrations2 -> { + checkService(tc, registrations2, serviceOptions1); + checkService(tc, registrations2, serviceOptions2); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions1.getId()).onComplete(tc.asyncAssertSuccess()); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions2.getId()).onComplete(tc.asyncAssertSuccess()); + }); + })); + })); + }); + })); + })); + } + + @Test + public void serviceMultiOperationSuccess(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.10.10", 8081); + ServiceOptions serviceOptions3 = createServiceOptions("id3", SERVICE_NAME, "10.10.10.10", 8082); + writeClient.registerService(serviceOptions1).onComplete(tc.asyncAssertSuccess(resp1 -> { + writeClient.registerService(serviceOptions2).onComplete(tc.asyncAssertSuccess(resp2 -> { + getRegistrations(tc, SERVICE_NAME, registrations1 -> { + tc.assertEquals(2, registrations1.size()); + Map meta = new HashMap<>(); + meta.put("test2", "value2"); + serviceOptions2.setMeta(meta); + TxnRequest multiOpRequest = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions1).setType(TxnServiceVerb.DELETE)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions2).setType(TxnServiceVerb.SET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions3).setType(TxnServiceVerb.SET)); + writeClient.transaction(multiOpRequest).onComplete(tc.asyncAssertSuccess(resp3 -> { + tc.assertEquals(0, resp3.getErrorsSize()); + tc.assertEquals(2, resp3.getResultsSize()); + getRegistrations(tc, SERVICE_NAME, registrations2 -> { + tc.assertEquals(2, registrations2.size()); + checkService(tc, registrations2, serviceOptions2); + checkService(tc, registrations2, serviceOptions3); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions2.getId()).onComplete(tc.asyncAssertSuccess()); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions3.getId()).onComplete(tc.asyncAssertSuccess()); + }); + })); + }); + })); + })); + } + + @Test + public void serviceMultiOperationError(TestContext tc) { + ServiceOptions serviceOptions1 = createServiceOptions("id1", SERVICE_NAME, "10.10.12.10", 8080); + ServiceOptions serviceOptions2 = createServiceOptions("id2", SERVICE_NAME, "10.10.12.10", 8081); + ServiceOptions serviceOptions3 = createServiceOptions("id3", "", "10.10.12.10", 8082); + writeClient.registerService(serviceOptions1).onComplete(tc.asyncAssertSuccess(resp1 -> { + writeClient.registerService(serviceOptions2).onComplete(tc.asyncAssertSuccess(resp2 -> { + getRegistrations(tc, SERVICE_NAME, registrations1 -> { + checkService(tc, registrations1, serviceOptions1); + checkService(tc, registrations1, serviceOptions2); + TxnRequest multiOpRequest = new TxnRequest() + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions1).setType(TxnServiceVerb.DELETE)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions2).setType(TxnServiceVerb.GET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions3).setType(TxnServiceVerb.SET)); + writeClient.transaction(multiOpRequest).onComplete(tc.asyncAssertSuccess(resp3 -> { + tc.assertEquals(1, resp3.getErrorsSize()); + tc.assertEquals(2, resp3.getError(0).getOpIndex()); + tc.assertEquals(0, resp3.getResultsSize()); + getRegistrations(tc, SERVICE_NAME, registrations2 -> { + tc.assertEquals(2, registrations2.size()); + checkService(tc, registrations2, serviceOptions1); + checkService(tc, registrations2, serviceOptions2); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions1.getId()).onComplete(tc.asyncAssertSuccess()); + writeClient.deregisterCatalogService(getNodeName(), serviceOptions2.getId()).onComplete(tc.asyncAssertSuccess()); + }); + })); + }); + })); + })); + } + + @Test + public void kvAndServiceSet(TestContext tc) { + ServiceOptions serviceOptions = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + TxnRequest request = new TxnRequest() + .addOperation(new TxnKVOperation().setKey("foo/bar/t1").setValue("val1").setType(TxnKVVerb.SET)) + .addOperation(new TxnKVOperation().setKey("foo/bar/t2").setValue("val2").setType(TxnKVVerb.SET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions).setType(TxnServiceVerb.SET)); + writeClient.transaction(request).onComplete(tc.asyncAssertSuccess(response -> { + tc.assertEquals(0, response.getErrorsSize()); + tc.assertEquals(3, response.getResultsSize()); + List keys = getKeys(response); + tc.assertTrue(keys.contains("foo/bar/t1")); + tc.assertTrue(keys.contains("foo/bar/t2")); + List services = getServices(response); + checkService(tc, services, serviceOptions); + getEntries(tc, "foo/bar/t", entries -> { + tc.assertTrue(entries.contains("foo/bar/t1/val1")); + tc.assertTrue(entries.contains("foo/bar/t2/val2")); + writeClient.deleteValues("foo/bar/t").onComplete(tc.asyncAssertSuccess()); + getRegistrations(tc, SERVICE_NAME, registrations2 -> { + checkService(tc, registrations2, serviceOptions);; + writeClient.deregisterCatalogService(getNodeName(), serviceOptions.getId()).onComplete(tc.asyncAssertSuccess()); + }); + }); + })); + } + + @Test + public void kvAndServiceError(TestContext tc) { + ServiceOptions serviceOptions = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + TxnRequest request = new TxnRequest() + .addOperation(new TxnKVOperation().setKey("foo/bar/t1").setValue("val1").setType(TxnKVVerb.SET)) + .addOperation(new TxnKVOperation().setKey("foo/bar/t2").setValue("val2").setType(TxnKVVerb.SET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions).setType(TxnServiceVerb.GET)); + writeClient.transaction(request).onComplete(tc.asyncAssertSuccess(response -> { + tc.assertEquals(1, response.getErrorsSize()); + tc.assertEquals(2, response.getError(0).getOpIndex()); + tc.assertEquals(0, response.getResultsSize()); + List keys = getKeys(response); + tc.assertTrue(keys.isEmpty()); + List services = getServices(response); + tc.assertTrue(services.isEmpty()); + readClient.getValues("foo/bar/t").onComplete(tc.asyncAssertSuccess(entries -> { + tc.assertTrue(entries.getList() == null); + })); + })); + } + + @Test + public void testJsonToTxnRequest(TestContext tc) { + ServiceOptions serviceOptions = createServiceOptions("id1", SERVICE_NAME, "10.10.10.10", 8080); + Map meta = new HashMap<>(); + meta.put("test1", "value2"); + List tags = new ArrayList<>(); + tags.add("tag1"); + serviceOptions.setMeta(meta); + serviceOptions.setTags(tags); + TxnRequest txnRequest = new TxnRequest() + .addOperation(new TxnKVOperation().setKey("foo/bar/t1").setValue("val1").setType(TxnKVVerb.SET)) + .addOperation(new TxnServiceOperation().setNode(getNodeName()).setServiceOptions(serviceOptions).setType(TxnServiceVerb.SET)); + JsonObject jsonRequest = txnRequest.toJson(); + List jsonOperations = jsonRequest.getJsonArray("operations").getList(); + tc.assertEquals(serviceOptions.getName(), jsonOperations.get(1).getJsonObject("Service").getJsonObject("Service").getString("Service")); + TxnRequest deserializedTxnRequest = new TxnRequest(jsonRequest); + tc.assertTrue(deserializedTxnRequest.getOperations().get(0) instanceof TxnKVOperation); + TxnKVOperation deserializedTxnKVOperation = (TxnKVOperation) deserializedTxnRequest.getOperations().get(0); + tc.assertEquals("foo/bar/t1", deserializedTxnKVOperation.getKey()); + tc.assertEquals("val1", deserializedTxnKVOperation.getValue()); + tc.assertEquals(TxnKVVerb.SET, deserializedTxnKVOperation.getType()); + tc.assertEquals(TxnOperationType.KV, deserializedTxnKVOperation.getOperationType()); + tc.assertTrue(deserializedTxnRequest.getOperations().get(1) instanceof TxnServiceOperation); + TxnServiceOperation deserializedTxnServiceOperation = (TxnServiceOperation) deserializedTxnRequest.getOperations().get(1); + tc.assertEquals(getNodeName(), deserializedTxnServiceOperation.getNode()); + tc.assertEquals(TxnServiceVerb.SET, deserializedTxnServiceOperation.getType()); + tc.assertEquals(TxnOperationType.SERVICE, deserializedTxnServiceOperation.getOperationType()); + ServiceOptions deserializedServiceOptions = deserializedTxnServiceOperation.getServiceOptions(); + tc.assertEquals(serviceOptions.getId(), deserializedServiceOptions.getId()); + tc.assertEquals(serviceOptions.getName(), deserializedServiceOptions.getName()); + tc.assertEquals(serviceOptions.getAddress(), deserializedServiceOptions.getAddress()); + tc.assertEquals(serviceOptions.getPort(), deserializedServiceOptions.getPort()); + tc.assertEquals(serviceOptions.getTags(), deserializedServiceOptions.getTags()); + tc.assertEquals(serviceOptions.getMeta(), deserializedServiceOptions.getMeta()); + tc.assertEquals(serviceOptions.getCreateIndex(), deserializedServiceOptions.getCreateIndex()); + tc.assertEquals(serviceOptions.getModifyIndex(), deserializedServiceOptions.getModifyIndex()); + } + + @Test + public void testJsonToTxnResponse(TestContext tc) { + List tags = new ArrayList<>(); + tags.add("tag1"); + Map meta = new HashMap<>(); + meta.put("test1", "value2"); + Service service = new Service() + .setId("id1") + .setName(SERVICE_NAME) + .setAddress("10.10.10.10") + .setPort(8080) + .setMeta(meta) + .setTags(tags); + KeyValue keyValue = new KeyValue() + .setKey("foo/bar/t1") + .setValue("val1"); + TxnError txnError = new TxnError() + .setOpIndex(0) + .setWhat("Missing node registration"); + TxnResponse txnResponse = new TxnResponse() + .addResult(keyValue) + .addResult(service) + .addError(txnError); + JsonObject jsonObject = txnResponse.toJson(); + List jsonResults = jsonObject.getJsonArray("Results").getList(); + tc.assertEquals(service.getName(), jsonResults.get(1).getJsonObject("Service").getString("Service")); + TxnResponse deserializedTxnResponse = new TxnResponse(jsonObject); + tc.assertEquals(deserializedTxnResponse.getResultsSize(), 2); + tc.assertTrue(deserializedTxnResponse.getResults().get(0) instanceof KeyValue); + KeyValue deserializedKeyValue = (KeyValue) deserializedTxnResponse.getResults().get(0); + tc.assertEquals(keyValue.getKey(), deserializedKeyValue.getKey()); + tc.assertEquals(keyValue.getValue(), deserializedKeyValue.getValue()); + tc.assertTrue(deserializedTxnResponse.getResults().get(1) instanceof Service); + Service deserializedService = (Service) deserializedTxnResponse.getResults().get(1); + tc.assertEquals(service.getId(), deserializedService.getId()); + tc.assertEquals(service.getName(), deserializedService.getName()); + tc.assertEquals(service.getAddress(), deserializedService.getAddress()); + tc.assertEquals(service.getPort(), deserializedService.getPort()); + tc.assertEquals(service.getTags(), deserializedService.getTags()); + tc.assertEquals(service.getMeta(), deserializedService.getMeta()); + tc.assertEquals(service.getCreateIndex(), deserializedService.getCreateIndex()); + tc.assertEquals(service.getModifyIndex(), deserializedService.getModifyIndex()); + tc.assertEquals(deserializedTxnResponse.getErrorsSize(), 1); + tc.assertTrue(deserializedTxnResponse.getErrors().get(0) instanceof TxnError); + TxnError deserializedTxnError = deserializedTxnResponse.getErrors().get(0); + tc.assertEquals(txnError.getOpIndex(), deserializedTxnError.getOpIndex()); + tc.assertEquals(txnError.getWhat(), deserializedTxnError.getWhat()); + } + + @Test + public void testEmptyJsonToTxnResponse(TestContext tc) { + TxnResponse txnResponse = new TxnResponse(); + JsonObject jsonObject = txnResponse.toJson(); + TxnResponse deserializedTxnResponse = new TxnResponse(jsonObject); + tc.assertEquals(deserializedTxnResponse.getResultsSize(), 0); + tc.assertEquals(deserializedTxnResponse.getErrorsSize(), 0); + } + private void getEntries(TestContext tc, String prefix, Handler> resultHandler) { readClient.getValues(prefix).onComplete(tc.asyncAssertSuccess(list -> { resultHandler.handle(list.getList().stream() @@ -91,6 +428,7 @@ private void getEntries(TestContext tc, String prefix, Handler> res private List getKeys(TxnResponse resp) { return resp.getResults().stream() + .filter(o -> o instanceof KeyValue) .map(kv -> ((KeyValue) kv).getKey()).collect(Collectors.toList()); } @@ -102,4 +440,62 @@ private void createKV(TestContext tc, String key, String value, Handler re })); })); } + + private void getRegistrations(TestContext tc, String serviceName, Handler> resultHandler) { + readClient.catalogServiceNodes(serviceName).onComplete(tc.asyncAssertSuccess(list -> { + resultHandler.handle(list.getList()); + })); + } + + private List getServices(TxnResponse resp) { + return resp.getResults().stream() + .filter(o -> o instanceof Service) + .map(s -> ((Service) s)).collect(Collectors.toList()); + } + + private ServiceOptions createServiceOptions(String id, String name, String address, int port) { + Map meta = new HashMap<>(); + meta.put("test1", "value1"); + List tags = new ArrayList<>(); + tags.add("TAG_1"); + tags.add("TAG_2"); + return new ServiceOptions() + .setId(id) + .setName(name) + .setAddress(address) + .setPort(port) + .setMeta(meta) + .setTags(tags); + } + + private static void checkService(TestContext tc, List list, ServiceOptions options) { + Service s = list.stream() + .filter(i -> i.getId().equals(options.getId())) + .findFirst() + .orElseThrow(NoSuchElementException::new); + tc.assertEquals(s.getName(), options.getName()); + tc.assertEquals(s.getTags(), options.getTags()); + tc.assertEquals(s.getAddress(), options.getAddress()); + tc.assertEquals(s.getPort(), options.getPort()); + for (Map.Entry entry : options.getMeta().entrySet()) { + tc.assertEquals(s.getMeta().get(entry.getKey()), entry.getValue()); + } + } + + private long getModifyIndex(TestContext tc, List list, ServiceOptions options) { + Service s = list.stream() + .filter(i -> i.getId().equals(options.getId())) + .findFirst() + .orElseThrow(NoSuchElementException::new); + return s.getModifyIndex(); + } + + private void registerServiceAndGet(TestContext tc, ServiceOptions serviceOptions, TxnRequest txnRequest, Handler resultsHandler) { + writeClient.registerService(serviceOptions).onComplete(tc.asyncAssertSuccess(resp1 -> { + writeClient.transaction(txnRequest).onComplete(tc.asyncAssertSuccess(resp2 -> { + resultsHandler.handle(resp2); + })); + })); + } + }