From 36f94b18f1fddc14e63827be9464a764e91919c8 Mon Sep 17 00:00:00 2001 From: yehao <944207160@qq.com> Date: Sat, 20 Apr 2024 22:54:24 +0800 Subject: [PATCH] feat: add write batch points Signed-off-by: yehao <944207160@qq.com> --- opengemini_client/client_impl.py | 22 +++++-- opengemini_client/models.py | 100 +++++++++++++++++++++++++++++++ opengemini_client/write_test.py | 33 ++++++++++ 3 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 opengemini_client/write_test.py diff --git a/opengemini_client/client_impl.py b/opengemini_client/client_impl.py index 89d3208..5290836 100644 --- a/opengemini_client/client_impl.py +++ b/opengemini_client/client_impl.py @@ -126,7 +126,7 @@ def request(self, method, server_url, url_path, headers=None, body=None, params= prepared = req.prepare() resp = self.session.send(prepared) if not 200 <= resp.status_code < 300: - raise HTTPError(f"error resp, code: {resp.status_code}, resp: {resp.text}") + raise HTTPError(f"request error resp, code: {resp.status_code}, body: {resp.text}") return resp def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=None) -> requests.Response: @@ -137,7 +137,7 @@ def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=N def ping(self, idx: int): resp = self.exec_http_request_by_index(idx, 'GET', UrlConst.PING) if resp.status_code != HTTPStatus.NO_CONTENT: - raise HTTPError(f"ping failed code: {resp.status_code}, body: {resp.text}") + raise HTTPError(f"ping error resp, code: {resp.status_code}, body: {resp.text}") def query(self, query: Query) -> QueryResult: server_url = self.get_server_url() @@ -146,7 +146,7 @@ def query(self, query: Query) -> QueryResult: resp = self.request(method='GET', server_url=server_url, url_path=UrlConst.QUERY, params=params) if resp.status_code == HTTPStatus.OK: return resolve_query_body(resp) - raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}") + raise HTTPError(f"query error resp, code: {resp.status_code}, body: {resp.text}") def _query_post(self, query: Query) -> QueryResult: server_url = self.get_server_url() @@ -155,10 +155,22 @@ def _query_post(self, query: Query) -> QueryResult: resp = self.request(method='POST', server_url=server_url, url_path=UrlConst.QUERY, params=params) if resp.status_code == HTTPStatus.OK: return resolve_query_body(resp) - raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}") + raise HTTPError(f"query_post error resp, code: {resp.status_code}, body: {resp.text}") def write_batch_points(self, database: str, batch_points: BatchPoints): - return + server_url = self.get_server_url() + params = {'db': database} + with io.StringIO() as writer: + for point in batch_points.points: + if point is None: + continue + writer.write(point.to_string()) + writer.write('\n') + body = writer.getvalue().encode() + resp = self.request(method="POST", server_url=server_url, url_path=UrlConst.WRITE, params=params, body=body) + if resp.status_code == HTTPStatus.NO_CONTENT: + return + raise HTTPError(f"write_batch_points error resp, code: {resp.status_code}, body: {resp.text}") def create_database(self, database: str, rp: RpConfig = None): if not database: diff --git a/opengemini_client/models.py b/opengemini_client/models.py index a13284b..a961f10 100644 --- a/opengemini_client/models.py +++ b/opengemini_client/models.py @@ -1,3 +1,4 @@ +import io import ssl from dataclasses import field, dataclass from datetime import datetime, timedelta @@ -62,9 +63,46 @@ class RpConfig: index_duration: str +class Precision(Enum): + PrecisionNanoSecond = 0 + PrecisionMicrosecond = 1 + PrecisionMillisecond = 2 + PrecisionSecond = 3 + PrecisionMinute = 4 + PrecisionHour = 5 + + +def round_datetime(dt: datetime, round_to: timedelta): + if round_to.seconds == 0: + microseconds = int(dt.timestamp() * 1000 * 1000) + rounding = round(microseconds / round_to.microseconds) * round_to.microseconds + rd = rounding * 1000 + elif round_to.seconds == 1: + rd = round(dt.timestamp()) * 1000 * 1000 * 1000 + else: + seconds = (dt - dt.min).seconds + rounding = round(seconds / round_to.seconds) * round_to.seconds + dt = datetime(dt.year, dt.month, dt.day) + timedelta(seconds=rounding) + rd = int(dt.timestamp()) * 1000 * 1000 * 1000 + return rd + + +def chars_to_escape(writer: io.StringIO, s: str, escape_str: str): + for i, c in enumerate(s): + need_escape = c in escape_str + need_check_next_char = c == '\\' and i < len(s) - 1 + if not need_escape and need_check_next_char: + next_char = s[i + 1] + need_escape = next_char == '\\' or next_char in escape_str + if need_escape: + writer.write('\\') + writer.write(s[i]) + + @dataclass class Point: measurement: str + precision: Precision fields: Dict[str, Union[str, int, float, bool]] tags: Dict[str, str] = field(default_factory=dict) timestamp: Optional[datetime] = None @@ -81,6 +119,68 @@ def set_time(self, time: datetime): def set_measurement(self, name: str): self.measurement = name + def to_string(self) -> str: + if len(self.measurement) == 0 or len(self.fields) == 0: + return "" + with io.StringIO() as writer: + self.write_measurement(writer) + self.write_tags(writer) + self.write_fields(writer) + self.write_timestamp(writer) + res = writer.getvalue() + return res + + def write_measurement(self, writer: io.StringIO): + chars_to_escape(writer, self.measurement, ', ') + + def write_tags(self, writer: io.StringIO): + if self.tags is None: + return + for k, v in self.tags.items(): + writer.write(',') + chars_to_escape(writer, k, ', =') + writer.write('=') + chars_to_escape(writer, v, ', =') + + def write_fields(self, writer: io.StringIO): + sep = ' ' + for k, v in self.fields.items(): + writer.write(sep) + sep = ',' + chars_to_escape(writer, k, ', =') + writer.write('=') + if isinstance(v, int): + writer.write(f"{v}i") + elif isinstance(v, str): + writer.write('"') + chars_to_escape(writer, v, '"') + writer.write('"') + elif isinstance(v, float): + writer.write(f"{v}") + elif isinstance(v, bool): + if v: + writer.write('T') + else: + writer.write('F') + + def write_timestamp(self, writer: io.StringIO): + if self.timestamp is None: + return + writer.write(' ') + if self.precision == Precision.PrecisionMicrosecond: + ts_str = str(round_datetime(self.timestamp, timedelta(microseconds=1))) + elif self.precision == Precision.PrecisionMillisecond: + ts_str = str(round_datetime(self.timestamp, timedelta(milliseconds=1))) + elif self.precision == Precision.PrecisionSecond: + ts_str = str(round_datetime(self.timestamp, timedelta(seconds=1))) + elif self.precision == Precision.PrecisionMinute: + ts_str = str(round_datetime(self.timestamp, timedelta(minutes=1))) + elif self.precision == Precision.PrecisionHour: + ts_str = str(round_datetime(self.timestamp, timedelta(hours=1))) + else: + ts_str = str(self.timestamp.timestamp() * 1000 * 1000 * 1000) + writer.write(ts_str) + @dataclass class BatchPoints: diff --git a/opengemini_client/write_test.py b/opengemini_client/write_test.py new file mode 100644 index 0000000..f50a8b0 --- /dev/null +++ b/opengemini_client/write_test.py @@ -0,0 +1,33 @@ +import time +import unittest +from datetime import datetime + +import requests +from opengemini_client import models +from opengemini_client import test_utils + + +class WriteTest(unittest.TestCase): + + def test_write_batch_points_success(self): + with test_utils.get_test_default_client() as cli: + cli.create_database('write_test') + point = models.Point(measurement='write_mm', precision=models.Precision.PrecisionSecond, + fields={'x': 12.0, 'y': 4.0}, tags={'a': 'ax', 'b': 'bx'}, timestamp=datetime.now()) + cli.write_batch_points("write_test", models.BatchPoints(points=[point])) + time.sleep(3) + qr = cli.query(models.Query(database='write_test', command='select * from write_mm', retention_policy='')) + print(qr) + self.assertNotEqual(len(qr.results), 0) + result = qr.results[0] + series = result.series + self.assertNotEqual(len(series), 0) + cli.drop_database('write_test') + + def test_write_batch_points_fail_with_no_database(self): + with test_utils.get_test_default_client() as cli: + point = models.Point(measurement='write_mm', precision=models.Precision.PrecisionSecond, + fields={'x': 6.0, 'y': 4.0}, timestamp=datetime.now()) + with self.assertRaises(requests.exceptions.HTTPError) as context: + cli.write_batch_points("write_test1", models.BatchPoints(points=[point])) + self.assertRegex(str(context.exception), "database not found")