Skip to content

Commit

Permalink
feat: add database and rp implement and usecases
Browse files Browse the repository at this point in the history
Signed-off-by: heiliuchao <[email protected]>
Co-authored-by: ZhangJian He <[email protected]>
  • Loading branch information
heiliuchao authored and shoothzj committed Mar 29, 2024
1 parent 5ae13d2 commit 1100340
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 14 deletions.
69 changes: 57 additions & 12 deletions opengemini_client/client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def request(self, method, server_url, url_path, headers=None, body=None, params=
prepared = req.prepare()
resp = self.session.send(prepared)
if not 200 <= resp.status_code < 300:
raise HTTPError(f"HTTP error: {resp.status_code}, Response: {resp.text}")
raise HTTPError(f"error resp, code: {resp.status_code}, resp: {resp.text}")
return resp

def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=None) -> requests.Response:
Expand All @@ -137,7 +137,7 @@ def exec_http_request_by_index(self, idx, method, url_path, headers=None, body=N
def ping(self, idx: int):
resp = self.exec_http_request_by_index(idx, 'GET', UrlConst.PING)
if resp.status_code != HTTPStatus.NO_CONTENT:
raise HTTPError(f"ping openGeminiDB status is {resp.status_code}")
raise HTTPError(f"ping failed code: {resp.status_code}, body: {resp.text}")

def query(self, query: Query) -> QueryResult:
server_url = self.get_server_url()
Expand All @@ -146,7 +146,7 @@ def query(self, query: Query) -> QueryResult:
resp = self.request(method='GET', server_url=server_url, url_path=UrlConst.QUERY, params=params)
if resp.status_code == HTTPStatus.OK:
return resolve_query_body(resp)
raise HTTPError(f"Query error_code: {resp.status_code}, error_msg: {resp.text}")
raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}")

def _query_post(self, query: Query) -> QueryResult:
server_url = self.get_server_url()
Expand All @@ -155,25 +155,70 @@ def _query_post(self, query: Query) -> QueryResult:
resp = self.request(method='POST', server_url=server_url, url_path=UrlConst.QUERY, params=params)
if resp.status_code == HTTPStatus.OK:
return resolve_query_body(resp)
raise HTTPError(f"Query error_code: {resp.status_code}, error_msg: {resp.text}")
raise HTTPError(f"Query code: {resp.status_code}, body: {resp.text}")

def write_batch_points(self, database: str, batch_points: BatchPoints):
return

def create_database(self, database: str, rp: RpConfig = None):
pass
if not database:
raise ValueError("empty database name")
query_string = f"CREATE DATABASE {database}"
if rp:
query_string += f" WITH DURATION {rp.duration} REPLICATION 1"
if rp.shard_group_duration:
query_string += f" SHARD DURATION {rp.shard_group_duration}"
if rp.index_duration:
query_string += f" INDEX DURATION {rp.index_duration}"
if rp.name:
query_string += f" NAME {rp.name}"
return self._query_post(Query(database=database, command=query_string, retention_policy=''))

def show_databases(self) -> List[str]:
pass
query_string = "SHOW DATABASES"
qr = self.query(Query(database='', command=query_string, retention_policy=''))
if not qr.results or not qr.results[0].series:
return []
return [val[0] for val in qr.results[0].series[0].values if val]

def drop_database(self, database: str):
pass
if not database:
raise ValueError("empty database name")
query_string = f"DROP DATABASE {database}"
return self._query_post(Query(database=database, command=query_string, retention_policy=''))

def create_retention_policy(self, dbname, rp_config: RpConfig, is_default: bool):
pass

def show_retention_policies(self, dbname):
pass
if not dbname:
raise ValueError("empty database name")
if not rp_config:
raise ValueError("rp_config is required")

query_string = (f"CREATE RETENTION POLICY {rp_config.name} ON {dbname} DURATION {rp_config.duration}"
f" REPLICATION 1")
if rp_config.shard_group_duration:
query_string += f" SHARD DURATION {rp_config.shard_group_duration}"
if rp_config.index_duration:
query_string += f" INDEX DURATION {rp_config.index_duration}"
if is_default:
query_string += " DEFAULT"

return self._query_post(Query(database=dbname, command=query_string, retention_policy=''))

def show_retention_policies(self, dbname: str):
if not dbname:
raise ValueError("empty database name")

query_string = f"SHOW RETENTION POLICIES ON {dbname}"
qr = self.query(Query(database=dbname, command=query_string, retention_policy=''))
if not qr.results or not qr.results[0].series:
return []
return [val for val in qr.results[0].series[0].values if val]

def drop_retention_policy(self, dbname, retention_policy: str):
pass
if not dbname:
raise ValueError("empty database name")
if not retention_policy:
raise ValueError("empty retention policy name")

