Skip to content
This repository has been archived by the owner on Oct 29, 2024. It is now read-only.

Commit

Permalink
Add client InfluxDBClusterClient to handle a cluster of InfluxDB se…
Browse files Browse the repository at this point in the history
…rvers

Signed-off-by: Can ZHANG <[email protected]>
  • Loading branch information
cannium committed Apr 10, 2015
1 parent 5ea7282 commit e44f74e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
2 changes: 2 additions & 0 deletions influxdb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# -*- coding: utf-8 -*-
from .client import InfluxDBClient
from .client import InfluxDBClusterClient
from .dataframe_client import DataFrameClient
from .helper import SeriesHelper


__all__ = [
'InfluxDBClient',
'InfluxDBClusterClient',
'DataFrameClient',
'SeriesHelper',
]
Expand Down
88 changes: 88 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import OrderedDict
import json
import socket
import random
import requests
import requests.exceptions
from sys import version_info
Expand Down Expand Up @@ -33,6 +34,12 @@ def __init__(self, content, code):
self.code = code


class InfluxDBServerError(Exception):
"""Raised when server error occurs"""
def __init__(self, content):
super(InfluxDBServerError, self).__init__(content)


class InfluxDBClient(object):

"""
Expand Down Expand Up @@ -451,3 +458,84 @@ def send_packet(self, packet):
data = json.dumps(packet)
byte = data.encode('utf-8')
self.udp_socket.sendto(byte, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
"""
The ``InfluxDBClusterClient`` is the client for connecting to a cluster of
InfluxDB Servers. It basically is a proxy to multiple ``InfluxDBClient``s.
:param hosts: A list of hosts, where a host should be in format
(address, port)
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]
"""

def __init__(self,
hosts=[('localhost', 8086)],
username='root',
password='root',
database=None,
ssl=False,
verify_ssl=False,
timeout=None,
use_udp=False,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
):
self.clients = []
self.bad_clients = [] # Corresponding server has failures in history
self.shuffle = shuffle # if true, queries will hit servers evenly
self.client_base_class = client_base_class # For simpler test code
for h in hosts:
self.clients.append(client_base_class(host=h[0], port=h[1],
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port))
for method in dir(client_base_class):
if method.startswith('_'):
continue
if not callable(getattr(client_base_class, method)):
continue
setattr(self, method, self._make_func(method))

def _make_func(self, func_name):
orig_func = getattr(self.client_base_class, func_name)

def func(*args, **kwargs):
if self.shuffle:
random.shuffle(self.clients)
clients = self.clients + self.bad_clients
for c in clients:
bad_client = False
try:
return orig_func(c, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_client = True
finally:
if bad_client:
if c not in self.bad_clients:
self.bad_clients.append(c)
for idx, val in enumerate(self.clients):
if val == c:
del self.clients[idx]
break
else:
if c not in self.clients:
self.clients.append(c)
for idx, val in enumerate(self.bad_clients):
if val == c:
del self.bad_clients[idx]
break
raise InfluxDBServerError("InfluxDB: no viable server!")

return func

0 comments on commit e44f74e

Please sign in to comment.