Skip to content

Commit

Permalink
Merge pull request #7 from xxwjj/data_generator
Browse files Browse the repository at this point in the history
add data generator
  • Loading branch information
xing-yang authored Jun 4, 2019
2 parents 3e37d87 + 1a1024f commit 9468904
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 1 deletion.
20 changes: 19 additions & 1 deletion anomaly_detection/cmd/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys

from anomaly_detection import log
from anomaly_detection.data_generator.generator import Generator
from anomaly_detection.utils import config as cfg
# need register global_opts
from anomaly_detection.common import options

CONF = cfg.CONF


def main():
pass
CONF(sys.argv[1:])
log.setup(CONF, "anomaly_detection")
generator = Generator()
generator.load_jobs()
generator.run()


if __name__ == '__main__':
sys.exit(main())

212 changes: 212 additions & 0 deletions anomaly_detection/data_generator/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import copy
import json

import requests
from keystoneauth1 import identity
from keystoneauth1 import session as ks

from anomaly_detection import log
from anomaly_detection.utils import config as cfg

LOG = log.getLogger(__name__)
CONF = cfg.CONF

auth_opts = [
cfg.StrOpt('auth_url',
default='http://127.0.0.1/identity',
help='Authentication URL'),
cfg.StrOpt('auth_type',
default="password",
help='Authentication type'),
cfg.StrOpt('username',
default="admin",
help='User name'),
cfg.StrOpt('password',
default="opensds@123",
help='User password'),
cfg.StrOpt('project_name',
default='admin',
help='Project name'),
cfg.StrOpt('project_domain_name',
default='Default',
help='Project domain name'),
cfg.StrOpt('project_domain_id',
default='default',
help='Project domain id'),
cfg.StrOpt('user_domain_name',
default="Default",
help='User domain name'),
cfg.StrOpt('user_domain_id',
default="default",
help='User domain id')
]

generator_opts = [
cfg.StrOpt('opensds_endpoint',
default='http://127.0.0.1/:50040',
help='OpenSDS hotpot endpoint URL'),
cfg.StrOpt('opensds_backend_driver_type',
default="lvm",
help='OpenSDS backend driver type'),
cfg.StrOpt('api_version',
default='v1beta',
help='OpenSDS hotpot api version'),
cfg.StrOpt('auth_strategy',
default='keystone',
help='OpenSDS authentication strategy'),
cfg.StrOpt('noauth_tenant_id',
default='e93b4c0934da416eb9c8d120c5d04d96',
help='NoAuth Tenant ID'),
cfg.IntOpt('retries',
default=3,
help='Failed retries number'),
cfg.BoolOpt('http_log_debug',
default=False,
help='Whether enable the log debug printing'),
cfg.BoolOpt('insecure',
default=True,
help='Using insecure http request'),
cfg.IntOpt('timeout',
default=60,
help='Request timeout in seconds'),
]

CONF.register_opts(auth_opts, "keystone_authtoken")
CONF.register_opts(generator_opts, "data_generator")


class KeystoneClient(object):

def __init__(self):
configuration = CONF.keystone_authtoken
auth = identity.Password(auth_url=configuration.auth_url,
username=configuration.username,
password=configuration.password,
project_name=configuration.project_name,
project_domain_id=configuration.project_domain_id,
project_domain_name=configuration.project_domain_name,
user_domain_id=configuration.user_domain_id,
user_domain_name=configuration.user_domain_name)
self.session = ks.Session(auth=auth)

def get_token(self):
return self.session.get_token()

def get_tenant_id(self):
return self.session.get_project_id()


class TelemetryClient(object):
def __init__(self):
self.auth_strategy = CONF.data_generator.auth_strategy
self.default_headers = {
'User-Agent': "python-anomaly-detection-client",
'Accept': 'application/json',
}

self.tenant_id = CONF.data_generator.noauth_tenant_id
if self.auth_strategy == "keystone":
self.keystone_client = KeystoneClient()
self.tenant_id = self.keystone_client.get_tenant_id()

self.api_version = CONF.data_generator.api_version
self.endpoint_url = CONF.data_generator.opensds_endpoint
pieces = [self.endpoint_url, self.api_version, self.tenant_id]
self.base_url = '/'.join(s.strip('/') for s in pieces)+"/"

self.retries = CONF.data_generator.retries
self.http_log_debug = CONF.data_generator.http_log_debug

self.request_options = self._set_request_options(
CONF.data_generator.insecure, CONF.data_generator.timeout)
self.driver_type = CONF.data_generator.opensds_backend_driver_type

def _set_request_options(self, insecure, timeout=None):
options = {'verify': True}
if insecure:
options['verify'] = False

if timeout:
options['timeout'] = timeout

return options

