Skip to content

[Z Design Doc] DNS based service discovery and implementation notes

Richard Hightower edited this page Oct 6, 2015 · 13 revisions

Both SkyDNS/etcd and Consul provide DNS support. QBit has support for discovery via pushed json files (Chef, consul templating, etc), and from interacting directly with Consul.

To implement a ServiceDiscovery provider, one has to implement this interface:

ServiceDiscovery

package io.advantageous.qbit.service.discovery.spi;

import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.qbit.service.discovery.impl.ServiceHealthCheckIn;
import io.advantageous.qbit.util.ConcurrentHashSet;

import java.util.Collections;
import java.util.List;
import java.util.Queue;

/**
 * Service Discovery Provider.
 * created by rhightower on 3/24/15.
 */
public interface ServiceDiscoveryProvider {

    default void registerServices(Queue<EndpointDefinition> registerQueue) {
    }

    default void checkIn(Queue<ServiceHealthCheckIn> checkInsQueue) {
    }

    default List<EndpointDefinition> loadServices(String serviceName) {
        return Collections.emptyList();
    }

    default void unregisterServices(ConcurrentHashSet<EndpointDefinition> endpointDefinitions) {
    }
}

In the case of the file system based approach checkIn, registerServices, and unregisterServices are not used as the services are read from JSON files. In the case of the Consul implementation, checkIn, registerServices and unregisterServices delegate to calling Consul where checkIn is the service checking in with Consul before its Consul TTL runs out.

The SkyDNS model could at first just be a DNS model that is similar to the ServiceDiscoveryFileSystemProvider push JSON file approach, in that it only needs to provide loadServices. Later integration can be done with etcd to listen to changes, and then trigger a re-loadServices.

The ServiceDiscoveryProvider interface is used by the ServiceDiscovery impl which there is only one a this point ServiceDiscoveryImpl. The ServiceDiscoveryImpl takes a primary and a secondary ServiceDiscoveryProvider so if there is outage then one can use this secondary. This is the primary mechnsim it supports to fall back to JSON files using the ServiceDiscoveryFileSystemProvider if for example Consul is down. In this way a Consul outage, does not cause a complete outage. (Consul is reliable but new infrastructure that is using Consul is prone to non optimal Consul setups. This is more learning curve in my experience than any inherent problems with Consul.)

I propose initially to implement a ServiceDiscoveryDNSProvider which will expect DNS entries in the SkyDNS format so to speak.

The ServiceDiscovery interface is as follows:

/**
 * Service Discovery
 * created by rhightower on 3/23/15.
 */
public interface ServiceDiscovery extends Startable, Stoppable {

    static String uniqueString(int port) {
        try {
            return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
        } catch (UnknownHostException e) {
            return port + "-" + UUID.randomUUID().toString();
        }
    }

    default EndpointDefinition register(
            final String serviceName,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port);
    }

    default EndpointDefinition registerWithTTL(
            final String serviceName,
            final int port,
            final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port, timeToLiveSeconds);
    }

    @SuppressWarnings("UnusedReturnValue")
    default EndpointDefinition registerWithIdAndTimeToLive(
            final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port, timeToLiveSeconds);
    }

    default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port);
    }


    void watch(String serviceName);

    default void checkIn(String serviceId, HealthStatus healthStatus) {

    }


    default void checkInOk(String serviceId) {

    }

    default List<EndpointDefinition> loadServices(final String serviceName) {

        return Collections.emptyList();
    }

    default List<EndpointDefinition> loadServicesNow(final String serviceName) {

        return Collections.emptyList();
    }

    default void start() {
    }

    default void stop() {
    }


    default Set<EndpointDefinition> localDefinitions() {
        return Collections.emptySet();
    }
}

Ok rather than explaining what each method does, I added comments.

ServiceDiscovery with comments

package io.advantageous.qbit.service.discovery;

import io.advantageous.qbit.service.Startable;
import io.advantageous.qbit.service.Stoppable;
import io.advantageous.qbit.service.health.HealthStatus;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;

/**
 * Service Discovery
 * created by rhightower on 3/23/15.
 */
public interface ServiceDiscovery extends Startable, Stoppable {


    /**
     * Generates a unique string, used for generating unique service ids
     * @param port port
     * @return unique id incorporating host name if possible.
     */
    static String uniqueString(int port) {
        try {
            return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
        } catch (UnknownHostException e) {
            return port + "-" + UUID.randomUUID().toString();
        }
    }

