diff --git a/Longhelp.md b/Longhelp.md index c762f6f7..4bc70dd5 100644 --- a/Longhelp.md +++ b/Longhelp.md @@ -25,6 +25,8 @@ usage: marathon_lb.py [-h] [--longhelp] [--marathon MARATHON [MARATHON ...]] [--lru-cache-capacity LRU_CACHE_CAPACITY] [--dont-bind-http-https] [--ssl-certs SSL_CERTS] [--skip-validation] [--dry] + [--min-serv-port-ip-per-task MIN_SERV_PORT_IP_PER_TASK] + [--max-serv-port-ip-per-task MAX_SERV_PORT_IP_PER_TASK] [--syslog-socket SYSLOG_SOCKET] [--log-format LOG_FORMAT] [--marathon-auth-credential-file MARATHON_AUTH_CREDENTIAL_FILE] @@ -73,9 +75,15 @@ optional arguments: /etc/ssl/mesosphere.com.pem) --skip-validation Skip haproxy config file validation (default: False) --dry, -d Only print configuration to console (default: False) + --min-serv-port-ip-per-task MIN_SERV_PORT_IP_PER_TASK + Minimum port number to use when auto-assigning service + ports for IP-per-task applications (default: 10050) + --max-serv-port-ip-per-task MAX_SERV_PORT_IP_PER_TASK + Maximum port number to use when auto-assigning service + ports for IP-per-task applications (default: 10100) --syslog-socket SYSLOG_SOCKET Socket to write syslog messages to. Use '/dev/null' to - disable logging to syslog (default: /var/run/syslog) + disable logging to syslog (default: /dev/log) --log-format LOG_FORMAT Set log message format (default: %(name)s: %(message)s) diff --git a/README.md b/README.md index b4e06066..3cb724c6 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,17 @@ the [templates section](Longhelp.md#templates) You can access the HAProxy statistics via `:9090/haproxy?stats`, and you can retrieve the current HAProxy config from the `:9090/_haproxy_getconfig` endpoint. +Marathon LB supports load balancing for applications that use the Mesos IP-per-task +feature, whereby each task is assigned unique, accessible, IP addresses. For these +tasks services are directly accessible via the configured discovery ports and there +is no host port mapping. Note, that due to limitations with Marathon (see +[mesosphere/marathon#3636](https://github.com/mesosphere/marathon/issues/3636)) +configured service ports are not exposed to Marathon LB for IP-per-task apps. +For these apps, where the service ports are missing from the Marathon app data, +Marathon LB will automatically assign port values from a configurable range. The range +is configured using the `--min-serv-port-ip-per-task` and `--max-serv-port-ip-per-task` +options. + ## Deployment The package is currently available [from the universe](https://github.com/mesosphere/universe). To deploy marathon-lb on the public slaves in your DCOS cluster, diff --git a/marathon_lb.py b/marathon_lb.py index a192114d..2511b400 100755 --- a/marathon_lb.py +++ b/marathon_lb.py @@ -29,6 +29,7 @@ from common import * from config import * from lrucache import * +from utils import * import argparse import json @@ -41,7 +42,6 @@ import shlex import subprocess import sys -import socket import time import dateutil.parser import threading @@ -50,6 +50,7 @@ import hashlib logger = logging.getLogger('marathon_lb') +SERVICE_PORT_ASSIGNER = ServicePortAssigner() class MarathonBackend(object): @@ -262,22 +263,6 @@ def has_group(groups, app_groups): return False -ip_cache = dict() - - -def resolve_ip(host): - cached_ip = ip_cache.get(host, None) - if cached_ip: - return cached_ip - else: - try: - logger.debug("trying to resolve ip address for host %s", host) - ip = socket.gethostbyname(host) - ip_cache[host] = ip - return ip - except socket.gaierror: - return None - def config(apps, groups, bind_http_https, ssl_certs, templater): logger.info("generating config") @@ -946,55 +931,6 @@ def get_health_check(app, portIndex): healthCheckResultCache = LRUCache() -def is_ip_per_task(app): - """ - Return whether the application is using IP-per-task. - :param app: The application to check. - :return: True if using IP per task, False otherwise. - """ - return app.get('ipAddress') is not None - - -def get_task_ip_and_ports(app, task): - """ - Return the IP address and list of ports used to access a task. For a - task using IP-per-task, this is the IP address of the task, and the ports - exposed by the task services. Otherwise, this is the IP address of the - host and the ports exposed by the host. - :param app: The application owning the task. - :param task: The task. - :return: Tuple of (ip address, [ports]). Returns (None, None) if no IP - address could be resolved or found for the task. - """ - # If the app ipAddress field is present and not None then this app is using - # IP per task. The ipAddress may be an empty dictionary though, in which - # case there are no discovery ports. At the moment, Mesos only supports a - # single IP address, so just take the first IP in the list. - if is_ip_per_task(app): - logger.debug("Using IP per container") - task_ip_addresses = task.get('ipAddresses') - if not task_ip_addresses: - logger.warning("Task %s does not yet have an ip address allocated", - task['id']) - return None, None - task_ip = task_ip_addresses[0]['ipAddress'] - - discovery = app['ipAddress'].get('discovery', {}) - task_ports = [int(port['number']) - for port in discovery.get('ports', [])] - else: - logger.debug("Using host port mapping") - task_ports = task.get('ports', []) - task_ip = resolve_ip(task['host']) - if not task_ip: - logger.warning("Could not resolve ip for host %s, ignoring", - task['host']) - return None, None - - logger.debug("Returning: %r, %r", task_ip, task_ports) - return task_ip, task_ports - - def get_apps(marathon): apps = marathon.list() logger.debug("got apps %s", [app["id"] for app in apps]) @@ -1084,6 +1020,13 @@ def get_apps(marathon): processed_apps.extend(deployment_groups.values()) + # Reset the service port assigner. This forces the port assigner to + # re-assign ports for IP-per-task applications. The upshot is that + # the service port for a particular app may change dynamically, but + # the service port will be deterministic and identical across all + # instances of the marathon-lb. + SERVICE_PORT_ASSIGNER.reset() + for app in processed_apps: appId = app['id'] if appId[1:] == os.environ.get("FRAMEWORK_NAME"): @@ -1096,9 +1039,8 @@ def get_apps(marathon): marathon_app.app['labels']['HAPROXY_GROUP'].split(',') marathon_apps.append(marathon_app) - service_ports = app['ports'] - for i in range(len(service_ports)): - servicePort = service_ports[i] + service_ports = SERVICE_PORT_ASSIGNER.get_service_ports(app) + for i, servicePort in enumerate(service_ports): service = MarathonService( appId, servicePort, get_health_check(app, i)) @@ -1305,6 +1247,14 @@ def get_arg_parser(): parser.add_argument("--dry", "-d", help="Only print configuration to console", action="store_true") + parser.add_argument("--min-serv-port-ip-per-task", + help="Minimum port number to use when auto-assigning " + "service ports for IP-per-task applications", + type=int, default=10050) + parser.add_argument("--max-serv-port-ip-per-task", + help="Maximum port number to use when auto-assigning " + "service ports for IP-per-task applications", + type=int, default=10100) parser = set_logging_args(parser) parser = set_marathon_auth_args(parser) return parser @@ -1397,10 +1347,21 @@ def process_sse_events(marathon, config_file, groups, if args.sse and args.listening: arg_parser.error( 'cannot use --listening and --sse at the same time') + if bool(args.min_serv_port_ip_per_task) != \ + bool(args.max_serv_port_ip_per_task): + arg_parser.error( + 'either specify both --min-serv-port-ip-per-task ' + 'and --max-serv-port-ip-per-task or neither (set both to zero ' + 'to disable auto assignment)') if len(args.group) == 0: arg_parser.error('argument --group is required: please' + 'specify at least one group name') + # Configure the service port assigner if min/max ports have been specified. + if args.min_serv_port_ip_per_task and args.max_serv_port_ip_per_task: + SERVICE_PORT_ASSIGNER.set_ports(int(args.min_serv_port_ip_per_task), + int(args.max_serv_port_ip_per_task)) + # Set request retries s = requests.Session() a = requests.adapters.HTTPAdapter(max_retries=3) diff --git a/tests/test_marathon_lb.py b/tests/test_marathon_lb.py index 051af943..1b2b9620 100644 --- a/tests/test_marathon_lb.py +++ b/tests/test_marathon_lb.py @@ -1,6 +1,5 @@ import unittest import json -from mock import patch import marathon_lb @@ -1684,66 +1683,3 @@ def test_config_simple_app_sticky(self): server agent1_1_1_1_1_1024 1.1.1.1:1024 check cookie d6ad48c81f ''' self.assertMultiLineEqual(config, expected) - - def test_get_task_ip_and_ports_ip_per_task(self): - app = { - "ipAddress": { - "discovery": { - "ports": [{"number": 123}, {"number": 234}] - } - }, - } - task = { - "id": "testtaskid", - "ipAddresses": [{"ipAddress": "1.2.3.4"}] - } - - result = marathon_lb.get_task_ip_and_ports(app, task) - expected = ("1.2.3.4", [123, 234]) - - self.assertEquals(result, expected) - - def test_get_task_ip_and_ports_ip_per_task_no_ip(self): - app = { - "ipAddress": { - "discovery": { - "ports": [{"number": 123}, {"number": 234}] - } - }, - } - task = { - "id": "testtaskid" - } - - result = marathon_lb.get_task_ip_and_ports(app, task) - expected = (None, None) - - self.assertEquals(result, expected) - - def test_get_task_ip_and_ports_port_map(self): - app = {} - task = { - "id": "testtaskid", - "ports": [234, 345, 567], - "host": "agent1" - } - - with patch("marathon_lb.resolve_ip", return_value="1.2.3.4"): - result = marathon_lb.get_task_ip_and_ports(app, task) - expected = ("1.2.3.4", [234, 345, 567]) - - self.assertEquals(result, expected) - - def test_get_task_ip_and_ports_port_map_no_ip(self): - app = {} - task = { - "id": "testtaskid", - "ports": [234, 345, 567], - "host": "agent1" - } - - with patch("marathon_lb.resolve_ip", return_value=None): - result = marathon_lb.get_task_ip_and_ports(app, task) - expected = (None, None) - - self.assertEquals(result, expected) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..69513b14 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,189 @@ +import unittest +import utils +from mock import Mock, patch +from utils import ServicePortAssigner + + +class TestUtils(unittest.TestCase): + + def test_get_task_ip_and_ports_ip_per_task(self): + app = { + "ipAddress": { + "discovery": { + "ports": [{"number": 123}, {"number": 234}] + } + }, + } + task = { + "id": "testtaskid", + "ipAddresses": [{"ipAddress": "1.2.3.4"}] + } + + result = utils.get_task_ip_and_ports(app, task) + expected = ("1.2.3.4", [123, 234]) + + self.assertEquals(result, expected) + + def test_get_task_ip_and_ports_ip_per_task_no_ip(self): + app = { + "ipAddress": { + "discovery": { + "ports": [{"number": 123}, {"number": 234}] + } + }, + } + task = { + "id": "testtaskid" + } + + result = utils.get_task_ip_and_ports(app, task) + expected = (None, None) + + self.assertEquals(result, expected) + + def test_get_task_ip_and_ports_port_map(self): + app = {} + task = { + "id": "testtaskid", + "ports": [234, 345, 567], + "host": "agent1" + } + + with patch("utils.resolve_ip", return_value="1.2.3.4"): + result = utils.get_task_ip_and_ports(app, task) + expected = ("1.2.3.4", [234, 345, 567]) + + self.assertEquals(result, expected) + + def test_get_task_ip_and_ports_port_map_no_ip(self): + app = {} + task = { + "id": "testtaskid", + "ports": [234, 345, 567], + "host": "agent1" + } + + with patch("utils.resolve_ip", return_value=None): + result = utils.get_task_ip_and_ports(app, task) + expected = (None, None) + + self.assertEquals(result, expected) + + +class TestServicePortAssigner(unittest.TestCase): + + def setUp(self): + self.assigner = ServicePortAssigner() + self.assigner.set_ports(10000, 10020) + + def test_not_ip_per_task(self): + """ + Test a non-IP-per-task app returns the service ports defined in the + app data. + """ + app = _get_app(ip_per_task=False, inc_service_ports=True) + self.assertEquals(self.assigner.get_service_ports(app), + [100, 101, 102]) + + def test_ip_per_task_with_ports(self): + """ + Test an IP-per-task app returns the service ports defined in the + app data. + """ + app = _get_app(ip_per_task=True, inc_service_ports=True) + self.assertEquals(self.assigner.get_service_ports(app), + [100, 101, 102]) + + def test_ip_per_task_no_clash(self): + """ + Check that the same ports are assigned are assigned for task-per-IP + apps and are based on the number of host ports but not the actual + ports themselves. + """ + # When assigning a single port for apps with index 1 and 2 there are + # no clashes. + app1 = _get_app(idx=1, num_ports=1, num_tasks=1) + app2 = _get_app(idx=2, num_ports=1, num_tasks=1) + + # Store the ports assigned for app1 and app2 + ports1 = self.assigner.get_service_ports(app1) + ports2 = self.assigner.get_service_ports(app2) + + # Check we get returned the same ports. + self.assertEquals(ports2, self.assigner.get_service_ports(app2)) + self.assertEquals(ports1, self.assigner.get_service_ports(app1)) + + # Now reset the assigner, and assign in a different order. Check the + # ports are still the same. + self.assigner.reset() + self.assertEquals(ports2, self.assigner.get_service_ports(app2)) + self.assertEquals(ports1, self.assigner.get_service_ports(app1)) + + def test_ip_per_task_clash(self): + """ + Check that the same ports will not be assigned if there are clashes + and we assign in a different order. + """ + # When assigning 5 ports for apps with index 1 and 3 there are + # clashes. + app1 = _get_app(idx=1, num_ports=5, num_tasks=1) + app2 = _get_app(idx=3, num_ports=5, num_tasks=1) + + # Store the ports assigned for app1 and app2 + ports1 = self.assigner.get_service_ports(app1) + ports2 = self.assigner.get_service_ports(app2) + + # Check we get returned the same ports. + self.assertEquals(ports2, self.assigner.get_service_ports(app2)) + self.assertEquals(ports1, self.assigner.get_service_ports(app1)) + + # Now reset the assigner, and assign in a different order. Check the + # ports are not the same. + self.assigner.reset() + self.assertNotEquals(ports2, self.assigner.get_service_ports(app2)) + self.assertNotEquals(ports1, self.assigner.get_service_ports(app1)) + + def test_ip_per_task_max_clash(self): + """ + Check that ports are assigned by linear scan when we max out the + clashes. + """ + app = _get_app(idx=1, num_ports=10, num_tasks=1) + + # Mock out the hashlib functions so that all hashes return 0. + sha1 = Mock() + sha1.hexdigest.return_value = "0" * 64 + with patch("hashlib.sha1", return_value=sha1): + ports = self.assigner.get_service_ports(app) + self.assertEquals(ports, list(range(10000, 10010))) + + +def _get_app(idx=1, num_ports=3, num_tasks=1, ip_per_task=True, + inc_service_ports=False): + app = { + "id": "app-%d" % idx + } + if inc_service_ports: + app["ports"] = list(range(100, 100 + num_ports)) + else: + app["ports"] = [] + + if ip_per_task: + app["ipAddress"] = { + "discovery": { + "ports": [ + {"number": port} for port in range(500, 500 + num_ports) + ] + } + } + + app["tasks"] = [_get_task(idx*10, num_ports) for idx in range(num_tasks)] + + return app + + +def _get_task(idx, num_ports): + return { + "id": "task-%d" % idx, + "ipAddresses": [{"ipAddress": "1.2.3.4"}] + } diff --git a/utils.py b/utils.py new file mode 100644 index 00000000..06c98da6 --- /dev/null +++ b/utils.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 + +import hashlib +import struct +import logging +import socket + +logger = logging.getLogger('utils') + +# The maximum number of clashes to allow when assigning a port. +MAX_CLASHES = 50 + + +class ServicePortAssigner(object): + """ + Helper class to assign service ports. + + Ordinarily Marathon should assign the service ports, but Marathon issue + https://github.com/mesosphere/marathon/issues/3636 means that service + ports are not returned for applications using IP-per-task. We work around + that here by assigning deterministic ports from a configurable range when + required. + + Note that auto-assigning ports is only useful when using vhost: the ports + that we assign here are not exposed to the client. + + The LB command line options --min-serv-port-ip-per-task and + --max-serv-port-ip-per-task specify the allowed range of ports to + auto-assign from. The range of ports used for auto-assignment should be + selected to ensure no clashes with the exposed LB ports and the + Marathon-assigned services ports. + + The service port assigner provides a mechanism to auto assign service ports + using the application name to generate service port (while preventing + clashes when the port is already claimed by another app). The assigner + provides a deterministic set of ports for a given ordered set of port + requests. + """ + def __init__(self): + self.min_port = None + self.max_port = None + self.max_ports = None + self.can_assign = False + self.next_port = None + self.ports_by_app = {} + + def _assign_new_service_port(self, app, task_port): + assert self.can_assign + + remaining_ports = self.max_ports - len(self.ports_by_app) + assert remaining_ports, "Service ports are exhausted" + + # We don't want to be searching forever, so limit the number of times + # we clash to the number of remaining ports. + ports = self.ports_by_app.values() + port = None + for i in range(MAX_CLASHES): + hash_str = "%s-%s-%s" % (app['id'], task_port, i) + hash_val = hashlib.sha1(hash_str.encode("utf-8")).hexdigest() + hash_int = int(hash_val[:8], 16) + trial_port = self.min_port + (hash_int % self.max_ports) + if trial_port not in ports: + port = trial_port + break + if port is None: + for port in range(self.min_port, self.max_port + 1): + if port not in ports: + break + + # We must have assigned a unique port by now since we know there were + # some available. + assert port and port not in ports + + logger.debug("Assigned new port: %d", port) + return port + + def _get_service_port(self, app, task_port): + key = (app['id'], task_port) + port = (self.ports_by_app.get(key) or + self._assign_new_service_port(app, task_port)) + self.ports_by_app[key] = port + return port + + def set_ports(self, min_port, max_port): + """ + Set the range of ports that we can use for auto-assignment of + service ports - just for IP-per-task apps. + :param min_port: The minimum port value + :param max_port: The maximum port value + """ + self.min_port = min_port + self.max_port = max_port + self.max_ports = max_port - min_port + 1 + self.can_assign = self.min_port and self.max_port + assert not self.ports_by_app + assert self.max_ports > 1 + + def reset(self): + """ + Reset the assigner so that ports are newly assigned. + :return: + """ + self.ports_by_app = {} + + def get_service_ports(self, app): + ports = app['ports'] + if not ports and is_ip_per_task(app) and self.can_assign: + logger.warning("Auto assigning service port for " + "IP-per-container task") + task = app['tasks'][0] + _, task_ports = get_task_ip_and_ports(app, task) + ports = [self._get_service_port(app, task_port) + for task_port in task_ports] + logger.debug("Service ports: %r", ports) + return ports + + +def resolve_ip(host): + cached_ip = ip_cache.get(host, None) + if cached_ip: + return cached_ip + else: + try: + logger.debug("trying to resolve ip address for host %s", host) + ip = socket.gethostbyname(host) + ip_cache[host] = ip + return ip + except socket.gaierror: + return None +ip_cache = dict() + + +def is_ip_per_task(app): + """ + Return whether the application is using IP-per-task. + :param app: The application to check. + :return: True if using IP per task, False otherwise. + """ + return app.get('ipAddress') is not None + + +def get_task_ip_and_ports(app, task): + """ + Return the IP address and list of ports used to access a task. For a + task using IP-per-task, this is the IP address of the task, and the ports + exposed by the task services. Otherwise, this is the IP address of the + host and the ports exposed by the host. + :param app: The application owning the task. + :param task: The task. + :return: Tuple of (ip address, [ports]). Returns (None, None) if no IP + address could be resolved or found for the task. + """ + # If the app ipAddress field is present and not None then this app is using + # IP per task. The ipAddress may be an empty dictionary though, in which + # case there are no discovery ports. At the moment, Mesos only supports a + # single IP address, so just take the first IP in the list. + if is_ip_per_task(app): + logger.debug("Using IP per container") + task_ip_addresses = task.get('ipAddresses') + if not task_ip_addresses: + logger.warning("Task %s does not yet have an ip address allocated", + task['id']) + return None, None + task_ip = task_ip_addresses[0]['ipAddress'] + + discovery = app['ipAddress'].get('discovery', {}) + task_ports = [int(port['number']) + for port in discovery.get('ports', [])] + else: + logger.debug("Using host port mapping") + task_ports = task.get('ports', []) + task_ip = resolve_ip(task['host']) + if not task_ip: + logger.warning("Could not resolve ip for host %s, ignoring", + task['host']) + return None, None + + logger.debug("Returning: %r, %r", task_ip, task_ports) + return task_ip, task_ports