Skip to content

Commit

Permalink
feat: add write batch points
Browse files Browse the repository at this point in the history
Signed-off-by: yehao <[email protected]>
  • Loading branch information
goyjy committed Apr 22, 2024
1 parent adf2477 commit 36f94b1
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 5 deletions.
22 changes: 17 additions & 5 deletions opengemini_client/client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def request(self, method, server_url, url_path, headers=None, body=None, params=
prepared = req.prepare()
resp = self.session.send(prepared)
if not 200 <= resp.status_code < 300:
raise HTTPError(f"error resp, code: {resp.status_code}, resp: {resp.text}")
raise HTTPError(f"request error resp, code: {resp.status_code}, body: {resp.text}")
return resp

def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=None) -> requests.Response:
Expand All @@ -137,7 +137,7 @@ def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=N
def ping(self, idx: int):
resp = self.exec_http_request_by_index(idx, 'GET', UrlConst.PING)
if resp.status_code != HTTPStatus.NO_CONTENT:
raise HTTPError(f"ping failed code: {resp.status_code}, body: {resp.text}")
raise HTTPError(f"ping error resp, code: {resp.status_code}, body: {resp.text}")

def query(self, query: Query) -> QueryResult:
server_url = self.get_server_url()
Expand All @@ -146,7 +146,7 @@ def query(self, query: Query) -> QueryResult:
resp = self.request(method='GET', server_url=server_url, url_path=UrlConst.QUERY, params=params)
if resp.status_code == HTTPStatus.OK:
return resolve_query_body(resp)
raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}")
raise HTTPError(f"query error resp, code: {resp.status_code}, body: {resp.text}")

def _query_post(self, query: Query) -> QueryResult:
server_url = self.get_server_url()
Expand All @@ -155,10 +155,22 @@ def _query_post(self, query: Query) -> QueryResult:
resp = self.request(method='POST', server_url=server_url, url_path=UrlConst.QUERY, params=params)
if resp.status_code == HTTPStatus.OK:
return resolve_query_body(resp)
raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}")
raise HTTPError(f"query_post error resp, code: {resp.status_code}, body: {resp.text}")

def write_batch_points(self, database: str, batch_points: BatchPoints):
return
server_url = self.get_server_url()
params = {'db': database}
with io.StringIO() as writer:
for point in batch_points.points:
if point is None:
continue
writer.write(point.to_string())
writer.write('\n')
body = writer.getvalue().encode()
resp = self.request(method="POST", server_url=server_url, url_path=UrlConst.WRITE, params=params, body=body)
if resp.status_code == HTTPStatus.NO_CONTENT:
return
raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}")

def create_database(self, database: str, rp: RpConfig = None):
if not database:
Expand Down
100 changes: 100 additions & 0 deletions opengemini_client/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import ssl
from dataclasses import field, dataclass
from datetime import datetime, timedelta
Expand Down Expand Up @@ -62,9 +63,46 @@ class RpConfig:
index_duration: str


class Precision(Enum):
PrecisionNanoSecond = 0
PrecisionMicrosecond = 1
PrecisionMillisecond = 2
PrecisionSecond = 3
PrecisionMinute = 4
PrecisionHour = 5


def round_datetime(dt: datetime, round_to: timedelta):
if round_to.seconds == 0:
microseconds = int(dt.timestamp() * 1000 * 1000)
rounding = round(microseconds / round_to.microseconds) * round_to.microseconds
rd = rounding * 1000
elif round_to.seconds == 1:
rd = round(dt.timestamp()) * 1000 * 1000 * 1000
else:
seconds = (dt - dt.min).seconds
rounding = round(seconds / round_to.seconds) * round_to.seconds
dt = datetime(dt.year, dt.month, dt.day) + timedelta(seconds=rounding)
rd = int(dt.timestamp()) * 1000 * 1000 * 1000
return rd


def chars_to_escape(writer: io.StringIO, s: str, escape_str: str):
for i, c in enumerate(s):
need_escape = c in escape_str
need_check_next_char = c == '\\' and i < len(s) - 1
if not need_escape and need_check_next_char:
next_char = s[i + 1]
need_escape = next_char == '\\' or next_char in escape_str
if need_escape:
writer.write('\\')
writer.write(s[i])


