-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathCommand.py
125 lines (106 loc) · 4.45 KB
/
Command.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import logging
import traceback
import asyncio
import textwrap
from .logger import log
from .Check import FailedCheck
from .lib import SafeCancellation, flag_parser
class Command(object):
def __init__(self, name, func, module, **kwargs):
self.name = name
self.func = func
self.module = module
self.handle_edits = kwargs.pop("handle_edits", True)
self.aliases = kwargs.pop("aliases", [])
self.flags = kwargs.pop("flags", [])
self.hidden = kwargs.pop("hidden", False)
self.short_help = kwargs.pop('short_help', None)
self.long_help = self.parse_help()
self.__dict__.update(kwargs)
async def run(self, ctx):
"""
Safely execute this command with the current context.
Respond and log any exceptions that arise.
"""
try:
task = asyncio.ensure_future(self.exec_wrapper(ctx))
ctx.tasks.append(task)
await task
except FailedCheck as e:
log("Command failed check: {}".format(e.check.name),
context="mid:{}".format(ctx.msg.id),
level=logging.DEBUG)
if e.check.msg:
await ctx.error_reply(e.check.msg)
except SafeCancellation as e:
log("Caught a safe command cancellation: {}: {}".format(e.__class__.__name__, e.details),
context="mid:{}".format(ctx.msg.id),
level=logging.DEBUG)
if e.msg is not None:
await ctx.error_reply(e.msg)
except asyncio.TimeoutError:
log("Caught an unhandled TimeoutError", context="mid:{}".format(ctx.msg.id), level=logging.WARNING)
await ctx.error_reply("Operation timed out.")
except asyncio.CancelledError:
log("Command was cancelled, probably due to a message edit.",
context="mid:{}".format(ctx.msg.id),
level=logging.DEBUG)
except Exception as e:
full_traceback = traceback.format_exc()
only_error = "".join(traceback.TracebackException.from_exception(e).format_exception_only())
log("Caught the following exception while running command:\n{}".format(full_traceback),
context="mid:{}".format(ctx.msg.id),
level=logging.ERROR)
await ctx.reply(
("An unexpected internal error occurred while running your command! "
"Please report the following error to the developer:\n`{}`").format(only_error)
)
else:
log("Command completed execution without error.",
context="mid:{}".format(ctx.msg.id),
level=logging.DEBUG)
async def exec_wrapper(self, ctx):
"""
Execute the command in the current context.
May raise an exception if not handled by the module on_exception handler.
"""
try:
await self.module.pre_command(ctx)
if self.flags:
flags, ctx.args = flag_parser(ctx.arg_str, self.flags)
await self.func(ctx, flags=flags)
else:
await self.func(ctx)
await self.module.post_command(ctx)
except Exception as e:
await self.module.on_exception(ctx, e)
def parse_help(self):
"""
Convert the docstring of the command function into a list of (fieldname, fieldcontent) tuples.
"""
if not self.func.__doc__:
return []
# Split the docstring into lines
lines = textwrap.dedent(self.func.__doc__).strip().splitlines()
help_fields = []
field_name = ""
field_content = []
for line in lines:
if line.endswith(':'):
# New field!
if field_content:
# Add the previous field to the table
field = textwrap.dedent("\n".join(field_content))
help_fields.append((field_name, field))
# Initialise the new field
field_name = line[:-1].strip()
field_content = []
else:
# Add the line to the current field content
field_content.append(line)
# Add the last field to the table if it exists
if field_content:
# Add the previous field to the table
field = textwrap.dedent("\n".join(field_content))
help_fields.append((field_name, field))
return help_fields