    /**
     * Register the service with the service discovery service if applicable.
     * @param serviceName serviceName
     * @param port port
     * @return EndpointDefinition
     */
    default EndpointDefinition register(
            final String serviceName,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port);
    }

    /**
     * Register with the service discovery system and specify a TTL so that if
     * the service does not send a checkIn that it is marked down.
     * TTL is time to live.
     * @param serviceName service name
     * @param port port
     * @param timeToLiveSeconds ttl
     * @return EndpointDefinition
     */
    default EndpointDefinition registerWithTTL(
            final String serviceName,
            final int port,
            final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port, timeToLiveSeconds);
    }

    /**
     * Register an end point given an id, and a TTL.
     * This gets used if you want to be specific about what you call the service.
     * @param serviceName service name
     * @param serviceId service id
     * @param port port
     * @param timeToLiveSeconds ttl
     * @return EndpointDefinition
     */
    @SuppressWarnings("UnusedReturnValue")
    default EndpointDefinition registerWithIdAndTimeToLive(
            final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port, timeToLiveSeconds);
    }

    /**
     * Register with id. Specify a unique id that is not autogenerated
     * @param serviceName service name
     * @param serviceId service id
     * @param port port
     * @return EndpointDefinition
     */
    default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port);
    }


    /**
     * Watch for changes in this service name and send change events if the service changes.
     * @param serviceName
     */
    void watch(String serviceName);

    /**
     * CheckIn with the service discovery mechanism. The service may be marked as down if it does
     * not check in, in the amount of time specified by the ttl if the service disovery provider supports
     * ttl and checkin (Consul does).
     * @param serviceId
     * @param healthStatus
     */
    default void checkIn(String serviceId, HealthStatus healthStatus) {

    }


    /** This is like `checkIn` but does an HealthStatus.SUCCESS if applicable.
     *
     * @param serviceId serviceId
     */
    default void checkInOk(String serviceId) {

    }

    /**
     * Load the services.
     *
     * Depending on the underlying implementation the services are either periodically loaded
     * or loaded whenever a change is detected.
     *
     * This version of the method is based on the last event change and / or the last poll of the
     * services from the remote system (i.e., Consul) if applicable.
     *
     * This may also may trigger a remote call, but it will always return right away.
     * @param serviceName service name
     * @return list of EndpointDefinition
     */
    default List<EndpointDefinition> loadServices(final String serviceName) {

        return Collections.emptyList();
    }

    /**
     * See `loadServices` this is like `loadServices` except it forces a remote call.
     * This is a blocking call to loadServices.
     * @param serviceName service name.
     * @return list of EndpointDefinition
     */
    default List<EndpointDefinition> loadServicesNow(final String serviceName) {

        return Collections.emptyList();
    }

    /**
     * Start the service discovery system if applicable.
     */
    default void start() {
    }

    /**
     * Stop the service discovery system if applicable.
     */
    default void stop() {
    }


    /**
     * This just loads the end points that were registered locally.
     * This are the endpoints that this JVM and this ServiceDiscovery is managing.
     * @return set of EndpointDefinitions
     */
    default Set<EndpointDefinition> localDefinitions() {
        return Collections.emptySet();
    }
}

To take this discussion further we have EndpointDefinition and HealthStatus.

EndpointDefinition

package io.advantageous.qbit.service.discovery;

import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.health.HealthStatus;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import static io.advantageous.qbit.service.discovery.ServiceDiscovery.uniqueString;

/**
 * Service Definition
 * Contains a healthStatus, unique id, name, host, port and a timeToLive in seconds.
 * This describes all parts of a service as far as something like a ServiceDiscovery system like
 * [Consul](https://consul.io/) is concerned.
 *
 * The `timeToLive` field is for ttl checkins if the underlying system supports it.
 *
 * The `HealthStatus` represents the current state of this system as returned from the remote
 * service discovery system.
 *
 * created by rhightower on 3/23/15.
 */
public class EndpointDefinition {


    /**
     * Current health status.
     */
    private final HealthStatus healthStatus;

    /**
     * Unique id of the system.
     */
    private final String id;

    /**
     * Name of the service, i.e., EventBus, StatsEngine, etc.
     */
    private final String name;

    /**
     * Host name.
     */
    private final String host;

    /**
     * Port of the service.
     */
    private final int port;

    /**
     * Time to live: how long until this service has to check in with the remote service discovery
     * system if applicable. Whether this is used or needed depends on the underlying service discovery system.
     */
    private final long timeToLive;


