Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
feat: define / undefine
Browse files Browse the repository at this point in the history
  • Loading branch information
eshepelyuk committed Jul 24, 2023
1 parent 0f13432 commit 8207c60
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 136 deletions.
16 changes: 8 additions & 8 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 15 additions & 37 deletions pykli/ksqldb.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import base64
import httpx

from pprint import pformat

class KsqlDBClient:
def __init__(self, url, api_key=None, api_secret=None):
# TODO introduce common headers
self._url = url
self._headers = {"Content-Type": "application/vnd.ksql.v1+json"}
self._client = httpx.Client(base_url=url, http2=True)
self._session_vars = {}

if api_key and api_secret:
b64string = base64.b64encode(bytes(f"{api_key}:{api_secret}"))
Expand All @@ -19,58 +20,35 @@ def url(self):
return self._url


def define(self, name: str, value: str) -> None:
self._session_vars[name] = value


def undefine(self, name: str) -> None:
del self._session_vars[name]


def info(self):
r = self._client.get("/info", headers=self._headers)
r.raise_for_status()
return r.json()["KsqlServerInfo"]


def stmt(self, ksql_str, stream_props={}):
body = {
"ksql": ksql_str,
"streamsProperties": stream_props,
"sessionVariables": {},
}
def stmt(self, ksql_str):
body = {"ksql": ksql_str, "sessionVariables": self._session_vars}
r = self._client.post("/ksql", json=body, headers=self._headers)
r.raise_for_status()
return r.json()

def pull_query(self, ksql_str, stream_props={}):
body = {
"sql": ksql_str,
"streamsProperties": stream_props,
"sessionVariables": {},
}

def pull_query(self, ksql_str):
body = {"sql": ksql_str, "sessionVariables": self._session_vars}
headers = {"Accept": "application/json"}
r = self._client.post("/query-stream", json=body, headers=headers)
r.raise_for_status()
return r.json()


def list_topic_names(self) -> list[str]:
json = self.stmt("show topics;")[0]
return [t["name"] for t in json["topics"]]


def list_stream_names(self) -> list[str]:
json = self.stmt("show streams;")[0]
return [t["name"] for t in json["streams"]]


def list_type_names(self) -> list[str]:
json = self.stmt("show types;")[0]
return json["types"].keys()


def list_table_names(self) -> list[str]:
json = self.stmt("show tables;")[0]
return [t["name"] for t in json["tables"]]


def list_connector_names(self) -> list[str]:
json = self.stmt("show connectors;")[0]
return [t["name"] for t in json["connectors"]]

# async def query_async(self, query_string, stream_properties=None, timeout=10):
# async for x in self.api.query(
# query_string=query_string,
Expand Down
16 changes: 12 additions & 4 deletions pykli/repl_eval.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from . import LOG
from .tokens import Stmt, ErrMsg, StmtResponse, Info, PullQuery, QueryResponse

import httpx
from pprint import pformat

from . import LOG
from .tokens import Stmt, ErrMsg, StmtResponse, Info, PullQuery, QueryResponse, SessionVar


class pykli_eval:
def __init__(self, ksqldb):
self._ksqldb = ksqldb

def __call__(self, token):

def __call__(self, token) -> StmtResponse | QueryResponse | ErrMsg | None:
try:
LOG.debug(f"pykli_eval: token={token}")
match token:
Expand All @@ -25,6 +27,12 @@ def __call__(self, token):
return QueryResponse(resp)
case ErrMsg():
return token
case SessionVar(nm, val):
if val is not None:
self._ksqldb.define(nm, val)
else:
self._ksqldb.undefine(nm)
return None
case _:
return ErrMsg(f"not yet implemented: {token}")
except httpx.HTTPStatusError as e:
Expand Down
13 changes: 10 additions & 3 deletions pykli/repl_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from pprint import pformat
from pathlib import Path
from sqlparse.tokens import DML, DDL, String, Keyword
from sqlparse.sql import Comparison, Identifier
import sqlparse

from . import MONOKAI_STYLE, HISTORY_FILE, LOG
from .completer import pykli_completer
from .keybindgings import pykli_keys
from .tokens import Stmt, ErrMsg, KRunScript, PullQuery
from .tokens import KSQL, Stmt, ErrMsg, KRunScript, PullQuery, SessionVar


class file_prompt:
Expand Down Expand Up @@ -68,8 +69,6 @@ def tokenize_script(stmt):
def tokenize_sql_stmt(stmt):
kw = stmt.token_first()

# stmt._pprint_tree()

