Skip to content

Commit

Permalink
Merge pull request teamhephy#34 from jianxiaoguo/dev
Browse files Browse the repository at this point in the history
test(controller): add command unittest
  • Loading branch information
duanhongyi authored Jan 20, 2021
2 parents b6118e4 + 180563a commit 4e1dec1
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 146 deletions.
20 changes: 12 additions & 8 deletions rootfs/api/management/commands/measure_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import uuid
import time
import logging
from contextlib import closing
from django.utils import timezone
from django.core.management.base import BaseCommand
Expand All @@ -8,9 +9,11 @@
from api.models import App
from api.utils import get_influxdb_client

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Management command for push data to influxdb"""
"""Management command for push data to manager"""

def _build_query_networks_flux(self, app_map, timestamp):
timestamp = int(timestamp)
Expand Down Expand Up @@ -60,10 +63,10 @@ def _measure_networks(self, app_map, timestamp):
) as records:
for record in records:
app_id = record["namespace"]
user_id = app_map[app_id].user_id
owner_id = app_map[app_id].owner_id
networks.append({
"app_id": app_id,
"user_id": user_id,
"owner_id": owner_id,
"pod_name": record["pod_name"],
"rx_bytes": record["rx_bytes"],
"tx_bytes": record["tx_bytes"],
Expand All @@ -80,11 +83,11 @@ def _measure_instances(self, app_map, timestamp):
) as records:
for record in records:
app_id = record["namespace"]
user_id = app_map[app_id].user_id
owner_id = app_map[app_id].owner_id
container_type = record["container_name"].replace(f"-{app_id}", "", 1)
instances.append({
"app_id": app_id,
"user_id": user_id,
"owner_id": owner_id,
"container_type": container_type,
"container_count": record["_value"],
"timestamp": timestamp
Expand All @@ -95,15 +98,16 @@ def handle(self, *args, **options):
if settings.WORKFLOW_MANAGER_URL is not None:
timestamp = time.time()
task_id = uuid.uuid4().hex
print(f"pushing {task_id} networks to workflow_manager when {timezone.now()}")
logger.info(f"pushing {task_id} networks to workflow_manager when {timezone.now()}")
app_map = {}
for app in App.objects.all():
app_map[app.pk] = app
app_map[app.id] = app
if len(app_map) % 1000 == 0:
self._measure_networks(app_map, timestamp)
self._measure_instances(app_map, timestamp)
app_map = {}
if len(app_map) > 0:
self._measure_networks(app_map, timestamp)
self._measure_instances(app_map, timestamp)
print(f"pushed {task_id} networks to workflow_manager when {timezone.now()}")
logger.info(f"pushed {task_id} networks to workflow_manager when {timezone.now()}")
self.stdout.write("done")
12 changes: 8 additions & 4 deletions rootfs/api/management/commands/measure_config.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import uuid
import time
import logging
from django.utils import timezone
from django.core.management.base import BaseCommand
from django.conf import settings
from api.models import Config
from api.tasks import measure_config

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Management command for push data to influxdb"""
"""Management command for push data to manager"""

def handle(self, *args, **options):
if settings.WORKFLOW_MANAGER_URL is not None:
timestamp = time.time()
task_id = uuid.uuid4().hex
print(f"pushing {task_id} limits to workflow_manager when {timezone.now()}")
logger.info(f"pushing {task_id} limits to workflow_manager when {timezone.now()}")
config_list = []
for config in Config.objects.all():
config_list.extend(config.to_measurements(timestamp))
if len(config_list) % 1000 == 0:
measure_config.delay(config_list)
config_list = []
if len(config_list) > 0:
measure_config.delay(*config_list)
print(f"pushed {task_id} limits to workflow_manager when {timezone.now()}")
measure_config.delay(config_list)
logger.info(f"pushed {task_id} limits to workflow_manager when {timezone.now()}")
self.stdout.write("done")
14 changes: 9 additions & 5 deletions rootfs/api/management/commands/measure_resources.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import uuid
import time
import logging
from django.utils import timezone
from django.core.management.base import BaseCommand
from django.conf import settings
from api.models import Resource
from api.tasks import measure_resources

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Management command for push data to influxdb"""
"""Management command for push data to manager"""

def handle(self, *args, **options):
if settings.WORKFLOW_MANAGER_URL is not None:
timestamp = time.time()
task_id = uuid.uuid4().hex
print(f"pushing {task_id} resources to workflow_manager when {timezone.now()}")
logger.info(f"pushing {task_id} resources to workflow_manager when {timezone.now()}")
resource_list = []
for resource in Resource.objects.all():
resource_list.extend(resource.to_to_measurements(timestamp))
resource_list.extend(resource.to_measurements(timestamp))
if len(resource_list) % 1000 == 0:
measure_resources.delay(resource_list)
resource_list = []
if len(resource_list) > 0:
measure_resources.delay(*resource_list)
print(f"pushed {task_id} resources to workflow_manager when {timezone.now()}")
measure_resources.delay(resource_list)
logger.info(f"pushed {task_id} resources to workflow_manager when {timezone.now()}")
self.stdout.write("done")
12 changes: 8 additions & 4 deletions rootfs/api/management/commands/measure_volumes.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import uuid
import time
import logging
from django.utils import timezone
from django.core.management.base import BaseCommand
from django.conf import settings
from api.models import Volume
from api.tasks import measure_volumes

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Management command for push data to influxdb"""
"""Management command for push data to manager"""

