From 8207c60e37cbbba770faa4958030707e50727ceb Mon Sep 17 00:00:00 2001 From: Ievgenii Shepeliuk Date: Thu, 13 Jul 2023 17:39:24 +0000 Subject: [PATCH] feat: define / undefine --- poetry.lock | 16 +++---- pykli/ksqldb.py | 52 +++++++-------------- pykli/repl_eval.py | 16 +++++-- pykli/repl_read.py | 13 ++++-- pykli/tokens.py | 17 ++++++- pyproject.toml | 1 + tests/conftest.py | 54 ++++++++++++++++++++++ tests/test_ksqldb.py | 29 ++++++++++++ tests/{test_e2e.py => test_pykli.py} | 56 +++++++---------------- tests/test_repl_eval.py | 30 +++++++++++++ tests/test_repl_read.py | 67 +++++++++++----------------- tests/test_tokens.py | 47 +++++++++++++++++++ 12 files changed, 262 insertions(+), 136 deletions(-) create mode 100644 tests/conftest.py create mode 100644 tests/test_ksqldb.py rename tests/{test_e2e.py => test_pykli.py} (78%) create mode 100644 tests/test_repl_eval.py create mode 100644 tests/test_tokens.py diff --git a/poetry.lock b/poetry.lock index 4e86af3..4d81835 100644 --- a/poetry.lock +++ b/poetry.lock @@ -279,21 +279,21 @@ pluggy = ">=0.12,<2.0" testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] -name = "pytest-env" -version = "0.8.2" -description = "py.test plugin that allows you to add environment variables." +name = "pytest-mock" +version = "3.11.1" +description = "Thin-wrapper around the mock package for easier use with pytest" optional = false python-versions = ">=3.7" files = [ - {file = "pytest_env-0.8.2-py3-none-any.whl", hash = "sha256:5e533273f4d9e6a41c3a3120e0c7944aae5674fa773b329f00a5eb1f23c53a38"}, - {file = "pytest_env-0.8.2.tar.gz", hash = "sha256:baed9b3b6bae77bd75b9238e0ed1ee6903a42806ae9d6aeffb8754cd5584d4ff"}, + {file = "pytest-mock-3.11.1.tar.gz", hash = "sha256:7f6b125602ac6d743e523ae0bfa71e1a697a2f5534064528c6ff84c2f7c2fc7f"}, + {file = "pytest_mock-3.11.1-py3-none-any.whl", hash = "sha256:21c279fff83d70763b05f8874cc9cfb3fcacd6d354247a976f9529d19f9acf39"}, ] [package.dependencies] -pytest = ">=7.3.1" +pytest = ">=5.0" [package.extras] -test = ["coverage (>=7.2.7)", "pytest-mock (>=3.10)"] +dev = ["pre-commit", "pytest-asyncio", "tox"] [[package]] name = "six" @@ -375,4 +375,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "389fe3daf35aaaf1e6f9f86f6c2b5107e12b35f3eab259b3bd1565d0d847a930" +content-hash = "19f0b589570acd55a5e6beacaacf9c33c3abb3dc4020733c1619a55f6198de9b" diff --git a/pykli/ksqldb.py b/pykli/ksqldb.py index 904c9bb..6763497 100644 --- a/pykli/ksqldb.py +++ b/pykli/ksqldb.py @@ -1,6 +1,6 @@ import base64 import httpx - +from pprint import pformat class KsqlDBClient: def __init__(self, url, api_key=None, api_secret=None): @@ -8,6 +8,7 @@ def __init__(self, url, api_key=None, api_secret=None): self._url = url self._headers = {"Content-Type": "application/vnd.ksql.v1+json"} self._client = httpx.Client(base_url=url, http2=True) + self._session_vars = {} if api_key and api_secret: b64string = base64.b64encode(bytes(f"{api_key}:{api_secret}")) @@ -19,58 +20,35 @@ def url(self): return self._url + def define(self, name: str, value: str) -> None: + self._session_vars[name] = value + + + def undefine(self, name: str) -> None: + del self._session_vars[name] + + def info(self): r = self._client.get("/info", headers=self._headers) r.raise_for_status() return r.json()["KsqlServerInfo"] - def stmt(self, ksql_str, stream_props={}): - body = { - "ksql": ksql_str, - "streamsProperties": stream_props, - "sessionVariables": {}, - } + def stmt(self, ksql_str): + body = {"ksql": ksql_str, "sessionVariables": self._session_vars} r = self._client.post("/ksql", json=body, headers=self._headers) r.raise_for_status() return r.json() - def pull_query(self, ksql_str, stream_props={}): - body = { - "sql": ksql_str, - "streamsProperties": stream_props, - "sessionVariables": {}, - } + + def pull_query(self, ksql_str): + body = {"sql": ksql_str, "sessionVariables": self._session_vars} headers = {"Accept": "application/json"} r = self._client.post("/query-stream", json=body, headers=headers) r.raise_for_status() return r.json() - def list_topic_names(self) -> list[str]: - json = self.stmt("show topics;")[0] - return [t["name"] for t in json["topics"]] - - - def list_stream_names(self) -> list[str]: - json = self.stmt("show streams;")[0] - return [t["name"] for t in json["streams"]] - - - def list_type_names(self) -> list[str]: - json = self.stmt("show types;")[0] - return json["types"].keys() - - - def list_table_names(self) -> list[str]: - json = self.stmt("show tables;")[0] - return [t["name"] for t in json["tables"]] - - - def list_connector_names(self) -> list[str]: - json = self.stmt("show connectors;")[0] - return [t["name"] for t in json["connectors"]] - # async def query_async(self, query_string, stream_properties=None, timeout=10): # async for x in self.api.query( # query_string=query_string, diff --git a/pykli/repl_eval.py b/pykli/repl_eval.py index 0f068b8..fb7fa43 100644 --- a/pykli/repl_eval.py +++ b/pykli/repl_eval.py @@ -1,14 +1,16 @@ -from . import LOG -from .tokens import Stmt, ErrMsg, StmtResponse, Info, PullQuery, QueryResponse - import httpx from pprint import pformat +from . import LOG +from .tokens import Stmt, ErrMsg, StmtResponse, Info, PullQuery, QueryResponse, SessionVar + + class pykli_eval: def __init__(self, ksqldb): self._ksqldb = ksqldb - def __call__(self, token): + + def __call__(self, token) -> StmtResponse | QueryResponse | ErrMsg | None: try: LOG.debug(f"pykli_eval: token={token}") match token: @@ -25,6 +27,12 @@ def __call__(self, token): return QueryResponse(resp) case ErrMsg(): return token + case SessionVar(nm, val): + if val is not None: + self._ksqldb.define(nm, val) + else: + self._ksqldb.undefine(nm) + return None case _: return ErrMsg(f"not yet implemented: {token}") except httpx.HTTPStatusError as e: diff --git a/pykli/repl_read.py b/pykli/repl_read.py index cd243fd..bd17d68 100644 --- a/pykli/repl_read.py +++ b/pykli/repl_read.py @@ -8,12 +8,13 @@ from pprint import pformat from pathlib import Path from sqlparse.tokens import DML, DDL, String, Keyword +from sqlparse.sql import Comparison, Identifier import sqlparse from . import MONOKAI_STYLE, HISTORY_FILE, LOG from .completer import pykli_completer from .keybindgings import pykli_keys -from .tokens import Stmt, ErrMsg, KRunScript, PullQuery +from .tokens import KSQL, Stmt, ErrMsg, KRunScript, PullQuery, SessionVar class file_prompt: @@ -68,8 +69,6 @@ def tokenize_script(stmt): def tokenize_sql_stmt(stmt): kw = stmt.token_first() - # stmt._pprint_tree() - LOG.debug(f"tokenize_sql_stmt: stmt=<{stmt}>, first_token={pformat(kw)}") if kw.ttype == Keyword or kw.ttype == DDL: yield Stmt(stmt.value) @@ -79,6 +78,14 @@ def tokenize_sql_stmt(stmt): yield PullQuery(stmt.value) elif kw.ttype is KRunScript: yield from tokenize_script(stmt) + elif kw.ttype is KSQL.Define: + _, cmp = stmt.token_next(stmt.token_index(kw)) + if isinstance(cmp, Comparison) and cmp.right.ttype is String.Single: + yield SessionVar(cmp.left.value, cmp.right.value.strip("'")) + elif kw.ttype is KSQL.Undefine: + _, id = stmt.token_next(stmt.token_index(kw)) + if isinstance(id, Identifier): + yield SessionVar(id.value, None) else: yield stmt diff --git a/pykli/tokens.py b/pykli/tokens.py index 925cb5e..0c38d58 100644 --- a/pykli/tokens.py +++ b/pykli/tokens.py @@ -6,27 +6,39 @@ from . import LOG + class Stmt(NamedTuple): ksql: str + class PullQuery(NamedTuple): ksql: str + class QueryResponse(NamedTuple): val: dict + class StmtResponse(NamedTuple): val: dict + class Info(NamedTuple): srv: str + class ErrMsg(NamedTuple): msg: str + +class SessionVar(NamedTuple): + name: str + val: str | None + + KRunScript = Keyword.KRunScript -KDefine = Keyword.KDefine +KSQL = Keyword.KSQL def initialize_sqlparse(): lex = Lexer.get_default_instance() @@ -39,6 +51,7 @@ def initialize_sqlparse(): lex.add_keywords(keywords.KEYWORDS_COMMON) lex.add_keywords(keywords.KEYWORDS) - lex.add_keywords({"DEFINE": Keyword.KDefine}) + lex.add_keywords({"DEFINE": KSQL.Define}) + lex.add_keywords({"UNDEFINE": KSQL.Undefine}) LOG.info("initialize_sqlparse done") diff --git a/pyproject.toml b/pyproject.toml index 6778939..d26b9be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ pykli = "pykli.__main__:main" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" +pytest-mock = "^3.11.1" [build-system] requires = ["poetry-core"] diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..fb7b8b7 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,54 @@ +import pytest +import httpx +from time import sleep + +from pykli.ksqldb import KsqlDBClient + +@pytest.fixture(scope="module") +def ksqldb_url(): + return "http://localhost:28088" + + +@pytest.fixture(scope="module") +def ksqldb(ksqldb_url): + ksqldb = KsqlDBClient(ksqldb_url) + for i in range(5): + try: + ksqldb.stmt(""" +drop stream if exists pykli_stream_json delete topic; +drop type if exists pykli_type; +drop connector if exists pykli_connector; +""") + return ksqldb + except httpx.HTTPError: + print(f"Waiting for KsqlDB #{i}, {i * 10} sec") + sleep(10) + raise RuntimeError(f"{ksqldb_url} unavailable") + + +def list_type_names(ksqldb) -> list[str]: + json = ksqldb.stmt("show types;")[0] + return json["types"].keys() + + +def list_topic_names(ksqldb) -> list[str]: + json = ksqldb.stmt("show topics;")[0] + return [t["name"] for t in json["topics"]] + + +def list_stream_names(ksqldb) -> list[str]: + json = ksqldb.stmt("show streams;")[0] + return [t["name"] for t in json["streams"]] + + + +def list_table_names(ksqldb) -> list[str]: + json = ksqldb.stmt("show tables;")[0] + return [t["name"] for t in json["tables"]] + + +def list_connector_names(ksqldb) -> list[str]: + json = ksqldb.stmt("show connectors;")[0] + return [t["name"] for t in json["connectors"]] + + diff --git a/tests/test_ksqldb.py b/tests/test_ksqldb.py new file mode 100644 index 0000000..1d6c69d --- /dev/null +++ b/tests/test_ksqldb.py @@ -0,0 +1,29 @@ +import pytest +import httpx + + +@pytest.mark.e2e +def test_define_stmt(ksqldb): + ksqldb.define("ff", "TO_JSON_STRING") + r = ksqldb.stmt("describe function ${ff};") + assert len(r) == 1 + assert r[0]["@type"] == "describe_function" + + ksqldb.undefine("ff") + with pytest.raises(httpx.HTTPStatusError) as ex: + ksqldb.stmt("describe function ${ff};") + assert ex.value.response.json()["message"] == "Can't find any functions with the name '${ff}'" + + +@pytest.mark.e2e +def test_define_pull_query(ksqldb): + ksqldb.define("ff", "KSQL_PROCESSING_LOG") + r = ksqldb.pull_query("select * from ${ff};") + assert len(r) == 1 + assert "columnNames" in r[0] + + ksqldb.undefine("ff") + with pytest.raises(httpx.HTTPStatusError) as ex: + ksqldb.pull_query("select * from ${ff};") + assert ex.value.response.json()["statementText"] == "${FF}" + diff --git a/tests/test_e2e.py b/tests/test_pykli.py similarity index 78% rename from tests/test_e2e.py rename to tests/test_pykli.py index 1f31917..faed449 100644 --- a/tests/test_e2e.py +++ b/tests/test_pykli.py @@ -1,37 +1,12 @@ -from pprint import pformat, pprint import pytest -import httpx -from time import sleep from click.testing import CliRunner -from pykli.ksqldb import KsqlDBClient -from pykli.__main__ import main from prompt_toolkit.application import create_app_session from prompt_toolkit.input import create_pipe_input from prompt_toolkit.output import DummyOutput - -@pytest.fixture(scope="module") -def ksqldb_url(): - return "http://localhost:28088" - - -@pytest.fixture(scope="module") -def ksqldb(ksqldb_url): - ksqldb = KsqlDBClient(ksqldb_url) - for i in range(5): - try: - ksqldb.stmt(""" -drop stream if exists pykli_stream_json delete topic; -drop type if exists pykli_type; -drop connector if exists pykli_connector; -""") - return ksqldb - except httpx.HTTPError: - print(f"Waiting for KsqlDB #{i}, {i * 10} sec") - sleep(10) - raise RuntimeError(f"{ksqldb_url} unavailable") - +from pykli.__main__ import main +from .conftest import list_type_names, list_topic_names, list_stream_names, list_connector_names @pytest.fixture(scope="function") def mock_input(): @@ -42,7 +17,6 @@ def mock_input(): @pytest.mark.e2e def test_streams(mock_input, ksqldb): - mock_input.send_text(""" create or replace stream pykli_stream_json ( id varchar key, `firstName` varchar, "Age" int @@ -75,8 +49,8 @@ def test_streams(mock_input, ksqldb): assert "| Kafka Topic | Partitions | Partition Replicas |" in output assert "| pykli_stream_json | 1 | 1 |" in output - assert "pykli_stream_json" in ksqldb.list_topic_names() - assert "PYKLI_STREAM_JSON" in ksqldb.list_stream_names() + assert "pykli_stream_json" in list_topic_names(ksqldb) + assert "PYKLI_STREAM_JSON" in list_stream_names(ksqldb) mock_input.send_text(""" drop stream pykli_stream_json delete topic; @@ -88,8 +62,8 @@ def test_streams(mock_input, ksqldb): assert r.exit_code == 0 assert "Source `PYKLI_STREAM_JSON` (topic: pykli_stream_json) was dropped." in r.output - assert "pykli_stream_json" not in ksqldb.list_topic_names() - assert "PYKLI_STREAM_JSON" not in ksqldb.list_stream_names() + assert "pykli_stream_json" not in list_topic_names(ksqldb) + assert "PYKLI_STREAM_JSON" not in list_stream_names(ksqldb) @pytest.mark.e2e @@ -105,7 +79,7 @@ def test_types(mock_input, ksqldb): assert r.exit_code == 0 assert "Registered custom type with name 'PYKLI_TYPE'" in r.output - assert "PYKLI_TYPE" in ksqldb.list_type_names() + assert "PYKLI_TYPE" in list_type_names(ksqldb) mock_input.send_text(""" drop type if exists pykli_type; @@ -116,7 +90,7 @@ def test_types(mock_input, ksqldb): assert r.exit_code == 0 assert "Dropped type 'PYKLI_TYPE'" in r.output - assert "PYKLI_TYPE" not in ksqldb.list_type_names() + assert "PYKLI_TYPE" not in list_type_names(ksqldb) @pytest.mark.e2e @@ -166,8 +140,8 @@ def test_connectors(mock_input, ksqldb): assert "Task ID" in r.output assert "RUNNING" in r.output - assert "PYKLI_CONNECTOR" in ksqldb.list_connector_names() - assert "pykli_connector" in ksqldb.list_topic_names() + assert "PYKLI_CONNECTOR" in list_connector_names(ksqldb) + assert "pykli_connector" in list_topic_names(ksqldb) mock_input.send_text(""" drop connector if exists pykli_connector; @@ -176,7 +150,7 @@ def test_connectors(mock_input, ksqldb): r = runner.invoke(main, [ksqldb.url]) assert r.exit_code == 0 - assert "PYKLI_CONNECTOR" not in ksqldb.list_connector_names() + assert "PYKLI_CONNECTOR" not in list_connector_names(ksqldb) @pytest.mark.skip @@ -206,8 +180,8 @@ def test_tables(mock_input, ksqldb): assert output.count("| uuid1 | Tom | 20 |") == 1 assert output.count("| uuid2 | Fred | 30 |") == 1 - assert "pykli_json" in ksqldb.list_topic_names() - assert "PYKLI_JSON" in ksqldb.list_stream_names() + assert "pykli_json" in list_topic_names(ksqldb) + assert "PYKLI_JSON" in list_stream_names(ksqldb) mock_input.send_text(""" drop stream pykli_json delete topic; @@ -219,5 +193,5 @@ def test_tables(mock_input, ksqldb): assert r.exit_code == 0 assert "Source `PYKLI_JSON` (topic: pykli_json) was dropped." in r.output - assert "pykli_json" not in ksqldb.list_topic_names() - assert "PYKLI_JSON" not in ksqldb.list_stream_names() + assert "pykli_json" not in list_topic_names(ksqldb) + assert "PYKLI_JSON" not in list_tlist_stream_names(ksqldb) diff --git a/tests/test_repl_eval.py b/tests/test_repl_eval.py new file mode 100644 index 0000000..50091f6 --- /dev/null +++ b/tests/test_repl_eval.py @@ -0,0 +1,30 @@ +import pytest +from pykli.repl_eval import pykli_eval +from pykli.tokens import Info, SessionVar, StmtResponse + + +@pytest.fixture +def ksqldb_mock(mocker): + return mocker.patch("pykli.ksqldb.KsqlDBClient", autospec=True) + + +def test_eval_info(ksqldb_mock): + ksqldb_mock.info.return_value = {"ksql": "info"} + + req = Info("mys_srv") + resp = pykli_eval(ksqldb_mock)(req) + ksqldb_mock.info.assert_called_once() + assert isinstance(resp, StmtResponse) + assert resp.val[0] == {"@type": "info", "server": req.srv, "ksql": "info"} + + +def test_eval_define(ksqldb_mock): + e = pykli_eval(ksqldb_mock) + + resp = e(SessionVar("var1", "some_value")) + assert resp is None + ksqldb_mock.define.assert_called_once_with("var1", "some_value") + + resp = e(SessionVar("var1", None)) + assert resp is None + ksqldb_mock.undefine.assert_called_once_with("var1") diff --git a/tests/test_repl_read.py b/tests/test_repl_read.py index d6259b7..4519db9 100644 --- a/tests/test_repl_read.py +++ b/tests/test_repl_read.py @@ -1,50 +1,35 @@ -from pykli.repl_print import pok -from pprint import pformat, pprint - -from sqlparse.tokens import String, Keyword import sqlparse +import pytest -from pathlib import Path - - -def test_sqlparse2(): - sql = """ exit ; select * from qwe ; select qwe from asd; update qwe set as=2;""" - stmts = [s.strip() for s in sqlparse.split(sql)] - pok(f"found {len(stmts)} statements:\n") - - for s in stmts: - pok(f"===>>> {s}") - - p = sqlparse.parse(s)[0] - p._pprint_tree() - - t1 = p.token_first() - pok(t1) - +from pykli.tokens import initialize_sqlparse, SessionVar +from pykli.repl_read import tokenize_sql_stmt -# sql = """RUN SCRIPT '/qwe/asd/asd.ksql';define zzz='zzz';DEFINE ttt='ttt';""" -def test_sqlparse_run_script(): - sql = """ RUN SCRIPT '/qwe/asd/asd.ksql'; run SCRIPT '/tmp/555/file.ksql';""" - stmts = [s.strip() for s in sqlparse.split(sql)] - pok(f"found {len(stmts)} statements:\n") - for s in stmts: - print(f"===>>> {s}") +@pytest.fixture(scope="module", autouse=True) +def ksqldb_mock(): + initialize_sqlparse() - p = sqlparse.parse(s)[0] - p._pprint_tree() - t1 = p.token_first() - _, t2 = p.token_next(0) +@pytest.mark.parametrize("ksql", [ + ("define my_var = 'my_val';"), + (" define my_var = 'my_val';"), +]) +def test_tokenize_sql_stmt_define(ksql): + stmt = sqlparse.parse(ksql)[0] + t = next(tokenize_sql_stmt(stmt)) + assert isinstance(t, SessionVar) + assert t.name == "my_var" + assert t.val == "my_val" - pok(t1) - pok(f"{t1}") - # pok(f"{t1.ttype} -> @{t1.value}@") - # pok(t1.match(Keyword, (r"run\s+script\b",), regex=True)) - # pok(f"{t2.ttype} -> {t2}") - # pok(f"{t2.ttype is String.Single}") - # pok(pformat(Path(t2.value.strip("'")))) - # pok(Path(t2.value.strip("'")).exists()) +@pytest.mark.parametrize("ksql", [ + ("undefine my_var;"), + (" undefine my_var ; "), +]) +def test_tokenize_sql_stmt_undefine(ksql): + stmt = sqlparse.parse(ksql)[0] + t = next(tokenize_sql_stmt(stmt)) + assert isinstance(t, SessionVar) + assert t.name == "my_var" + assert t.val is None -# sql = """RUN SCRIPT '/qwe/asd/asd.ksql';define zzz='zzz';DEFINE ttt='ttt';""" diff --git a/tests/test_tokens.py b/tests/test_tokens.py new file mode 100644 index 0000000..3620818 --- /dev/null +++ b/tests/test_tokens.py @@ -0,0 +1,47 @@ +import pytest +import sqlparse + +from pykli.tokens import KSQL, KRunScript, initialize_sqlparse + +@pytest.fixture(scope="module", autouse=True) +def ksqldb_mock(): + initialize_sqlparse() + + +@pytest.mark.parametrize("ksql", [ + (" RUN SCRIPT '/qwe/asd/asd.ksql';"), + ("run SCRIPT '/qwe/asd/asd.ksql'"), +]) +def test_sqlparse_run_script(ksql): + stmts = sqlparse.parse(ksql) + assert len(stmts) == 1 + + t = stmts[0].token_first() + assert t.ttype == KRunScript + + +@pytest.mark.parametrize("ksql", [ + ("define qwe = 'asd;"), + (" define qwe = 'asd "), + ("DEFINE qwe = 'asd;"), + (" DEFINE qwe = 'asd "), +]) +def test_sqlparse_define(ksql): + stmts = sqlparse.parse(ksql) + assert len(stmts) == 1 + + t = stmts[0].token_first() + assert t.ttype == KSQL.Define + + +@pytest.mark.parametrize("ksql", [ + ("undefine qwe;"), + (" undefine qwe "), + (" UNDEFINE qwe "), +]) +def test_sqlparse_undefine(ksql): + stmts = sqlparse.parse(ksql) + assert len(stmts) == 1 + + t = stmts[0].token_first() + assert t.ttype == KSQL.Undefine