    /**
     * Find host
     * @return hostname
     */
    static String findHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new IllegalStateException("unable to find host name");
        }
    }


    /**
     * Create a new one with default TTL of 20 seconds.
     * @param healthStatus healthStatus
     * @param id id
     * @param name name
     * @param host post
     * @param port port
     */
    public EndpointDefinition(
            final HealthStatus healthStatus,
            final String id,
            final String name,
            final String host,
            final int port) {
        this.healthStatus = healthStatus;
        this.id = id;
        this.name = name;
        this.host = host;
        this.port = port;
        this.timeToLive = Sys.sysProp(EndpointDefinition.class.getName()+".timeToLive", 20L);
    }


    /**
     * Create a new one with default TTL of 20 seconds.
     * @param healthStatus healthStatus
     * @param id id
     * @param name name
     * @param host post
     * @param port port
     */
    public EndpointDefinition(
            final HealthStatus healthStatus,
            final String id,
            final String name,
            final String host,
            final int port,
            final long timeToLive) {
        this.healthStatus = healthStatus;
        this.id = id;
        this.name = name;
        this.host = host;
        this.port = port;
        this.timeToLive = timeToLive;
    }

    /**
     * Creates a list of service definitions.
     * @param endpointDefinitions vararg array of service definitions
     * @return list of service definitions
     */
    public static List<EndpointDefinition> serviceDefinitions(final EndpointDefinition... endpointDefinitions) {
        return Lists.list(endpointDefinitions);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(final String name) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(0), name, findHostName(), 0);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name service name
     * @param port port
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(final String name, int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(port), name, findHostName(), 0);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param id id
     * @param name name
     * @param host host
     * @param port port
     * @return EndpointDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String id,
            final String name,
            final String host,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                id, name, host, port);
    }

    /**
     *  Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @param host host
     * @param port port
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String name,
            final String host,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(port), name, host, port);
    }


    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @param host host
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String name,
            final String host
    ) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(0), name, host, 0);
    }



    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param id id
     * @param name name
     * @param host host
     * @return EndpointDefinition
     */
    public static EndpointDefinition serviceDefinitionWithId(
            final String name,
            final String host,
            final String id) {

        return new EndpointDefinition(HealthStatus.PASS,
                id, name, host, 0);
    }

    public HealthStatus getHealthStatus() {
        return healthStatus;
    }

    public String getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    @SuppressWarnings("SimplifiableIfStatement")
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof EndpointDefinition)) return false;

        EndpointDefinition that = (EndpointDefinition) o;

        if (port != that.port) return false;
        if (healthStatus != that.healthStatus) return false;
        if (host != null ? !host.equals(that.host) : that.host != null) return false;
        if (id != null ? !id.equals(that.id) : that.id != null) return false;
        return !(name != null ? !name.equals(that.name) : that.name != null);

    }

    @Override
    public int hashCode() {
        int result = healthStatus != null ? healthStatus.hashCode() : 0;
        result = 31 * result + (id != null ? id.hashCode() : 0);
        result = 31 * result + (name != null ? name.hashCode() : 0);
        result = 31 * result + (host != null ? host.hashCode() : 0);
        result = 31 * result + port;
        return result;
    }

    @Override
    public String toString() {
        return "ServiceDefinition{" +
                "status=" + healthStatus +
                ", id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", host='" + host + '\'' +
                ", port=" + port +
                '}';
    }

    public long getTimeToLive() {

        return timeToLive;
    }
}

etcd and DNS support.

We can reuse the concept of register with etcd, but we don't yet have use for ttl that I know of with etcd. CheckIn would be a no-op. But the first part of this exercise will be to access the DNS style entries of sky-dns that relies on etcd.

Setting up etcd and sky-dns for dev

Follow instructions at https://github.com/skynetservices/skydns.

First we download and install etcd.

etcd is written in Go and uses the Raft consensus algorithm to manage a highly-available replicated log. See etcdctl for a simple command line client. --https://github.com/coreos/etcd

$ mkdir skydns
$ cd skydns

#Download and run etcd
$ curl -L  https://github.com/coreos/etcd/releases/download/v2.2.0/etcd-v2.2.0-darwin-amd64.zip -o etcd-v2.2.0-darwin-amd64.zip
$ unzip etcd-v2.2.0-darwin-amd64.zip
$ cd etcd-v2.2.0-darwin-amd64
$ ./etcd

#Download and run skydns
$ export GOPATH=`pwd`
$ go get github.com/skynetservices/skydns
$ cd $GOPATH/src/github.com/skynetservices/skydns

