Skip to content

Commit

Permalink
Merge pull request #852 from projectcalico/smc-concurrent-resync
Browse files Browse the repository at this point in the history
[Work in progress] Implement concurrent resync
  • Loading branch information
Peter White committed Nov 17, 2015
2 parents 4d39e23 + 9c4cd35 commit 573af2f
Show file tree
Hide file tree
Showing 48 changed files with 4,678 additions and 1,139 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
[run]
include =
calico/etcddriver/*
calico/felix/*
calico/openstack/*
calico/*.py
omit =
calico/test/*
calico/felix/test/*
calico/openstack/test/*
calico/etcddriver/test/*
branch = True
concurrency = eventlet
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

- Felix now parses the etcd snapshot in parallel with the event stream;
this dramatically increases scale when under load.

## 1.2.0

- Add liveness reporting to Felix. Felix now reports its liveness into
Expand Down
62 changes: 25 additions & 37 deletions calico/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@
Calico common utilities.
"""
import errno
import gevent
import gevent.local
import itertools
import logging
import logging.handlers
import netaddr
import netaddr.core
import os
import re
import sys
from types import StringTypes

import netaddr
import netaddr.core
from netaddr.strategy import eui48

_log = logging.getLogger(__name__)

AGENT_TYPE_CALICO = 'Calico agent'
FORMAT_STRING = '%(asctime)s [%(levelname)s][%(process)s/%(tid)d] %(name)s %(lineno)d: %(message)s'

FORMAT_STRING = '%(asctime)s [%(levelname)s][%(process)s/%(thread)d] %(name)s %(lineno)d: %(message)s'
# Used "tid", which we swap for the greenlet ID, instead of "thread"
FORMAT_STRING_GEVENT = '%(asctime)s [%(levelname)s][%(process)s/%(tid)d] %(name)s %(lineno)d: %(message)s'

# This format string deliberately uses two different styles of format
# specifier. The %()s form is used by the logging module: the {} form is used
Expand Down Expand Up @@ -89,24 +90,6 @@
VALID_IPAM_POOL_ID_RE = re.compile(r'^[0-9\.:a-fA-F\-]{1,43}$')
EXPECTED_IPAM_POOL_KEYS = set(["cidr", "masquerade"])

tid_storage = gevent.local.local()
tid_counter = itertools.count()
# Ought to do itertools.count(start=1), but python 2.6 does not support it.
tid_counter.next()

def greenlet_id():
"""
Returns an integer greenlet ID.
itertools.count() is atomic, if the internet is correct.
http://stackoverflow.com/questions/23547604/python-counter-atomic-increment
"""
try:
tid = tid_storage.tid
except:
tid = tid_counter.next()
tid_storage.tid = tid
return tid


def validate_port(port):
"""
Expand Down Expand Up @@ -178,18 +161,13 @@ def mkdir_p(path):
except TypeError:
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else: raise

class GreenletFilter(logging.Filter):
def filter(self, record):
record.tid = greenlet_id()
return True


def default_logging():
def default_logging(gevent_in_use=True, syslog_executable_name=None):
"""
Sets up the Calico default logging, with default severities.
Expand All @@ -210,7 +188,7 @@ def default_logging():
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)

executable_name = os.path.basename(sys.argv[0])
executable_name = syslog_executable_name or os.path.basename(sys.argv[0])
syslog_format = SYSLOG_FORMAT_STRING.format(excname=executable_name)
syslog_formatter = logging.Formatter(syslog_format)
if os.path.exists("/dev/log"):
Expand All @@ -223,18 +201,22 @@ def default_logging():

root_logger.addHandler(syslog_handler)

file_formatter = logging.Formatter(FORMAT_STRING)
format_string = FORMAT_STRING_GEVENT if gevent_in_use else FORMAT_STRING
file_formatter = logging.Formatter(format_string)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setLevel(logging.ERROR)
stream_handler.setFormatter(file_formatter)
stream_handler.addFilter(GreenletFilter())
if gevent_in_use:
from geventutils import GreenletFilter
stream_handler.addFilter(GreenletFilter())
root_logger.addHandler(stream_handler)


def complete_logging(logfile=None,
file_level=logging.DEBUG,
syslog_level=logging.ERROR,
stream_level=logging.ERROR):
stream_level=logging.ERROR,
gevent_in_use=True):
"""
Updates the logging configuration based on learned configuration.
Expand Down Expand Up @@ -279,9 +261,13 @@ def complete_logging(logfile=None,
if logfile and file_level is not None:
if not file_handler:
mkdir_p(os.path.dirname(logfile))
formatter = logging.Formatter(FORMAT_STRING)
format_string = (FORMAT_STRING_GEVENT if gevent_in_use
else FORMAT_STRING)
formatter = logging.Formatter(format_string)
file_handler = logging.handlers.WatchedFileHandler(logfile)
file_handler.addFilter(GreenletFilter())
if gevent_in_use:
from geventutils import GreenletFilter
file_handler.addFilter(GreenletFilter())
file_handler.setLevel(file_level)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
Expand Down Expand Up @@ -402,6 +388,7 @@ def validate_endpoint(config, combined_id, endpoint):
if issues:
raise ValidationFailed(" ".join(issues))


def validate_rules(profile_id, rules):
"""
Ensures that the supplied rules are valid. Once this routine has returned
Expand Down Expand Up @@ -588,6 +575,7 @@ def validate_tags(profile_id, tags):
if issues:
raise ValidationFailed(" ".join(issues))


def validate_ipam_pool(pool_id, pool, ip_version):
"""
Validates and canonicalises an IPAM pool dict. Removes any fields that
Expand Down
6 changes: 6 additions & 0 deletions calico/datamodel_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ def path_for_status(self):
def __str__(self):
return self.__class__.__name__ + ("<%s>" % self.endpoint)

def __repr__(self):
return self.__class__.__name__ + ("(%r,%r,%r,%r)" % (self.host,
self.orchestrator,
self.workload,
self.endpoint))

def __eq__(self, other):
if other is self:
return True
Expand Down
Empty file added calico/etcddriver/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions calico/etcddriver/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-

# Copyright (c) 2014, 2015 Metaswitch Networks
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
calico.etcddriver.__main__
~~~~~~~~~~~~~~~~~~~~~~~~~~
Main entry point for the etcd driver, responsible for basic logging config
and starting our threads.
"""

import logging
import os
import socket
import sys

from calico.etcddriver import driver
from calico import common

_log = logging.getLogger(__name__)

last_ppid = os.getppid()
common.default_logging(gevent_in_use=False,
syslog_executable_name="calico-felix-etcd")

felix_sck = socket.socket(socket.AF_UNIX,
socket.SOCK_STREAM)
try:
felix_sck.connect(sys.argv[1])
except:
_log.exception("Failed to connect to Felix")
raise

etcd_driver = driver.EtcdDriver(felix_sck)
etcd_driver.start()

while not etcd_driver.join(timeout=1):
parent_pid = os.getppid()
# Defensive, just in case we don't get a socket error, check if the
# parent PID has changed, indicating that Felix has died.
if parent_pid == 1 or parent_pid != last_ppid:
_log.critical("Process adopted, assuming felix has died")
etcd_driver.stop()
break
_log.critical("Driver shutting down.")
Loading

0 comments on commit 573af2f

Please sign in to comment.