-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathknowledge.py
173 lines (160 loc) · 5.76 KB
/
knowledge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
About
=====
Knowledgebase module for filling the missing gaps to access the Grafana API
more efficiently.
"""
from grafana_client.model import DatasourceModel
def datasource_factory(datasource: DatasourceModel):
"""
Create payload suitable for creating a Grafana data source item.
Some data sources need additional configuration beyond the bare minimum
attributes required by `DatasourceModel`.
This is by far not a generic implementation. It merely satisfies the use
case where Docker containers are started on localhost, like the data source
health check demo program does. Many attributes are hardcoded to specific
values. Additional parameterization work might make this function more
generic in the future.
TODO: Fill the gaps for other databases.
"""
if datasource.type == "__NEVER__":
raise NotImplementedError("__NEVER__")
elif datasource.type == "cratedb":
datasource.type = "postgres"
datasource.user = "crate"
datasource.jsonData = {
"postgresVersion": 1200,
"sslmode": "disable",
}
elif datasource.type == "elasticsearch":
datasource.access = "proxy"
datasource.database = "testdrive"
datasource.jsonData = {
"esVersion": "7.10.0",
"timeField": "@timestamp",
}
elif datasource.type in ["influxdb", "influxdb+influxql"]:
datasource.type = "influxdb"
datasource.access = "proxy"
datasource.jsonData = {
"httpMode": "POST",
"version": "InfluxQL",
}
elif datasource.type in ["influxdb+flux"]:
datasource.type = "influxdb"
datasource.access = "proxy"
datasource.jsonData = {
"httpMode": "POST",
"organization": "example",
"version": "Flux",
}
datasource.secureJsonData = {
"token": "admintoken",
}
datasource.secureJsonFields = {
"token": False,
}
elif datasource.type == "postgres":
datasource.user = "postgres"
datasource.jsonData = {
"postgresVersion": 1200,
"sslmode": "disable",
}
elif datasource.type == "prometheus":
pass
elif datasource.type == "testdata":
pass
else:
raise NotImplementedError(f"Unknown data source type: {datasource.type}")
return datasource
def query_factory(datasource, expression: str):
"""
Create payload suitable for running a query against a Grafana data source.
TODO: Fill the gaps for other databases.
"""
datasource_type = datasource["type"]
if datasource_type == "__NEVER__":
raise NotImplementedError("__NEVER__")
elif datasource_type == "elasticsearch":
query = expression
elif datasource_type == "influxdb":
dialect = datasource["jsonData"]["version"]
if dialect == "InfluxQL":
query = {
"refId": "test",
"datasource": {
"type": datasource["type"],
"uid": datasource.get("uid"),
},
"datasourceId": datasource.get("id"),
"q": expression,
}
elif dialect == "Flux":
query = {
"refId": "test",
"datasource": {
"type": datasource["type"],
"uid": datasource.get("uid"),
},
"datasourceId": datasource.get("id"),
# "intervalMs": 60000,
"maxDataPoints": 1,
"query": expression,
}
else:
raise KeyError(f"InfluxDB dialect '{dialect}' unknown")
elif datasource_type == "postgres":
query = {
"refId": "test",
"datasource": {
"type": datasource["type"],
"uid": datasource.get("uid"),
},
"datasourceId": datasource.get("id"),
"format": "table",
"rawSql": expression,
}
elif datasource_type == "prometheus":
query = {
"refId": "test",
"expr": expression,
"instant": True,
# "queryType": "timeSeriesQuery",
# "exemplar": False,
# "requestId": "0test",
# "utcOffsetSec": 7200,
# "legendFormat": "",
# "interval": "",
"datasource": {
"type": datasource["type"],
"uid": datasource.get("uid"),
},
"datasourceId": datasource.get("id"),
# "intervalMs": 60000,
"maxDataPoints": 1,
}
elif datasource_type == "testdata":
query = expression
else:
raise NotImplementedError(f"Unknown data source type: {datasource_type}")
return query
# Define health-check status queries for all database types.
# TODO: Fill the gaps for other databases.
HEALTHCHECK_EXPRESSION_MAP = {
"elasticsearch": "url:///datasources/proxy/{datasource_id}/{database_name}/_mapping",
"influxdb": "SHOW RETENTION POLICIES on _internal",
"influxdb+influxql": "SHOW RETENTION POLICIES on _internal",
"influxdb+flux": "buckets()",
"postgres": "SELECT 1;",
"prometheus": "1+1",
"testdata": "url:///datasources/uid/{datasource_uid}",
}
def get_healthcheck_expression(datasource_type: str, datasource_dialect: str = None):
"""
Produce data source health check query by database type.
"""
if datasource_type == "influxdb" and datasource_dialect == "Flux":
datasource_type = "influxdb+flux"
if datasource_type not in HEALTHCHECK_EXPRESSION_MAP:
raise NotImplementedError(f"Health check for datasource type {datasource_type} not implemented yet")
return HEALTHCHECK_EXPRESSION_MAP[datasource_type]