#!/bin/python # AI Assistant in the terminal import argparse import os import sys import sqlite3 import warnings import json from ollama import Client import re import pyperclip import pygments from pygments.lexers import get_lexer_by_name, guess_lexer from pygments.formatters import TerminalFormatter from prompt_toolkit.key_binding import KeyBindings from prompt_toolkit import PromptSession default_assistant = "qwen3:14b" default_topic_llm = "qwen3:1.7b" warnings.filterwarnings("ignore") class AIAssistant: def __init__(self, server="http://localhost:11434", model="qwen3:14b"): self.server = server self.model = model self.client = Client(host=self.server) self.temperature = 0.2 self.num_ctx = 4096 self.history = [self.system_prompt()] self.db_path = os.path.expanduser("~/.cache/ai-assistant.db") self._init_db() def _init_db(self): """Initialize SQLite database and create the conversations table.""" if not os.path.exists(self.db_path): self._create_db() def _create_db(self): """Create the conversations table in the SQLite database.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT, history TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() conn.close() def _save_to_db(self, topic): """Save the current conversation to the SQLite database.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO conversations (topic, history) VALUES (?, ?) ''', (topic, json.dumps(self.history))) conn.commit() conn.close() def _load_from_db(self, conversation_id): """Load a conversation from the SQLite database by ID.""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT history FROM conversations WHERE id = ? ''', (conversation_id,)) result = cursor.fetchone() conn.close() if result: self.history = json.loads(result[0]) else: self.history = [self.system_prompt()] def _stringify_history(self): s = "" for item in self.history: s += item["role"] + ":\n" + item["content"] return s def _strip_thinking_tags(self, text: str): start_idx = text.find("") if (start_idx < 0): return text end_idx = text.find("") + len("") stripped_text = text[end_idx:] stripped_text = stripped_text.strip() return stripped_text def save_history(self): """Save the current conversation to the database with a generated topic.""" # Only save if this is the first user message if len(self.history) == 3: # Generate a topic using the AI system_prompt = self.system_prompt() user_prompt = "/no_think Generate a concise, 5 word descriptive topic for this conversation based on the following content. Do not use markdown, just plaintext. KEEP IT TO 5 WORDS OR LESS.:\n\n" user_prompt += self._stringify_history() topic = self.client.chat(model=default_topic_llm, messages=[system_prompt, {"role": "user", "content": user_prompt}], stream=False)['message']['content'].strip() topic = self._strip_thinking_tags(topic) self._save_to_db(topic) else: # For subsequent messages, we can update the topic in the future pass def load_history(self, conversation_id=None): """Load a conversation from the database by ID. If no ID, start a new one.""" if conversation_id: self._load_from_db(conversation_id) print(self._stringify_history()) else: self.history = [self.system_prompt()] def set_host(self, host): self.server = host self.client = Client(host=host) def system_prompt(self): return {"role": "system", "content": "You are a helpful, smart, kind, and efficient AI assistant. You always fulfill the user's requests accurately and concisely."} def determine_lexer(self, code_block): lexer_name = None lines = code_block.split('\n') for line in lines: if line.strip().startswith('```'): lexer_part = line.strip().split('```')[1].strip() if lexer_part: lexer_name = lexer_part break elif line.strip().startswith('lang:'): lexer_part = line.strip().split(':')[1].strip() if lexer_part: lexer_name = lexer_part break return lexer_name def highlight_code(self, lexer_name, code): try: lexer = get_lexer_by_name(lexer_name) if lexer_name else guess_lexer(code) except ValueError: lexer = guess_lexer('\n'.join(code.split('\n')[1:-1])) if not lexer: lexer = get_lexer_by_name('bash') formatter = TerminalFormatter() highlighted_code = pygments.highlight(code, lexer, formatter) return highlighted_code def chat(self, message, stream=True): self.history.append({"role": "user", "content": message}) completion = self.client.chat( model=self.model, options={"temperature": self.temperature, "num_ctx": self.num_ctx}, messages=self.history, stream=stream ) result = '' all_chunks = [] large_chunk = [] language = None if stream: for chunk in completion: text = chunk['message']['content'] large_chunk.append(text) all_chunks.append(text) large_text = ''.join(large_chunk) if ('```' in large_text) and ('\n' in large_text.split('```')[1]): language = large_text.split('```')[1].split('\n')[0] large_chunk = [] if language == '': print(large_text, end='', flush=True) language = None if language and ('\n' in large_text) and large_chunk: output = self.highlight_code(language, large_text) print(output, end='', flush=True) large_chunk = [] elif not language or not large_chunk: print(text, end='', flush=True) result = ''.join(all_chunks) else: result = completion['message']['content'] self.history.append({"role": 'assistant', 'content': result}) self.save_history() return result class CommandLineParser: def __init__(self): self.parser = argparse.ArgumentParser(description='Chat with an intelligent assistant') self.add_arguments() def add_arguments(self): parser = self.parser parser.add_argument('--host', nargs='?', const=True, default=False, help='Specify host of Ollama server') parser.add_argument('--model', '-m', nargs='?', const=True, default=False, help='Specify model') parser.add_argument('--temp', '-t', nargs='?', type=float, const=0.2, default=False, help='Specify temperature') parser.add_argument('--context', type=int, default=4096, help='Specify context size') parser.add_argument('--reasoning', '-r', action='store_true', help='Use the default reasoning model deepseek-r1:14b') parser.add_argument('--resume', action='store_true', help='Resume a previous conversation') parser.add_argument('--follow-up', '-f', nargs='?', const=True, default=False, help='Ask a follow up question when piping in context') parser.add_argument('--copy', '-c', action='store_true', help='Copy a codeblock if it appears') parser.add_argument('--shell', '-s', nargs='?', const=True, default=False, help='Output a shell command that does as described') def parse(self): return self.parser.parse_args() # Keybindings bindings = KeyBindings() @bindings.add('c-d') def _(event): event.current_buffer.validate_and_handle() class InputHandler: def __init__(self, assistant, command_parser): self.assistant = assistant self.command_parser = command_parser self.command_parser.assistant = assistant self.command_parser.input_handler = self self.session = PromptSession(multiline=True, prompt_continuation='', key_bindings=bindings) def handle_input(self, args): if not sys.stdin.isatty(): self.handle_piped_input(args) else: self.handle_interactive_input(args) def copy_string_to_clipboard(self, s): try: pyperclip.copy(s) except: return def follow_up(self, args, query): if type(args.follow_up) is str: second_input = args.follow_up else: second_input = self.improved_input() query += f'\n\nUser Question:\n{second_input}' return query def handle_piped_input(self, args): all_input = sys.stdin.read() query = f'Use the following context to answer the question. There will be no follow up questions from the user so make sure your answer is complete:\n{all_input}\n' if args.copy: query += 'Answer the question using a codeblock for any code or shell scripts\n' if args.follow_up: query = self.follow_up(args, query) result = self.assistant.chat(query, stream=False) result = self.assistant._strip_thinking_tags(result) print(result) blocks = self.extract_code_block(result) if args.copy and len(blocks): self.copy_string_to_clipboard(blocks[0]) def arg_shell(self, args): query = ''' Form a shell command based on the following description. Only output a working shell command. Format the command like this: `command` Description:\n ''' if type(args.shell) is str: query += args.shell else: query += self.improved_input() result = self.assistant.chat(query, stream=False) result = blocks[0] if len(blocks := self.extract_code_block(result)) else result print(result) self.copy_string_to_clipboard(result) def handle_interactive_input(self, args): if args.shell: self.assistant.history = [self.assistant.system_prompt()] self.arg_shell(args) exit() if args.follow_up: query = "Provide a complete answer to the user's question, there will be no follow up questions from the user.\n\n" query = self.follow_up(args, query) result = self.assistant.chat(query, stream=False) result = self.assistant._strip_thinking_tags(result) print(result) exit() print("\033[91massistant\033[0m: Type your message (press Ctrl+D to send):") while True: try: full_input = self.improved_input() if full_input is None: break if full_input.strip() == '': continue command_result = self.command_parser.parse_commands(full_input) if type(command_result) is str: self.assistant.chat(command_result) print() except (EOFError, KeyboardInterrupt): print("\nExiting...") break def improved_input(self, prompt="> "): """ Returns the full text (including embedded newlines) when you press Ctrl-D. Arrow keys edit within or across lines automatically. """ try: text = self.session.prompt(prompt) return text except KeyboardInterrupt: print("\nUser aborted input") return None def extract_code_block(self, text, highlight=True): pattern = r'```[a-z]*\n[\s\S]*?\n```' code_blocks = [] matches = re.finditer(pattern, text) for match in matches: code_block = match.group(0) if highlight: lexer_name = self.assistant.determine_lexer(code_block) highlighted_code = self.assistant.highlight_code(lexer_name, code_block) code_blocks.append(highlighted_code) else: code_blocks.append(code_block) if not code_blocks: line_pattern = r'`[a-z]*[\s\S]*?`' line_matches = re.finditer(line_pattern, text) for match in line_matches: code_block = match.group(0) code_blocks.append(code_block[1:-1]) return code_blocks class CommandParser: def __init__(self): self.commands = { '/save': self.handle_save, '/clear': self.handle_clear, '/clipboard': None, '/exit': self.handle_exit, '/copy': self.handle_copy, '/list': self.handle_list } def parse_commands(self, text): """ Parses the given text to check if it contains a recognized command. If the command is standalone returns True. If the command requires passing the text through a string handler (e.g., /clipboard), returns the result of that handler as a string. Otherwise, returns text if no command is recognized. Args: text: The input text to parse. Returns: True if the command is standalone A string containing the result of the command handler if the command is not standalone. """ tokens = text.split(' ') if not tokens: return False command = tokens[0] if command in self.commands: handler = self.commands[command] if len(tokens) > 1 and command == '/clipboard': context_query = '\n\nThe following is context provided by the user:\n' clipboard_content = pyperclip.paste() if clipboard_content: context_query += clipboard_content + '\n' + text return context_query else: return handler() return text def handle_save(self): filename = input('Enter filename to save conversation: ') self.save_conversation(filename) return True def save_conversation(self, filename='conversation.md'): # TODO finish this pass def handle_clear(self): self.assistant.history = [self.assistant.system_prompt()] self.assistant.save_history() return True def handle_copy(self): blocks = self.input_handler.extract_code_block(self.assistant.history[-1]['content'], highlight=False) if len(blocks): block = '\n'.join(blocks[0].split('\n')[1:-1]) self.input_handler.copy_string_to_clipboard(block) return True def handle_list(self): assistant = self.assistant db_path = assistant.db_path # Connect to the database and fetch saved conversations conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT id, topic FROM conversations") rows = cursor.fetchall() conn.close() if not rows: print("No saved conversations.") return # Display saved conversations print("\nSaved conversations:") for row in rows: print(f"ID: {row[0]}, Topic: {row[1]}") # Prompt user to select an ID choice = input("\nEnter conversation ID to load (or 'q' to quit): ").strip() if choice.lower() == 'q': return try: conv_id = int(choice) assistant.load_history(conversation_id=conv_id) print(f"\nLoaded conversation with ID {conv_id}.") except ValueError: print("Invalid ID. Please enter a number.") except Exception as e: print(f"Error loading conversation: {e}") def handle_exit(self): sys.exit(0) def main(): args = CommandLineParser().parse() assistant = AIAssistant() if args.host: assistant.set_host(args.host) if args.model: assistant.model = args.model if args.temp: assistant.temperature = args.temp if args.context: assistant.num_ctx = args.context if args.resume: assistant.load_history() else: assistant.history = [assistant.system_prompt()] command_parser = CommandParser() input_handler = InputHandler(assistant, command_parser) input_handler.handle_input(args) if __name__ == '__main__': main()