Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simple service toy prototype #303

Merged
merged 66 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
20c989a
+ http server example
shadeofblue Mar 23, 2021
948fd65
add another service example
shadeofblue Mar 24, 2021
9880630
simple service debug session etc ...
shadeofblue Mar 29, 2021
982d7df
add `WorkContext.send_bytes` command
shadeofblue Mar 29, 2021
69bcdfc
Merge branch 'blue/send_bytes' into blue/services-prototype
shadeofblue Mar 29, 2021
0665865
fix the PollingBatch with the current version of yagna
shadeofblue Mar 29, 2021
adc5972
Merge branch 'blue/polling-batch-timeout-argument' into blue/services…
shadeofblue Mar 29, 2021
3e4b0a8
black, additional comment
shadeofblue Mar 29, 2021
3524e5b
- ","
shadeofblue Mar 29, 2021
cab9f94
use aiohttp's general `_request_timeout`
shadeofblue Mar 30, 2021
f92b448
Merge branch 'blue/polling-batch-timeout-argument' into blue/services…
shadeofblue Mar 30, 2021
5a776a6
Merge branch 'master' into blue/polling-batch-timeout-argument
shadeofblue Mar 30, 2021
f851124
black...
shadeofblue Mar 30, 2021
942f46b
Merge branch 'blue/polling-batch-timeout-argument' of github.com:gole…
shadeofblue Mar 30, 2021
32ae650
black...
shadeofblue Mar 30, 2021
4a2081d
bring back the "proper" version of the service
shadeofblue Mar 30, 2021
ccb307e
.
shadeofblue Mar 30, 2021
eec0510
a more fleshed-out example
shadeofblue Mar 31, 2021
ccedf05
`WorkContext.download_bytes` and `WorkContext.download_json` methods
shadeofblue Mar 31, 2021
f8eeeda
`WorkContext.download_bytes` and `WorkContext.download_json` methods
shadeofblue Mar 31, 2021
9854fc4
refine the simple_service.py
shadeofblue Apr 2, 2021
8f1065e
+ tests, comments
shadeofblue Apr 6, 2021
a55f667
black
shadeofblue Apr 6, 2021
338eceb
make the `on_download` callback async
shadeofblue Apr 7, 2021
6d171a3
fix assert message
shadeofblue Apr 7, 2021
e6563fd
Merge branch 'master' into blue/download-bytes
shadeofblue Apr 7, 2021
d6e5a35
Set maximum client-side timeout to 5 seconds, continue on client-side…
filipgolem Apr 7, 2021
10de416
mypy
shadeofblue Apr 7, 2021
474565b
Decrease timeout
filipgolem Apr 7, 2021
554abcc
Merge branch 'master' into blue/polling-batch-timeout-argument
shadeofblue Apr 7, 2021
b05ccc6
Merge branch 'blue/download-bytes' into blue/services-prototype
shadeofblue Apr 7, 2021
9afefd5
make callbacks async in `simple_service.py`
shadeofblue Apr 7, 2021
e1fdffe
Merge branch 'blue/polling-batch-timeout-argument' into blue/services…
shadeofblue Apr 7, 2021
ac1e43a
Merge branch 'master' into blue/services-prototype
shadeofblue Apr 8, 2021
8f46907
remove the non-working http server example
shadeofblue Apr 8, 2021
1ff0c4b
some comments
shadeofblue Apr 8, 2021
4467388
comment to the exception handler
shadeofblue Apr 13, 2021
cbf48af
Merge branch 'master' into blue/services-prototype
shadeofblue Apr 14, 2021
27ffa59
Merge branch 'master' into blue/services-prototype
shadeofblue Apr 26, 2021
2e7d5f6
limit workers to 1
shadeofblue Apr 26, 2021
9f71db2
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 19, 2021
2a2717d
mv examples/service examples/simple-service-poc
shadeofblue May 19, 2021
3861b3a
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 19, 2021
f809854
if the user issues "start" or "deploy" explictly, disable "implicit i…
shadeofblue May 19, 2021
73ba1fd
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 19, 2021
abf2211
update the toy example with the HL Services API
shadeofblue May 19, 2021
9eee66f
.
shadeofblue May 19, 2021
b821a55
- deprecation warning
shadeofblue May 19, 2021
2e5f575
make the log unique
shadeofblue May 19, 2021
99a7845
fix messages
shadeofblue May 19, 2021
e053519
ARGH ...
shadeofblue May 19, 2021
0ee6c70
make `ctx` and `cluster` private on `Service`
shadeofblue May 20, 2021
cafd93c
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 20, 2021
290a305
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 20, 2021
033e5db
fix plot downloading in simple_service
shadeofblue May 20, 2021
b7b197a
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 20, 2021
43c0146
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 20, 2021
cdadb30
optimize the example to eliminate the callbacks
shadeofblue May 20, 2021
8037417
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 20, 2021
a9f4915
Merge branch 'blue/services-api' into blue/services-prototype
shadeofblue May 24, 2021
c7cbd5b
Merge branch 'blue/services-api' into blue/services-prototype
azawlocki May 26, 2021
0ed9598
more sensible example behavior (waiting for start, etc)
shadeofblue May 27, 2021
ae7b8c0
add README.md to the Docker image directory, add appropriate annotati…
shadeofblue May 27, 2021
57f2c48
prevent the example from finishing before we've established whether t…
shadeofblue May 27, 2021
ab9d278
some comments to clarify the service example
shadeofblue May 27, 2021
44f826e
make the ctl script a constant, add some comments to the handlers
shadeofblue May 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions examples/simple-service-poc/simple_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
the requestor agent controlling and interacting with the "simple service"
"""
import asyncio
from datetime import datetime, timedelta, timezone
import json
import pathlib
import queue
import random
import string
import sys



from yapapi import (
NoPaymentAccountError,
__version__ as yapapi_version,
windows_event_loop_fix,
)
from yapapi.executor import Golem
from yapapi.executor.services import Service

from yapapi.log import enable_default_logger, log_summary, log_event_repr, pluralize # noqa
from yapapi.payload import vm

examples_dir = pathlib.Path(__file__).resolve().parent.parent
sys.path.append(str(examples_dir))

from utils import (
build_parser,
TEXT_COLOR_CYAN,
TEXT_COLOR_DEFAULT,
TEXT_COLOR_RED,
TEXT_COLOR_YELLOW,
)

NUM_INSTANCES = 1


class SimpleService(Service):
STATS_PATH = "/golem/out/stats"
PLOT_INFO_PATH = "/golem/out/plot"
SIMPLE_SERVICE = "/golem/run/simple_service.py"

@staticmethod
async def get_payload():
return await vm.repo(
image_hash="8b11df59f84358d47fc6776d0bb7290b0054c15ded2d6f54cf634488",
min_mem_gib=0.5,
min_storage_gib=2.0,
)

async def start(self):
self._ctx.run("/golem/run/simulate_observations_ctl.py", "--start")
azawlocki marked this conversation as resolved.
Show resolved Hide resolved
yield self._ctx.commit()

async def run(self):
while True:
await asyncio.sleep(10)
self._ctx.run(self.SIMPLE_SERVICE, "--stats") # idx 0
self._ctx.run(self.SIMPLE_SERVICE, "--plot", "dist") # idx 1

future_results = yield self._ctx.commit()
results = await future_results
stats = results[0].stdout.strip()
plot = results[1].stdout.strip().strip('"')

print(f"{TEXT_COLOR_CYAN}stats: {stats}{TEXT_COLOR_DEFAULT}")

plot_filename = (
"".join(random.choice(string.ascii_letters) for _ in
range(10)) + ".png"
)
print(f"{TEXT_COLOR_CYAN}downloading plot: {plot} to {plot_filename}{TEXT_COLOR_DEFAULT}")
self._ctx.download_file(plot, str(pathlib.Path(__file__).resolve().parent / plot_filename))

steps = self._ctx.commit()
yield steps

async def shutdown(self):
self._ctx.run("/golem/run/simulate_observations_ctl.py", "--stop")
yield self._ctx.commit()


async def main(subnet_tag, driver=None, network=None):
async with Golem(
budget=1.0,
subnet_tag=subnet_tag,
driver=driver,
network=network,
event_consumer=log_summary(log_event_repr),
) as golem:

print(
f"yapapi version: {TEXT_COLOR_YELLOW}{yapapi_version}{TEXT_COLOR_DEFAULT}\n"
f"Using subnet: {TEXT_COLOR_YELLOW}{subnet_tag}{TEXT_COLOR_DEFAULT}, "
f"payment driver: {TEXT_COLOR_YELLOW}{golem.driver}{TEXT_COLOR_DEFAULT}, "
f"and network: {TEXT_COLOR_YELLOW}{golem.network}{TEXT_COLOR_DEFAULT}\n"
)

start_time = datetime.now()

print(f"{TEXT_COLOR_YELLOW}starting {pluralize(NUM_INSTANCES, 'instance')}{TEXT_COLOR_DEFAULT}")

cluster = await golem.run_service(
SimpleService,
num_instances=NUM_INSTANCES,
expiration=datetime.now(timezone.utc) + timedelta(minutes=15))

def instances():
return [(s.provider_name, s.state.value) for s in cluster.instances]

def still_running():
return any([s for s in cluster.instances if s.is_available])

Comment on lines +118 to +123
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the example would be more readable if we moved these out of main()? That would require adding a c: Cluster parameter.

Or maybe even have a print_instances(cluster) function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments, I wouldn't do pull those out

while datetime.now() < start_time + timedelta(minutes=2):
print(f"instances: {instances()}")
await asyncio.sleep(5)

print(f"{TEXT_COLOR_YELLOW}stopping instances{TEXT_COLOR_DEFAULT}")
cluster.stop()

cnt = 0
while cnt < 10 and still_running():
print(f"instances: {instances()}")
await asyncio.sleep(5)

print(f"instances: {instances()}")


if __name__ == "__main__":
parser = build_parser("Test http")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "Test http" an accurate description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, no :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done in 57f2c48

now = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
parser.set_defaults(log_file=f"simple-service-yapapi-{now}.log")
args = parser.parse_args()

# This is only required when running on Windows with Python prior to 3.8:
windows_event_loop_fix()

enable_default_logger(
log_file=args.log_file,
debug_activity_api=True,
debug_market_api=True,
debug_payment_api=True,
)

loop = asyncio.get_event_loop()
task = loop.create_task(
main(subnet_tag=args.subnet_tag, driver=args.driver, network=args.network)
)

try:
loop.run_until_complete(task)
except NoPaymentAccountError as e:
handbook_url = (
"https://handbook.golem.network/requestor-tutorials/"
"flash-tutorial-of-requestor-development"
)
print(
f"{TEXT_COLOR_RED}"
f"No payment account initialized for driver `{e.required_driver}` "
f"and network `{e.required_network}`.\n\n"
f"See {handbook_url} on how to initialize payment accounts for a requestor node."
f"{TEXT_COLOR_DEFAULT}"
)
except KeyboardInterrupt:
print(
f"{TEXT_COLOR_YELLOW}"
"Shutting down gracefully, please wait a short while "
"or press Ctrl+C to exit immediately..."
f"{TEXT_COLOR_DEFAULT}"
)
task.cancel()
try:
loop.run_until_complete(task)
print(
f"{TEXT_COLOR_YELLOW}Shutdown completed, thank you for waiting!{TEXT_COLOR_DEFAULT}"
)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM python:3.8-slim
VOLUME /golem/in /golem/out
COPY simple_service.py /golem/run/simple_service.py
COPY simulate_observations.py /golem/run/simulate_observations.py
COPY simulate_observations_ctl.py /golem/run/simulate_observations_ctl.py
RUN pip install numpy matplotlib
RUN chmod +x /golem/run/*
RUN /golem/run/simple_service.py --init
ENTRYPOINT ["sh"]
129 changes: 129 additions & 0 deletions examples/simple-service-poc/simple_service/simple_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/local/bin/python
"""
azawlocki marked this conversation as resolved.
Show resolved Hide resolved
a very basic "stub" that exposes a few commands of an imagined, very simple CLI-based
service that is able to accumulate some linear, time-based values and present it stats
(characteristics of the statistical distribution of the data collected so far) or provide
distribution and time-series plots of the collected data
"""
import argparse
from datetime import datetime
import enum
import contextlib
import json
import matplotlib.pyplot as plt
import numpy
import random
import sqlite3
import string
from pathlib import Path

DB_PATH = Path(__file__).absolute().parent / "service.db"
PLOT_PATH = Path("/golem/out").absolute()


class PlotType(enum.Enum):
time = "time"
dist = "dist"


@contextlib.contextmanager
def _connect_db():
db = sqlite3.connect(DB_PATH)
db.row_factory = sqlite3.Row
try:
yield db.cursor()
db.commit()
finally:
db.close()


def init():
with _connect_db() as db:
db.execute(
"create table observations("
"id integer primary key autoincrement not null, "
"val float not null,"
"time_added timestamp default current_timestamp not null"
")"
)


def add(val):
with _connect_db() as db:
db.execute("insert into observations (val) values (?)", [val])


def plot(plot_type):
data = _get_data()

if not data:
print(json.dumps(""))
return

y = [r["val"] for r in data]

if plot_type == PlotType.dist.value:
plt.hist(y)
elif plot_type == PlotType.time.value:
x = [datetime.strptime(r["time_added"], "%Y-%m-%d %H:%M:%S") for r in data]
plt.plot(x, y)

plot_filename = PLOT_PATH / (
"".join(random.choice(string.ascii_letters) for _ in range(10)) + ".png"
)
plt.savefig(plot_filename)
print(json.dumps(str(plot_filename)))


def dump():
print(json.dumps(_get_data()))


def _get_data():
with _connect_db() as db:
db.execute("select val, time_added from observations order by time_added asc")
return list(map(dict, db.fetchall()))


def _get_stats(data=None):
data = data or [r["val"] for r in _get_data()]
return {
"min": min(data) if data else None,
"max": max(data) if data else None,
"median": numpy.median(data) if data else None,
"mean": numpy.mean(data) if data else None,
"variance": numpy.var(data) if data else None,
"std dev": numpy.std(data) if data else None,
"size": len(data),
}


def stats():
print(json.dumps(_get_stats()))


def get_arg_parser():
parser = argparse.ArgumentParser(description="simple service")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--add", type=float)
group.add_argument("--init", action="store_true")
group.add_argument("--plot", choices=[pt.value for pt in list(PlotType)])
group.add_argument("--dump", action="store_true")
group.add_argument("--stats", action="store_true")
return parser


if __name__ == "__main__":
arg_parser = get_arg_parser()
args = arg_parser.parse_args()

if args.init:
init()
elif args.add:
add(args.add)
elif args.plot:
plot(args.plot)
elif args.dump:
dump()
elif args.stats:
stats()
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/local/bin/python
"""
the "hello world" service here just adds randomized numbers with normal distribution

