-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathsamuelcolvin_aicli.py
151 lines (120 loc) · 5.09 KB
/
samuelcolvin_aicli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
import argparse
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import openai
from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.history import FileHistory
from rich.console import Console, ConsoleOptions, RenderResult
from rich.live import Live
from rich.markdown import CodeBlock, Markdown
from rich.status import Status
from rich.syntax import Syntax
from rich.text import Text
__version__ = '0.8.0'
class SimpleCodeBlock(CodeBlock):
def __rich_console__(self, console: Console, options: ConsoleOptions) -> RenderResult:
code = str(self.text).rstrip()
yield Text(self.lexer_name, style='dim')
yield Syntax(code, self.lexer_name, theme=self.theme, background_color='default', word_wrap=True)
yield Text(f'/{self.lexer_name}', style='dim')
Markdown.elements['fence'] = SimpleCodeBlock
def cli() -> int:
parser = argparse.ArgumentParser(
prog='aicli',
description=f"""\
OpenAI powered AI CLI v{__version__}
Special prompts:
* `show-markdown` - show the markdown output from the previous response
* `multiline` - toggle multiline mode
""",
)
parser.add_argument('prompt', nargs='?', help='AI Prompt, if omitted fall into interactive mode')
# allows you to disable streaming responses if they get annoying or are more expensive.
parser.add_argument('--no-stream', action='store_true', help='Whether to stream responses from OpenAI')
parser.add_argument('--version', action='store_true', help='Show version and exit')
args = parser.parse_args()
console = Console()
console.print(f'aicli - OpenAI powered AI CLI v{__version__}', style='green bold', highlight=False)
if args.version:
return 0
try:
openai_api_key = os.environ['OPENAI_API_KEY']
except KeyError:
console.print('You must set the OPENAI_API_KEY environment variable', style='red')
return 1
client = openai.OpenAI(api_key=openai_api_key)
now_utc = datetime.now(timezone.utc)
setup = f"""\
Help the user by responding to their request, the output should be concise and always written in markdown.
The current date and time is {datetime.now()} {now_utc.astimezone().tzinfo.tzname(now_utc)}.
The user is running {sys.platform}."""
stream = not args.no_stream
messages = [{'role': 'system', 'content': setup}]
if args.prompt:
messages.append({'role': 'user', 'content': args.prompt})
try:
ask_openai(client, messages, stream, console)
except KeyboardInterrupt:
pass
return 0
history = Path().home() / '.openai-prompt-history.txt'
session = PromptSession(history=FileHistory(str(history)))
multiline = False
while True:
try:
text = session.prompt('aicli ➤ ', auto_suggest=AutoSuggestFromHistory(), multiline=multiline)
except (KeyboardInterrupt, EOFError):
return 0
if not text.strip():
continue
ident_prompt = text.lower().strip(' ').replace(' ', '-')
if ident_prompt == 'show-markdown':
last_content = messages[-1]['content']
console.print('[dim]Last markdown output of last question:[/dim]\n')
console.print(Syntax(last_content, lexer='markdown', background_color='default'))
continue
elif ident_prompt == 'multiline':
multiline = not multiline
if multiline:
console.print(
'Enabling multiline mode. '
'[dim]Press [Meta+Enter] or [Esc] followed by [Enter] to accept input.[/dim]'
)
else:
console.print('Disabling multiline mode.')
continue
messages.append({'role': 'user', 'content': text})
try:
content = ask_openai(client, messages, stream, console)
except KeyboardInterrupt:
return 0
messages.append({'role': 'assistant', 'content': content})
def ask_openai(client: openai.OpenAI, messages: list[dict[str, str]], stream: bool, console: Console) -> str:
with Status('[dim]Working on it…[/dim]', console=console):
response = client.chat.completions.create(model='gpt-4o', messages=messages, stream=stream)
console.print('\nResponse:', style='green')
if stream:
content = ''
interrupted = False
with Live('', refresh_per_second=15, console=console) as live:
try:
for chunk in response:
if chunk.choices[0].finish_reason is not None:
break
chunk_text = chunk.choices[0].delta.content
content += chunk_text
live.update(Markdown(content))
except KeyboardInterrupt:
interrupted = True
if interrupted:
console.print('[dim]Interrupted[/dim]')
else:
content = response.choices[0].message.content
console.print(Markdown(content))
return content
if __name__ == '__main__':
sys.exit(cli())