Skip to content

Commit

Permalink
feat: add write batch points
Browse files Browse the repository at this point in the history
Signed-off-by: yehao <[email protected]>
  • Loading branch information
goyjy committed Apr 20, 2024
1 parent adf2477 commit 88c5f78
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 1 deletion.
14 changes: 13 additions & 1 deletion opengemini_client/client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,19 @@ def _query_post(self, query: Query) -> QueryResult:
raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}")

def write_batch_points(self, database: str, batch_points: BatchPoints):
return
server_url = self.get_server_url()
params = {'db': database}
with io.StringIO() as writer:
for bp in batch_points.points:
if bp is None:
continue
writer.write(bp.to_string())
writer.write('\n')
body = writer.getvalue().encode()
resp = self.request(method="POST", server_url=server_url, url_path=UrlConst.WRITE, params=params, body=body)
if resp.status_code == HTTPStatus.NO_CONTENT:
return
raise HTTPError(f"Write error: {resp.status_code}, body: {resp.text}")

def create_database(self, database: str, rp: RpConfig = None):
if not database:
Expand Down
98 changes: 98 additions & 0 deletions opengemini_client/models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import io
import ssl
from dataclasses import field, dataclass
from datetime import datetime, timedelta
Expand Down Expand Up @@ -62,9 +63,44 @@ class RpConfig:
index_duration: str


class PrecisionType(Enum):
PrecisionNanoSecond = 0
PrecisionMicrosecond = 1
PrecisionMillisecond = 2
PrecisionSecond = 3
PrecisionMinute = 4
PrecisionHour = 5


def round_datetime(dt: datetime, round_to: timedelta):
if round_to.seconds == 0:
microseconds = int(dt.timestamp() * 1000 * 1000)
rounding = round(microseconds / round_to.microseconds) * round_to.microseconds
rd = rounding * 1000
elif round_to.seconds == 1:
rd = round(dt.timestamp()) * 1000 * 1000 * 1000
else:
seconds = (dt - dt.min).seconds
rounding = round(seconds / round_to.seconds) * round_to.seconds
dt = datetime(dt.year, dt.month, dt.day) + timedelta(seconds=rounding)
rd = int(dt.timestamp()) * 1000 * 1000 * 1000
return rd


def chars_to_escape(writer: io.StringIO, s: str, escape_str: str):
for i, c in enumerate(s):
need_escape = c in escape_str
if need_escape is False and s[i] == '\\' and i < len(s) - 1:
need_escape = s[i + 1] == '\\' or s[i + 1] in escape_str
if need_escape is True:
writer.write('\\')
writer.write(s[i])


@dataclass
class Point:
measurement: str
precision: PrecisionType
fields: Dict[str, Union[str, int, float, bool]]
tags: Dict[str, str] = field(default_factory=dict)
timestamp: Optional[datetime] = None
Expand All @@ -81,6 +117,68 @@ def set_time(self, time: datetime):
def set_measurement(self, name: str):
self.measurement = name

def to_string(self) -> str:
if len(self.measurement) == 0 or len(self.fields) == 0:
return ""
with io.StringIO() as writer:
self.write_measurement(writer)
self.write_tags(writer)
self.write_fields(writer)
self.write_timestamp(writer)
res = writer.getvalue()
return res

def write_measurement(self, writer: io.StringIO):
chars_to_escape(writer, self.measurement, ', ')

def write_tags(self, writer: io.StringIO):
if self.tags is None:
return
for k, v in self.tags.items():
writer.write(',')
chars_to_escape(writer, k, ', =')
writer.write('=')
chars_to_escape(writer, v, ', =')

def write_fields(self, writer: io.StringIO):
sep = ' '
for k, v in self.fields.items():
writer.write(sep)
sep = ','
chars_to_escape(writer, k, ', =')
writer.write('=')
if isinstance(v, int):
writer.write(f"{v}i")
elif isinstance(v, str):
writer.write('"')
chars_to_escape(writer, v, '"')
writer.write('"')
elif isinstance(v, float):
writer.write(f"{v}")
elif isinstance(v, bool):
if v is True:
writer.write('T')
else:
writer.write('F')

def write_timestamp(self, writer: io.StringIO):
if self.timestamp is None:
return
writer.write(' ')
if self.precision == PrecisionType.PrecisionMicrosecond:
ts_str = str(round_datetime(self.timestamp, timedelta(microseconds=1)))
elif self.precision == PrecisionType.PrecisionMillisecond:
ts_str = str(round_datetime(self.timestamp, timedelta(milliseconds=1)))
elif self.precision == PrecisionType.PrecisionSecond:
ts_str = str(round_datetime(self.timestamp, timedelta(seconds=1)))
elif self.precision == PrecisionType.PrecisionMinute:
ts_str = str(round_datetime(self.timestamp, timedelta(minutes=1)))
elif self.precision == PrecisionType.PrecisionHour:
ts_str = str(round_datetime(self.timestamp, timedelta(hours=1)))
else:
ts_str = str(self.timestamp.timestamp() * 1000 * 1000 * 1000)
writer.write(ts_str)


@dataclass
class BatchPoints:
Expand Down
33 changes: 33 additions & 0 deletions opengemini_client/write_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import time
import unittest
from datetime import datetime

import requests
from opengemini_client import models
from opengemini_client import test_utils


class WriteTest(unittest.TestCase):

def test_write_batch_points_success(self):
with test_utils.get_test_default_client() as cli:
cli.create_database('write_test')
point = models.Point(measurement='write_mm', precision=models.PrecisionType.PrecisionSecond,
fields={'x': 12.0, 'y': 4.0}, tags={'a': 'ax', 'b': 'bx'}, timestamp=datetime.now())
cli.write_batch_points("write_test", models.BatchPoints(points=[point]))
time.sleep(3)
qr = cli.query(models.Query(database='write_test', command='select * from write_mm', retention_policy=''))
print(qr)
self.assertNotEqual(len(qr.results), 0)
result = qr.results[0]
series = result.series
self.assertNotEqual(len(series), 0)
cli.drop_database('write_test')

def test_write_batch_points_fail_with_no_database(self):
with test_utils.get_test_default_client() as cli:
point = models.Point(measurement='write_mm', precision=models.PrecisionType.PrecisionSecond,
fields={'x': 6.0, 'y': 4.0}, timestamp=datetime.now())
with self.assertRaises(requests.exceptions.HTTPError) as context:
cli.write_batch_points("write_test1", models.BatchPoints(points=[point]))
self.assertRegex(str(context.exception), "database not found")

0 comments on commit 88c5f78

Please sign in to comment.