import os
import tempfile
import gc
import logging
import streamlit as st
from groq import Groq, APIError
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import Chroma
import torch
# ---------------- CONFIGURATION ----------------
logging.basicConfig(level=logging.INFO)
# Load API key from Hugging Face secrets
GROQ_API_KEY = st.secrets.get("GROQ_API_KEY", os.environ.get("GROQ_API_KEY"))
GROQ_MODEL = "openai/gpt-oss-120b"
# Initialize Groq client
client = None
if GROQ_API_KEY:
try:
client = Groq(api_key=GROQ_API_KEY)
logging.info("✅ Groq client initialized successfully.")
except Exception as e:
st.error(f"❌ Failed to initialize Groq client: {e}")
client = None
else:
st.warning("⚠️ GROQ_API_KEY not found. Please add it to Hugging Face secrets.")
# ---------------- STREAMLIT UI SETUP ----------------
st.set_page_config(
page_title="PDF Assistant",
page_icon="📘",
layout="wide",
initial_sidebar_state="expanded"
)
# ---------------- CSS ----------------
st.markdown("""
""", unsafe_allow_html=True)
# ---------------- FIXED HEADER ----------------
st.markdown("""
""", unsafe_allow_html=True)
# ---------------- SESSION STATE ----------------
if "chat" not in st.session_state:
st.session_state.chat = []
if "vectorstore" not in st.session_state:
st.session_state.vectorstore = None
if "retriever" not in st.session_state:
st.session_state.retriever = None
if "uploaded_file_name" not in st.session_state:
st.session_state.uploaded_file_name = None
if "uploader_key" not in st.session_state:
st.session_state.uploader_key = 0
# ---------------- FUNCTIONS ----------------
def clear_chat_history():
st.session_state.chat = []
def clear_memory():
st.session_state.vectorstore = None
st.session_state.retriever = None
st.session_state.uploaded_file_name = None
st.session_state.uploader_key += 1
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def process_pdf(uploaded_file):
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(uploaded_file.getvalue())
path = tmp.name
loader = PyPDFLoader(path)
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=60)
chunks = splitter.split_documents(docs)
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True}
)
vectorstore = Chroma.from_documents(chunks, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
st.session_state.vectorstore = vectorstore
st.session_state.retriever = retriever
if os.path.exists(path):
os.unlink(path)
return len(chunks)
except Exception as e:
st.error(f"Error processing PDF: {str(e)}")
return None
def ask_question(question):
if not client:
return None, 0, "Groq client is not initialized."
if not st.session_state.retriever:
return None, 0, "Upload PDF first."
try:
docs = st.session_state.retriever.invoke(question)
context = "\n\n".join(d.page_content for d in docs)
prompt = f"""You are a strict RAG Q&A assistant.
Use below CONTEXT to answer the below mentioned QUESTION
If the answer is not found, reply: "I cannot find this in the PDF."
CONTEXT = {context}
QUESTION = {question}
Answer on your behalf, write answer in a presentable manner (proper formatting) like point-wise with numbering or bullet points accordingly!"""
response = client.chat.completions.create(
model=GROQ_MODEL,
messages=[
{"role": "system", "content": "Use only the PDF content."},
{"role": "user", "content": prompt}
],
temperature=0.1
)
return response.choices[0].message.content.strip(), len(docs), None
except Exception as e:
return None, 0, f"Error: {str(e)}"
# ---------------- SIDEBAR ----------------
with st.sidebar:
st.write("")
if st.button("🗑️ Clear Chat History", use_container_width=True):
clear_chat_history()
if st.button("🔥 Clear PDF Memory", on_click=clear_memory, use_container_width=True):
st.success("Memory Cleared!")
st.markdown("---")
upload_label = "✅ PDF Uploaded!" if st.session_state.uploaded_file_name else "Upload PDF"
uploaded = st.file_uploader(
upload_label, type=["pdf"], key=st.session_state.uploader_key, label_visibility="collapsed"
)
if uploaded:
if uploaded.name != st.session_state.uploaded_file_name:
st.session_state.uploaded_file_name = None
st.session_state.chat = []
with st.spinner(f"Processing '{uploaded.name}'..."):
chunks = process_pdf(uploaded)
if chunks:
st.session_state.uploaded_file_name = uploaded.name
st.success("✅ PDF Processed!")
else:
st.error("❌ Failed.")
else:
st.success(f"✅ **Active:** `{uploaded.name}`")
else:
st.warning("⬆️ Upload a PDF to start chatting!")
# ---------------- INPUT AREA ----------------
disabled_input = st.session_state.uploaded_file_name is None or client is None
# Input Form
with st.form(key='chat_form', clear_on_submit=True):
col_input, col_btn = st.columns([0.85, 0.15], gap="small")
with col_input:
user_question = st.text_input(
"Ask a question",
placeholder="Ask a question about the loaded PDF...",
label_visibility="collapsed",
disabled=disabled_input
)
with col_btn:
submit_btn = st.form_submit_button("➤", disabled=disabled_input, use_container_width=True)
if submit_btn and user_question:
st.session_state.chat.append(("user", user_question))
with st.spinner("Thinking..."):
answer, sources, error = ask_question(user_question)
if answer:
bot_msg = answer
st.session_state.chat.append(("bot", bot_msg))
else:
st.session_state.chat.append(("bot", f"🔴 **Error:** {error}"))
st.rerun()
# ---------------- CHAT HISTORY (REVERSED) ----------------
if st.session_state.chat:
st.markdown("---")
for role, msg in reversed(st.session_state.chat):
if role == "user":
st.markdown(f"{msg}
", unsafe_allow_html=True)
else:
st.markdown(f"{msg}
", unsafe_allow_html=True)