in a real-world example, this could be e.g. a thermometer connected to the provider's
machine providing its inputs into the database or some other piece of information
from some external source that changes over time and which can be expressed as a
singular value
"""
import os
from pathlib import Path
import random
import time

MU = 14
SIGMA = 3

SERVICE_PATH = Path(__file__).absolute().parent / "simple_service.py"


while True:
v = random.normalvariate(MU, SIGMA)
os.system(f"{SERVICE_PATH} --add {v}")
time.sleep(1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/local/bin/python
"""
a helper, control script that starts and stops our example `simulate_observations` service
"""
import argparse
import os
import subprocess
import signal

PIDFILE = "/var/run/simulate_observations.pid"
SCRIPT_FILE = "/golem/run/simulate_observations.py"

parser = argparse.ArgumentParser("start/stop simulation")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--start", action="store_true")
group.add_argument("--stop", action="store_true")

args = parser.parse_args()

if args.start:
if os.path.exists(PIDFILE):
raise Exception(f"Cannot start process, {PIDFILE} exists.")
p = subprocess.Popen([SCRIPT_FILE])
with open(PIDFILE, "w") as pidfile:
pidfile.write(str(p.pid))
elif args.stop:
if not os.path.exists(PIDFILE):
raise Exception(f"Could not find pidfile: {PIDFILE}.")
with open(PIDFILE, "r") as pidfile:
pid = int(pidfile.read())

os.kill(pid, signal.SIGKILL)
os.remove(PIDFILE)
2 changes: 2 additions & 0 deletions yapapi/executor/ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,11 @@ def begin(self):
pass

def deploy(self):
self._implicit_init = False
self._pending_steps.append(_Deploy())

def start(self):
self._implicit_init = False
self._pending_steps.append(_Start())

def send_json(self, json_path: str, data: dict):
Expand Down
Loading