Skip to content

Commit

Permalink
add cassandra support (#2230)
Browse files Browse the repository at this point in the history
* add cassandra draft

add cassandra integration test

重构 engine 的代码, 改为动态加载

* add more cassandra tests

* fix data dictionary tests

* more tests in cassandra

* add cassandra in readme
  • Loading branch information
LeoQuote authored Aug 16, 2023
1 parent 6018ae8 commit d503b55
Show file tree
Hide file tree
Showing 31 changed files with 726 additions and 278 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/django.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: [3.8, 3.9, "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11"]

# https://github.com/actions/example-services/tree/master/.github/workflows
services:
Expand Down Expand Up @@ -70,7 +70,7 @@ jobs:

- name: Install Dependencies
run: |
sudo apt-get update && sudo apt-get install libsasl2-dev libldap2-dev libssl-dev unixodbc unixodbc-dev
sudo apt-get update && sudo apt-get install libsasl2-dev libkrb5-dev libldap2-dev libssl-dev unixodbc unixodbc-dev
python -m pip install --upgrade pip
pip install codecov coverage flake8 -r requirements.txt
Expand Down
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@
功能清单
====

| 数据库 | 查询 | 审核 | 执行 | 备份 | 数据字典 | 慢日志 | 会话管理 | 账号管理 | 参数管理 | 数据归档 |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| MySQL |||||||||||
| MsSQL || × || × || × | × | × | × | × |
| Redis || × || × | × | × | × | × | × | × |
| PgSQL || × || × | × | × | × | × | × | × |
| Oracle |||||| × || × | × | × |
| MongoDB |||| × | × | × ||| × | × |
| Phoenix || × || × | × | × | × | × | × | × |
| ODPS || × | × | × | × | × | × | × | × | × |
| 数据库 | 查询 | 审核 | 执行 | 备份 | 数据字典 | 慢日志 | 会话管理 | 账号管理 | 参数管理 | 数据归档 |
|------------| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| MySQL |||||||||||
| MsSQL || × || × || × | × | × | × | × |
| Redis || × || × | × | × | × | × | × | × |
| PgSQL || × || × | × | × | × | × | × | × |
| Oracle |||||| × || × | × | × |
| MongoDB |||| × | × | × ||| × | × |
| Phoenix || × || × | × | × | × | × | × | × |
| ODPS || × | × | × | × | × | × | × | × | × |
| ClickHouse |||| × | × | × | × | × | × | × |
| Cassandra || × || × | × | × | × | × | × | × |



Expand Down
33 changes: 31 additions & 2 deletions archery/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
environ.Env.read_env(os.path.join(BASE_DIR, ".env"))
env = environ.Env(
DEBUG=(bool, False),
ALLOWED_HOSTS=(List[str], ["*"]),
ALLOWED_HOSTS=(list, ["*"]),
SECRET_KEY=(str, "hfusaf2m4ot#7)fkw#di2bu6(cv0@opwmafx5n#6=3d%x^hpl6"),
DATABASE_URL=(str, "mysql://root:@127.0.0.1:3306/archery"),
CACHE_URL=(str, "redis://127.0.0.1:6379/0"),
Expand All @@ -38,6 +38,21 @@
Q_CLUISTER_SYNC=(bool, False), # qcluster 同步模式, debug 时可以调整为 True
# CSRF_TRUSTED_ORIGINS=subdomain.example.com,subdomain.example2.com subdomain.example.com
CSRF_TRUSTED_ORIGINS=(list, []),
ENABLED_ENGINES=(
list,
[
"mysql",
"clickhouse",
"goinception",
"mssql",
"redis",
"pqsql",
"oracle",
"mongo",
"phoenix",
"odps",
],
),
)

# SECURITY WARNING: keep the secret key used in production secret!
Expand All @@ -57,6 +72,21 @@
# 请求限制
DATA_UPLOAD_MAX_MEMORY_SIZE = 15728640

AVAILABLE_ENGINES = {
"mysql": {"path": "sql.engines.mysql:MysqlEngine"},
"cassandra": {"path": "sql.engines.cassandra:CassandraEngine"},
"clickhouse": {"path": "sql.engines.clickhouse:ClickHouseEngine"},
"goinception": {"path": "sql.engines.goinception:GoInceptionEngine"},
"mssql": {"path": "sql.engines.mssql:MssqlEngine"},
"redis": {"path": "sql.engines.redis:RedisEngine"},
"pqsql": {"path": "sql.engines.pgsql:PgSQLEngine"},
"oracle": {"path": "sql.engines.oracle:OracleEngine"},
"mongo": {"path": "sql.engines.mongo:MongoEngine"},
"phoenix": {"path": "sql.engines.phoenix:PhoenixEngine"},
"odps": {"path": "sql.engines.odps:ODPSEngine"},
}
ENABLED_ENGINES = env("ENABLED_ENGINES")

# Application definition
INSTALLED_APPS = (
"django.contrib.admin",
Expand Down Expand Up @@ -245,7 +275,6 @@
ENABLE_OIDC = env("ENABLE_OIDC", False)
if ENABLE_OIDC:
INSTALLED_APPS += ("mozilla_django_oidc",)
MIDDLEWARE += ("mozilla_django_oidc.middleware.SessionRefresh",)
AUTHENTICATION_BACKENDS = (
"common.authenticate.oidc_auth.OIDCAuthenticationBackend",
"django.contrib.auth.backends.ModelBackend",
Expand Down
1 change: 1 addition & 0 deletions downloads/dictionary/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.html
Empty file added downloads/dictionary/.gitkeep
Empty file.
18 changes: 0 additions & 18 deletions downloads/dictionary/test_instance_test_archery.html

This file was deleted.

5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pyecharts==1.9.1
aliyun-python-sdk-rds==2.1.1
cx-Oracle==7.3.0
supervisor==4.1.0
phoenixdb==0.7
phoenixdb==1.2.1
django-mirage-field==1.4.0
schema-sync==0.9.7
parsedatetime==2.4
Expand All @@ -38,4 +38,5 @@ django-environ==0.8.1
alibabacloud_dysmsapi20170525==2.0.9
tencentcloud-sdk-python==3.0.656
mozilla-django-oidc==3.0.0
django-auth-dingding==0.0.2
django-auth-dingding==0.0.2
cassandra-driver
26 changes: 21 additions & 5 deletions sql/data_dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ def table_info(request):
)


def get_export_full_path(base_dir: str, instance_name: str, db_name: str) -> str:
"""validate if the instance_name and db_name provided is secure"""
fullpath = os.path.normpath(
os.path.join(base_dir, f"{instance_name}_{db_name}.html")
)
if not fullpath.startswith(base_dir):
return ""
return fullpath


@permission_required("sql.data_dictionary_export", raise_exception=True)
def export(request):
"""导出数据字典"""
Expand All @@ -111,10 +121,10 @@ def export(request):
elif request.user.is_superuser:
dbs = query_engine.get_all_databases().rows
else:
return JsonResponse({"status": 1, "msg": f"仅管理员可以导出整个实例的字典信息!", "data": []})
return JsonResponse({"status": 1, "msg": "仅管理员可以导出整个实例的字典信息!", "data": []})

# 获取数据,存入目录
path = os.path.join(settings.BASE_DIR, "downloads/dictionary")
path = os.path.join(settings.BASE_DIR, "downloads", "dictionary")
os.makedirs(path, exist_ok=True)
for db in dbs:
table_metas = query_engine.get_tables_metas_data(db_name=db)
Expand All @@ -126,12 +136,18 @@ def export(request):
data = loader.render_to_string(
template_name="dictionaryexport.html", context=context, request=request
)
with open(f"{path}/{instance_name}_{db}.html", "w") as f:
f.write(data)
fullpath = get_export_full_path(path, instance_name, db)
if not fullpath:
return JsonResponse({"status": 1, "msg": "实例名或db名不合法", "data": []})
with open(fullpath, "w", encoding="utf-8") as fp:
fp.write(data)
# 关闭连接
query_engine.close()
if db_name:
response = FileResponse(open(f"{path}/{instance_name}_{db_name}.html", "rb"))
fullpath = get_export_full_path(path, instance_name, db)
if not fullpath:
return JsonResponse({"status": 1, "msg": "实例名或db名不合法", "data": []})
response = FileResponse(open(fullpath, "rb"))
response["Content-Type"] = "application/octet-stream"
response[
"Content-Disposition"
Expand Down
49 changes: 49 additions & 0 deletions sql/engines/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Engine 说明

## Cassandra
当前连接时, 使用参数基本为写死参数, 具体可以参照代码.

如果需要覆盖, 可以自行继承

具体方法为:
1. 新增一个文件夹`extras`在根目录, 和`sql`, `sql_api`等文件夹平级 可以docker 打包时加入, 也可以使用卷挂载的方式
2. 新增一个文件, `mycassandra.py`
```python
from sql.engines.cassandra import CassandraEngine

class MyCassandraEngine(CassandraEngine):
def get_connection(self, db_name=None):
db_name = db_name or self.db_name
if self.conn:
if db_name:
self.conn.execute(f"use {db_name}")
return self.conn
hosts = self.host.split(",")
# 在这里更改你获取 session 的方式
auth_provider = PlainTextAuthProvider(
username=self.user, password=self.password
)
cluster = Cluster(hosts, port=self.port, auth_provider=auth_provider,
load_balancing_policy=RoundRobinPolicy(), protocol_version=5)
self.conn = cluster.connect(keyspace=db_name)
# 下面这一句最好是不要动.
self.conn.row_factory = tuple_factory
return self.conn
```
3. 修改settings , 加载你刚写的 engine
```python
AVAILABLE_ENGINES = {
"mysql": {"path": "sql.engines.mysql:MysqlEngine"},
# 这里改成你的 engine
"cassandra": {"path": "extras.mycassandra:MyCassandraEngine"},
"clickhouse": {"path": "sql.engines.clickhouse:ClickHouseEngine"},
"goinception": {"path": "sql.engines.goinception:GoInceptionEngine"},
"mssql": {"path": "sql.engines.mssql:MssqlEngine"},
"redis": {"path": "sql.engines.redis:RedisEngine"},
"pqsql": {"path": "sql.engines.pgsql:PgSQLEngine"},
"oracle": {"path": "sql.engines.oracle:OracleEngine"},
"mongo": {"path": "sql.engines.mongo:MongoEngine"},
"phoenix": {"path": "sql.engines.phoenix:PhoenixEngine"},
"odps": {"path": "sql.engines.odps:ODPSEngine"},
}
```
80 changes: 28 additions & 52 deletions sql/engines/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
"""engine base库, 包含一个``EngineBase`` class和一个get_engine函数"""
import importlib
from sql.engines.models import ResultSet, ReviewSet
from sql.utils.ssh_tunnel import SSHConnection
from django.conf import settings


class EngineBase:
"""enginebase 只定义了init函数和若干方法的名字, 具体实现用mysql.py pg.py等实现"""

test_query = None

name = "Base"
info = "base engine"

def __init__(self, instance=None):
self.conn = None
self.thread_id = None
Expand Down Expand Up @@ -77,16 +82,6 @@ def test_connection(self):
"""测试实例链接是否正常"""
return self.query(sql=self.test_query)

@property
def name(self):
"""返回engine名称"""
return "base"

@property
def info(self):
"""返回引擎简介"""
return "Base engine"

def escape_string(self, value: str) -> str:
"""参数转义"""
return value
Expand Down Expand Up @@ -179,7 +174,7 @@ def query(
limit_num=0,
close_conn=True,
parameters=None,
**kwargs
**kwargs,
):
"""实际查询 返回一个ResultSet"""
return ResultSet()
Expand Down Expand Up @@ -213,6 +208,22 @@ def set_variable(self, variable_name, variable_value):
return ResultSet()


def get_engine_map():
available_engines = settings.AVAILABLE_ENGINES
enabled_engines = {}
for e in settings.ENABLED_ENGINES:
config = available_engines.get(e)
if not config:
raise ValueError(f"invalid engine {e}, not found in engine map")
module, o = config["path"].split(":")
engine = getattr(importlib.import_module(module), o)
enabled_engines[e] = engine
return enabled_engines


engine_map = get_engine_map()


def get_engine(instance=None): # pragma: no cover
"""获取数据库操作engine"""
if instance.db_type == "mysql":
Expand All @@ -222,44 +233,9 @@ def get_engine(instance=None): # pragma: no cover
from .cloud.aliyun_rds import AliyunRDS

return AliyunRDS(instance=instance)
from .mysql import MysqlEngine

return MysqlEngine(instance=instance)
elif instance.db_type == "mssql":
from .mssql import MssqlEngine

return MssqlEngine(instance=instance)
elif instance.db_type == "redis":
from .redis import RedisEngine

return RedisEngine(instance=instance)
elif instance.db_type == "pgsql":
from .pgsql import PgSQLEngine

return PgSQLEngine(instance=instance)
elif instance.db_type == "oracle":
from .oracle import OracleEngine

return OracleEngine(instance=instance)
elif instance.db_type == "mongo":
from .mongo import MongoEngine

return MongoEngine(instance=instance)
elif instance.db_type == "goinception":
from .goinception import GoInceptionEngine

return GoInceptionEngine(instance=instance)
elif instance.db_type == "phoenix":
from .phoenix import PhoenixEngine

return PhoenixEngine(instance=instance)

elif instance.db_type == "odps":
from .odps import ODPSEngine

return ODPSEngine(instance=instance)

elif instance.db_type == "clickhouse":
from .clickhouse import ClickHouseEngine

return ClickHouseEngine(instance=instance)
engine = engine_map.get(instance.db_type)
if not engine:
raise ValueError(
f"engine {instance.db_type} not enabled or not supported, please contact admin"
)
return engine(instance=instance)
Loading

0 comments on commit d503b55

Please sign in to comment.