@dataclass
class Point:
measurement: str
precision: Precision
fields: Dict[str, Union[str, int, float, bool]]
tags: Dict[str, str] = field(default_factory=dict)
timestamp: Optional[datetime] = None
Expand All @@ -81,6 +119,68 @@ def set_time(self, time: datetime):
def set_measurement(self, name: str):
self.measurement = name

def to_string(self) -> str:
if len(self.measurement) == 0 or len(self.fields) == 0:
return ""
with io.StringIO() as writer:
self.write_measurement(writer)
self.write_tags(writer)
self.write_fields(writer)
self.write_timestamp(writer)
res = writer.getvalue()
return res

def write_measurement(self, writer: io.StringIO):
chars_to_escape(writer, self.measurement, ', ')

def write_tags(self, writer: io.StringIO):
if self.tags is None:
return
for k, v in self.tags.items():
writer.write(',')
chars_to_escape(writer, k, ', =')
writer.write('=')
chars_to_escape(writer, v, ', =')

def write_fields(self, writer: io.StringIO):
sep = ' '
for k, v in self.fields.items():
writer.write(sep)
sep = ','
chars_to_escape(writer, k, ', =')
writer.write('=')
if isinstance(v, int):
writer.write(f"{v}i")
elif isinstance(v, str):
writer.write('"')
chars_to_escape(writer, v, '"')
writer.write('"')
elif isinstance(v, float):
writer.write(f"{v}")
elif isinstance(v, bool):
if v:
writer.write('T')
else:
writer.write('F')

def write_timestamp(self, writer: io.StringIO):
if self.timestamp is None:
return
writer.write(' ')
if self.precision == Precision.PrecisionMicrosecond:
ts_str = str(round_datetime(self.timestamp, timedelta(microseconds=1)))
elif self.precision == Precision.PrecisionMillisecond:
ts_str = str(round_datetime(self.timestamp, timedelta(milliseconds=1)))
elif self.precision == Precision.PrecisionSecond:
ts_str = str(round_datetime(self.timestamp, timedelta(seconds=1)))
elif self.precision == Precision.PrecisionMinute:
ts_str = str(round_datetime(self.timestamp, timedelta(minutes=1)))
elif self.precision == Precision.PrecisionHour:
ts_str = str(round_datetime(self.timestamp, timedelta(hours=1)))
else:
ts_str = str(self.timestamp.timestamp() * 1000 * 1000 * 1000)
writer.write(ts_str)


@dataclass
class BatchPoints:
Expand Down
33 changes: 33 additions & 0 deletions opengemini_client/write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import unittest
from datetime import datetime

import requests
from opengemini_client import models
from opengemini_client import test_utils


class WriteTest(unittest.TestCase):

def test_write_batch_points_success(self):
with test_utils.get_test_default_client() as cli:
cli.create_database('write_test')
point = models.Point(measurement='write_mm', precision=models.Precision.PrecisionSecond,
fields={'x': 12.0, 'y': 4.0}, tags={'a': 'ax', 'b': 'bx'}, timestamp=datetime.now())
cli.write_batch_points("write_test", models.BatchPoints(points=[point]))
time.sleep(3)
qr = cli.query(models.Query(database='write_test', command='select * from write_mm', retention_policy=''))
print(qr)
self.assertNotEqual(len(qr.results), 0)
result = qr.results[0]
series = result.series
self.assertNotEqual(len(series), 0)
cli.drop_database('write_test')

def test_write_batch_points_fail_with_no_database(self):
with test_utils.get_test_default_client() as cli:
point = models.Point(measurement='write_mm', precision=models.Precision.PrecisionSecond,
fields={'x': 6.0, 'y': 4.0}, timestamp=datetime.now())
with self.assertRaises(requests.exceptions.HTTPError) as context:
cli.write_batch_points("write_test1", models.BatchPoints(points=[point]))
self.assertRegex(str(context.exception), "database not found")

0 comments on commit 36f94b1

Please sign in to comment.