query_string = f"DROP RETENTION POLICY {retention_policy} ON {dbname}"
return self._query_post(Query(database=dbname, command=query_string, retention_policy=retention_policy))
43 changes: 43 additions & 0 deletions opengemini_client/database_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import inspect
import unittest

from opengemini_client import test_utils


class DatabaseTest(unittest.TestCase):

def test_create_databases_success(self):
dbname = inspect.currentframe().f_code.co_name
with test_utils.get_test_default_client() as cli:
qr = cli.create_database(database=dbname)
self.assertEqual(qr.error, None)
qr = cli.drop_database(database=dbname)
self.assertEqual(qr.error, None)

def test_show_databases_success(self):
dbname = inspect.currentframe().f_code.co_name
with test_utils.get_test_default_client() as cli:
qr = cli.create_database(database=dbname)
self.assertEqual(qr.error, None)
new_db_list = cli.show_databases()
self.assertTrue(dbname in new_db_list)
qr = cli.drop_database(database=dbname)
self.assertEqual(qr.error, None)

def test_create_databases_with_empty_db(self):
with test_utils.get_test_default_client() as cli:
with self.assertRaises(ValueError):
cli.create_database(database='')

def test_drop_databases_success(self):
dbname = inspect.currentframe().f_code.co_name
with test_utils.get_test_default_client() as cli:
qr = cli.create_database(database=dbname)
self.assertEqual(qr.error, None)
qr = cli.drop_database(database=dbname)
self.assertEqual(qr.error, None)

def test_drop_databases_with_empty_db(self):
with test_utils.get_test_default_client() as cli:
with self.assertRaises(ValueError):
cli.drop_database(database='')
3 changes: 1 addition & 2 deletions opengemini_client/query_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import time
import unittest

from opengemini_client import client_impl
Expand All @@ -21,5 +22,3 @@ def test_query_no_db_in_opengemini(self):
self.assertEqual(len(results), 1)
result = results[0]
self.assertEqual(result.error, None)
series = result.series
self.assertEqual(len(series), 0)
74 changes: 74 additions & 0 deletions opengemini_client/retention_policy_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import inspect
import unittest

from opengemini_client import models
from opengemini_client import test_utils


class RetentionPolicyTest(unittest.TestCase):

def test_create_retention_policy_empty_db_parameter(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
with self.assertRaises(ValueError):
cli.create_retention_policy(dbname='', rp_config=rp_config, is_default=True)

def test_create_retention_policy_success(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
cli.create_database(dbname)
qr = cli.create_retention_policy(dbname=dbname, rp_config=rp_config, is_default=True)
self.assertEqual(qr.error, None)
cli.drop_retention_policy(dbname=dbname, retention_policy=rp_config.name)
cli.drop_database(dbname)

def test_show_retention_policy_empty_db_parameter(self):
with test_utils.get_test_default_client() as cli:
with self.assertRaises(ValueError):
cli.show_retention_policies('')

def test_show_retention_policies_success(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
cli.create_database(dbname)
cli.create_retention_policy(dbname=dbname, rp_config=rp_config, is_default=True)
retention_policies = cli.show_retention_policies(dbname=dbname)
rp_config_name = [rp[0] for rp in retention_policies]
self.assertTrue(rp_config.name in rp_config_name)
cli.drop_retention_policy(dbname=dbname, retention_policy=rp_config.name)
cli.drop_database(dbname)

def test_drop_retention_policy_success(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
cli.create_database(dbname)
cli.create_retention_policy(dbname=dbname, rp_config=rp_config, is_default=True)
qr = cli.drop_retention_policy(dbname=dbname, retention_policy=rp_config.name)
self.assertEqual(qr.error, None)
cli.drop_database(dbname)

def test_drop_retention_policy_no_db_parameters(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
cli.create_database(dbname)
cli.create_retention_policy(dbname=dbname, rp_config=rp_config, is_default=True)
with self.assertRaises(ValueError):
cli.drop_retention_policy('', retention_policy=rp_config.name)
cli.drop_retention_policy(dbname=dbname, retention_policy=rp_config.name)
cli.drop_database(dbname)

def test_drop_retention_policy_no_rp_parameters(self):
dbname = inspect.currentframe().f_code.co_name
rp_config = models.RpConfig(dbname, '2h', '2h', '2h')
with test_utils.get_test_default_client() as cli:
cli.create_database(dbname)
cli.create_retention_policy(dbname=dbname, rp_config=rp_config, is_default=True)
with self.assertRaises(ValueError):
cli.drop_retention_policy(dbname, retention_policy='')
cli.drop_retention_policy(dbname=dbname, retention_policy=rp_config.name)
cli.drop_database(dbname)

0 comments on commit 1100340

Please sign in to comment.