Compare commits
No commits in common. "feat/sqlite_conversations" and "main" have entirely different histories.
feat/sqlit
...
main
172
assistant.py
172
assistant.py
|
|
@ -3,8 +3,6 @@
|
|||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import sqlite3
|
||||
import warnings
|
||||
|
||||
import json
|
||||
from ollama import Client
|
||||
|
|
@ -19,11 +17,6 @@ from prompt_toolkit.key_binding import KeyBindings
|
|||
from prompt_toolkit import PromptSession
|
||||
|
||||
|
||||
default_assistant = "qwen3:14b"
|
||||
default_topic_llm = "qwen3:1.7b"
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
|
||||
class AIAssistant:
|
||||
def __init__(self, server="http://localhost:11434", model="qwen3:14b"):
|
||||
self.server = server
|
||||
|
|
@ -32,94 +25,6 @@ class AIAssistant:
|
|||
self.temperature = 0.2
|
||||
self.num_ctx = 4096
|
||||
self.history = [self.system_prompt()]
|
||||
self.db_path = os.path.expanduser("~/.cache/ai-assistant.db")
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize SQLite database and create the conversations table."""
|
||||
if not os.path.exists(self.db_path):
|
||||
self._create_db()
|
||||
|
||||
def _create_db(self):
|
||||
"""Create the conversations table in the SQLite database."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS conversations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
topic TEXT,
|
||||
history TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
''')
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _save_to_db(self, topic):
|
||||
"""Save the current conversation to the SQLite database."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
INSERT INTO conversations (topic, history)
|
||||
VALUES (?, ?)
|
||||
''', (topic, json.dumps(self.history)))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def _load_from_db(self, conversation_id):
|
||||
"""Load a conversation from the SQLite database by ID."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
SELECT history FROM conversations WHERE id = ?
|
||||
''', (conversation_id,))
|
||||
result = cursor.fetchone()
|
||||
conn.close()
|
||||
if result:
|
||||
self.history = json.loads(result[0])
|
||||
else:
|
||||
self.history = [self.system_prompt()]
|
||||
|
||||
def _stringify_history(self):
|
||||
s = ""
|
||||
for item in self.history:
|
||||
s += item["role"] + ":\n" + item["content"]
|
||||
return s
|
||||
|
||||
def _strip_thinking_tags(self, text: str):
|
||||
start_idx = text.find("<think>")
|
||||
if (start_idx < 0):
|
||||
return text
|
||||
|
||||
end_idx = text.find("</think>") + len("</think>")
|
||||
stripped_text = text[end_idx:]
|
||||
stripped_text = stripped_text.strip()
|
||||
return stripped_text
|
||||
|
||||
def save_history(self):
|
||||
"""Save the current conversation to the database with a generated topic."""
|
||||
# Only save if this is the first user message
|
||||
if len(self.history) == 3:
|
||||
# Generate a topic using the AI
|
||||
system_prompt = self.system_prompt()
|
||||
user_prompt = "/no_think Generate a concise, 5 word descriptive topic for this conversation based on the following content. Do not use markdown, just plaintext. KEEP IT TO 5 WORDS OR LESS.:\n\n"
|
||||
user_prompt += self._stringify_history()
|
||||
topic = self.client.chat(model=default_topic_llm, messages=[system_prompt, {"role": "user", "content": user_prompt}], stream=False)['message']['content'].strip()
|
||||
topic = self._strip_thinking_tags(topic)
|
||||
self._save_to_db(topic)
|
||||
else:
|
||||
# For subsequent messages, we can update the topic in the future
|
||||
pass
|
||||
|
||||
def load_history(self, conversation_id=None):
|
||||
"""Load a conversation from the database by ID. If no ID, start a new one."""
|
||||
if conversation_id:
|
||||
self._load_from_db(conversation_id)
|
||||
print(self._stringify_history())
|
||||
else:
|
||||
self.history = [self.system_prompt()]
|
||||
|
||||
|
||||
|
||||
def set_host(self, host):
|
||||
self.server = host
|
||||
|
|
@ -128,6 +33,19 @@ class AIAssistant:
|
|||
def system_prompt(self):
|
||||
return {"role": "system", "content": "You are a helpful, smart, kind, and efficient AI assistant. You always fulfill the user's requests accurately and concisely."}
|
||||
|
||||
def load_history(self):
|
||||
path = os.environ.get('HOME') + '/.cache/ai-assistant.history'
|
||||
try:
|
||||
with open(path, 'r') as f:
|
||||
self.history = json.load(f)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def save_history(self):
|
||||
path = os.environ.get('HOME') + '/.cache/ai-assistant.history'
|
||||
with open(path, 'w+') as f:
|
||||
json.dump(self.history, f)
|
||||
|
||||
def determine_lexer(self, code_block):
|
||||
lexer_name = None
|
||||
lines = code_block.split('\n')
|
||||
|
|
@ -208,7 +126,7 @@ class CommandLineParser:
|
|||
parser.add_argument('--temp', '-t', nargs='?', type=float, const=0.2, default=False, help='Specify temperature')
|
||||
parser.add_argument('--context', type=int, default=4096, help='Specify context size')
|
||||
parser.add_argument('--reasoning', '-r', action='store_true', help='Use the default reasoning model deepseek-r1:14b')
|
||||
parser.add_argument('--resume', action='store_true', help='Resume a previous conversation')
|
||||
parser.add_argument('--new', '-n', action='store_true', help='Start a chat with a fresh history')
|
||||
parser.add_argument('--follow-up', '-f', nargs='?', const=True, default=False, help='Ask a follow up question when piping in context')
|
||||
parser.add_argument('--copy', '-c', action='store_true', help='Copy a codeblock if it appears')
|
||||
parser.add_argument('--shell', '-s', nargs='?', const=True, default=False, help='Output a shell command that does as described')
|
||||
|
|
@ -246,25 +164,15 @@ class InputHandler:
|
|||
except:
|
||||
return
|
||||
|
||||
def follow_up(self, args, query):
|
||||
if type(args.follow_up) is str:
|
||||
second_input = args.follow_up
|
||||
else:
|
||||
second_input = self.improved_input()
|
||||
query += f'\n\nUser Question:\n{second_input}'
|
||||
return query
|
||||
|
||||
|
||||
def handle_piped_input(self, args):
|
||||
all_input = sys.stdin.read()
|
||||
query = f'Use the following context to answer the question. There will be no follow up questions from the user so make sure your answer is complete:\n{all_input}\n'
|
||||
if args.copy:
|
||||
query += 'Answer the question using a codeblock for any code or shell scripts\n'
|
||||
if args.follow_up:
|
||||
query = self.follow_up(args, query)
|
||||
second_input = self.improved_input()
|
||||
query += f'\n{second_input}'
|
||||
result = self.assistant.chat(query, stream=False)
|
||||
result = self.assistant._strip_thinking_tags(result)
|
||||
print(result)
|
||||
blocks = self.extract_code_block(result)
|
||||
if args.copy and len(blocks):
|
||||
self.copy_string_to_clipboard(blocks[0])
|
||||
|
|
@ -289,14 +197,6 @@ Description:\n
|
|||
self.assistant.history = [self.assistant.system_prompt()]
|
||||
self.arg_shell(args)
|
||||
exit()
|
||||
if args.follow_up:
|
||||
query = "Provide a complete answer to the user's question, there will be no follow up questions from the user.\n\n"
|
||||
query = self.follow_up(args, query)
|
||||
result = self.assistant.chat(query, stream=False)
|
||||
result = self.assistant._strip_thinking_tags(result)
|
||||
print(result)
|
||||
exit()
|
||||
|
||||
|
||||
print("\033[91massistant\033[0m: Type your message (press Ctrl+D to send):")
|
||||
while True:
|
||||
|
|
@ -354,8 +254,7 @@ class CommandParser:
|
|||
'/clear': self.handle_clear,
|
||||
'/clipboard': None,
|
||||
'/exit': self.handle_exit,
|
||||
'/copy': self.handle_copy,
|
||||
'/list': self.handle_list
|
||||
'/copy': self.handle_copy
|
||||
}
|
||||
|
||||
def parse_commands(self, text):
|
||||
|
|
@ -410,36 +309,6 @@ class CommandParser:
|
|||
self.input_handler.copy_string_to_clipboard(block)
|
||||
return True
|
||||
|
||||
def handle_list(self):
|
||||
assistant = self.assistant
|
||||
db_path = assistant.db_path
|
||||
# Connect to the database and fetch saved conversations
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT id, topic FROM conversations")
|
||||
rows = cursor.fetchall()
|
||||
conn.close()
|
||||
if not rows:
|
||||
print("No saved conversations.")
|
||||
return
|
||||
# Display saved conversations
|
||||
print("\nSaved conversations:")
|
||||
for row in rows:
|
||||
print(f"ID: {row[0]}, Topic: {row[1]}")
|
||||
# Prompt user to select an ID
|
||||
choice = input("\nEnter conversation ID to load (or 'q' to quit): ").strip()
|
||||
if choice.lower() == 'q':
|
||||
return
|
||||
try:
|
||||
conv_id = int(choice)
|
||||
assistant.load_history(conversation_id=conv_id)
|
||||
print(f"\nLoaded conversation with ID {conv_id}.")
|
||||
except ValueError:
|
||||
print("Invalid ID. Please enter a number.")
|
||||
except Exception as e:
|
||||
print(f"Error loading conversation: {e}")
|
||||
|
||||
|
||||
def handle_exit(self):
|
||||
sys.exit(0)
|
||||
|
||||
|
|
@ -455,10 +324,11 @@ def main():
|
|||
assistant.temperature = args.temp
|
||||
if args.context:
|
||||
assistant.num_ctx = args.context
|
||||
if args.resume:
|
||||
assistant.load_history()
|
||||
else:
|
||||
if args.new:
|
||||
assistant.history = [assistant.system_prompt()]
|
||||
assistant.save_history()
|
||||
else:
|
||||
assistant.load_history()
|
||||
|
||||
command_parser = CommandParser()
|
||||
input_handler = InputHandler(assistant, command_parser)
|
||||
|
|
|
|||
Loading…
Reference in a new issue