def handle(self, *args, **options):
if settings.WORKFLOW_MANAGER_URL is not None:
timestamp = time.time()
task_id = uuid.uuid4().hex
print(f"pushing {task_id} volumes to workflow_manager when {timezone.now()}")
logger.info(f"pushing {task_id} volumes to workflow_manager when {timezone.now()}")
volume_list = []
for volume in Volume.objects.all():
volume_list.extend(volume.to_measurements(timestamp))
if len(volume_list) % 1000 == 0:
measure_volumes.delay(volume_list)
volume_list = []
if len(volume_list) > 0:
measure_volumes.delay(*volume_list)
print(f"pushed {task_id} volumes to workflow_manager when {timezone.now()}")
measure_volumes.delay(volume_list)
logger.info(f"pushed {task_id} volumes to workflow_manager when {timezone.now()}")
self.stdout.write("done")
10 changes: 7 additions & 3 deletions rootfs/api/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
Data models for the Drycc API.
"""
import time
import hashlib
import hmac
import importlib
Expand Down Expand Up @@ -265,14 +266,16 @@ def config_changed_handle(sender, instance=None, created=False, update_fields=No
created or (
update_fields is not None and (
"cpu" in update_fields or "memory" in update_fields))):
measure_config.delay(instance.to_measurements())
timestamp = time.time()
measure_config.delay(instance.to_measurements(timestamp))


@receiver(post_save, sender=Volume)
def volume_changed_handle(sender, instance=None, created=False, update_fields=None, **kwargs):
# measure volumes to workflow manager
if settings.WORKFLOW_MANAGER_URL is not None and created:
measure_volumes.delay(instance.to_measurements())
timestamp = time.time()
measure_volumes.delay(instance.to_measurements(timestamp))


@receiver(post_save, sender=Resource)
Expand All @@ -290,4 +293,5 @@ def resource_changed_handle(sender, instance=None, created=False, update_fields=
update_fields is not None and (
"plan" in update_fields
))):
measure_resources.delay(instance.to_measurements())
timestamp = time.time()
measure_resources.delay(instance.to_measurements(timestamp))
4 changes: 2 additions & 2 deletions rootfs/api/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ def save(self, **kwargs):
def to_measurements(self, timestamp: float):
assert len(set(self.memory.keys()).difference(self.cpu.keys())) == 0
return [{
"app_id": self.app_id,
"user_id": self.user_id,
"app_id": str(self.app_id),
"owner_id": str(self.owner_id),
"container_type": container_type,
"cpu": unit_to_millicpu(self.cpu.get(container_type)),
"memory": unit_to_bytes(self.memory.get(container_type)),
Expand Down
4 changes: 2 additions & 2 deletions rootfs/api/models/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ def detach_resource(self, *args, **kwargs):
def to_measurements(self, timestamp: float):
return [{
"name": self.name,
"app_id": self.app_id,
"user_id": self.user_id,
"app_id": str(self.app_id),
"owner_id": str(self.owner_id),
"plan": self.plan,
"timestamp": "%f" % timestamp
}]
4 changes: 2 additions & 2 deletions rootfs/api/models/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def log(self, message, level=logging.INFO):
def to_measurements(self, timestamp: float):
return [{
"name": self.name,
"app_id": self.app_id,
"user_id": self.user_id,
"app_id": str(self.app_id),
"owner_id": str(self.owner_id),
"size": unit_to_bytes(self.size),
"timestamp": "%f" % timestamp
}]
22 changes: 11 additions & 11 deletions rootfs/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def retrieve_resource(resource):
else:
resource.detach_resource()
except Resource.DoesNotExist:
logger.info("retrieve task not found resource: {}".format(resource.id)) # noqa
logger.exception("retrieve task not found resource: {}".format(resource.id)) # noqa
finally:
signals.request_finished.send(sender=task_id)

Expand All @@ -44,7 +44,7 @@ def measure_config(config: List[Dict[str, str]]):
[
{
"app_id": "test",
"user_id": "test",
"owner_id": "test",
"container_type": web,
"cpu": "1",
"memory": "2G",
Expand All @@ -64,7 +64,7 @@ def measure_config(config: List[Dict[str, str]]):
data=json.dumps(config)
)
except Exception as e:
logger.info("write influxdb point fail: {}".format(e))
logger.exception("write influxdb point fail: {}".format(e))
finally:
signals.request_finished.send(sender=task_id)

Expand All @@ -76,7 +76,7 @@ def measure_volumes(volumes: List[Dict[str, str]]):
{
"name": "disk",
"app_id": "test",
"user_id": "test",
"owner_id": "test",
"size": "100G",
"timestamp": "1609231998.9103732"
}
Expand All @@ -94,7 +94,7 @@ def measure_volumes(volumes: List[Dict[str, str]]):
data=json.dumps(volumes)
)
except Exception as e:
logger.info("write influxdb point fail: {}".format(e))
logger.exception("write influxdb point fail: {}".format(e))
finally:
signals.request_finished.send(sender=task_id)

Expand All @@ -105,7 +105,7 @@ def measure_networks(networks: List[Dict[str, str]]):
[
{
"app_id": "test",
"user_id": "test",
"owner_id": "test",
"pod_name": "django2test-web-xxxxxx",
"rx_bytes": "10000",
"tx_bytes": "200000",
Expand All @@ -125,7 +125,7 @@ def measure_networks(networks: List[Dict[str, str]]):
data=json.dumps(networks)
)
except Exception as e:
logger.info("write influxdb point fail: {}".format(e))
logger.exception("write influxdb point fail: {}".format(e))
finally:
signals.request_finished.send(sender=task_id)

Expand All @@ -136,7 +136,7 @@ def measure_instances(instances: List[Dict[str, str]]):
[
{
"app_id": "test",
"user_id": "test",
"owner_id": "test",
"container_type": "web",
"container_count": 1,
"timestamp": "1609231998.9103732"
Expand All @@ -155,7 +155,7 @@ def measure_instances(instances: List[Dict[str, str]]):
data=json.dumps(instances)
)
except Exception as e:
logger.info("write influxdb point fail: {}".format(e))
logger.exception("write influxdb point fail: {}".format(e))
finally:
signals.request_finished.send(sender=task_id)

Expand All @@ -167,7 +167,7 @@ def measure_resources(resources: List[Dict[str, str]]):
{
"name": "test1",
"app_id": "redis",
"user_id": "test",
"owener_id": "test",
"plan": "redis:small",
"timestamp": "1609231998.9103732"
}
Expand All @@ -185,6 +185,6 @@ def measure_resources(resources: List[Dict[str, str]]):
data=json.dumps(resources)
)
except Exception as e:
logger.info("write influxdb point fail: {}".format(e))
logger.exception("write influxdb point fail: {}".format(e))
finally:
signals.request_finished.send(sender=task_id)
22 changes: 22 additions & 0 deletions rootfs/api/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,25 @@ def test_config_failures(self, mock_requests):
self.assertEqual(app.release_set.latest().version, 5)
self.assertEqual(app.release_set.latest().config, success_config)
self.assertEqual(app.config_set.count(), 3)

def call_command(self, *args, **kwargs):
from io import StringIO
from django.core.management import call_command
out = StringIO()
call_command(
"measure_config",
*args,
stdout=out,
stderr=StringIO(),
**kwargs,
)
return out.getvalue()

def test_measure_config(self, *args, **kwargs):
# create
app_id = self.create_app()
url = "/v2/apps/{app_id}/config".format(**locals())
body = {'values': json.dumps({'PORT': 5000}), 'cpu': json.dumps({'web': '1000m'})}
response = self.client.post(url, body)
out = self.call_command()
self.assertIn(out, "done\n")
Loading

0 comments on commit 4e1dec1

Please sign in to comment.