ai-assistant/assistant.py

304 lines
11 KiB
Python
Raw Normal View History

#!/bin/python3
# Chat with an intelligent assistant in your terminal
from ollama import Client
import re
import pyperclip
import sys
import argparse
2024-09-25 18:56:48 +00:00
import pygments
from pygments.lexers import get_lexer_by_name
from pygments.formatters import TerminalFormatter
2024-09-25 19:54:27 +00:00
server = 'localhost:11434'
2024-09-03 09:38:36 +00:00
model = 'llama3.1:8b-instruct-q8_0'
temp = 0.2
pattern = r'```[a-z]*\n[\s\S]*?\n```'
line_pattern = r'`[a-z]*[\s\S]*?`'
2024-10-01 18:50:35 +00:00
def parse_commands(text):
# See if user wrote any commands here
tokens = text.split(' ')
match tokens[0]:
case '/save':
print('saving')
case '/clear':
print('clearing context')
case '/exit':
print('exiting')
2024-09-25 18:56:48 +00:00
def highlight_code(language_name, code):
# Check if the language is specified in the first line
lexer_name = language_name
if lexer_name == None:
lines = code.split('\n')
for line in lines:
if line.strip().startswith('```'):
lexer_name = line.strip().split('```')[1].strip()
break
elif line.strip().startswith('lang:'):
lexer_name = line.strip().split(':')[1].strip()
break
if lexer_name:
try:
# Try to get the lexer by name
lexer = get_lexer_by_name(lexer_name)
except ValueError:
# If the lexer is not found, guess it
lexer = guess_lexer(code.split('\n')[1:-1])
if not lexer:
# If no lexer is guessed, default to bash
lexer = get_lexer_by_name('bash')
else:
# If no language is specified, guess the lexer
print("LEXER NAME " + lexer_name)
2024-09-25 18:56:48 +00:00
lexer = guess_lexer(code.split('\n')[1:-1])
if not lexer:
# If no lexer is guessed, default to bash
lexer = get_lexer_by_name('bash')
formatter = TerminalFormatter()
just_code = code.split('\n')[0]
newlines = '\n'.join(code.split('\n')[1:])
# if code is a code block, strip surrounding block markers
2024-09-25 18:56:48 +00:00
lines = code.split('\n')
if (len(lines) > 2) and ('```' in lines[0]) and ('```' in lines[-1]):
2024-09-25 18:56:48 +00:00
just_code = '\n'.join(code.split('\n')[1:-1])
highlighted_code = pygments.highlight(just_code, lexer, formatter)
return highlighted_code + newlines
2024-09-25 18:56:48 +00:00
def extract_code_block(markdown_text):
# Use the regular expression pattern to find all matches in the markdown text
matches = re.finditer(pattern, markdown_text)
# Iterate over the matches and extract the code blocks
code_blocks = []
for match in matches:
code_block = match.group(0)
2024-09-25 18:56:48 +00:00
highlighted_code = highlight_code(None, code_block)
# Add the highlighted code block to the list of code blocks
code_blocks.append(highlighted_code)
if len(code_blocks) == 0:
line_matches = re.finditer(line_pattern, markdown_text)
for match in line_matches:
code_block = match.group(0)
code_blocks.append(code_block[1:-1])
return code_blocks
def copy_string_to_clipboard(string):
try:
pyperclip.copy(string)
except:
return
code_history = []
history = [
{"role": "system", "content": "You are a helpful, smart, kind, and efficient AI assistant. You always fulfill the user's requests accurately and concisely."},
]
2024-10-01 18:50:35 +00:00
conversation = ""
def chat(message, stream=True):
history.append({"role": "user", "content": message})
completion = client.chat(
model=model,
2024-09-03 09:38:36 +00:00
options={"temperature":temp},
messages=history,
stream=stream
)
result = ''
2024-09-25 18:56:48 +00:00
language = ''
large_chunk = []
for chunk in completion:
if stream:
2024-09-25 18:56:48 +00:00
text = chunk['message']['content']
large_chunk.append(text)
large_text = ''.join(large_chunk)
# update language if entering or leaving code block
2024-09-25 18:56:48 +00:00
if ('\n' in large_text) and ('```' in large_text):
language = large_text.split('```')[1].split('\n')[0]
if language == '':
language = None
2024-09-25 18:56:48 +00:00
print(large_text, end='', flush=True)
large_chunk = []
large_text = ''
# Only print full lines
2024-09-25 18:56:48 +00:00
if '\n' in large_text:
output = large_text
if language:
2024-09-25 18:56:48 +00:00
output = highlight_code(language, output)
print(output, end='', flush=True)
large_chunk = []
result += text
2024-09-03 09:38:36 +00:00
if not stream:
result = completion['message']['content']
if stream:
print(large_text, flush=True)
history.append({"role": 'assistant', 'content': result})
return result
2024-09-25 21:44:54 +00:00
def chat2(args, user_input, stream=True):
2024-10-01 18:52:31 +00:00
conversation += 'user: ' + user_input + '\n'
2024-09-25 21:33:14 +00:00
if args.reflect:
result = reflection_mode(user_input, stream)
else:
result = chat(user_input, stream)
2024-10-01 18:52:31 +00:00
conversation += 'assistant: ' + result + '\n'
2024-09-25 21:33:14 +00:00
return result
def highlightify_text(full_text):
lines = full_text.split('\n')
result = ''
language = None
for line in lines:
text = line + '\n'
# update language if entering or leaving code block
if '```' in text:
language = text.split('```')[1].split('\n')[0]
if language == '':
language = None
result += text
text = ''
# Only print full lines
if '\n' in text:
output = text
if language:
output = highlight_code(language, output)
result += output
return result
def parse_args():
# Create the parser
parser = argparse.ArgumentParser(description='Copy and open a source file in TextEdit')
# Add the --follow-up (-f) argument
parser.add_argument('--follow-up', '-f', nargs='?', const=True, default=False, help='Ask a follow up question when piping in context')
# Add the --copy (-c) argument
parser.add_argument('--copy', '-c', action='store_true', help='copy a codeblock if it appears')
# Add the --shell (-s) argument
parser.add_argument('--shell', '-s', nargs='?', const=True, default=False, help='output a shell command that does as described')
# Add the --model (-m) argument
parser.add_argument('--model', '-m', nargs='?', const=True, default=False, help='Specify model')
2024-09-03 09:38:36 +00:00
# Add the --temp (-t) argument
parser.add_argument('--temp', '-t', nargs='?', const=True, default=False, help='Specify temperature')
2024-09-25 21:33:14 +00:00
# Add the --host argument
2024-09-25 20:02:02 +00:00
parser.add_argument('--host', nargs='?', const=True, default=False, help='Specify host of ollama server')
2024-09-25 21:33:14 +00:00
# Add the --reflect argument
parser.add_argument('--reflect', action='store_true', help='Use reflection prompting style to improve output. May be slower and not work with all models.')
# Parse the arguments
return parser.parse_args()
2024-09-25 21:33:14 +00:00
def reflection_mode(query, should_print=False):
reflection_prompt = """
You are a helpful ai assistant that answers every question thoroughly and accurately. You always begin your response with a <planning></planning> section where you lay out your plan for answering the question. It is important that you don't make any assumptions while planning. Then you <reflect></reflect> on your plan to make sure it correctly answers the user's question. Then, if you are confident your plan in correct, you give your <draft answer>, followed by <final reflection> to make sure the answer correctly addresses the user's question. Finally, give a <final answer> with your answer to the user. If there are any ambiguous or unknown requirements, ask the user for more information as your final answer. You must always have a <final answer> no matter what, even if you are asking for clarifying questions. If you do not have the <final answer> tags, the user will not see your response. Additionally, the user can not see your planning or reflecting, they can only see what goes in the <final answer></final answer> tags, so make sure you provide any information you want to tell the user in there.
"""
result = chat(reflection_prompt + query, stream=False)
highlighted_result = highlightify_text(result)
# print('==DEBUG==')
# print(highlighted_result)
# print('==DEBUG==')
final_answer = highlighted_result.split('<final answer>')
while len(final_answer) < 2:
final_answer = chat('Please put your final answer in <final answer></final answer> tags.', stream=False)
final_answer = highlighted_result.split('<final answer>')
final_answer = final_answer[1].split('</final answer>')[0]
if should_print:
print(final_answer)
return final_answer
2024-09-25 19:54:27 +00:00
def set_host(host):
global server
server = host
def arg_follow_up(args):
sys.stdin = open('/dev/tty')
if args.follow_up != True:
second_input = args.follow_up
else:
second_input = input('> ')
return second_input
def arg_shell(args):
2024-09-03 09:38:36 +00:00
query = 'Form a shell command based on the following description. Only output a working shell command .\nDescription: '
if args.shell != True:
query += args.shell
else:
query += input('> ')
2024-09-25 21:33:14 +00:00
result = chat2(args, query, False)
result = blocks[0] if len(blocks := extract_code_block(result)) else result
2024-09-25 21:33:14 +00:00
print(result)
copy_string_to_clipboard(result)
def handle_piped_input(args):
all_input = sys.stdin.read()
query = 'Use the following context to answer the question. There will be no follow up questions from the user so make sure your answer is complete:\nSTART CONTEXT\n' + all_input + '\nEND CONTEXT\nAfter you answer the question, reflect on your answer and determine if it answers the question correctly.'
if args.copy:
query += 'Answer the question using a codeblock for any code or shell scripts\n'
if args.follow_up:
query += arg_follow_up(args)
query += '\n'
2024-09-25 21:33:14 +00:00
result = chat2(args, query)
blocks = extract_code_block(result)
if args.copy and len(blocks):
copy_string_to_clipboard(blocks[0])
def handle_non_piped_input(args):
if args.shell:
arg_shell(args)
exit()
if args.follow_up:
user_input = arg_follow_up(args)
2024-09-25 21:33:14 +00:00
result = chat2(args, user_input)
exit()
while True:
try:
user_input = input('> ')
except (EOFError, KeyboardInterrupt):
print()
exit()
else:
2024-09-25 21:33:14 +00:00
result = chat2(args, user_input)
2024-09-25 20:02:02 +00:00
client = None
def main():
args = parse_args()
2024-09-25 19:54:27 +00:00
if args.host:
set_host(args.host)
2024-09-25 20:02:02 +00:00
# Point to the local server
global client
client = Client(host=server)
if args.model:
global model
model = args.model
2024-09-03 09:38:36 +00:00
if args.temp:
global temp
temp = float(args.temp)
if not sys.stdin.isatty():
handle_piped_input(args)
else:
handle_non_piped_input(args)
if __name__ == '__main__':
main()