Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Qubole] - Adding support to process Quantum query types. #4066

Merged
merged 14 commits into from
Aug 29, 2019
49 changes: 34 additions & 15 deletions redash/query_runner/qubole.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
try:
import qds_sdk
from qds_sdk.qubole import Qubole as qbol
from qds_sdk.commands import Command, HiveCommand, PrestoCommand
from qds_sdk.commands import Command, HiveCommand
from qds_sdk.commands import SqlCommand, PrestoCommand
arikfr marked this conversation as resolved.
Show resolved Hide resolved
enabled = True
except ImportError:
enabled = False
Expand All @@ -24,6 +25,11 @@ def configuration_schema(cls):
return {
"type": "object",
"properties": {
"query_type": {
"type": "string",
"title": "Query Type (quantum / presto / hive)",
"default": "hive"
},
"endpoint": {
"type": "string",
"title": "API Endpoint",
Expand All @@ -37,18 +43,21 @@ def configuration_schema(cls):
"type": "string",
"title": "Cluster Label",
"default": "default"
},
"query_type": {
"type": "string",
"title": "Query Type (hive or presto)",
"default": "hive"
}
},
"order": ["endpoint", "token", "cluster"],
"required": ["endpoint", "token", "cluster"],
"order": ["query_type", "endpoint", "token", "cluster"],
"required": ["endpoint", "token"],
"secret": ["token"]
}

@classmethod
def type(cls):
return "qubole"

@classmethod
def name(cls):
return "Qubole"

@classmethod
def enabled(cls):
return enabled
Expand All @@ -59,16 +68,26 @@ def annotate_query(cls):

def test_connection(self):
headers = self._get_header()
r = requests.head("%s/api/latest/users" % self.configuration['endpoint'], headers=headers)
r = requests.head("%s/api/latest/users" % self.configuration.get('endpoint'), headers=headers)
r.status_code == 200

def run_query(self, query, user):
qbol.configure(api_token=self.configuration['token'],
api_url='%s/api' % self.configuration['endpoint'])
qbol.configure(api_token=self.configuration.get('token'),
api_url='%s/api' % self.configuration.get('endpoint'))

try:
cls = PrestoCommand if(self.configuration['query_type'] == 'presto') else HiveCommand
cmd = cls.create(query=query, label=self.configuration['cluster'])
query_type = self.configuration.get('query_type', 'hive')

if query_type == 'quantum':
cmd = SqlCommand.create(query=query)
elif query_type == 'hive':
cmd = HiveCommand.create(query=query, label=self.configuration.get('cluster'))
elif query_type == 'presto':
cmd = PrestoCommand.create(query=query, label=self.configuration.get('cluster'))
else:
raise Exception("Invalid Query Type:%s.\
It must be : hive / presto / quantum." % self.configuration.get('query_type'))

logging.info("Qubole command created with Id: %s and Status: %s", cmd.id, cmd.status)

while not Command.is_done(cmd.status):
Expand Down Expand Up @@ -106,7 +125,7 @@ def get_schema(self, get_stats=False):
try:
headers = self._get_header()
content = requests.get("%s/api/latest/hive?describe=true&per_page=10000" %
self.configuration['endpoint'], headers=headers)
self.configuration.get('endpoint'), headers=headers)
data = content.json()

for schema in data['schemas']:
Expand All @@ -127,7 +146,7 @@ def get_schema(self, get_stats=False):

def _get_header(self):
return {"Content-type": "application/json", "Accept": "application/json",
"X-AUTH-TOKEN": self.configuration['token']}
"X-AUTH-TOKEN": self.configuration.get('token')}


register(Qubole)