LOG.debug(f"tokenize_sql_stmt: stmt=<{stmt}>, first_token={pformat(kw)}")
if kw.ttype == Keyword or kw.ttype == DDL:
yield Stmt(stmt.value)
Expand All @@ -79,6 +78,14 @@ def tokenize_sql_stmt(stmt):
yield PullQuery(stmt.value)
elif kw.ttype is KRunScript:
yield from tokenize_script(stmt)
elif kw.ttype is KSQL.Define:
_, cmp = stmt.token_next(stmt.token_index(kw))
if isinstance(cmp, Comparison) and cmp.right.ttype is String.Single:
yield SessionVar(cmp.left.value, cmp.right.value.strip("'"))
elif kw.ttype is KSQL.Undefine:
_, id = stmt.token_next(stmt.token_index(kw))
if isinstance(id, Identifier):
yield SessionVar(id.value, None)
else:
yield stmt

Expand Down
17 changes: 15 additions & 2 deletions pykli/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,39 @@

from . import LOG


class Stmt(NamedTuple):
ksql: str


class PullQuery(NamedTuple):
ksql: str


class QueryResponse(NamedTuple):
val: dict


class StmtResponse(NamedTuple):
val: dict


class Info(NamedTuple):
srv: str


class ErrMsg(NamedTuple):
msg: str


class SessionVar(NamedTuple):
name: str
val: str | None


KRunScript = Keyword.KRunScript

KDefine = Keyword.KDefine
KSQL = Keyword.KSQL

def initialize_sqlparse():
lex = Lexer.get_default_instance()
Expand All @@ -39,6 +51,7 @@ def initialize_sqlparse():
lex.add_keywords(keywords.KEYWORDS_COMMON)
lex.add_keywords(keywords.KEYWORDS)

lex.add_keywords({"DEFINE": Keyword.KDefine})
lex.add_keywords({"DEFINE": KSQL.Define})
lex.add_keywords({"UNDEFINE": KSQL.Undefine})

LOG.info("initialize_sqlparse done")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pykli = "pykli.__main__:main"

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
pytest-mock = "^3.11.1"

[build-system]
requires = ["poetry-core"]
Expand Down
54 changes: 54 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
import httpx
from time import sleep

from pykli.ksqldb import KsqlDBClient

@pytest.fixture(scope="module")
def ksqldb_url():
return "http://localhost:28088"


@pytest.fixture(scope="module")
def ksqldb(ksqldb_url):
ksqldb = KsqlDBClient(ksqldb_url)
for i in range(5):
try:
ksqldb.stmt("""
drop stream if exists pykli_stream_json delete topic;
drop type if exists pykli_type;
drop connector if exists pykli_connector;
""")
return ksqldb
except httpx.HTTPError:
print(f"Waiting for KsqlDB #{i}, {i * 10} sec")
sleep(10)
raise RuntimeError(f"{ksqldb_url} unavailable")


def list_type_names(ksqldb) -> list[str]:
json = ksqldb.stmt("show types;")[0]
return json["types"].keys()


def list_topic_names(ksqldb) -> list[str]:
json = ksqldb.stmt("show topics;")[0]
return [t["name"] for t in json["topics"]]


def list_stream_names(ksqldb) -> list[str]:
json = ksqldb.stmt("show streams;")[0]
return [t["name"] for t in json["streams"]]



def list_table_names(ksqldb) -> list[str]:
json = ksqldb.stmt("show tables;")[0]
return [t["name"] for t in json["tables"]]


def list_connector_names(ksqldb) -> list[str]:
json = ksqldb.stmt("show connectors;")[0]
return [t["name"] for t in json["connectors"]]


29 changes: 29 additions & 0 deletions tests/test_ksqldb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import pytest
import httpx


@pytest.mark.e2e
def test_define_stmt(ksqldb):
ksqldb.define("ff", "TO_JSON_STRING")
r = ksqldb.stmt("describe function ${ff};")
assert len(r) == 1
assert r[0]["@type"] == "describe_function"

ksqldb.undefine("ff")
with pytest.raises(httpx.HTTPStatusError) as ex:
ksqldb.stmt("describe function ${ff};")
assert ex.value.response.json()["message"] == "Can't find any functions with the name '${ff}'"


@pytest.mark.e2e
def test_define_pull_query(ksqldb):
ksqldb.define("ff", "KSQL_PROCESSING_LOG")
r = ksqldb.pull_query("select * from ${ff};")
assert len(r) == 1
assert "columnNames" in r[0]

ksqldb.undefine("ff")
with pytest.raises(httpx.HTTPStatusError) as ex:
ksqldb.pull_query("select * from ${ff};")
assert ex.value.response.json()["statementText"] == "${FF}"

Loading

0 comments on commit 8207c60

Please sign in to comment.