# app.py """ Unified Online/Offline AI Quiz Game with Friends, Chat, Presence, Invites. - Offline: uses local JSON files under ./data/ - Online: uses Firebase Realtime Database when configured (optional) - Put your Firebase service account JSON next to this file and name it serviceAccountKey.json - FIREBASE_DB_URL is set to your Firebase project's Realtime DB (from your screenshot) """ import requests import os import uuid import time import json from groq import Groq from datetime import datetime, timedelta import streamlit as st import pandas as pd import random from streamlit.components.v1 import html import plotly.express as px # Try to import firebase-admin (optional). If unavailable, app will run Offline. try: import firebase_admin from firebase_admin import credentials, db FIREBASE_AVAILABLE = True except Exception: FIREBASE_AVAILABLE = False # ---------------- Page config ---------------- st.set_page_config(page_title="AI Quiz Game — Online/Offline", layout="wide") # ---------------- Configuration ---------------- DATA_DIR = "data" os.makedirs(DATA_DIR, exist_ok=True) # Local filenames GAMES_FILE = os.path.join(DATA_DIR, "games.json") PLAYERS_FILE = os.path.join(DATA_DIR, "players.json") MESSAGES_FILE = os.path.join(DATA_DIR, "messages.json") SESSIONS_FILE = os.path.join(DATA_DIR, "sessions.json") LEADERBOARD_FILE = os.path.join(DATA_DIR, "leaderboard.csv") FRIENDS_FILE = os.path.join(DATA_DIR, "friends.json") INBOX_FILE = os.path.join(DATA_DIR, "inbox.json") # friend requests & invitations # Firebase defaults (you provided project) FIREBASE_CREDENTIALS = os.getenv("FIREBASE_CREDENTIALS", "serviceAccountKey.json") FIREBASE_DB_URL = os.getenv("FIREBASE_DB_URL", "https://real-time-database-fe632-default-rtdb.firebaseio.com/") # Heartbeat threshold (seconds) to consider a session active HEARTBEAT_THRESHOLD_SECONDS = 40 # ---------------- Example Questions DB (expand as needed) ---------------- questions_db = { "Geography": [ ("What is the capital of France?", ["Paris", "London", "Berlin", "Madrid"], "Paris"), ("Largest country by area?", ["Canada", "USA", "Russia", "China"], "Russia"), ("River through Egypt?", ["Nile", "Amazon", "Ganges", "Yangtze"], "Nile"), ("Mount Everest is in which country?", ["Nepal", "India", "China", "Bhutan"], "Nepal"), ("Which ocean is the largest?", ["Atlantic", "Pacific", "Indian", "Arctic"], "Pacific"), ("Which country has the most population?", ["China", "India", "USA", "Indonesia"], "China"), ("Capital of Japan?", ["Tokyo", "Kyoto", "Osaka", "Hiroshima"], "Tokyo"), ("Longest river in the world?", ["Nile", "Amazon", "Yangtze", "Mississippi"], "Nile"), ("Which desert is the largest?", ["Sahara", "Gobi", "Kalahari", "Arctic"], "Sahara"), ("Capital of Australia?", ["Sydney", "Melbourne", "Canberra", "Brisbane"], "Canberra") ], "Math": [ ("5 * 12?", ["50", "60", "55", "70"], "60"), ("sqrt(64)?", ["6","7","8","9"], "8"), ("What is 15 + 25?", ["35","40","45","50"], "40"), ("100 ÷ 4?", ["20", "25", "30", "24"], "25"), ("If x+5=12, x=?", ["5","6","7","8"], "7"), ("Area of a circle with radius 7?", ["154", "144", "160", "150"], "154"), ("7^2 = ?", ["49","42","56","36"], "49"), ("10% of 200?", ["10","20","15","25"], "20"), ("Solve: 3x = 15, x = ?", ["4","5","6","7"], "5"), ("What is 9 * 8?", ["72","81","64","69"], "72") ], "Science": [ ("H2O is?", ["Water","CO2","O2","H2"], "Water"), ("Who developed relativity?", ["Newton","Einstein","Tesla","Curie"], "Einstein"), ("Sun is a?", ["Star","Planet","Moon","Asteroid"], "Star"), ("Light speed is approximately?", ["3x10^8 m/s","3x10^6 m/s","3x10^5 km/s","3x10^7 km/s"], "3x10^8 m/s"), ("Which gas do plants absorb?", ["Oxygen","CO2","Nitrogen","Helium"], "CO2"), ("The human brain weighs about?", ["1kg","1.4kg","2kg","2.5kg"], "1.4kg"), ("Chemical symbol for Gold?", ["Au","Ag","Go","Gd"], "Au"), ("Which planet is called Red Planet?", ["Mars","Venus","Jupiter","Mercury"], "Mars"), ("Which part of the plant conducts photosynthesis?", ["Root","Stem","Leaf","Flower"], "Leaf"), ("What is the boiling point of water?", ["90°C","100°C","120°C","80°C"], "100°C") ], "IPL": [ ("2020 IPL winner?", ["Mumbai Indians","Delhi Capitals","RCB","CSK"], "Mumbai Indians"), ("Which team is called Yellow Army?", ["CSK","MI","DC","SRH"], "CSK"), ("Who won the Orange Cap in 2021 IPL?", ["KL Rahul","Faf du Plessis","Ruturaj Gaikwad","Shikhar Dhawan"], "Ruturaj Gaikwad"), ("Which team has won the most IPL titles?", ["MI","CSK","RCB","KKR"], "MI"), ("Who is known as Mr. IPL?", ["MS Dhoni","Rohit Sharma","Virat Kohli","AB de Villiers"], "MS Dhoni"), ("First IPL season was in?", ["2007","2008","2009","2010"], "2008"), ("Which team represents Bangalore?", ["RCB","MI","KKR","SRH"], "RCB"), ("Purple Cap is for?", ["Highest scorer","Best bowler","Best fielder","Best captain"], "Best bowler"), ("Which team plays at Eden Gardens?", ["KKR","CSK","MI","RCB"], "KKR"), ("Who scored fastest 50 in IPL?", ["KL Rahul","Chris Gayle","Andre Russell","AB de Villiers"], "Chris Gayle") ], "History": [ ("Who was the first President of the USA?", ["George Washington","Abraham Lincoln","Thomas Jefferson","John Adams"], "George Washington"), ("In which year did India gain independence?", ["1945","1947","1950","1952"], "1947"), ("The Great Wall is in which country?", ["China","Japan","Korea","Mongolia"], "China"), ("Who discovered America?", ["Columbus","Magellan","Vasco da Gama","Cook"], "Columbus"), ("French Revolution started in?", ["1789","1776","1800","1799"], "1789"), ("First man on the moon?", ["Neil Armstrong","Buzz Aldrin","Yuri Gagarin","John Glenn"], "Neil Armstrong"), ("Who invented the printing press?", ["Gutenberg","Edison","Tesla","Newton"], "Gutenberg"), ("World War II ended in?", ["1943","1945","1947","1950"], "1945"), ("Who was known as Iron Man of India?", ["Sardar Patel","Nehru","Gandhi","Tilak"], "Sardar Patel"), ("Which empire built the Colosseum?", ["Roman Empire","Greek Empire","Egyptian Empire","Persian Empire"], "Roman Empire") ], "Technology": [ ("Who is the founder of Microsoft?", ["Steve Jobs","Bill Gates","Elon Musk","Mark Zuckerberg"], "Bill Gates"), ("HTML stands for?", ["Hyper Text Markup Language","High Text Markup Language","Hyperlinks Text Mark Language","None"], "Hyper Text Markup Language"), ("Python is a type of?", ["Snake","Programming Language","Car","Game"], "Programming Language"), ("CPU stands for?", ["Central Process Unit","Central Processing Unit","Control Processing Unit","Computer Processing Unit"], "Central Processing Unit"), ("Java is a?", ["Programming Language","Coffee","Operating System","Browser"], "Programming Language"), ("WWW stands for?", ["World Wide Web","Wide World Web","Web World Wide","None"], "World Wide Web"), ("What is 1 Gigabyte in MB?", ["512MB","1024MB","2048MB","1000MB"], "1024MB"), ("Google was founded in?", ["1996","1998","2000","2002"], "1998"), ("First computer virus was?", ["Creeper","ILOVEYOU","Michelangelo","Morris"], "Creeper"), ("Linux OS was developed by?", ["Linus Torvalds","Bill Gates","Steve Jobs","Tim Berners-Lee"], "Linus Torvalds") ], "Sports": [ ("How many players in a football team?", ["9","10","11","12"], "11"), ("Olympics are held every?", ["2 years","4 years","3 years","5 years"], "4 years"), ("Tennis player known as ‘Federer’?", ["Roger Federer","Rafael Nadal","Novak Djokovic","Andy Murray"], "Roger Federer"), ("Cricket World Cup held every?", ["2","4","5","3"], "4"), ("Which country won the first football World Cup?", ["Brazil","Uruguay","Germany","Italy"], "Uruguay"), ("Formula 1 world champion 2020?", ["Hamilton","Verstappen","Vettel","Leclerc"], "Hamilton"), ("Number of players in a basketball team?", ["5","6","7","8"], "5"), ("Which country hosts Wimbledon?", ["USA","UK","France","Australia"], "UK"), ("Who holds most Olympic golds?", ["Michael Phelps","Usain Bolt","Carl Lewis","Mark Spitz"], "Michael Phelps"), ("Which sport uses the term 'love'?", ["Tennis","Badminton","Squash","Golf"], "Tennis") ] } # ----------------- JSON helpers --------------- client = Groq(api_key=st.secrets["GROQ_API_KEY"].strip()) MODEL_NAME = "openai/gpt-oss-20b" def generate_ai_questions(topic, num_questions=5, gid=None, questions_db=None): """ Generate MCQs using Groq AI for a topic. Fallback to questions_db if AI fails or no data returned. Returns list of dicts: {"question":..., "options": [...], "answer":...} """ # Ensure fallback exists if questions_db is None: questions_db = {} prompt = f""" Generate {num_questions} high-quality MCQ questions for the topic "{topic}". STRICT FORMAT: [ {{ "topic": "{topic}", "question": "Question text", "options": ["Option 1", "Option 2", "Option 3", "Option 4"], "answer": "Correct option text" }} ] RULES: - ONLY valid JSON array. - No extra text. - No markdown. - No explanations. """ try: st.info(f"Generating AI questions for topic '{topic}'...") response = client.chat.completions.create( model=MODEL_NAME, messages=[{"role": "user", "content": prompt}], temperature=0.2, max_tokens=800, ) raw = response.choices[0].message.content.strip() # Remove code blocks if AI adds them if raw.startswith("```"): raw = raw.split("```")[1].replace("json", "").strip() data = json.loads(raw) # Optional: save Excel if gid provided if gid: os.makedirs("ai_questions_excel", exist_ok=True) rows = [] for q in data: rows.append({ "topic": q.get("topic", topic), "question": q.get("question", ""), "option_1": q.get("options", [""]*4)[0], "option_2": q.get("options", [""]*4)[1], "option_3": q.get("options", [""]*4)[2], "option_4": q.get("options", [""]*4)[3], "answer": q.get("answer", "") }) df = pd.DataFrame(rows) df.to_excel(f"ai_questions_excel/{gid}_questions.xlsx", index=False) return data[:num_questions] except Exception as e: st.warning(f"AI generation failed for topic '{topic}': {e}") st.info("Using fallback questions from questions_db if available.") # Fallback to static questions_db fallback = questions_db.get(topic.lower(), questions_db.get("default", [])) return fallback[:num_questions] def load_json(path, default): if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default return default def save_json(path, data): with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) # Ensure base files exist (local) base_defaults = { GAMES_FILE: {}, PLAYERS_FILE: {}, MESSAGES_FILE: {}, SESSIONS_FILE: {}, FRIENDS_FILE: {}, INBOX_FILE: {} } for p, d in base_defaults.items(): if not os.path.exists(p): save_json(p, d) if not os.path.exists(LEADERBOARD_FILE): pd.DataFrame(columns=['name','score','game_id','topics','timestamp','avatar','questions','answers','correct_flags']).to_csv(LEADERBOARD_FILE, index=False) # ----------------- Firebase init ----------------- def init_firebase_if_needed(): """Initialize Firebase Admin if available and credentials present. Return (ok,msg).""" global FIREBASE_AVAILABLE if not FIREBASE_AVAILABLE: return False, "firebase-admin not installed" if not FIREBASE_DB_URL: return False, "FIREBASE_DB_URL not set" if not os.path.exists(FIREBASE_CREDENTIALS): return False, f"Service account file not found at {FIREBASE_CREDENTIALS}" try: if not firebase_admin._apps: cred = credentials.Certificate(FIREBASE_CREDENTIALS) firebase_admin.initialize_app(cred, {"databaseURL": FIREBASE_DB_URL}) return True, "Firebase initialized" except Exception as e: return False, f"Firebase init error: {e}" # Firebase helpers def fb_get(path): try: ref = db.reference(path) return ref.get() except Exception: return None def fb_set(path, value): ref = db.reference(path) ref.set(value) def fb_push(path, value): ref = db.reference(path).push() ref.set(value) return ref.key # ----------------- Unified DB API (Online or Offline) ----------------- def local_get(collection): if collection == "games": return load_json(GAMES_FILE, {}) if collection == "players": return load_json(PLAYERS_FILE, {}) if collection == "messages": return load_json(MESSAGES_FILE, {}) if collection == "sessions": return load_json(SESSIONS_FILE, {}) if collection == "friends": return load_json(FRIENDS_FILE, {}) if collection == "inbox": return load_json(INBOX_FILE, {}) if collection == "leaderboard": try: df = pd.read_csv(LEADERBOARD_FILE) return df.to_dict(orient="records") except Exception: return [] return {} def local_set(collection, value): if collection == "games": save_json(GAMES_FILE, value) elif collection == "players": save_json(PLAYERS_FILE, value) elif collection == "messages": save_json(MESSAGES_FILE, value) elif collection == "sessions": save_json(SESSIONS_FILE, value) elif collection == "friends": save_json(FRIENDS_FILE, value) elif collection == "inbox": save_json(INBOX_FILE, value) elif collection == "leaderboard": try: df = pd.DataFrame(value) df.to_csv(LEADERBOARD_FILE, index=False) except Exception: pass def unified_get(collection): mode = st.session_state.get("mode_selection", "Offline") if mode == "Online": ok, msg = init_firebase_if_needed() if not ok: return local_get(collection) # map collection to firebase path if collection == "games": return fb_get("/games") or {} if collection == "players": return fb_get("/players") or {} if collection == "messages": return fb_get("/messages") or {} if collection == "sessions": return fb_get("/active_sessions") or {} if collection == "friends": return fb_get("/friends") or {} if collection == "inbox": return fb_get("/inbox") or {} if collection == "leaderboard": raw = fb_get("/leaderboard") or {} if isinstance(raw, dict): return list(raw.values()) return raw else: return local_get(collection) def unified_set(collection, data): mode = st.session_state.get("mode_selection", "Offline") if mode == "Online": ok, msg = init_firebase_if_needed() if not ok: local_set(collection, data) return if collection == "games": fb_set("/games", data); return if collection == "players": fb_set("/players", data); return if collection == "messages": fb_set("/messages", data); return if collection == "sessions": fb_set("/active_sessions", data); return if collection == "friends": fb_set("/friends", data); return if collection == "inbox": fb_set("/inbox", data); return if collection == "leaderboard": local_set(collection, data); return else: local_set(collection, data) def unified_push_message(game_id, msg_obj): mode = st.session_state.get("mode_selection", "Offline") if mode == "Online": ok, _ = init_firebase_if_needed() if ok: fb_push(f"/messages/{game_id}", msg_obj) return all_msgs = unified_get("messages") or {} game_msgs = all_msgs.get(game_id, []) game_msgs.append(msg_obj) if len(game_msgs) > 500: game_msgs = game_msgs[-500:] all_msgs[game_id] = game_msgs unified_set("messages", all_msgs) def unified_push_leaderboard(row): mode = st.session_state.get("mode_selection", "Offline") if mode == "Online": ok, _ = init_firebase_if_needed() if ok: fb_push("/leaderboard", row) # also save local backup try: df = pd.read_csv(LEADERBOARD_FILE) except Exception: df = pd.DataFrame(columns=list(row.keys())) df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) df.to_csv(LEADERBOARD_FILE, index=False) return # offline append CSV try: df = pd.read_csv(LEADERBOARD_FILE) except Exception: df = pd.DataFrame(columns=list(row.keys())) df = pd.concat([df, pd.DataFrame([row])], ignore_index=True) df.to_csv(LEADERBOARD_FILE, index=False) def get_weekly_leaderboard(limit=10): mode = st.session_state.get("mode_selection", "Offline") rows = [] if mode == "Online": ok, _ = init_firebase_if_needed() if ok: rows = fb_get("/leaderboard") or [] else: try: rows = pd.read_csv(LEADERBOARD_FILE).to_dict("records") except Exception: rows = [] if not rows: return [] # Start of current week (Monday) now = datetime.utcnow() start_of_week = now - timedelta(days=now.weekday()) weekly = [] for r in rows: ts = r.get("timestamp") if not ts: continue try: played = datetime.fromisoformat(ts) except Exception: continue if played >= start_of_week: weekly.append(r) weekly.sort(key=lambda x: x.get("score", 0), reverse=True) return weekly[:limit] # ----------------- Session & Presence helpers ----------------- def now_iso(): return datetime.utcnow().isoformat() def parse_iso(s): try: return datetime.fromisoformat(s) except Exception: return None def ensure_session_ids(): if "uid" not in st.session_state: st.session_state['uid'] = str(uuid.uuid4()) if "session_id" not in st.session_state: st.session_state['session_id'] = str(uuid.uuid4()) ensure_session_ids() def claim_session_unified(game_id, username): sessions = unified_get("sessions") or {} game_sessions = sessions.get(game_id, {}) rec = game_sessions.get(username) now = now_iso() if rec is None: game_sessions[username] = {"session_id": st.session_state['session_id'], "last_heartbeat": now} sessions[game_id] = game_sessions unified_set("sessions", sessions) return True, "" last = parse_iso(rec.get("last_heartbeat")) if rec.get("session_id") == st.session_state['session_id']: rec["last_heartbeat"] = now game_sessions[username] = rec sessions[game_id] = game_sessions unified_set("sessions", sessions) return True, "" if last and (datetime.utcnow() - last) < timedelta(seconds=HEARTBEAT_THRESHOLD_SECONDS): return False, "You are active in another tab/device. Return to that tab or wait." # override stale game_sessions[username] = {"session_id": st.session_state['session_id'], "last_heartbeat": now} sessions[game_id] = game_sessions unified_set("sessions", sessions) return True, "" def heartbeat_unified(game_id, username): sessions = unified_get("sessions") or {} game_sessions = sessions.get(game_id, {}) rec = game_sessions.get(username, {}) rec["session_id"] = st.session_state['session_id'] rec["last_heartbeat"] = now_iso() game_sessions[username] = rec sessions[game_id] = game_sessions unified_set("sessions", sessions) # ----------------- Friends & Inbox helpers ----------------- def get_friends_map(): return unified_get("friends") or {} def save_friends_map(m): unified_set("friends", m) def get_inbox(): return unified_get("inbox") or {} def save_inbox(i): unified_set("inbox", i) def send_friend_request(from_user, to_user): inbox = get_inbox() user_inbox = inbox.get(to_user, []) user_inbox.append({"type":"friend_request","from":from_user,"ts":now_iso()}) inbox[to_user] = user_inbox save_inbox(inbox) def accept_friend_request(current_user, from_user): friends = get_friends_map() friends.setdefault(current_user, []) friends.setdefault(from_user, []) if from_user not in friends[current_user]: friends[current_user].append(from_user) if current_user not in friends[from_user]: friends[from_user].append(current_user) save_friends_map(friends) # remove request inbox = get_inbox() entries = inbox.get(current_user, []) entries = [e for e in entries if not (e.get('type')=='friend_request' and e.get('from')==from_user)] inbox[current_user] = entries save_inbox(inbox) def send_game_invite(from_user, to_user, game_id): inbox = get_inbox() user_inbox = inbox.get(to_user, []) user_inbox.append({"type":"invite","from":from_user,"game_id":game_id,"ts":now_iso()}) inbox[to_user] = user_inbox save_inbox(inbox) # ----------------- Game helpers ----------------- def compute_winners(game_id): rows = unified_get("leaderboard") or [] df = pd.DataFrame(rows) if df.empty: return [] df = df[df["game_id"] == game_id] if df.empty: return [] df = df.sort_values( by=["score", "timestamp"], ascending=[False, True] ) return df.head(3).to_dict(orient="records") #------Total players-------------------- def get_total_online_players(): players = unified_get("players") or {} total = 0 for gid in players: total += len(players[gid]) return total # ----------------- Create Game Page ----------------- def create_game_page(): st.header("Create Game") host = st.text_input( "Host name", value=st.session_state.get("username", "") ) # 🔹 Normal topics (static) topics = st.multiselect( "Topics", list(questions_db.keys()) ) # 🔥 NEW: AI topic input ai_topic = st.text_input( "AI Topic (optional)", placeholder="Eg: IPL 2024, Space, Python" ) num_questions = st.number_input("Number of Questions", min_value=1, max_value=20, value=5) auto_close = st.checkbox("Auto-close game after submission?", value=True) if st.button("Create Game"): if not host: st.warning("Please enter your name.") return if not topics and not ai_topic: st.warning("Please select at least one topic or enter an AI topic.") return # Pass AI topic to create_game gid = create_game(topics=topics, num_questions=num_questions, auto_close=auto_close, ai_topic=ai_topic) st.success(f"Game created! Game ID: {gid}") st.session_state['game_id'] = gid st.session_state['username'] = host # Optional: generate AI questions immediately if AI topic is provided if ai_topic: st.info(f"AI questions will be generated for: {ai_topic}") # Here you can call your AI question generator function if you have one # 1️⃣ Define join_game first def join_game(game_id, username, avatar): games = unified_get("games") or {} if game_id not in games: return False, "Invalid Game ID" if games[game_id].get("closed"): return False, "Game is closed" players = unified_get("players") or {} if game_id not in players: players[game_id] = {} game_players = players[game_id] if username in game_players: game_players[username]['avatar'] = avatar game_players[username]['last_joined'] = now_iso() else: game_players[username] = {"avatar": avatar, "joined_at": now_iso(), "submitted": False} players[game_id] = game_players unified_set("players", players) ok, msg = claim_session_unified(game_id, username) if ok: players[game_id][username]['last_heartbeat'] = now_iso() unified_set("players", players) return ok, msg def compute_score(questions, answers, times=None): """ Compute the total score and correctness flags for each question. Args: questions (list): List of question dicts. Each dict must have 'answer' key. answers (list): List of user's answers corresponding to questions. times (list, optional): Time taken for each question (in seconds). Returns: score (int): Total score. flags (list): List of correctness flags: True if correct, False if incorrect. """ score_per_question = 15 score = 0 flags = [] for idx, q in enumerate(questions): correct_ans = q.get("answer") user_ans = answers[idx] if idx < len(answers) else None if user_ans == correct_ans: score += score_per_question flags.append(True) else: flags.append(False) # Optional: implement time-based penalties if needed # if times: # if times[idx] > 15: # example: 15 sec limit # score -= 5 # penalty return score, flags # ----------------- UI ----------------- st.sidebar.title("Mode & Profile") mode_choice = st.sidebar.selectbox("Mode", ["Offline (local JSON)", "Online (Firebase)"], index=0) st.session_state['mode_selection'] = "Online" if mode_choice.startswith("Online") else "Offline" # If user chose Online, attempt to init and show feedback if st.session_state['mode_selection'] == "Online": ok, msg = init_firebase_if_needed() if not ok: st.sidebar.error(f"Online init failed: {msg}. Working Offline.") st.session_state['mode_selection'] = "Offline" else: st.sidebar.success("Online (Firebase) ready.") # Sidebar inputs st.sidebar.markdown("### You") username = st.sidebar.text_input("Your name", value=st.session_state.get("username",""), key="sidebar_username") avatar = st.sidebar.selectbox("Avatar", ["🎮","🐱","🐶","🦄","👽","🎩"], index=0, key="sidebar_avatar") # Save to session_state st.session_state['username'] = username or st.session_state.get("username","") st.session_state['avatar'] = avatar or st.session_state.get("avatar","🎮") if st.sidebar.button("Refresh"): st.rerun() page = st.sidebar.selectbox("Page", ["Home","Create Game","Join Game","Play","Friends","Inbox","Leaderboard"], index=0) st.title("AI Quiz Game — Online/Offline (Friends & Chat)") def render_copy_button(val, key): copy_html = f'''
''' html(copy_html) def get_top3_and_player_count(game_id): rows = unified_get("leaderboard") or [] if not rows: return [], 0 df = pd.DataFrame(rows) df = df[df["game_id"] == game_id] if df.empty: return [], 0 df = df.sort_values( by=["score", "timestamp"], ascending=[False, True] # stable winner ) top3 = df.head(3).to_dict(orient="records") total_players = df["name"].nunique() return top3, total_players # Home page HEARTBEAT_THRESHOLD_SECONDS = 60 # adjust if needed def home_page(): st.header("Home") st.write("Create games, invite friends, play and climb the leaderboard.") # ---------------- TOTAL ONLINE PLAYERS ---------------- players_map = unified_get("players") or {} total_online = sum(len(v) for v in players_map.values()) st.metric("🟢 Players Online", total_online) # Show last score if st.session_state.get('last_score') is not None: st.success( f"Your last score: {st.session_state['last_score']} " f"(Game {st.session_state.get('last_game')})" ) # Load games games = unified_get("games") or {} st.subheader("Recent games") for g in sorted(games.values(), key=lambda x: x.get("created_at", ""), reverse=True)[:10]: gid = g.get("game_id") players_here = players_map.get(gid, {}) or {} st.markdown(f"### 🎮 Game: **{gid}** {'(Closed)' if g.get('closed') else ''}") st.write(f"Host: {g.get('host')} — Topics: {', '.join(g.get('topics', []))}") st.write(f"Created: {g.get('created_at')}") # ---------------- GAME STATS ---------------- joined = len(players_here) submitted = sum(1 for p in players_here.values() if p.get("submitted")) st.write(f"Players joined: **{joined}**") st.write(f"Submitted: **{submitted} / {joined}**") # ---------------- TOP 3 PLAYERS ---------------- submitted_players = [ (u, d) for u, d in players_here.items() if d.get("submitted") ] submitted_players.sort(key=lambda x: x[1].get("score", 0), reverse=True) if submitted_players: st.markdown("🏆 **Top 3 Players**") for i, (uname_p, info) in enumerate(submitted_players[:3], start=1): st.write( f"{i}. {info.get('avatar','🎮')} **{uname_p}** — " f"{info.get('score',0)} pts ({info.get('percentage',0)}%)" ) # ---------------- PLAYER STATUS ---------------- if players_here: st.markdown("**👥 Player Status**") for uname_p, info in players_here.items(): status = "✅ Submitted" if info.get("submitted") else "⏳ Playing" st.write(f"{info.get('avatar','🎮')} **{uname_p}** — {status}") st.markdown("---") # ---------------- INVITE & CHALLENGE ---------------- if not g.get("closed"): st.info(f"Share this Game ID: {gid}") render_copy_button(gid, gid) if st.session_state.get("username"): if st.button(f"Invite your friends to {gid}", key=f"invite_{gid}"): friends = get_friends_map().get(st.session_state["username"], []) if not friends: st.warning("No friends to invite.") else: for f in friends: send_game_invite(st.session_state["username"], f, gid) st.success("Invites sent to friends.") if st.button(f"Challenge friends with a new game like {gid}", key=f"challenge_{gid}"): new_id = create_game( st.session_state.get("username", "Host"), g.get("topics", []), num_questions=len(g.get("questions", [])) ) st.session_state["active_game_id"] = new_id st.session_state["game_questions"] = games.get(new_id, {}).get("questions", []) st.success(f"Challenge created: {new_id}") st.rerun() # Weekly leaderboard st.subheader("🏆 Weekly Leaderboard (Top 10)") weekly = get_weekly_leaderboard() if not weekly: st.info("No scores yet this week.") else: for i, r in enumerate(weekly, 1): st.write( f"{i}. {r.get('avatar','🎮')} **{r['name']}** " f"(Game {r['game_id']}) — {r['score']} pts" ) # ---------------------------- # Play Page if 'active_game' not in st.session_state: games = unified_get("games") or {} if games: first_game = list(games.keys())[0] st.session_state['active_game'] = first_game else: st.warning("No games exist yet. Please create a game first.") if 'username' not in st.session_state: st.session_state['username'] = "Guest" # temporary default username def create_game(host=None, topics=[], num_questions=5, auto_close=True, ai_topic=None): games = unified_get("games") or {} gid = f"GAME{int(time.time())}" host = host or st.session_state.get("username", "Host") questions = [] # 1️⃣ AI questions (already dict format) if ai_topic: ai_questions = generate_ai_questions(ai_topic, num_questions=num_questions) if ai_questions: for q in ai_questions: questions.append({ "question": q.get("question", ""), "options": q.get("options", []), "answer": q.get("answer", "") }) # 2️⃣ Static fallback (tuple → dict conversion) if not questions: for topic in topics: qs = questions_db.get(topic, []) for q in qs[:num_questions]: questions.append({ "question": q[0], "options": q[1], "answer": q[2] }) # 🔹 Store game games[gid] = { "game_id": gid, "host": host, "topics": topics, "questions": questions, "created_at": now_iso(), "closed": False, "auto_close": auto_close } unified_set("games", games) # ✅ REQUIRED: prepare play state st.session_state['game_id'] = gid st.session_state['active_game_id'] = gid st.session_state['game_questions'] = questions st.session_state['current_index'] = 0 st.session_state['answers'] = [""] * len(questions) st.session_state['answer_times'] = [None] * len(questions) st.session_state['question_started_at'] = None st.success(f"Game created: {gid} with {len(questions)} questions.") return gid # ------------------------- # PLAY PAGE # ------------------------- def play_page(): import time import streamlit as st gid = st.session_state.get("active_game_id") uname = st.session_state.get("username") if not gid or not uname: st.error("No active game or username found. Please join or create a game first.") return # Get game and questions games = unified_get("games") or {} game = games.get(gid) if not game: st.error("Game not found.") return if game.get('closed'): st.warning("This game is closed.") return questions = game.get("questions", []) if not questions: st.info("No questions loaded for this game.") return # Initialize session state for answers and times if 'answers' not in st.session_state: st.session_state['answers'] = [""] * len(questions) if 'answer_times' not in st.session_state: st.session_state['answer_times'] = [0] * len(questions) if 'current_index' not in st.session_state: st.session_state['current_index'] = 0 if 'question_started_at' not in st.session_state: st.session_state['question_started_at'] = time.time() idx = st.session_state['current_index'] # All done if idx >= len(questions): st.success("All done — submit your answers!") return # ---------------- QUESTION UI ---------------- q = questions[idx] st.subheader(f"Question {idx + 1}/{len(questions)}") st.write(q["question"]) # Safe elapsed calculation start_time = st.session_state.get('question_started_at') or time.time() elapsed = int(time.time() - start_time) time_limit = 15 st.markdown(f"**Time left:** {max(0, time_limit - elapsed)} seconds") # Stable radio buttons choice = st.radio( "Choose an answer:", q["options"], key=f"choice_{gid}_{idx}" ) col1, col2 = st.columns(2) # ---------------- NEXT ---------------- with col1: if st.button("Next", key=f"next_{gid}_{idx}"): st.session_state['answers'][idx] = choice st.session_state['answer_times'][idx] = time.time() - start_time st.session_state['current_index'] = idx + 1 st.session_state['question_started_at'] = time.time() st.rerun() # ---------------- SUBMIT ---------------- with col2: if idx == len(questions) - 1: if st.button("Submit All Answers", key=f"submit_{gid}_{idx}"): # Record last question st.session_state['answers'][idx] = choice st.session_state['answer_times'][idx] = time.time() - start_time answers = st.session_state['answers'] times = st.session_state['answer_times'] # Compute score score, flags = compute_score(questions, answers, times) percentage = int(score / (len(questions) * 15) * 100) # ---------------- Update players dict ---------------- players = unified_get("players") or {} players.setdefault(gid, {}) players[gid][uname] = { "submitted": True, "score": score, "percentage": percentage, "answers": answers, "avatar": st.session_state.get("avatar", "🎮"), "timestamp": now_iso() } unified_set("players", players) # ---------------- Push leaderboard ---------------- row = { "name": uname, "avatar": st.session_state.get("avatar", "🎮"), "score": score, "percentage": percentage, "game_id": gid, "topics": game.get("topics", []), "timestamp": now_iso(), "questions": len(questions), "answers": answers, "correct_flags": flags } unified_push_leaderboard(row) # Auto-close game if needed if game.get('auto_close', True): games[gid]['closed'] = True games[gid]['closed_at'] = now_iso() unified_set("games", games) # Reset session state st.success(f"Submitted! Score: {score} / {len(questions)*15} ({percentage}%)") st.balloons() st.session_state['last_score'] = score st.session_state['last_game'] = gid st.session_state['current_index'] = 0 st.session_state['answers'] = [] st.session_state['answer_times'] = [] st.rerun() # Join game def join_game_page(): st.header("Join Game") game_id = st.text_input("Enter Game ID") username = st.text_input("Your Name") avatar = st.selectbox("Choose Avatar", ["🎮","🤖","🧩","🛡️"]) if st.button("Join Game"): if not game_id or not username: st.warning("Enter both Game ID and Username") return ok, msg = join_game(game_id, username, avatar) if ok: st.success(f"Joined game {game_id} successfully!") # Load game safely games = unified_get("games") or {} g = games.get(game_id, {}) # ✅ REQUIRED: set questions st.session_state['game_questions'] = g.get('questions', []) # ✅ REQUIRED: set BOTH ids st.session_state['game_id'] = game_id st.session_state['active_game_id'] = game_id # ✅ REQUIRED: reset play state (prevents stored answer bugs) st.session_state['current_index'] = 0 st.session_state['answers'] = [""] * len(st.session_state['game_questions']) st.session_state['answer_times'] = [None] * len(st.session_state['game_questions']) st.session_state['question_started_at'] = None st.session_state['username'] = username st.session_state['avatar'] = avatar st.rerun() else: st.error(msg) # Play page # ----------------- Create Game ----------------- # Friends page def friends_page(): st.header("Friends") user = st.session_state.get('username') if not user: st.info("Enter your name in the sidebar to use Friends.") return friends_map = get_friends_map() your_friends = friends_map.get(user, []) st.subheader("Your friends") if your_friends: for f in your_friends: sessions = unified_get("sessions") or {} status = "offline" for gid, users in (sessions or {}).items(): rec = users.get(f) if rec: last = parse_iso(rec.get('last_heartbeat')) if last and (datetime.utcnow() - last) < timedelta(seconds=HEARTBEAT_THRESHOLD_SECONDS): status = "online" break st.write(f"• {f} — **{status}**") if st.button(f"Invite {f} to a game", key=f"invitebtn_{f}"): gid = st.text_input(f"Enter game id to invite {f} (or leave blank to create)", key=f"inviteinput_{f}") if gid: send_game_invite(st.session_state['username'], f, gid) st.success(f"Invite sent to {f} for game {gid}") else: topics = list(questions_db.keys())[:1] new_id = create_game(st.session_state['username'], topics, num_questions=5) send_game_invite(st.session_state['username'], f, new_id) st.success(f"Invite sent to {f} for new game {new_id}") else: st.write("You have no friends yet.") st.markdown("---") st.subheader("Find / Add friends") all_users = set() players = unified_get("players") or {} for gid, users in (players or {}).items(): for u in (users or {}).keys(): all_users.add(u) all_users = all_users.union(set(get_friends_map().keys())) all_users.discard(user) candidate = st.text_input("Search user to add (exact name)", value="") if st.button("Send Friend Request"): if not candidate: st.error("Enter user name") else: send_friend_request(user, candidate) st.success("Friend request sent.") # Inbox page def inbox_page(): st.header("Inbox") user = st.session_state.get('username') if not user: st.info("Enter your name in the sidebar to view Inbox.") return inbox = get_inbox() items = inbox.get(user, []) if not items: st.write("No messages.") return for idx, item in enumerate(items[:50]): t = item.get('type') if t == "friend_request": fr = item.get('from') st.write(f"Friend request from **{fr}** at {item.get('ts')}") if st.button(f"Accept {idx}"): accept_friend_request(user, fr) st.success(f"You are now friends with {fr}") st.rerun() if st.button(f"Reject {idx}"): entries = [it for it in items if not (it.get('type')=='friend_request' and it.get('from')==fr)] inbox[user] = entries save_inbox(inbox) st.success("Rejected") st.rerun() elif t == "invite": fr = item.get('from'); gid = item.get('game_id') st.write(f"Invite from **{fr}** to join game **{gid}** at {item.get('ts')}") if st.button(f"Join Invite {idx}"): ok, msg = join_game(gid, user, st.session_state.get('avatar','🎮')) if ok: st.success(f"Joined game {gid}") items = [it for it in items if not (it.get('type')=='invite' and it.get('from')==fr and it.get('game_id')==gid)] inbox[user] = items save_inbox(inbox) st.session_state['game_id'] = gid st.session_state['username'] = user st.rerun() else: st.error(msg) # Leaderboard page # ----------------- Leaderboard Page ----------------- def leaderboard_page(): st.header("🏆 Leaderboard") # Get leaderboard data leaderboard = unified_get("leaderboard") or [] if not leaderboard: st.info("No scores yet.") return # Sort by score descending leaderboard = sorted(leaderboard, key=lambda x: x.get("score", 0), reverse=True) # Display top 10 st.subheader("Top Players") for i, row in enumerate(leaderboard[:10], 1): st.write( f"{i}. {row.get('avatar','🎮')} **{row.get('name','Guest')}** " f"(Game {row.get('game_id','')}) — {row.get('score',0)} pts — {row.get('percentage',0)}%" ) # Weekly leaderboard (optional) st.subheader("Weekly Leaderboard (Top 10)") week_leaderboard = [r for r in leaderboard if datetime.fromisoformat(r.get("timestamp", now_iso())).isocalendar()[1] == datetime.now().isocalendar()[1]] if not week_leaderboard: st.info("No scores yet this week.") else: week_leaderboard = sorted(week_leaderboard, key=lambda x: x.get("score",0), reverse=True) for i, r in enumerate(week_leaderboard[:10], 1): st.write( f"{i}. {r.get('avatar','🎮')} **{r.get('name','Guest')}** " f"(Game {r.get('game_id','')}) — {r.get('score',0)} pts" ) def get_weekly_leaderboard(limit=10): rows = unified_get("/leaderboard") or [] one_week_ago = datetime.utcnow() - timedelta(days=7) weekly = [] for r in rows: ts = r.get("timestamp") if not ts: continue try: played_time = datetime.fromisoformat(ts) except Exception: continue if played_time >= one_week_ago: weekly.append(r) # Sort by score DESC weekly.sort(key=lambda x: x.get("score", 0), reverse=True) return weekly[:limit] # Route pages if page == "Home": home_page() elif page == "Create Game": create_game_page() elif page == "Join Game": join_game_page() elif page == "Play": play_page() elif page == "Friends": friends_page() elif page == "Inbox": inbox_page() elif page == "Leaderboard": leaderboard_page() # Resume quick action if st.session_state.get('game_id') and st.session_state.get('username'): players = unified_get("players") or {} info = players.get(st.session_state['game_id'], {}).get(st.session_state['username'], {}) if players.get(st.session_state.get('game_id')) else {} if info and info.get('submitted'): st.info("You already submitted this game.") else: with st.expander("Resume Game"): if st.button("Go to Play"): st.rerun() st.markdown("---") st.write("Notes: Online mode requires firebase-admin and service account JSON named 'serviceAccountKey.json' placed next to app.py. Offline mode stores data in ./data/.")