diff --git a/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonQBitFactory.java b/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonQBitFactory.java index 63498d40..e6340784 100644 --- a/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonQBitFactory.java +++ b/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonQBitFactory.java @@ -27,17 +27,12 @@ import io.advantageous.qbit.events.EventManager; import io.advantageous.qbit.events.EventManagerBuilder; import io.advantageous.qbit.events.impl.BoonEventBusProxyCreator; -import io.advantageous.qbit.events.spi.EventConnector; import io.advantageous.qbit.http.HttpTransport; import io.advantageous.qbit.http.client.HttpClient; -import io.advantageous.qbit.http.config.HttpServerOptions; -import io.advantageous.qbit.http.server.HttpServer; import io.advantageous.qbit.json.JsonMapper; import io.advantageous.qbit.message.MethodCall; import io.advantageous.qbit.message.MethodCallBuilder; import io.advantageous.qbit.message.Request; -import io.advantageous.qbit.message.Response; -import io.advantageous.qbit.queue.Queue; import io.advantageous.qbit.queue.QueueBuilder; import io.advantageous.qbit.sender.Sender; import io.advantageous.qbit.sender.SenderEndPoint; @@ -49,7 +44,6 @@ import io.advantageous.qbit.service.impl.BoonServiceMethodCallHandler; import io.advantageous.qbit.service.impl.CallbackManager; import io.advantageous.qbit.service.impl.ServiceBundleImpl; -import io.advantageous.qbit.service.impl.ServiceQueueImpl; import io.advantageous.qbit.service.stats.StatsCollector; import io.advantageous.qbit.spi.*; import io.advantageous.qbit.system.QBitSystemManager; diff --git a/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonServiceProxyFactory.java b/qbit/boon/src/main/java/io/advantageous/qbit/service/BoonServiceProxyFactory.java similarity index 99% rename from qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonServiceProxyFactory.java rename to qbit/boon/src/main/java/io/advantageous/qbit/service/BoonServiceProxyFactory.java index 69ab8668..f2d8f3b8 100644 --- a/qbit/boon/src/main/java/io/advantageous/qbit/boon/BoonServiceProxyFactory.java +++ b/qbit/boon/src/main/java/io/advantageous/qbit/service/BoonServiceProxyFactory.java @@ -16,7 +16,7 @@ * QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web! */ -package io.advantageous.qbit.boon; +package io.advantageous.qbit.service; import io.advantageous.boon.core.Str; import io.advantageous.boon.primitive.CharBuf; diff --git a/qbit/boon/src/test/java/io/advantageous/qbit/boon/BoonJSONServiceFactoryTest.java b/qbit/boon/src/test/java/io/advantageous/qbit/service/BoonJSONServiceFactoryTest.java similarity index 96% rename from qbit/boon/src/test/java/io/advantageous/qbit/boon/BoonJSONServiceFactoryTest.java rename to qbit/boon/src/test/java/io/advantageous/qbit/service/BoonJSONServiceFactoryTest.java index bd287430..cb57844c 100644 --- a/qbit/boon/src/test/java/io/advantageous/qbit/boon/BoonJSONServiceFactoryTest.java +++ b/qbit/boon/src/test/java/io/advantageous/qbit/service/BoonJSONServiceFactoryTest.java @@ -16,12 +16,11 @@ * QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web! */ -package io.advantageous.qbit.boon; +package io.advantageous.qbit.service; import io.advantageous.qbit.QBit; import io.advantageous.qbit.client.ServiceProxyFactory; import io.advantageous.qbit.message.MethodCall; -import io.advantageous.qbit.service.EndPoint; import org.junit.Before; import org.junit.Test; diff --git a/qbit/build.gradle b/qbit/build.gradle index f5fb4442..4af286f4 100644 --- a/qbit/build.gradle +++ b/qbit/build.gradle @@ -236,6 +236,32 @@ project('vertx') { } +project('service-discovery') { + + + ext { + vertxVersion = '3.0.0' + } + + dependencies { + compile project(':qbit:boon') + compile project(':qbit:core') + compile group: 'io.vertx', name: 'vertx-core', version: vertxVersion + compile group: 'io.vertx', name: 'vertx-web', version: vertxVersion + testCompile project(':qbit:test-support') + } + + uploadArchives { + repositories { + mavenDeployer { + pom.project { + description 'Vertx support for qbit' + } + } + } + } +} + project('consul-client') { diff --git a/qbit/core/src/main/java/io/advantageous/qbit/events/EventManagerBuilder.java b/qbit/core/src/main/java/io/advantageous/qbit/events/EventManagerBuilder.java index 3b7a0a63..1efd7fda 100644 --- a/qbit/core/src/main/java/io/advantageous/qbit/events/EventManagerBuilder.java +++ b/qbit/core/src/main/java/io/advantageous/qbit/events/EventManagerBuilder.java @@ -26,10 +26,8 @@ public class EventManagerBuilder { private StatsCollector statsCollector; public static EventConnector DEFAULT_NO_EVENT_CONNECTOR = event -> {}; public static StatsCollector DEFAULT_NO_STATS_COLLECTOR = new StatsCollector() {}; - private final Logger logger = LoggerFactory.getLogger(EventManagerBuilder.class); - - Factory factory; + private Factory factory; public static EventManagerBuilder eventManagerBuilder() { diff --git a/qbit/core/src/main/java/io/advantageous/qbit/reactive/CallbackBuilder.java b/qbit/core/src/main/java/io/advantageous/qbit/reactive/CallbackBuilder.java index d5d970ea..7d19e6e8 100644 --- a/qbit/core/src/main/java/io/advantageous/qbit/reactive/CallbackBuilder.java +++ b/qbit/core/src/main/java/io/advantageous/qbit/reactive/CallbackBuilder.java @@ -1,5 +1,9 @@ package io.advantageous.qbit.reactive; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -52,6 +56,31 @@ public CallbackBuilder setCallback(Class returnType, Callback callback } + public CallbackBuilder setCallbackReturnsList(Class componentClass, Callback> callback) { + this.callback = callback; + return this; + } + + public CallbackBuilder setCallbackReturnsSet(Class componentClass, Callback> callback) { + this.callback = callback; + return this; + } + + public CallbackBuilder setCallbackReturnsCollection(Class componentClass, Callback> callback) { + this.callback = callback; + return this; + } + + public CallbackBuilder setCallbackReturnsMap(Class keyClass, Class valueClass, + Callback> callback) { + this.callback = callback; + return this; + } + + + + + public Runnable getOnTimeout() { return onTimeout; } diff --git a/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/EndpointDefinition.java b/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/EndpointDefinition.java index 3ec21f59..05a9b6b1 100644 --- a/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/EndpointDefinition.java +++ b/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/EndpointDefinition.java @@ -1,25 +1,84 @@ package io.advantageous.qbit.service.discovery; import io.advantageous.boon.core.Lists; +import io.advantageous.boon.core.Sys; import io.advantageous.qbit.service.health.HealthStatus; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.List; import static io.advantageous.qbit.service.discovery.ServiceDiscovery.uniqueString; /** * Service Definition + * Contains a healthStatus, unique id, name, host, port and a timeToLive in seconds. + * This describes all parts of a service as far as something like a ServiceDiscovery system like + * [Consul](https://consul.io/) is concerned. + * + * The `timeToLive` field is for ttl checkins if the underlying system supports it. + * + * The `HealthStatus` represents the current state of this system as returned from the remote + * service discovery system. + * * created by rhightower on 3/23/15. */ public class EndpointDefinition { + + /** + * Current health status. + */ private final HealthStatus healthStatus; + + /** + * Unique id of the system. + */ private final String id; + + /** + * Name of the service, i.e., EventBus, StatsEngine, etc. + */ private final String name; + + /** + * Host name. + */ private final String host; + + /** + * Port of the service. + */ private final int port; + + /** + * Time to live: how long until this service has to check in with the remote service discovery + * system if applicable. Whether this is used or needed depends on the underlying service discovery system. + */ private final long timeToLive; + + /** + * Find host + * @return hostname + */ + static String findHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new IllegalStateException("unable to find host name"); + } + } + + + /** + * Create a new one with default TTL of 20 seconds. + * @param healthStatus healthStatus + * @param id id + * @param name name + * @param host post + * @param port port + */ public EndpointDefinition( final HealthStatus healthStatus, final String id, @@ -31,9 +90,36 @@ public EndpointDefinition( this.name = name; this.host = host; this.port = port; - this.timeToLive = 20L; + this.timeToLive = Sys.sysProp(EndpointDefinition.class.getName()+".timeToLive", 20L); + } + + /** + * Create a new one with default TTL of 20 seconds. + * @param name name + * @param host post + * @param port port + */ + public EndpointDefinition( + final String name, + final String host, + final int port) { + this.healthStatus = HealthStatus.PASS; + this.id = name + "-" + port + "-" + host.replace('.', '-'); + this.name = name; + this.host = host; + this.port = port; + this.timeToLive = Sys.sysProp(EndpointDefinition.class.getName()+".timeToLive", 20L); } + + /** + * Create a new one with default TTL of 20 seconds. + * @param healthStatus healthStatus + * @param id id + * @param name name + * @param host post + * @param port port + */ public EndpointDefinition( final HealthStatus healthStatus, final String id, @@ -49,10 +135,46 @@ public EndpointDefinition( this.timeToLive = timeToLive; } + /** + * Creates a list of service definitions. + * @param endpointDefinitions vararg array of service definitions + * @return list of service definitions + */ public static List serviceDefinitions(final EndpointDefinition... endpointDefinitions) { return Lists.list(endpointDefinitions); } + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param name name + * @return serviceDefinition + */ + public static EndpointDefinition serviceDefinition(final String name) { + + return new EndpointDefinition(HealthStatus.PASS, + name + "-" + uniqueString(0), name, findHostName(), 0); + } + + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param name service name + * @param port port + * @return serviceDefinition + */ + public static EndpointDefinition serviceDefinition(final String name, int port) { + + return new EndpointDefinition(HealthStatus.PASS, + name + "-" + uniqueString(port), name, findHostName(), 0); + } + + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param id id + * @param name name + * @param host host + * @param port port + * @return EndpointDefinition + */ public static EndpointDefinition serviceDefinition( final String id, final String name, @@ -63,6 +185,13 @@ public static EndpointDefinition serviceDefinition( id, name, host, port); } + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param name name + * @param host host + * @param port port + * @return serviceDefinition + */ public static EndpointDefinition serviceDefinition( final String name, final String host, @@ -73,6 +202,12 @@ public static EndpointDefinition serviceDefinition( } + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param name name + * @param host host + * @return serviceDefinition + */ public static EndpointDefinition serviceDefinition( final String name, final String host @@ -83,11 +218,18 @@ public static EndpointDefinition serviceDefinition( } + + /** + * Creates a EndpointDefinition for a service, i.e., a serviceDefinition. + * @param id id + * @param name name + * @param host host + * @return EndpointDefinition + */ public static EndpointDefinition serviceDefinitionWithId( final String name, final String host, - final String id - ) { + final String id) { return new EndpointDefinition(HealthStatus.PASS, id, name, host, 0); diff --git a/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/ServiceDiscovery.java b/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/ServiceDiscovery.java index f882b617..1b233a68 100644 --- a/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/ServiceDiscovery.java +++ b/qbit/core/src/main/java/io/advantageous/qbit/service/discovery/ServiceDiscovery.java @@ -17,6 +17,12 @@ */ public interface ServiceDiscovery extends Startable, Stoppable { + + /** + * Generates a unique string, used for generating unique service ids + * @param port port + * @return unique id incorporating host name if possible. + */ static String uniqueString(int port) { try { return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-'); @@ -25,6 +31,12 @@ static String uniqueString(int port) { } } + /** + * Register the service with the service discovery service if applicable. + * @param serviceName serviceName + * @param port port + * @return EndpointDefinition + */ default EndpointDefinition register( final String serviceName, final int port) { @@ -34,6 +46,15 @@ default EndpointDefinition register( serviceName, null, port); } + /** + * Register with the service discovery system and specify a TTL so that if + * the service does not send a checkIn that it is marked down. + * TTL is time to live. + * @param serviceName service name + * @param port port + * @param timeToLiveSeconds ttl + * @return EndpointDefinition + */ default EndpointDefinition registerWithTTL( final String serviceName, final int port, @@ -44,6 +65,15 @@ default EndpointDefinition registerWithTTL( serviceName, null, port, timeToLiveSeconds); } + /** + * Register an end point given an id, and a TTL. + * This gets used if you want to be specific about what you call the service. + * @param serviceName service name + * @param serviceId service id + * @param port port + * @param timeToLiveSeconds ttl + * @return EndpointDefinition + */ @SuppressWarnings("UnusedReturnValue") default EndpointDefinition registerWithIdAndTimeToLive( final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) { @@ -53,6 +83,13 @@ default EndpointDefinition registerWithIdAndTimeToLive( serviceName, null, port, timeToLiveSeconds); } + /** + * Register with id. Specify a unique id that is not autogenerated + * @param serviceName service name + * @param serviceId service id + * @param port port + * @return EndpointDefinition + */ default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) { return new EndpointDefinition(HealthStatus.PASS, @@ -61,34 +98,79 @@ default EndpointDefinition registerWithId(final String serviceName, final String } + /** + * Watch for changes in this service name and send change events if the service changes. + * @param serviceName + */ void watch(String serviceName); + /** + * CheckIn with the service discovery mechanism. The service may be marked as down if it does + * not check in, in the amount of time specified by the ttl if the service disovery provider supports + * ttl and checkin (Consul does). + * @param serviceId + * @param healthStatus + */ default void checkIn(String serviceId, HealthStatus healthStatus) { } + /** This is like `checkIn` but does an HealthStatus.SUCCESS if applicable. + * + * @param serviceId serviceId + */ default void checkInOk(String serviceId) { } + /** + * Load the services. + * + * Depending on the underlying implementation the services are either periodically loaded + * or loaded whenever a change is detected. + * + * This version of the method is based on the last event change and / or the last poll of the + * services from the remote system (i.e., Consul) if applicable. + * + * This may also may trigger a remote call, but it will always return right away. + * @param serviceName service name + * @return list of EndpointDefinition + */ default List loadServices(final String serviceName) { return Collections.emptyList(); } + /** + * See `loadServices` this is like `loadServices` except it forces a remote call. + * This is a blocking call to loadServices. + * @param serviceName service name. + * @return list of EndpointDefinition + */ default List loadServicesNow(final String serviceName) { return Collections.emptyList(); } + /** + * Start the service discovery system if applicable. + */ default void start() { } + /** + * Stop the service discovery system if applicable. + */ default void stop() { } + /** + * This just loads the end points that were registered locally. + * This are the endpoints that this JVM and this ServiceDiscovery is managing. + * @return set of EndpointDefinitions + */ default Set localDefinitions() { return Collections.emptySet(); } diff --git a/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsClientProvider.java b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsClientProvider.java new file mode 100644 index 00000000..b957f8ae --- /dev/null +++ b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsClientProvider.java @@ -0,0 +1,49 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.vertx.core.Vertx; +import io.vertx.core.dns.DnsClient; + +import java.util.function.Supplier; + +/** + * Provider abstracts how the DNS Client is created so we can unit test it. + */ +public class DnsClientProvider implements Supplier { + + + /** + * Vertx instance. Vertx is used to build dns client. + */ + private final Vertx vertx; + + /** + * port of DNS server. + */ + private final int port; + + /** + * Host of DNS server. + */ + private final String host; + + /** + * DnsClientProvider constructor. + * @param vertx vertx + * @param host host + * @param port port + */ + public DnsClientProvider(final Vertx vertx, final String host, final int port) { + this.vertx = vertx; + this.port = port; + this.host = host; + } + + /** + * Supply an instance of DnsClient. + * @return DnsClient. + */ + @Override + public DnsClient get() { + return vertx.createDnsClient(port, host); + } +} diff --git a/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProvider.java b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProvider.java new file mode 100644 index 00000000..0d26719e --- /dev/null +++ b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProvider.java @@ -0,0 +1,85 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.advantageous.qbit.reactive.CallbackBuilder; +import io.advantageous.qbit.service.discovery.EndpointDefinition; +import io.advantageous.qbit.service.discovery.spi.ServiceDiscoveryProvider; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Wraps DnsSupport to fit in as a ServiceDiscoveryProvider. + */ +public class DnsServiceDiscoveryProvider implements ServiceDiscoveryProvider { + + /** + * Dns support talks to DNS server to get EndpointDefinitions. + */ + private final DnsSupport dnsSupport; + + /** + * Timeout for DNS call. + */ + private final int timeout; + + /** + * Time unit for timeout. + */ + private final TimeUnit timeUnit; + + /** + * New DnsServiceDiscoveryProvider. + * @param dnsSupport dnsSupport + * @param timeout timeout + * @param timeUnit timeUnit + */ + public DnsServiceDiscoveryProvider(final DnsSupport dnsSupport, + final int timeout, + final TimeUnit timeUnit) { + this.dnsSupport = dnsSupport; + this.timeout = timeout; + this.timeUnit = timeUnit; + } + + /** + * Load the services. + * @param serviceName serviceName + * @return list of EndpointDefinition + */ + @Override + public List loadServices(final String serviceName) { + + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + final AtomicReference> endPointsRef = new AtomicReference<>(); + + final AtomicReference exceptionAtomicReference = new AtomicReference<>(); + + dnsSupport.loadServiceEndpointsByServiceName(CallbackBuilder.callbackBuilder() + .setCallbackReturnsList(EndpointDefinition.class, + endpointDefinitions -> + { + + endPointsRef.set(endpointDefinitions); + countDownLatch.countDown(); + + }).setOnError(exceptionAtomicReference::set) + .build(), serviceName); + + try { + countDownLatch.await(timeout, timeUnit); + } catch (InterruptedException e) { + throw new IllegalStateException("DNS Timeout", e); + } + if (exceptionAtomicReference.get()!=null) { + throw new IllegalStateException("Unable to read from DNS", exceptionAtomicReference.get()); + } else { + return endPointsRef.get(); + } + + } + +} diff --git a/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProviderBuilder.java b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProviderBuilder.java new file mode 100644 index 00000000..12036236 --- /dev/null +++ b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsServiceDiscoveryProviderBuilder.java @@ -0,0 +1,51 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.advantageous.boon.core.Sys; + +import java.util.concurrent.TimeUnit; + +/** + * Builds a DnsServiceProvider. + */ +public class DnsServiceDiscoveryProviderBuilder { + + private DnsSupport dnsSupport; + private int timeout = Sys.sysProp(DnsServiceDiscoveryProviderBuilder.class.getName() + ".timeout", 30); + private TimeUnit timeUnit = TimeUnit.SECONDS; + + public DnsSupport getDnsSupport() { + return dnsSupport; + } + + public DnsServiceDiscoveryProviderBuilder setDnsSupport(DnsSupport dnsSupport) { + this.dnsSupport = dnsSupport; + return this; + } + + public int getTimeout() { + return timeout; + } + + public DnsServiceDiscoveryProviderBuilder setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public TimeUnit getTimeUnit() { + return timeUnit; + } + + public DnsServiceDiscoveryProviderBuilder setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + return this; + } + + public DnsServiceDiscoveryProvider build() { + return new DnsServiceDiscoveryProvider(getDnsSupport(), getTimeout(), getTimeUnit()); + } + + + public static DnsServiceDiscoveryProviderBuilder dnsServiceDiscoveryProviderBuilder() { + return new DnsServiceDiscoveryProviderBuilder(); + } +} diff --git a/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupport.java b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupport.java new file mode 100644 index 00000000..65aa4631 --- /dev/null +++ b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupport.java @@ -0,0 +1,147 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.advantageous.qbit.reactive.Callback; +import io.advantageous.qbit.service.discovery.EndpointDefinition; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.SrvRecord; + +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.Collectors; + + +/** + * DNS Support for service discovery. + * This looks up DNS entries for a given domain name. + * + * It has two main methods. + * + * One method allow you to look up things by URL e.g., db.skydns.local. . + * + * The other method allows you to look things up by QBit service name e.g., dbService. + */ +public class DnsSupport { + + + /** + * Holds mappings from DNS names to service names. + */ + private final Map dnsServiceNameToServiceName; + + /** + * Holds mappings from service names to dns names. + */ + private final Map serviceNameToDNSName; + + /** + * Holds a postfixURL to hold what URL comes after the service name. + * + * Example: db.skydns.local. + * In the above db is the service and skydns.local. is the postfix URL. + * + * The postfixURL equates to the name in the SRV DNS record. + */ + private final String postfixURL; + + + /** + * Class that knows how to create an instance of DnsClient. + */ + private final Supplier dnsClientProvider; + + /** + * + * @param dnsClientProvider dnsClientProvider + * @param dnsServiceNameToServiceName dnsServiceNameToServiceName + * @param postFixURL postFixURL + */ + public DnsSupport(final Supplier dnsClientProvider, + final Map dnsServiceNameToServiceName, + final String postFixURL) { + + this.dnsClientProvider = dnsClientProvider; + this.postfixURL = postFixURL; + this.dnsServiceNameToServiceName = dnsServiceNameToServiceName; + this.serviceNameToDNSName = new HashMap<>(dnsServiceNameToServiceName.size()); + + /* + * Build serviceNameToDNSName by reversing the dnsServiceNameToServiceName mappings. + */ + dnsServiceNameToServiceName.entrySet().forEach(entry -> serviceNameToDNSName.put(entry.getValue(), entry.getKey())); + } + + + /** + * Looks up a service name based on its dns service name. The service part of the SRV DNS Record. + * @param dnsServiceName dnsServiceName + * @return serviceName + */ + public String findServiceName(final String dnsServiceName) { + final String serviceName = dnsServiceNameToServiceName.get(dnsServiceName); + return serviceName == null ? dnsServiceName : serviceName; + } + + /** + * Looks up a dns service name (SRV DNS RECORD). + * @param serviceName serviceName + * @return DNS service name (server field + name of SRV DNS Record). + */ + public String findDndServiceName(final String serviceName) { + final String dnsServiceName = serviceNameToDNSName.get(serviceName); + return (dnsServiceName == null ? serviceName : dnsServiceName) + postfixURL; + } + + + /** + * Load the service nodes based on the internal service name. + * DB, Ingester, RadarAggregator, etc. + * @param callback callback + * @param serviceName serviceName + */ + public void loadServiceEndpointsByServiceName(final Callback> callback, + final String serviceName) { + + loadServiceEndpointsByDNSService(callback, findDndServiceName(serviceName)); + } + + /** + * Load the services nodes by its "${SRV.service}${SRV.name}". + * @param callback callback + * @param serviceURL serviceURL + */ + public void loadServiceEndpointsByDNSService(final Callback> callback, + final String serviceURL) { + final DnsClient dnsClient = dnsClientProvider.get(); + dnsClient.resolveSRV(serviceURL, event -> + { + if (event.succeeded()) { + callback.returnThis(convertEndpoints(event.result())); + } else { + callback.onError(event.cause()); + } + } + ); + } + + /** + * Converts list of SrvRecord(s) to list of EndpointDefinition(s). + * @param results of SrvRecord to convert to EndpointDefinition(s) + * @return list of EndpointDefinition + */ + private List convertEndpoints(final List results) { + return results.stream().map(this::convertSrvRecordToEndpointDefinition + ).collect(Collectors.toList()); + } + + /** + * Convert a single srvRecord into an EndpointDefinition. + * @param srvRecord srvRecord + * @return EndpointDefinition from srvRecord + */ + private EndpointDefinition convertSrvRecordToEndpointDefinition(final SrvRecord srvRecord) { + return new EndpointDefinition(findServiceName(srvRecord.service()), srvRecord.target(), + srvRecord.port()); + } + + +} diff --git a/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupportBuilder.java b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupportBuilder.java new file mode 100644 index 00000000..100bf448 --- /dev/null +++ b/qbit/service-discovery/src/main/java/io/advantageous/qbit/service/discovery/dns/DnsSupportBuilder.java @@ -0,0 +1,99 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.vertx.core.Vertx; +import io.vertx.core.dns.DnsClient; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.function.Supplier; + +/** + * DnsSupportBuilder knows how to build a DnsSupport class. + */ +public class DnsSupportBuilder { + + private final Vertx vertx; + private int port = -1; + private String host = null; + private Supplier dnsClientProvider; + private Map dnsServiceNameToServiceName; + private String postfixURL; + + public DnsSupportBuilder(Vertx vertx) { + this.vertx = vertx; + } + + + public String getPostfixURL() { + return postfixURL; + } + + public DnsSupportBuilder setPostfixURL(String postfixURL) { + this.postfixURL = postfixURL; + return this; + } + + public Vertx getVertx() { + return vertx; + } + + public int getPort() { + return port; + } + + public DnsSupportBuilder setPort(int port) { + this.port = port; + return this; + } + + public String getHost() { + return host; + } + + public DnsSupportBuilder setHost(String host) { + this.host = host; + return this; + } + + public Supplier getDnsClientProvider() { + if (dnsClientProvider == null) { + dnsClientProvider = new DnsClientProvider(getVertx(), getHost(), getPort()); + } + return dnsClientProvider; + } + + public DnsSupportBuilder setDnsClientProvider(Supplier dnsClientProvider) { + this.dnsClientProvider = dnsClientProvider; + return this; + } + + public Map getDnsServiceNameToServiceName() { + if (dnsServiceNameToServiceName == null) { + dnsServiceNameToServiceName = new LinkedHashMap<>(); + } + return dnsServiceNameToServiceName; + } + + + public DnsSupportBuilder addDnsServerToServiceNameMapping( + final String dnsName, final String serviceName) { + getDnsServiceNameToServiceName().put(dnsName, serviceName); + return this; + } + + + public DnsSupportBuilder setDnsServiceNameToServiceName(Map dnsServiceNameToServiceName) { + this.dnsServiceNameToServiceName = dnsServiceNameToServiceName; + return this; + } + + public DnsSupport build() { + return new DnsSupport(getDnsClientProvider(), + getDnsServiceNameToServiceName(), + getPostfixURL()); + } + + public static DnsSupportBuilder dnsSupportFactory(final Vertx vertx) { + return new DnsSupportBuilder(vertx); + } +} diff --git a/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/DnsSupportTest.java b/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/DnsSupportTest.java new file mode 100644 index 00000000..6b234d60 --- /dev/null +++ b/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/DnsSupportTest.java @@ -0,0 +1,238 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.advantageous.boon.core.Lists; +import io.advantageous.boon.core.Maps; +import io.advantageous.qbit.reactive.CallbackBuilder; +import io.advantageous.qbit.service.discovery.EndpointDefinition; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.MxRecord; +import io.vertx.core.dns.SrvRecord; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.*; + +public class DnsSupportTest { + + public static TestSrvRecord srv(String service, String name, String target) { + return new TestSrvRecord(service, name, "", 1, 1, target, 1); + } + + + public static class TestSrvRecord implements SrvRecord { + + + private final String service; + private final String name; + private final String protocol; + + private final int priority; + private final int weight; + + private final String target; + private final int port; + + public TestSrvRecord(final String service, final String name, final String protocol, + final int priority, final int weight, final String target, final int port) { + this.service = service; + this.name = name; + this.protocol = protocol; + this.priority = priority; + this.weight = weight; + this.target = target; + this.port = port; + } + + + @Override + public int priority() { + return priority; + } + + @Override + public int weight() { + return weight; + } + + @Override + public int port() { + return port; + } + + @Override + public String name() { + return name; + } + + @Override + public String protocol() { + return protocol; + } + + @Override + public String service() { + return service; + } + + @Override + public String target() { + return target; + } + } + + @Test + public void testLoadServicesByServiceName() throws Exception { + + /** DNS data. */ + final Map> dnsData = Maps.map( + "db.skydns.local." , Lists.list( + srv("db", "skydns.local", "server1.db.skydns.local"), + srv("db", "skydns.local", "server2.db.skydns.local"), + srv("db", "skydns.local", "server3.db.skydns.local") + ) + ); + + final DnsClient client = new DnsClient() { + @Override + public DnsClient lookup(String name, Handler> handler) { + return null; + } + + @Override + public DnsClient lookup4(String name, Handler> handler) { + return null; + } + + @Override + public DnsClient lookup6(String name, Handler> handler) { + return null; + } + + @Override + public DnsClient resolveA(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolveAAAA(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolveCNAME(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolveMX(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolveTXT(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolvePTR(String name, Handler> handler) { + return null; + } + + @Override + public DnsClient resolveNS(String name, Handler>> handler) { + return null; + } + + @Override + public DnsClient resolveSRV(final String name, final Handler>> handler) { + AsyncResult> result = new AsyncResult>(){ + @Override + public List result() { + return dnsData.get(name); + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean succeeded() { + return true; + } + + @Override + public boolean failed() { + return false; + } + }; + + handler.handle(result); + return this; + } + + @Override + public DnsClient reverseLookup(String ipaddress, Handler> handler) { + return null; + } + }; + + + + final DnsSupport dnsSupport = DnsSupportBuilder + .dnsSupportFactory(null) + .setDnsClientProvider(() -> client) + .addDnsServerToServiceNameMapping("db", "dbService") + .setPostfixURL(".skydns.local.") + .build(); + + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + final AtomicReference> endPointsRef = new AtomicReference<>(); + + dnsSupport.loadServiceEndpointsByServiceName( + CallbackBuilder.callbackBuilder().setCallbackReturnsList(EndpointDefinition.class, + endpointDefinitions -> + { + + endPointsRef.set(endpointDefinitions); + countDownLatch.countDown(); + + endpointDefinitions.forEach(endpointDefinition -> + System.out.printf("%s %s %s \n", endpointDefinition.getPort(), + endpointDefinition.getHost(), + endpointDefinition.getId())); + }).setOnError(Throwable::printStackTrace) + .build(), "dbService"); + + final List endpointDefinitionList = endPointsRef.get(); + + final Map map = Maps.toMap(String.class, "host", endpointDefinitionList); + + assertNotNull(map.get("server1.db.skydns.local")); + assertNotNull(map.get("server2.db.skydns.local")); + assertNotNull(map.get("server3.db.skydns.local")); + assertEquals("dbService-1-server1-db-skydns-local", map.get("server1.db.skydns.local").getId()); + + + final DnsServiceDiscoveryProvider serviceDiscoveryProvider = DnsServiceDiscoveryProviderBuilder.dnsServiceDiscoveryProviderBuilder().setDnsSupport(dnsSupport).build(); + + final List endpointDefinitionList2 = serviceDiscoveryProvider.loadServices("dbService"); + + final Map map2 = Maps.toMap(String.class, "host", endpointDefinitionList2); + + assertNotNull(map2.get("server1.db.skydns.local")); + assertNotNull(map2.get("server2.db.skydns.local")); + assertNotNull(map2.get("server3.db.skydns.local")); + assertEquals("dbService-1-server1-db-skydns-local", map2.get("server1.db.skydns.local").getId()); + + + } +} \ No newline at end of file diff --git a/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/IntegrationTestWithSkyDNS.java b/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/IntegrationTestWithSkyDNS.java new file mode 100644 index 00000000..bed9e66a --- /dev/null +++ b/qbit/service-discovery/src/test/java/io/advantageous/qbit/service/discovery/dns/IntegrationTestWithSkyDNS.java @@ -0,0 +1,28 @@ +package io.advantageous.qbit.service.discovery.dns; + +import io.advantageous.boon.core.Maps; +import io.advantageous.qbit.reactive.CallbackBuilder; +import io.advantageous.qbit.service.discovery.EndpointDefinition; +import io.vertx.core.Vertx; + +public class IntegrationTestWithSkyDNS { + + + public static void main(String... args) { + final Vertx vertx = Vertx.vertx(); + final DnsSupport dnsSupport = new DnsSupport(new DnsClientProvider(vertx, "localhost", 5354), + Maps.map("db", "dbService"), ".skydns.local"); + + dnsSupport.loadServiceEndpointsByServiceName( + CallbackBuilder.callbackBuilder().setCallbackReturnsList(EndpointDefinition.class, + endpointDefinitions -> + { + endpointDefinitions.forEach(endpointDefinition -> + System.out.printf("%s %s %s \n", endpointDefinition.getPort(), + endpointDefinition.getHost(), + endpointDefinition.getId())); + }).setOnError(Throwable::printStackTrace) + .build(), "dbService"); + + } +} diff --git a/settings.gradle b/settings.gradle index 8969b22d..6cc30a63 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,7 +19,7 @@ rootProject.name = 'qbit-bundle' include 'qbit' include 'qbit:boon', 'qbit:core', 'qbit:test-support', 'qbit:spring' -include 'qbit:servlet', 'qbit:vertx', 'qbit:scala' +include 'qbit:servlet', 'qbit:vertx', 'qbit:scala', "qbit:service-discovery" include 'qbit:consul-client', 'qbit:eventbus-replicator', 'qbit:admin' include 'examples:standalone'