Skip to content

Commit

Permalink
Add api service to health
Browse files Browse the repository at this point in the history
This commit adds health api, that queries elasticsearch for aggregated
health status. Health aggregation job has been moved to a separate file
and dockerfile has been modified to run main.sh, that can run either of
job or api (or both).
Improves error reporting and adds support for elasticsearch 5
  • Loading branch information
teferi authored and boris-42 committed Nov 23, 2016
1 parent 4c77bb9 commit 641b67d
Show file tree
Hide file tree
Showing 12 changed files with 395 additions and 154 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ RUN pip install -r requirements.txt
WORKDIR /app/health
EXPOSE 5000

ENTRYPOINT ["python"]
CMD ["main.py"]
ENTRYPOINT ["bash"]
CMD ["main.sh"]
Empty file added health/api/__init__.py
Empty file.
Empty file added health/api/v1/__init__.py
Empty file.
135 changes: 135 additions & 0 deletions health/api/v1/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import json
import logging

import flask
import requests


health = flask.Blueprint("health", __name__)


def get_blueprints():
return [
["/health", health],
]


@health.route("/", defaults={"region": "all"})
@health.route("/<region>")
def get_health(region):

period = flask.request.args.get("period", "day")

if period == "week":
period = "now-7d/m"
interval = "1h"
elif period == "month":
period = "now-30d/m"
interval = "4h"
elif period == "year":
period = "now-365d/m"
interval = "8h"
else:
# assuming day
period = "now-1d/m"
interval = "10m"

query = {
"size": 0, # this is a count request
"query": {
"bool": {
"filter": [{
"range": {
"timestamp": {
"gte": period
}
}
}]
}
},
"aggs": {
"projects": {
"terms": {"field": "service"},

"aggs": {
"avg_fci": {
"avg": {
"field": "fci"
}
},
"data": {
"date_histogram": {
"field": "timestamp",
"interval": interval,
"format": "yyyy-MM-dd'T'hh:mm",
"min_doc_count": 0
},
"aggs": {
"fci": {
"avg": {"field": "fci"}
},
"response_size": {
"avg": {"field": "response_time.avg"}
},
"response_time": {
"avg": {"field": "response_size.avg"}
}
}
}
}
}
}
}

# only match if region is not "all"
if region != "all":
region = {
"match": {"region": region}
}
query["query"]["bool"]["filter"].append(region)

request = flask.current_app.config["backend"]["elastic"]
r = requests.get("%s/_search" % request,
data=json.dumps(query))

if not r.ok:
logging.error("Got {} status when requesting {}. {}".format(
request, r.text))
raise RuntimeError(r.text)

result = {
"project_names": [],
"projects": {}
}

def convert_(data, field):
result = []
for d in data["buckets"]:
result.append([d["key_as_string"], d[field]["value"]])
return result

for project in r.json()["aggregations"]["projects"]["buckets"]:
result["project_names"].append(project["key"])
result["projects"][project["key"]] = {
"fci": project["avg_fci"]["value"],
"fci_score_data": convert_(project["data"], "fci"),
"response_time_data": convert_(project["data"], "response_time"),
"response_size_data": convert_(project["data"], "response_size")
}

return flask.jsonify(**result)
30 changes: 17 additions & 13 deletions health/drivers/tcp/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import copy
import json
import logging

import requests

Expand Down Expand Up @@ -48,17 +49,14 @@