def do_request(self, url, method, **kwargs):
url = self.base_url+url
headers = copy.deepcopy(self.default_headers)
if self.keystone_client is not None:
headers['X-Auth-Token'] = self.keystone_client.get_token()

headers.update(kwargs.get('headers', {}))
options = copy.deepcopy(self.request_options)

if 'body' in kwargs:
headers['Content-Type'] = 'application/json'
options['data'] = json.dumps(kwargs['body'])

self.log_request(method, url, headers, options.get('data', None))
resp = requests.request(method, url, headers=headers, **options)
self.log_response(resp)
body = None
if resp.text:
try:
body = json.loads(resp.text)
except ValueError:
pass
return resp, body

def request(self, url, method, **kwargs):
retries = self.retries
for index in range(1, retries + 1):
try:
self.do_request(url, method, **kwargs)
except Exception as e:
if index > retries:
LOG.error('%s\nall retry failed, exit.', e)
raise
else:
LOG.error("%s ,retry %d time(s)", e, index)
else:
break

def log_request(self, method, url, headers, data=None):
if not self.http_log_debug:
return

string_parts = ['curl -i', ' -X %s' % method, ' %s' % url]

for element in headers:
header = ' -H "%s: %s"' % (element, headers[element])
string_parts.append(header)

if data:
string_parts.append(" -d '%s'" % data)
LOG.info("\nREQ: %s\n", "".join(string_parts))

def log_response(self, resp):
if not self.http_log_debug:
return
LOG.info(
"RESP: [%(code)s] %(headers)s\nRESP BODY: %(body)s\n", {
'code': resp.status_code,
'headers': resp.headers,
'body': resp.text
})

def collect_metrics(self):
body = {"driverType": self.driver_type}
self.request('metrics', "POST", body=body)
35 changes: 35 additions & 0 deletions anomaly_detection/data_generator/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from apscheduler.schedulers.blocking import BlockingScheduler

from anomaly_detection.data_generator.jobs import CollectMetricsJob


class Generator(object):
def __init__(self):
self._scheduler = BlockingScheduler()

def add_cron_job(self, job):
values = job.expression.split()
if len(values) != 6:
raise ValueError('Wrong number of fields; got {}, expected 6'.format(len(values)))
self._scheduler.add_job(job, 'cron', second=values[0], minute=values[1], hour=values[2],
day=values[3], month=values[4], day_of_week=values[5], timezone=None)

def load_jobs(self):
self.add_cron_job(CollectMetricsJob())

def run(self):
self._scheduler.start()
61 changes: 61 additions & 0 deletions anomaly_detection/data_generator/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2019 The OpenSDS Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from anomaly_detection import log
from anomaly_detection.data_generator.client import TelemetryClient
from anomaly_detection.utils import config as cfg

LOG = log.getLogger(__name__)
CONF = cfg.CONF

data_parser_opts = [
cfg.StrOpt('cron_expression',
default='*/10 * * * * *',
help='Cron expression')
]

CONF.register_opts(data_parser_opts, "data_generator")


class Job(object):
def __init__(self, name, retries=3):
self._name = name
self._retries = retries

def run(self, *args, **kwargs):
raise NotImplemented

def __call__(self, *args, **kwargs):
retries = self._retries
for index in range(1, retries + 1):
try:
self.run(*args, **kwargs)
except Exception as e:
if index > retries:
LOG.error('%s\nall retry failed, exit.', e)
raise
else:
LOG.error("%s ,retry %d time(s)", e, index)
else:
break


class CollectMetricsJob(Job):
def __init__(self):
super(CollectMetricsJob, self).__init__("collect_metrics")
self._client = TelemetryClient()
self.expression = CONF.data_generator.cron_expression

def run(self, *args, **kwargs):
self._client.collect_metrics()
2 changes: 2 additions & 0 deletions anomaly_detection/data_parser/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def run(self):
raise
else:
LOG.error("%s ,retry %d time(s)", e, index)
else:
break


class Manager(base.Base):
Expand Down
16 changes: 16 additions & 0 deletions etc/anomaly_detection.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,19 @@ receiver_name=kafka
csv_file_name=performance.csv
kafka_topic=telemetry_topic
kafka_bootstrap_servers=127.0.0.1:9092

[keystone_authtoken]
project_domain_name = Default
project_name = admin
user_domain_name = Default
password = opensds@123
username = admin
auth_url = http://127.0.0.1/identity
auth_type = password

[data_generator]
opensds_endpoint = http://127.0.0.1:50040
api_version = v1beta
auth_strategy = keystone
http_log_debug = true
opensds_backend_driver_type = lvm
4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
requests
keystoneauth1
configparser
apscheduler
kafka-python
werkzeug
matplotlib
Expand Down

0 comments on commit 9468904

Please sign in to comment.