Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Commit

Permalink
Auto-assign service ports for IP-per-container tasks when Marathon do…
Browse files Browse the repository at this point in the history
…es not
  • Loading branch information
Rob Brockbank committed Apr 13, 2016
1 parent d53db7d commit 0e9c66f
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 134 deletions.
10 changes: 9 additions & 1 deletion Longhelp.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ usage: marathon_lb.py [-h] [--longhelp] [--marathon MARATHON [MARATHON ...]]
[--lru-cache-capacity LRU_CACHE_CAPACITY]
[--dont-bind-http-https] [--ssl-certs SSL_CERTS]
[--skip-validation] [--dry]
[--min-serv-port-ip-per-task MIN_SERV_PORT_IP_PER_TASK]
[--max-serv-port-ip-per-task MAX_SERV_PORT_IP_PER_TASK]
[--syslog-socket SYSLOG_SOCKET]
[--log-format LOG_FORMAT]
[--marathon-auth-credential-file MARATHON_AUTH_CREDENTIAL_FILE]
Expand Down Expand Up @@ -73,9 +75,15 @@ optional arguments:
/etc/ssl/mesosphere.com.pem)
--skip-validation Skip haproxy config file validation (default: False)
--dry, -d Only print configuration to console (default: False)
--min-serv-port-ip-per-task MIN_SERV_PORT_IP_PER_TASK
Minimum port number to use when auto-assigning service
ports for IP-per-task applications (default: 10050)
--max-serv-port-ip-per-task MAX_SERV_PORT_IP_PER_TASK
Maximum port number to use when auto-assigning service
ports for IP-per-task applications (default: 10100)
--syslog-socket SYSLOG_SOCKET
Socket to write syslog messages to. Use '/dev/null' to
disable logging to syslog (default: /var/run/syslog)
disable logging to syslog (default: /dev/log)
--log-format LOG_FORMAT
Set log message format (default: %(name)s:
%(message)s)
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ the [templates section](Longhelp.md#templates)
You can access the HAProxy statistics via `:9090/haproxy?stats`, and you can
retrieve the current HAProxy config from the `:9090/_haproxy_getconfig` endpoint.

Marathon LB supports load balancing for applications that use the Mesos IP-per-task
feature, whereby each task is assigned unique, accessible, IP addresses. For these
tasks services are directly accessible via the configured discovery ports and there
is no host port mapping. Note, that due to limitations with Marathon (see
[mesosphere/marathon#3636](https://github.com/mesosphere/marathon/issues/3636))
configured service ports are not exposed to Marathon LB for IP-per-task apps.
For these apps, where the service ports are missing from the Marathon app data,
Marathon LB will automatically assign port values from a configurable range. The range
is configured using the `--min-serv-port-ip-per-task` and `--max-serv-port-ip-per-task`
options.

## Deployment
The package is currently available [from the universe](https://github.com/mesosphere/universe).
To deploy marathon-lb on the public slaves in your DCOS cluster,
Expand Down
99 changes: 30 additions & 69 deletions marathon_lb.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from common import *
from config import *
from lrucache import *
from utils import *

import argparse
import json
Expand All @@ -41,7 +42,6 @@
import shlex
import subprocess
import sys
import socket
import time
import dateutil.parser
import threading
Expand All @@ -50,6 +50,7 @@
import hashlib

logger = logging.getLogger('marathon_lb')
SERVICE_PORT_ASSIGNER = ServicePortAssigner()


class MarathonBackend(object):
Expand Down Expand Up @@ -262,22 +263,6 @@ def has_group(groups, app_groups):

return False

ip_cache = dict()


def resolve_ip(host):
cached_ip = ip_cache.get(host, None)
if cached_ip:
return cached_ip
else:
try:
logger.debug("trying to resolve ip address for host %s", host)
ip = socket.gethostbyname(host)
ip_cache[host] = ip
return ip
except socket.gaierror:
return None


def config(apps, groups, bind_http_https, ssl_certs, templater):
logger.info("generating config")
Expand Down Expand Up @@ -946,55 +931,6 @@ def get_health_check(app, portIndex):
healthCheckResultCache = LRUCache()


def is_ip_per_task(app):
"""
Return whether the application is using IP-per-task.
:param app: The application to check.
:return: True if using IP per task, False otherwise.
"""
return app.get('ipAddress') is not None


def get_task_ip_and_ports(app, task):
"""
Return the IP address and list of ports used to access a task. For a
task using IP-per-task, this is the IP address of the task, and the ports
exposed by the task services. Otherwise, this is the IP address of the
host and the ports exposed by the host.
:param app: The application owning the task.
:param task: The task.
:return: Tuple of (ip address, [ports]). Returns (None, None) if no IP
address could be resolved or found for the task.
"""
# If the app ipAddress field is present and not None then this app is using
# IP per task. The ipAddress may be an empty dictionary though, in which
# case there are no discovery ports. At the moment, Mesos only supports a
# single IP address, so just take the first IP in the list.
if is_ip_per_task(app):
logger.debug("Using IP per container")
task_ip_addresses = task.get('ipAddresses')
if not task_ip_addresses:
logger.warning("Task %s does not yet have an ip address allocated",
task['id'])
return None, None
task_ip = task_ip_addresses[0]['ipAddress']

discovery = app['ipAddress'].get('discovery', {})
task_ports = [int(port['number'])
for port in discovery.get('ports', [])]
else:
logger.debug("Using host port mapping")
task_ports = task.get('ports', [])
task_ip = resolve_ip(task['host'])
if not task_ip:
logger.warning("Could not resolve ip for host %s, ignoring",
task['host'])
return None, None

logger.debug("Returning: %r, %r", task_ip, task_ports)
return task_ip, task_ports


def get_apps(marathon):
apps = marathon.list()
logger.debug("got apps %s", [app["id"] for app in apps])
Expand Down Expand Up @@ -1084,6 +1020,13 @@ def get_apps(marathon):

processed_apps.extend(deployment_groups.values())

# Reset the service port assigner. This forces the port assigner to
# re-assign ports for IP-per-task applications. The upshot is that
# the service port for a particular app may change dynamically, but
# the service port will be deterministic and identical across all
# instances of the marathon-lb.
SERVICE_PORT_ASSIGNER.reset()

for app in processed_apps:
appId = app['id']
if appId[1:] == os.environ.get("FRAMEWORK_NAME"):
Expand All @@ -1096,9 +1039,8 @@ def get_apps(marathon):
marathon_app.app['labels']['HAPROXY_GROUP'].split(',')
marathon_apps.append(marathon_app)

service_ports = app['ports']
for i in range(len(service_ports)):
servicePort = service_ports[i]
service_ports = SERVICE_PORT_ASSIGNER.get_service_ports(app)
for i, servicePort in enumerate(service_ports):
service = MarathonService(
appId, servicePort, get_health_check(app, i))

Expand Down Expand Up @@ -1305,6 +1247,14 @@ def get_arg_parser():
parser.add_argument("--dry", "-d",
help="Only print configuration to console",
action="store_true")
parser.add_argument("--min-serv-port-ip-per-task",
help="Minimum port number to use when auto-assigning "
"service ports for IP-per-task applications",
type=int, default=10050)
parser.add_argument("--max-serv-port-ip-per-task",
help="Maximum port number to use when auto-assigning "
"service ports for IP-per-task applications",
type=int, default=10100)
parser = set_logging_args(parser)
parser = set_marathon_auth_args(parser)
return parser
Expand Down Expand Up @@ -1397,10 +1347,21 @@ def process_sse_events(marathon, config_file, groups,
if args.sse and args.listening:
arg_parser.error(
'cannot use --listening and --sse at the same time')
if bool(args.min_serv_port_ip_per_task) != \
bool(args.max_serv_port_ip_per_task):
arg_parser.error(
'either specify both --min-serv-port-ip-per-task '
'and --max-serv-port-ip-per-task or neither (set both to zero '
'to disable auto assignment)')
if len(args.group) == 0:
arg_parser.error('argument --group is required: please' +
'specify at least one group name')

# Configure the service port assigner if min/max ports have been specified.
if args.min_serv_port_ip_per_task and args.max_serv_port_ip_per_task:
SERVICE_PORT_ASSIGNER.set_ports(int(args.min_serv_port_ip_per_task),
int(args.max_serv_port_ip_per_task))

# Set request retries
s = requests.Session()
a = requests.adapters.HTTPAdapter(max_retries=3)
Expand Down
64 changes: 0 additions & 64 deletions tests/test_marathon_lb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
import json
from mock import patch
import marathon_lb


Expand Down Expand Up @@ -1684,66 +1683,3 @@ def test_config_simple_app_sticky(self):
server agent1_1_1_1_1_1024 1.1.1.1:1024 check cookie d6ad48c81f
'''
self.assertMultiLineEqual(config, expected)

def test_get_task_ip_and_ports_ip_per_task(self):
app = {
"ipAddress": {
"discovery": {
"ports": [{"number": 123}, {"number": 234}]
}
},
}
task = {
"id": "testtaskid",
"ipAddresses": [{"ipAddress": "1.2.3.4"}]
}

result = marathon_lb.get_task_ip_and_ports(app, task)
expected = ("1.2.3.4", [123, 234])

self.assertEquals(result, expected)

def test_get_task_ip_and_ports_ip_per_task_no_ip(self):
app = {
"ipAddress": {
"discovery": {
"ports": [{"number": 123}, {"number": 234}]
}
},
}
task = {
"id": "testtaskid"
}

result = marathon_lb.get_task_ip_and_ports(app, task)
expected = (None, None)

self.assertEquals(result, expected)

def test_get_task_ip_and_ports_port_map(self):
app = {}
task = {
"id": "testtaskid",
"ports": [234, 345, 567],
"host": "agent1"
}

with patch("marathon_lb.resolve_ip", return_value="1.2.3.4"):
result = marathon_lb.get_task_ip_and_ports(app, task)
expected = ("1.2.3.4", [234, 345, 567])

self.assertEquals(result, expected)

def test_get_task_ip_and_ports_port_map_no_ip(self):
app = {}
task = {
"id": "testtaskid",
"ports": [234, 345, 567],
"host": "agent1"
}

with patch("marathon_lb.resolve_ip", return_value=None):
result = marathon_lb.get_task_ip_and_ports(app, task)
expected = (None, None)

self.assertEquals(result, expected)
Loading

0 comments on commit 0e9c66f

Please sign in to comment.