AGG_REQUEST = {
"size": 0, # this is a count request
"query": {
"filtered": {
"filter": {
"and": {
"filters": [
{"exists": {"field": "http_method"}},
{"exists": {"field": "http_status"}},
{"exists": {"field": "http_response_time"}}
]
}
}
"bool": {
"filter": [
{"exists": {"field": "http_method"}},
{"exists": {"field": "http_status"}},
{"exists": {"field": "http_response_time"}},
]
}
},
"aggs": {
Expand Down Expand Up @@ -99,7 +97,7 @@

def get_request(ts_range):
query = copy.deepcopy(AGG_REQUEST)
query["query"]["filtered"]["filter"]["and"]["filters"].append({
query["query"]["bool"]["filter"].append({
"range": {"Timestamp": ts_range}
})
return query
Expand Down Expand Up @@ -154,8 +152,14 @@ def main(es, latest_aggregated_ts=None):

for interval in intervals:
body = get_request(interval)
resp = requests.post("%s/_search?search_type=count" % es,
data=json.dumps(body)).json()
resp = requests.post("%s/_search" % es,
data=json.dumps(body))

if not resp.ok:
logging.error("Got a non-ok response for interval {}\n{}".format(
interval, resp.text))
continue
resp = resp.json()

r = []
for bucket in resp["aggregations"]["per_minute"]["buckets"]:
Expand Down
10 changes: 8 additions & 2 deletions health/drivers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import datetime
import json
import logging

import requests

Expand All @@ -34,12 +35,17 @@ def get_min_max_timestamps(es, field):

r_min = requests.get(
url, data=json.dumps({"sort": {field: {"order": "asc"}}}))
r_max = requests.get(
url, data=json.dumps({"sort": {field: {"order": "desc"}}}))

if not r_min.ok:
logging.error("Got {} status when requesting {}: {}".format(
r_min.status_code, url, r_min.text))
return [None, None]
if r_min.json()["hits"]["total"] == 0:
return [None, None]

r_max = requests.get(
url, data=json.dumps({"sort": {field: {"order": "desc"}}}))

return [el.json()["hits"]["hits"][0]["_source"][field]
for el in [r_min, r_max]]

Expand Down
159 changes: 159 additions & 0 deletions health/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import json
import logging
import sys
import time
import traceback

import jsonschema
import requests
import schedule

from health.drivers.tcp import driver as tcp_driver
from health.drivers import utils
from health.mapping import es


LOGGING_FORMAT = '[%(asctime)s] %(levelname)s in %(module)s: %(message)s'
logging.basicConfig(format=LOGGING_FORMAT, level=logging.INFO)


CONF_PATH = "/etc/health/config.json"
CONF_SCHEMA = {
"type": "object",
"$schema": "http://json-schema.org/draft-04/schema",
"properties": {
"sources": {
"type": "array",
"items": {
"type": "object",
"properties": {
"region": {
"type": "string"
},
"driver": {
"type": "object",
"properties": {
"type": {"type": "string"},
"elastic_src": {"type": "string"}
},
"required": ["type", "elastic_src"]
}
},
"required": ["region", "driver"]
}
},
"backend": {
"type": "object",
"properties": {
"elastic": {
"type": "string"
}
},
"required": ["elastic"]
},
"config": {
"type": "object",
"properties": {
"run_every_minutes": {
"type": "integer",
"minimum": 1
}
}
}
},
"additionalProperties": False
}


CONF = None


def job():
started_at = time.time()
logging.info("Starting Syncing Job")

backend_url = "%s/ms_health/service" % CONF["backend"]["elastic"]

min_ts, max_ts = utils.get_min_max_timestamps(backend_url, "timestamp")

for src in CONF["sources"]:
# TODO(boris-42): Make this actually pluggable
data_generator = tcp_driver.main(src["driver"]["elastic_src"],
latest_aggregated_ts=max_ts)

logging.info("Start syncing %s region" % src["region"])

for i, data_interval in enumerate(data_generator):
logging.info("Start syncing %s region" % src["region"])

if not data_interval:
logging.info("Fetched data from %s region, chunk %s"
% (src["region"], i))
continue

req_data = []
for d in data_interval:
d["region"] = src["region"]
# TODO(boris-42): Data is validated only by ES, which is bad
req_data.append('{"index": {}}')
req_data.append(json.dumps(d))
req_data = "\n".join(req_data)
logging.info("Sending data chunk {} to elastic".format(i))

r = requests.post("%s/_bulk" % backend_url, data=req_data)
logging.debug(r.json())

logging.info("Syncing Job: Completed in %.3f seconds"
% (time.time() - started_at))


def main():
global CONF
try:
with open(CONF_PATH) as f:
CONF = json.load(f)
jsonschema.validate(CONF, CONF_SCHEMA)

except (OSError, IOError):
logging.error("Sorry, couldn't open configuration file: {}".format(
CONF_PATH))
traceback.print_exc()
sys.exit(1)
except jsonschema.ValidationError as e:
logging.error(e.message)
sys.exit(1)
except jsonschema.SchemaError as e:
logging.error(e)
sys.exit(1)
else:
# Init Elastic index in backend
es.init_elastic(CONF["backend"]["elastic"])

# Setup periodic job that does aggregation magic
run_every_min = CONF.get("config", {}).get("run_every_minutes", 1)
schedule.every(run_every_min).minutes.do(job)

job()

while True:
schedule.run_pending()
time.sleep(1)


if __name__ == "__main__":
main()
Loading

0 comments on commit 641b67d

Please sign in to comment.