Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run queries through ad-hoc SSH tunnels #4797

Merged
merged 9 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions client/app/pages/queries/hooks/useDataSourceSchema.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
import { reduce } from "lodash";
import { useCallback, useEffect, useRef, useState, useMemo } from "react";
import { axios } from "@/services/axios";
import DataSource, { SCHEMA_NOT_SUPPORTED } from "@/services/data-source";
import notification from "@/services/notification";

function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

function getSchema(dataSource, refresh = undefined) {
if (!dataSource) {
return Promise.resolve([]);
}

const fetchSchemaFromJob = (data) => {
return sleep(1000).then(() => {
Copy link
Member

@jezdez jezdez May 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a noticeable delay to loading the schema browser without indication that it's loading the schema, could we add a "Loading... please wait" screen or something similar? Or reduce the sleep interval to something that is closer to an unnoticeable perception level?

Also, in case of a worker failure that prevents the job from ever returning a good result (e.g. Redis key eviction), could this add an upper limit on the number of tries to load the data and then show a button to fetch the schema anew with a new job?

return axios.get(`api/jobs/${data.job.id}`).then((data) => {
if (data.job.status < 3) {
return fetchSchemaFromJob(data);
} else if (data.job.status === 3) {
return data.job.result;
} else if (data.job.status === 4 && data.job.error.code === SCHEMA_NOT_SUPPORTED) {
return [];
} else {
return Promise.reject(new Error(data.job.error));
}
});
});
};

return DataSource.fetchSchema(dataSource, refresh)
.then(data => {
if (data.schema) {
return data.schema;
} else if (data.error.code === SCHEMA_NOT_SUPPORTED) {
return [];
}
return Promise.reject(new Error("Schema refresh failed."));
})
.then(fetchSchemaFromJob)
.catch(() => {
notification.error("Schema refresh failed.", "Please try again later.");
return Promise.resolve([]);
Expand All @@ -34,11 +48,9 @@ export default function useDataSourceSchema(dataSource) {

const reloadSchema = useCallback(
(refresh = undefined) => {
const refreshToken = Math.random()
.toString(36)
.substr(2);
const refreshToken = Math.random().toString(36).substr(2);
refreshSchemaTokenRef.current = refreshToken;
getSchema(dataSource, refresh).then(data => {
getSchema(dataSource, refresh).then((data) => {
if (refreshSchemaTokenRef.current === refreshToken) {
setSchema(prepareSchema(data));
}
Expand Down
6 changes: 3 additions & 3 deletions client/app/services/data-source.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ const DataSource = {
query: () => axios.get("api/data_sources"),
get: ({ id }) => axios.get(`api/data_sources/${id}`),
types: () => axios.get("api/data_sources/types"),
create: data => axios.post(`api/data_sources`, data),
save: data => axios.post(`api/data_sources/${data.id}`, data),
test: data => axios.post(`api/data_sources/${data.id}/test`),
create: (data) => axios.post(`api/data_sources`, data),
save: (data) => axios.post(`api/data_sources/${data.id}`, data),
test: (data) => axios.post(`api/data_sources/${data.id}/test`),
delete: ({ id }) => axios.delete(`api/data_sources/${id}`),
fetchSchema: (data, refresh = false) => {
const params = {};
Expand Down
33 changes: 16 additions & 17 deletions redash/handlers/data_sources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

from flask import make_response, request
from flask_restful import abort
Expand All @@ -20,12 +21,16 @@
)
from redash.utils import filter_none
from redash.utils.configuration import ConfigurationContainer, ValidationError
from redash.tasks.general import test_connection, get_schema
from redash.serializers import serialize_job


class DataSourceTypeListResource(BaseResource):
@require_admin
def get(self):
return [q.to_dict() for q in sorted(query_runners.values(), key=lambda q: q.name())]
return [
q.to_dict() for q in sorted(query_runners.values(), key=lambda q: q.name())
]


class DataSourceResource(BaseResource):
Expand Down Expand Up @@ -182,19 +187,9 @@ def get(self, data_source_id):
require_access(data_source, self.current_user, view_only)
refresh = request.args.get("refresh") is not None

response = {}

try:
response["schema"] = data_source.get_schema(refresh)
except NotSupported:
response["error"] = {
"code": 1,
"message": "Data source type does not support retrieving schema",
}
except Exception:
response["error"] = {"code": 2, "message": "Error retrieving schema."}
Comment on lines -189 to -195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep reporting these error codes. For example, we might show different UI when it's not supported. And we don't want to throw random errors at the user but rather show "Error retrieving schema." (it might make sense to show some of the errors, but it's beyond the scope of this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so too, but then I realized we aren't really using the error codes defined in the service. The current implementation (returning [] on NotSupported from the backend) mimics the same behavior we have today - silent failing on NotSupported and a "Schema refresh failed." message when errors occur.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that we don't really use them anymore in code -- this is something that happened during the conversion to React, but we should someday. It's better to be explicit about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job = get_schema.delay(data_source.id, refresh)

return response
return serialize_job(job)


class DataSourcePauseResource(BaseResource):
Expand Down Expand Up @@ -245,10 +240,14 @@ def post(self, data_source_id):
)

response = {}
try:
data_source.query_runner.test_connection()
except Exception as e:
response = {"message": str(e), "ok": False}

job = test_connection.delay(data_source.id)
while not (job.is_finished or job.is_failed):
time.sleep(1)
job.refresh()

if isinstance(job.result, Exception):
response = {"message": str(job.result), "ok": False}
else:
response = {"message": "success", "ok": True}

Expand Down
12 changes: 11 additions & 1 deletion redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from redash.metrics import database # noqa: F401
from redash.query_runner import (
with_ssh_tunnel,
get_configuration_schema_for_query_runner_type,
get_query_runner,
TYPE_BOOLEAN,
Expand Down Expand Up @@ -251,9 +252,18 @@ def update_group_permission(self, group, view_only):
db.session.add(dsg)
return dsg

@property
def uses_ssh_tunnel(self):
return "ssh_tunnel" in self.options

@property
def query_runner(self):
return get_query_runner(self.type, self.options)
query_runner = get_query_runner(self.type, self.options)

if self.uses_ssh_tunnel:
query_runner = with_ssh_tunnel(query_runner, self.options.get("ssh_tunnel"))

return query_runner

@classmethod
def get_by_name(cls, name):
Expand Down
100 changes: 99 additions & 1 deletion redash/query_runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging

from contextlib import ExitStack
from dateutil import parser
from functools import wraps
import requests

from sshtunnel import open_tunnel
from redash import settings
from redash.utils import json_loads
from rq.timeouts import JobTimeoutException
Expand Down Expand Up @@ -70,6 +73,58 @@ def type(cls):
def enabled(cls):
return True

@property
def host(self):
"""Returns this query runner's configured host.
This is used primarily for temporarily swapping endpoints when using SSH tunnels to connect to a data source.

`BaseQueryRunner`'s naïve implementation supports query runner implementations that store endpoints using `host` and `port`
configuration values. If your query runner uses a different schema (e.g. a web address), you should override this function.
"""
if "host" in self.configuration:
return self.configuration["host"]
else:
raise NotImplementedError()

@host.setter
def host(self, host):
"""Sets this query runner's configured host.
This is used primarily for temporarily swapping endpoints when using SSH tunnels to connect to a data source.

`BaseQueryRunner`'s naïve implementation supports query runner implementations that store endpoints using `host` and `port`
configuration values. If your query runner uses a different schema (e.g. a web address), you should override this function.
"""
if "host" in self.configuration:
self.configuration["host"] = host
else:
raise NotImplementedError()

@property
def port(self):
"""Returns this query runner's configured port.
This is used primarily for temporarily swapping endpoints when using SSH tunnels to connect to a data source.

`BaseQueryRunner`'s naïve implementation supports query runner implementations that store endpoints using `host` and `port`
configuration values. If your query runner uses a different schema (e.g. a web address), you should override this function.
"""
if "port" in self.configuration:
return self.configuration["port"]
else:
raise NotImplementedError()

@port.setter
def port(self, port):
"""Sets this query runner's configured port.
This is used primarily for temporarily swapping endpoints when using SSH tunnels to connect to a data source.

`BaseQueryRunner`'s naïve implementation supports query runner implementations that store endpoints using `host` and `port`
configuration values. If your query runner uses a different schema (e.g. a web address), you should override this function.
"""
if "port" in self.configuration:
self.configuration["port"] = port
else:
raise NotImplementedError()

@classmethod
def configuration_schema(cls):
return {}
Expand Down Expand Up @@ -127,7 +182,7 @@ def to_dict(cls):
"name": cls.name(),
"type": cls.type(),
"configuration_schema": cls.configuration_schema(),
**({ "deprecated": True } if cls.deprecated else {})
**({"deprecated": True} if cls.deprecated else {}),
}


Expand Down Expand Up @@ -303,3 +358,46 @@ def guess_type_from_string(string_value):
pass

return TYPE_STRING


def with_ssh_tunnel(query_runner, details):
def tunnel(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
remote_host, remote_port = query_runner.host, query_runner.port
except NotImplementedError:
raise NotImplementedError(
"SSH tunneling is not implemented for this query runner yet."
)

stack = ExitStack()
try:
bastion_address = (details["ssh_host"], details.get("ssh_port", 22))
remote_address = (remote_host, remote_port)
auth = {
"ssh_username": details["ssh_username"],
**settings.dynamic_settings.ssh_tunnel_auth(),
}
server = stack.enter_context(
open_tunnel(
bastion_address, remote_bind_address=remote_address, **auth
)
)
except Exception as error:
raise type(error)("SSH tunnel: {}".format(str(error)))

with stack:
try:
query_runner.host, query_runner.port = server.local_bind_address
result = f(*args, **kwargs)
finally:
query_runner.host, query_runner.port = remote_host, remote_port

return result

return wrapper

query_runner.run_query = tunnel(query_runner.run_query)

return query_runner
25 changes: 25 additions & 0 deletions redash/query_runner/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
from urllib.parse import urlparse

import requests

Expand Down Expand Up @@ -42,6 +43,30 @@ def configuration_schema(cls):
def type(cls):
return "clickhouse"

@property
def _url(self):
return urlparse(self.configuration["url"])

@_url.setter
def _url(self, url):
self.configuration["url"] = url.geturl()

@property
def host(self):
return self._url.hostname

@host.setter
def host(self, host):
self._url = self._url._replace(netloc="{}:{}".format(host, self._url.port))

@property
def port(self):
return self._url.port

@port.setter
def port(self, port):
self._url = self._url._replace(netloc="{}:{}".format(self._url.hostname, port))

def _get_tables(self, schema):
query = "SELECT database, table, name FROM system.columns WHERE database NOT IN ('system')"

Expand Down
8 changes: 6 additions & 2 deletions redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,24 +284,28 @@ def serialize_job(job):
updated_at = 0

status = STATUSES[job_status]
query_result_id = None
result = query_result_id = None

if job.is_cancelled:
error = "Query cancelled by user."
status = 4
elif isinstance(job.result, Exception):
error = str(job.result)
status = 4
elif isinstance(job.result, dict) and "error" in job.result:
error = job.result["error"]
status = 4
else:
error = ""
query_result_id = job.result
result = query_result_id = job.result

return {
"job": {
"id": job.id,
"updated_at": updated_at,
"status": status,
"error": error,
"result": result,
"query_result_id": query_result_id,
Comment on lines +308 to 309
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid accumulating tech debt, maybe we can return query_result_id only when it's a execute_query task?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(only if it's simple check)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to avoid accumulating tech debt, we should probably always return result and ditch query_result_id, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this will break scripts that use /jobs for polling and expect query_result_id... :| We could really use API versioning at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that's why I wen't for doubling result and query_result_id, as the cheapest thing that won't break and won't be too awkward :(

}
}
12 changes: 12 additions & 0 deletions redash/settings/dynamic_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ def periodic_jobs():
# This provides the ability to override the way we store QueryResult's data column.
# Reference implementation: redash.models.DBPersistence
QueryResultPersistence = None


def ssh_tunnel_auth():
"""
To enable data source connections via SSH tunnels, provide your SSH authentication
pkey here. Return a string pointing at your **private** key's path (which will be used
to extract the public key), or a `paramiko.pkey.PKey` instance holding your **public** key.
"""
return {
# 'ssh_pkey': 'path_to_private_key', # or instance of `paramiko.pkey.PKey`
# 'ssh_private_key_password': 'optional_passphrase_of_private_key',
}
Loading