| | import streamlit as st |
| | import os, gc, shutil, re, time, threading, queue |
| | from itertools import islice |
| | from llama_cpp import Llama |
| | from llama_cpp.llama_speculative import LlamaPromptLookupDecoding |
| | from huggingface_hub import hf_hub_download |
| | from duckduckgo_search import DDGS |
| |
|
| | |
| | |
| | |
| | if "chat_history" not in st.session_state: |
| | st.session_state.chat_history = [] |
| | if "pending_response" not in st.session_state: |
| | st.session_state.pending_response = False |
| | if "model_name" not in st.session_state: |
| | st.session_state.model_name = None |
| | if "llm" not in st.session_state: |
| | st.session_state.llm = None |
| |
|
| | |
| | |
| | |
| | st.markdown(""" |
| | <style> |
| | .chat-container { margin: 1em 0; } |
| | .chat-assistant { background-color: #eef7ff; padding: 1em; border-radius: 10px; margin-bottom: 1em; } |
| | .chat-user { background-color: #e6ffe6; padding: 1em; border-radius: 10px; margin-bottom: 1em; } |
| | .message-time { font-size: 0.8em; color: #555; text-align: right; } |
| | .loading-spinner { font-size: 1.1em; color: #ff6600; } |
| | </style> |
| | """, unsafe_allow_html=True) |
| |
|
| | |
| | |
| | |
| | REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 |
| |
|
| | MODELS = { |
| | "Qwen2.5-0.5B-Instruct (Q4_K_M)": { |
| | "repo_id": "Qwen/Qwen2.5-0.5B-Instruct-GGUF", |
| | "filename": "qwen2.5-0.5b-instruct-q4_k_m.gguf", |
| | "description": "Qwen2.5-0.5B-Instruct (Q4_K_M)" |
| | }, |
| | "Gemma-3.1B-it (Q4_K_M)": { |
| | "repo_id": "unsloth/gemma-3-1b-it-GGUF", |
| | "filename": "gemma-3-1b-it-Q4_K_M.gguf", |
| | "description": "Gemma-3.1B-it (Q4_K_M)" |
| | }, |
| | "Qwen2.5-1.5B-Instruct (Q4_K_M)": { |
| | "repo_id": "Qwen/Qwen2.5-1.5B-Instruct-GGUF", |
| | "filename": "qwen2.5-1.5b-instruct-q4_k_m.gguf", |
| | "description": "Qwen2.5-1.5B-Instruct (Q4_K_M)" |
| | }, |
| | "Qwen2.5-3B-Instruct (Q4_K_M)": { |
| | "repo_id": "Qwen/Qwen2.5-3B-Instruct-GGUF", |
| | "filename": "qwen2.5-3b-instruct-q4_k_m.gguf", |
| | "description": "Qwen2.5-3B-Instruct (Q4_K_M)" |
| | }, |
| | "Qwen2.5-7B-Instruct (Q2_K)": { |
| | "repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", |
| | "filename": "qwen2.5-7b-instruct-q2_k.gguf", |
| | "description": "Qwen2.5-7B Instruct (Q2_K)" |
| | }, |
| | "Gemma-3-4B-IT (Q4_K_M)": { |
| | "repo_id": "unsloth/gemma-3-4b-it-GGUF", |
| | "filename": "gemma-3-4b-it-Q4_K_M.gguf", |
| | "description": "Gemma 3 4B IT (Q4_K_M)" |
| | }, |
| | "Phi-4-mini-Instruct (Q4_K_M)": { |
| | "repo_id": "unsloth/Phi-4-mini-instruct-GGUF", |
| | "filename": "Phi-4-mini-instruct-Q4_K_M.gguf", |
| | "description": "Phi-4 Mini Instruct (Q4_K_M)" |
| | }, |
| | "Meta-Llama-3.1-8B-Instruct (Q2_K)": { |
| | "repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF", |
| | "filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf", |
| | "description": "Meta-Llama-3.1-8B-Instruct (Q2_K)" |
| | }, |
| | "DeepSeek-R1-Distill-Llama-8B (Q2_K)": { |
| | "repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF", |
| | "filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf", |
| | "description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)" |
| | }, |
| | "Mistral-7B-Instruct-v0.3 (IQ3_XS)": { |
| | "repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF", |
| | "filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf", |
| | "description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)" |
| | }, |
| | "Qwen2.5-Coder-7B-Instruct (Q2_K)": { |
| | "repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF", |
| | "filename": "qwen2.5-coder-7b-instruct-q2_k.gguf", |
| | "description": "Qwen2.5-Coder-7B-Instruct (Q2_K)" |
| | }, |
| | } |
| |
|
| | |
| | |
| | |
| | def retrieve_context(query, max_results=6, max_chars_per_result=600): |
| | """Retrieve web search context using DuckDuckGo.""" |
| | try: |
| | with DDGS() as ddgs: |
| | results = list(islice(ddgs.text(query, region="wt-wt", safesearch="off", timelimit="y"), max_results)) |
| | context = "" |
| | for i, result in enumerate(results, start=1): |
| | title = result.get("title", "No Title") |
| | snippet = result.get("body", "")[:max_chars_per_result] |
| | context += f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\n\n" |
| | return context.strip() |
| | except Exception as e: |
| | st.error(f"Error during web retrieval: {e}") |
| | return "" |
| |
|
| | def try_load_model(model_path): |
| | """Attempt to initialize the model from a specified path.""" |
| | try: |
| | return Llama( |
| | model_path=model_path, |
| | n_ctx=4096, |
| | n_threads=2, |
| | n_threads_batch=1, |
| | n_batch=256, |
| | n_gpu_layers=0, |
| | use_mlock=True, |
| | use_mmap=True, |
| | verbose=False, |
| | logits_all=True, |
| | draft_model=LlamaPromptLookupDecoding(num_pred_tokens=2), |
| | ) |
| | except Exception as e: |
| | return str(e) |
| |
|
| | def download_model(selected_model): |
| | """Download the model using Hugging Face Hub.""" |
| | with st.spinner(f"Downloading {selected_model['filename']}..."): |
| | hf_hub_download( |
| | repo_id=selected_model["repo_id"], |
| | filename=selected_model["filename"], |
| | local_dir="./models", |
| | local_dir_use_symlinks=False, |
| | ) |
| |
|
| | def validate_or_download_model(selected_model): |
| | """Ensure the model is available and loaded properly; download if necessary.""" |
| | model_path = os.path.join("models", selected_model["filename"]) |
| | os.makedirs("models", exist_ok=True) |
| | if not os.path.exists(model_path): |
| | if shutil.disk_usage(".").free < REQUIRED_SPACE_BYTES: |
| | st.info("Insufficient storage space. Consider cleaning up old models.") |
| | download_model(selected_model) |
| | result = try_load_model(model_path) |
| | if isinstance(result, str): |
| | st.warning(f"Initial model load failed: {result}\nAttempting re-download...") |
| | try: |
| | os.remove(model_path) |
| | except Exception: |
| | pass |
| | download_model(selected_model) |
| | result = try_load_model(model_path) |
| | if isinstance(result, str): |
| | st.error(f"Model failed to load after re-download: {result}") |
| | st.stop() |
| | return result |
| |
|
| | def stream_response(llm, messages, max_tokens, temperature, top_k, top_p, repeat_penalty, response_queue): |
| | """Stream the model response token-by-token.""" |
| | final_text = "" |
| | try: |
| | stream = llm.create_chat_completion( |
| | messages=messages, |
| | max_tokens=max_tokens, |
| | temperature=temperature, |
| | top_k=top_k, |
| | top_p=top_p, |
| | repeat_penalty=repeat_penalty, |
| | stream=True, |
| | ) |
| | for chunk in stream: |
| | if "choices" in chunk: |
| | delta = chunk["choices"][0]["delta"].get("content", "") |
| | final_text += delta |
| | response_queue.put(delta) |
| | if chunk["choices"][0].get("finish_reason", ""): |
| | break |
| | except Exception as e: |
| | response_queue.put(f"\nError: {e}") |
| | response_queue.put(None) |
| |
|
| | |
| | |
| | |
| | with st.sidebar: |
| | st.header("⚙️ Settings") |
| |
|
| | |
| | selected_model_name = st.selectbox("Select Model", list(MODELS.keys()), |
| | help="Choose from the available model configurations.") |
| | system_prompt_base = st.text_area("System Prompt", |
| | value="You are a helpful assistant.", |
| | height=80, |
| | help="Define the base context for the AI's responses.") |
| | |
| | |
| | st.subheader("Generation Parameters") |
| | max_tokens = st.slider("Max Tokens", 64, 1024, 256, step=32, |
| | help="The maximum number of tokens the assistant can generate.") |
| | temperature = st.slider("Temperature", 0.1, 2.0, 0.7, |
| | help="Controls randomness. Lower values are more deterministic.") |
| | top_k = st.slider("Top-K", 1, 100, 40, |
| | help="Limits the token candidates to the top-k tokens.") |
| | top_p = st.slider("Top-P", 0.1, 1.0, 0.95, |
| | help="Nucleus sampling parameter; restricts to a cumulative probability.") |
| | repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1, |
| | help="Penalizes token repetition to improve output variety.") |
| |
|
| | |
| | with st.expander("Web Search Settings"): |
| | enable_search = st.checkbox("Enable Web Search", value=False, |
| | help="Include recent web search context to augment the prompt.") |
| | max_results = st.number_input("Max Results for Context", min_value=1, max_value=20, value=6, step=1, |
| | help="How many search results to use.") |
| | max_chars_per_result = st.number_input("Max Chars per Result", min_value=100, max_value=2000, value=600, step=50, |
| | help="Max characters to extract from each search result.") |
| |
|
| | |
| | |
| | |
| | selected_model = MODELS[selected_model_name] |
| | if st.session_state.model_name != selected_model_name: |
| | with st.spinner("Loading selected model..."): |
| | if st.session_state.llm is not None: |
| | del st.session_state.llm |
| | gc.collect() |
| | st.session_state.llm = validate_or_download_model(selected_model) |
| | st.session_state.model_name = selected_model_name |
| |
|
| | llm = st.session_state.llm |
| |
|
| | |
| | |
| | |
| | st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)") |
| | st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}") |
| |
|
| | |
| | for chat in st.session_state.chat_history: |
| | role = chat["role"] |
| | content = chat["content"] |
| | if role == "assistant": |
| | st.markdown(f"<div class='chat-assistant'>{content}</div>", unsafe_allow_html=True) |
| | else: |
| | st.markdown(f"<div class='chat-user'>{content}</div>", unsafe_allow_html=True) |
| |
|
| | |
| | |
| | |
| | user_input = st.chat_input("Your message...") |
| | if user_input: |
| | if st.session_state.pending_response: |
| | st.warning("Please wait until the current response is finished.") |
| | else: |
| | |
| | timestamp = time.strftime("%H:%M") |
| | st.session_state.chat_history.append({"role": "user", "content": f"{user_input}\n\n<span class='message-time'>{timestamp}</span>"}) |
| | with st.chat_message("user"): |
| | st.markdown(f"<div class='chat-user'>{user_input}</div>", unsafe_allow_html=True) |
| | |
| | st.session_state.pending_response = True |
| | |
| | |
| | retrieved_context = "" |
| | if enable_search: |
| | retrieved_context = retrieve_context(user_input, max_results=max_results, max_chars_per_result=max_chars_per_result) |
| | with st.sidebar: |
| | st.markdown("### Retrieved Context") |
| | st.text_area("", value=retrieved_context or "No context found.", height=150) |
| |
|
| | |
| | if enable_search and retrieved_context: |
| | augmented_user_input = ( |
| | f"{system_prompt_base.strip()}\n\n" |
| | f"Use the following recent web search context to help answer the query:\n\n" |
| | f"{retrieved_context}\n\n" |
| | f"User Query: {user_input}" |
| | ) |
| | else: |
| | augmented_user_input = f"{system_prompt_base.strip()}\n\nUser Query: {user_input}" |
| |
|
| | |
| | MAX_TURNS = 2 |
| | trimmed_history = st.session_state.chat_history[-(MAX_TURNS * 2):] |
| | if trimmed_history and trimmed_history[-1]["role"] == "user": |
| | messages = trimmed_history[:-1] + [{"role": "user", "content": augmented_user_input}] |
| | else: |
| | messages = trimmed_history + [{"role": "user", "content": augmented_user_input}] |
| |
|
| | |
| | visible_placeholder = st.empty() |
| | progress_bar = st.progress(0) |
| | response_queue = queue.Queue() |
| |
|
| | |
| | stream_thread = threading.Thread( |
| | target=stream_response, |
| | args=(llm, messages, max_tokens, temperature, top_k, top_p, repeat_penalty, response_queue), |
| | daemon=True |
| | ) |
| | stream_thread.start() |
| |
|
| | |
| | final_response = "" |
| | timeout = 300 |
| | start_time = time.time() |
| | progress = 0 |
| | while True: |
| | try: |
| | update = response_queue.get(timeout=0.1) |
| | if update is None: |
| | break |
| | final_response += update |
| | |
| | visible_response = re.sub(r"<think>.*?</think>", "", final_response, flags=re.DOTALL) |
| | visible_placeholder.markdown(f"<div class='chat-assistant'>{visible_response}</div>", unsafe_allow_html=True) |
| | progress = min(progress + 1, 100) |
| | progress_bar.progress(progress) |
| | start_time = time.time() |
| | except queue.Empty: |
| | if time.time() - start_time > timeout: |
| | st.error("Response generation timed out.") |
| | break |
| |
|
| | |
| | timestamp = time.strftime("%H:%M") |
| | st.session_state.chat_history.append({"role": "assistant", "content": f"{final_response}\n\n<span class='message-time'>{timestamp}</span>"}) |
| | st.session_state.pending_response = False |
| | progress_bar.empty() |
| | gc.collect() |
| |
|