Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Add a client InfluxDBClusterClient to handle a cluster of InfluxDB servers #148

Merged
merged 5 commits into from
Apr 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ Here's a basic example (for more see the examples directory)::

>>> print("Result: {0}".format(result))

If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``::

$ python

>>> from influxdb import InfluxDBClusterClient

>>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086),
('192.168.0.2', 8086),
('192.168.0.3', 8086)],
username='root',
password='root',
database='example')

``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients.

Testing
=======
Expand Down
2 changes: 2 additions & 0 deletions influxdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from .client import InfluxDBClient
from .client import InfluxDBClusterClient
from .dataframe_client import DataFrameClient
from .helper import SeriesHelper


__all__ = [
'InfluxDBClient',
'InfluxDBClusterClient',
'DataFrameClient',
'SeriesHelper',
]
Expand Down
121 changes: 120 additions & 1 deletion influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
Python client for InfluxDB
"""
from collections import OrderedDict
from functools import wraps
import json
import socket
import random
import requests
import requests.exceptions
from sys import version_info
Expand All @@ -16,7 +18,7 @@
except NameError:
xrange = range

if version_info.major == 3:
if version_info[0] == 3:
from urllib.parse import urlparse
else:
from urlparse import urlparse
Expand All @@ -33,6 +35,12 @@ def __init__(self, content, code):
self.code = code


class InfluxDBServerError(Exception):
"""Raised when server error occurs"""
def __init__(self, content):
super(InfluxDBServerError, self).__init__(content)


class InfluxDBClient(object):

"""
Expand Down Expand Up @@ -451,3 +459,114 @@ def send_packet(self, packet):
data = json.dumps(packet)
byte = data.encode('utf-8')
self.udp_socket.sendto(byte, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
"""
The ``InfluxDBClusterClient`` is the client for connecting to a cluster of
InfluxDB Servers. It basically is a proxy to multiple ``InfluxDBClient``s.

:param hosts: A list of hosts, where a host should be in format
(address, port)
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]
:param shuffle: If true, queries will hit servers evenly(randomly)
:param client_base_class: In order to support different clients,
default to InfluxDBClient
"""

def __init__(self,
hosts=[('localhost', 8086)],
username='root',
password='root',
database=None,
ssl=False,
verify_ssl=False,
timeout=None,
use_udp=False,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
):
self.clients = []
self.bad_clients = [] # Corresponding server has failures in history
self.shuffle = shuffle
for h in hosts:
self.clients.append(client_base_class(host=h[0], port=h[1],
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port))
for method in dir(client_base_class):
if method.startswith('_'):
continue
orig_func = getattr(client_base_class, method)
if not callable(orig_func):
continue
setattr(self, method, self._make_func(orig_func))

@staticmethod
def from_DSN(dsn, client_base_class=InfluxDBClient,
shuffle=True, **kwargs):
"""
Same as InfluxDBClient.from_DSN, and supports multiple servers.

Example DSN:
influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db_name
udp+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db_name
https+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db_name

:param shuffle: If true, queries will hit servers evenly(randomly)
:param client_base_class: In order to support different clients,
default to InfluxDBClient
"""
dsn = dsn.lower()
conn_params = urlparse(dsn)
netlocs = conn_params.netloc.split(',')
cluster_client = InfluxDBClusterClient(
hosts=[],
client_base_class=client_base_class,
shuffle=shuffle,
**kwargs)
for netloc in netlocs:
single_dsn = '%(scheme)s://%(netloc)s%(path)s' % (
{'scheme': conn_params.scheme,
'netloc': netloc,
'path': conn_params.path}
)
cluster_client.clients.append(client_base_class.from_DSN(
single_dsn,
**kwargs))
return cluster_client

def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
if self.shuffle:
random.shuffle(self.clients)
clients = self.clients + self.bad_clients
for c in clients:
bad_client = False
try:
return orig_func(c, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_client = True
if c in self.clients:
self.clients.remove(c)
self.bad_clients.append(c)
finally:
if not bad_client and c in self.bad_clients:
self.bad_clients.remove(c)
self.clients.append(c)

raise InfluxDBServerError("InfluxDB: no viable server!")

return func
Loading