Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Add a client InfluxDBClusterClient to handle a cluster of InfluxDB servers #148

Merged
merged 5 commits into from
Apr 24, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ Here's a basic example (for more see the examples directory)::

>>> print("Result: {0}".format(result))

If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``::

$ python

>>> from influxdb import InfluxDBClusterClient

>>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086),
('192.168.0.2', 8086),
('192.168.0.3', 8086)],
username='root',
password='root',
database='example')

``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients.

Testing
=======
Expand Down
2 changes: 2 additions & 0 deletions influxdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from .client import InfluxDBClient
from .client import InfluxDBClusterClient
from .dataframe_client import DataFrameClient
from .helper import SeriesHelper


__all__ = [
'InfluxDBClient',
'InfluxDBClusterClient',
'DataFrameClient',
'SeriesHelper',
]
Expand Down
88 changes: 88 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import OrderedDict
import json
import socket
import random
import requests
import requests.exceptions
from sys import version_info
Expand Down Expand Up @@ -33,6 +34,12 @@ def __init__(self, content, code):
self.code = code


class InfluxDBServerError(Exception):
"""Raised when server error occurs"""
def __init__(self, content):
super(InfluxDBServerError, self).__init__(content)


class InfluxDBClient(object):

"""
Expand Down Expand Up @@ -451,3 +458,84 @@ def send_packet(self, packet):
data = json.dumps(packet)
byte = data.encode('utf-8')
self.udp_socket.sendto(byte, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
"""
The ``InfluxDBClusterClient`` is the client for connecting to a cluster of
InfluxDB Servers. It basically is a proxy to multiple ``InfluxDBClient``s.

:param hosts: A list of hosts, where a host should be in format
(address, port)
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]
"""

def __init__(self,
hosts=[('localhost', 8086)],
username='root',
password='root',
database=None,
ssl=False,
verify_ssl=False,
timeout=None,
use_udp=False,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
):
self.clients = []
self.bad_clients = [] # Corresponding server has failures in history
self.shuffle = shuffle # if true, queries will hit servers evenly
self.client_base_class = client_base_class # For simpler test code
for h in hosts:
self.clients.append(client_base_class(host=h[0], port=h[1],
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port))
for method in dir(client_base_class):
if method.startswith('_'):
continue
if not callable(getattr(client_base_class, method)):
continue
setattr(self, method, self._make_func(method))

def _make_func(self, func_name):
orig_func = getattr(self.client_base_class, func_name)

def func(*args, **kwargs):
if self.shuffle:
random.shuffle(self.clients)
clients = self.clients + self.bad_clients
for c in clients:
bad_client = False
try:
return orig_func(c, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_client = True
finally:
if bad_client:
if c not in self.bad_clients:
self.bad_clients.append(c)
for idx, val in enumerate(self.clients):
if val == c:
del self.clients[idx]
break
else:
if c not in self.clients:
self.clients.append(c)
for idx, val in enumerate(self.bad_clients):
if val == c:
del self.bad_clients[idx]
break
raise InfluxDBServerError("InfluxDB: no viable server!")

return func
97 changes: 96 additions & 1 deletion tests/influxdb/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import warnings
import mock

from influxdb import InfluxDBClient
from influxdb import InfluxDBClient, InfluxDBClusterClient
from influxdb.client import InfluxDBServerError


def _build_response_object(status_code=200, content=""):
Expand Down Expand Up @@ -534,3 +535,97 @@ def connection_error(self, *args, **kwargs):

with self.assertRaises(requests.exceptions.ConnectionError):
cli.write_points(self.dummy_points)


class FakeClient(InfluxDBClient):
fail = False

def query(self,
query,
params={},
expected_response_code=200,
database=None):
if query == 'Fail':
raise Exception("Fail")

if self.fail:
raise Exception("Fail")
else:
return "Success"


class TestInfluxDBClusterClient(unittest.TestCase):

def setUp(self):
# By default, raise exceptions on warnings
warnings.simplefilter('error', FutureWarning)

self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]

def test_init(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
username='username',
password='password',
database='database',
shuffle=False,
client_base_class=FakeClient)
assert len(cluster.clients) == 3
assert len(cluster.bad_clients) == 0
for idx, client in enumerate(cluster.clients):
assert client._host == self.hosts[idx][0]
assert client._port == self.hosts[idx][1]

def test_one_server_fails(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
cluster.clients[0].fail = True
assert cluster.query('') == 'Success'
assert len(cluster.clients) == 2
assert len(cluster.bad_clients) == 1

def test_two_servers_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
cluster.clients[0].fail = True
cluster.clients[1].fail = True
assert cluster.query('') == 'Success'
assert len(cluster.clients) == 1
assert len(cluster.bad_clients) == 2

def test_all_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
try:
cluster.query('Fail')
except InfluxDBServerError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could/should also "assert the whole try/except block" with :

with self.assertRaises(InfluxDBServerError) as ctx:
    cluster.query('Fail')

?
EDIT: the as ctx isn't required/necessary as you don't assert anything specifically on the exception instance itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assert len(cluster.clients) == 0
assert len(cluster.bad_clients) == 3

def test_all_good(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
assert cluster.query('') == 'Success'
assert len(cluster.clients) == 3
assert len(cluster.bad_clients) == 0

def test_recovery(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
try:
cluster.query('Fail')
except InfluxDBServerError:
pass
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

assert cluster.query('') == 'Success'
assert len(cluster.clients) == 1
assert len(cluster.bad_clients) == 2