If you want skydns to point to different servers do this.

export ETCD_MACHINES='http://192.168.0.1:4001,http://192.168.0.2:4001'

To run skydns

$ pwd
/.../skydns/src/github.com/skynetservices/skydns
$ ./skydns -addr="127.0.0.1:5354"

By default it will use port 4001 on localhost.

To setup config, of sky dns, we post to etcd and kv pair.

curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/config \
    -d value='{"dns_addr":"127.0.0.1:5354","ttl":3600, \
       "nameservers": ["8.8.8.8:53","8.8.4.4:53"]}'

Output from setting up sky dns

{"action":"set","node":{"key":"/skydns/config",
"value":"{\"dns_addr\":\"127.0.0.1:5354\",
\"ttl\":3600, \"nameservers\": 
[\"8.8.8.8:53\",\"8.8.4.4:53\"]}",
"modifiedIndex":5,"createdIndex":5}}

SkyDNS Service announcement

Service Announcements: Announce your service by submitting JSON over HTTP to etcd with information about your service. This information will then be available for queries via DNS. We use the directory /skydns to anchor all names. ... Adding the service can thus be done with:

Service Announcement curl call

curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/east/production/rails \
    -d value='{"host":"service5.example.com","priority":20}'

Output

{"action":"set",
"node":{"key":"/skydns/local/skydns/east/production/rails",
"value":"{\"host\":\"service5.example.com\",\"priority\":20}",
"modifiedIndex":6,"createdIndex":6}}

Now let's add some more for testing.

curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/east/production/rails/1 \
    -d value='{"host":"service1.example.com","port":8080}'
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/west/production/rails/2 \
    -d value='{"host":"service2.example.com","port":8080}'
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/east/staging/rails/4 \
    -d value='{"host":"10.0.1.125","port":8080}'
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/east/staging/rails/6 \
    -d value='{"host":"2003::8:1","port":8080}'

Now when we use dig, we can query the dns entries.

Using dig to query sky dns

$ dig @localhost -p 5354 SRV east.skydns.local

; <<>> DiG 9.8.3-P1 <<>> @localhost -p 5354 SRV east.skydns.local
; (2 servers found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 23032
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 3, AUTHORITY: 0, ADDITIONAL: 2

;; QUESTION SECTION:
;east.skydns.local.		IN	SRV

;; ANSWER SECTION:
east.skydns.local.	3600	IN	SRV	20 100 0 service5.example.com.
east.skydns.local.	3600	IN	SRV	10 50 8080 4.rails.staging.east.skydns.local.
east.skydns.local.	3600	IN	SRV	10 50 8080 6.rails.staging.east.skydns.local.

;; ADDITIONAL SECTION:
4.rails.staging.east.skydns.local. 3600	IN A	10.0.1.125
6.rails.staging.east.skydns.local. 3600	IN AAAA	2003::8:1

;; Query time: 111 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 21:55:42 2015
;; MSG SIZE  rcvd: 243

What is dig?

dig (domain information groper) is a flexible tool for interrogating DNS name servers. It performs DNS lookups and displays the answers that are returned from the name server(s) that were queried. Most DNS administrators use dig to troubleshoot DNS problems because of its flexibility, ease of use and clarity of output. Other lookup tools tend to have less functionality than dig. --https://developer.apple.com/library/mac/documentation/Darwin/Reference/ManPages/man1/dig.1.html

Another example:

dig @localhost -p 5354 SRV production.east.skydns.local

; <<>> DiG 9.8.3-P1 <<>> @localhost -p 5354 SRV production.east.skydns.local
; (2 servers found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 50200
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;production.east.skydns.local.	IN	SRV

;; ANSWER SECTION:
production.east.skydns.local. 3600 IN	SRV	20 100 0 service5.example.com.

;; Query time: 109 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:05:05 2015
;; MSG SIZE  rcvd: 86

Reading some common A records.

$ dig @localhost -p 5354 A  apple.com

; <<>> DiG 9.8.3-P1 <<>> @localhost -p 5354 A apple.com
; (2 servers found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 63436
;; flags: qr rd ra; QUERY: 1, ANSWER: 3, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;apple.com.			IN	A

;; ANSWER SECTION:
apple.com.		2136	IN	A	17.172.224.47
apple.com.		2136	IN	A	17.142.160.59
apple.com.		2136	IN	A	17.178.96.59

;; Query time: 28 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:11:56 2015
;; MSG SIZE  rcvd: 75

$ dig @localhost -p 5354 A  google.com

; <<>> DiG 9.8.3-P1 <<>> @localhost -p 5354 A google.com
; (2 servers found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 62950
;; flags: qr rd ra; QUERY: 1, ANSWER: 11, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;google.com.			IN	A

;; ANSWER SECTION:
google.com.		82	IN	A	74.125.239.35
google.com.		82	IN	A	74.125.239.46
google.com.		82	IN	A	74.125.239.37
google.com.		82	IN	A	74.125.239.40
google.com.		82	IN	A	74.125.239.41
google.com.		82	IN	A	74.125.239.36
google.com.		82	IN	A	74.125.239.39
google.com.		82	IN	A	74.125.239.38
google.com.		82	IN	A	74.125.239.32
google.com.		82	IN	A	74.125.239.33
google.com.		82	IN	A	74.125.239.34

;; Query time: 19 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:11:31 2015
;; MSG SIZE  rcvd: 204

Setting up something to read from....

Setup some servers

curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/db/x1 -d \
    value='{"host":"127.0.0.1"}'
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/db/x2 -d \
    value='{"host": "127.0.0.2"'}
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/local/skydns/db/x3 -d \
    value='{"host": "127.0.0.3"'}

Read them.

$ dig @127.0.0.1 -p 5354 db.skydns.local. A


; <<>> DiG 9.8.3-P1 <<>> @127.0.0.1 -p 5354 db.skydns.local. A
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 46079
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 3, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;db.skydns.local.		IN	A

;; ANSWER SECTION:
db.skydns.local.	3600	IN	A	127.0.0.2
db.skydns.local.	3600	IN	A	127.0.0.3
db.skydns.local.	3600	IN	A	127.0.0.1

;; Query time: 0 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:20:06 2015
;; MSG SIZE  rcvd: 81

Stuff I have in...

$ dig @127.0.0.1 -p 5354 skydns.local. A

; <<>> DiG 9.8.3-P1 <<>> @127.0.0.1 -p 5354 skydns.local. A
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 37695
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 9, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;skydns.local.			IN	A

;; ANSWER SECTION:
skydns.local.		3600	IN	A	127.0.0.1
skydns.local.		3600	IN	A	192.0.2.1
skydns.local.		3600	IN	A	192.168.0.1
skydns.local.		3600	IN	A	10.0.2.17
bar.skydns.local.	3600	IN	A	192.168.0.1
skydns.local.		3600	IN	A	127.0.0.3
skydns.local.		3600	IN	CNAME	bar.skydns.local.
skydns.local.		3600	IN	A	10.0.1.125
skydns.local.		3600	IN	A	127.0.0.2

;; Query time: 85 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:22:50 2015
;; MSG SIZE  rcvd: 176

$ dig @127.0.0.1 -p 5354 skydns.local. SRV

; <<>> DiG 9.8.3-P1 <<>> @127.0.0.1 -p 5354 skydns.local. SRV
; (1 server found)
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 50677
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 11, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;skydns.local.			IN	SRV

;; ANSWER SECTION:
skydns.local.		3600	IN	SRV	10 10 8080 service2.example.com.
skydns.local.		3600	IN	SRV	10 10 8080 service1.skydns.local.
skydns.local.		3600	IN	SRV	10 10 0 public.addresses.local.skydns.local.
skydns.local.		3600	IN	SRV	10 10 0 x1.db.skydns.local.
skydns.local.		3600	IN	SRV	10 10 0 x2.db.skydns.local.
skydns.local.		3600	IN	SRV	10 10 0 x3.db.skydns.local.
skydns.local.		3600	IN	SRV	10 10 80 x1.bar.skydns.local.
skydns.local.		3600	IN	SRV	10 10 443 bar.skydns.local.
skydns.local.		3600	IN	SRV	20 100 0 service5.example.com.
skydns.local.		3600	IN	SRV	10 10 8080 4.rails.staging.east.skydns.local.
skydns.local.		3600	IN	SRV	10 10 8080 6.rails.staging.east.skydns.local.

;; Query time: 165 msec
;; SERVER: 127.0.0.1#5354(127.0.0.1)
;; WHEN: Mon Sep 28 22:24:10 2015
;; MSG SIZE  rcvd: 501

First cut

I was able to implement a first cut of this:

https://github.com/advantageous/qbit/pull/420/files

package io.advantageous.qbit.service.discovery.dns;

import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.SrvRecord;

import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;


/**
 * DNS Support for service discovery.
 * This looks up DNS entries for a given domain name.
 *
 * It has two main methods.
 *
 * One method allow you to look up things by URL e.g., db.skydns.local. .
 *
 * The other method allows you to look things up by QBit service name e.g., dbService.
 */
public class DnsSupport {


    /**
     * Holds mappings from DNS names to service names.
     */
    private final Map<String, String> dnsServiceNameToServiceName;

    /**
     * Holds mappings from service names to dns names.
     */
    private final Map<String, String> serviceNameToDNSName;

    /**
     * Holds a postfixURL to hold what URL comes after the service name.
     *
     * Example: db.skydns.local.
     * In the above db is the service and skydns.local. is the postfix URL.
     *
     * The postfixURL equates to the name in the SRV DNS record.
     */
    private final String postfixURL;


    /**
     * Class that knows how to create an instance of DnsClient.
     */
    private final Supplier<DnsClient> dnsClientProvider;

    /**
     *
     * @param dnsClientProvider dnsClientProvider
     * @param dnsServiceNameToServiceName dnsServiceNameToServiceName
     * @param postFixURL postFixURL
     */
    public DnsSupport(final Supplier<DnsClient> dnsClientProvider,
                      final Map<String, String> dnsServiceNameToServiceName,
                      final String postFixURL) {

        this.dnsClientProvider = dnsClientProvider;
        this.postfixURL = postFixURL;
        this.dnsServiceNameToServiceName = dnsServiceNameToServiceName;
        this.serviceNameToDNSName = new HashMap<>(dnsServiceNameToServiceName.size());

        /*
         * Build serviceNameToDNSName by reversing the dnsServiceNameToServiceName mappings.
         */
        dnsServiceNameToServiceName.entrySet().forEach(entry -> serviceNameToDNSName.put(entry.getValue(), entry.getKey()));
    }


    /**
     * Looks up a service name based on its dns service name. The service part of the SRV DNS Record.
     * @param dnsServiceName dnsServiceName
     * @return serviceName
     */
    public String findServiceName(final String dnsServiceName) {
        final String serviceName = dnsServiceNameToServiceName.get(dnsServiceName);
        return serviceName == null ? dnsServiceName : serviceName;
    }

    /**
     * Looks up a dns service name (SRV DNS RECORD).
     * @param serviceName serviceName
     * @return DNS service name (server field + name of SRV DNS Record).
     */
    public String findDndServiceName(final String serviceName) {
        final String dnsServiceName = serviceNameToDNSName.get(serviceName);
        return (dnsServiceName == null ? serviceName : dnsServiceName) + postfixURL;
    }


    /**
     * Load the service nodes based on the internal service name.
     * DB, Ingester, RadarAggregator, etc.
     * @param callback callback
     * @param serviceName serviceName
     */
    public void loadServiceEndpointsByServiceName(final Callback<List<EndpointDefinition>> callback,
                                                  final String serviceName) {

        loadServiceEndpointsByDNSService(callback, findDndServiceName(serviceName));
    }

    /**
     * Load the services nodes by its "${SRV.service}${SRV.name}".
     * @param callback callback
     * @param serviceURL serviceURL
     */
    public void loadServiceEndpointsByDNSService(final Callback<List<EndpointDefinition>> callback,
                                                 final String serviceURL) {
        final DnsClient dnsClient = dnsClientProvider.get();
        dnsClient.resolveSRV(serviceURL, event ->
                {
                    if (event.succeeded()) {
                        callback.returnThis(convertEndpoints(event.result()));
                    } else {
                        callback.onError(event.cause());
                    }
                }
        );
    }

    /**
     * Converts list of SrvRecord(s) to list of EndpointDefinition(s).
     * @param results of SrvRecord to convert to EndpointDefinition(s)
     * @return list of EndpointDefinition
     */
    private List<EndpointDefinition> convertEndpoints(final List<SrvRecord> results) {
        return results.stream().map(this::convertSrvRecordToEndpointDefinition
        ).collect(Collectors.<EndpointDefinition>toList());
    }

    /**
     * Convert a single srvRecord into an EndpointDefinition.
     * @param srvRecord srvRecord
     * @return EndpointDefinition from srvRecord
     */
    private EndpointDefinition convertSrvRecordToEndpointDefinition(final SrvRecord srvRecord) {
        return new EndpointDefinition(findServiceName(srvRecord.service()), srvRecord.target(),
                srvRecord.port());
    }


}

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally