import sqlite3 from typing import Optional, Dict, List from contextlib import contextmanager from pathlib import Path from config.config import Config from .auth_handler import AuthHandler class DatabaseManager: """SQLite database manager for user storage""" DB_PATH = Path(Config.DATABASE_URL.replace("sqlite:///", "")) def __init__(self): self.auth_handler = AuthHandler() # Create database directory if it doesn't exist self.DB_PATH.parent.mkdir(exist_ok=True) @contextmanager def get_connection(self): """Context manager for database connections""" conn = sqlite3.connect(self.DB_PATH) conn.row_factory = sqlite3.Row # Enable column access by name try: yield conn finally: conn.close() def initialize_database(self): """Create users table if it doesn't exist""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT NOT NULL, is_active BOOLEAN NOT NULL DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Create index on username for faster lookups cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_username ON users(username) """) conn.commit() print("✅ Users table created/verified") def seed_users(self): """Create default users for testing""" default_users = Config.DEFAULT_USERS with self.get_connection() as conn: cursor = conn.cursor() for user in default_users: # Check if user already exists cursor.execute( "SELECT id FROM users WHERE username = ?", (user["username"],) ) if cursor.fetchone() is None: # Hash password and insert user password_hash = self.auth_handler.hash_password(user["password"]) cursor.execute(""" INSERT INTO users (username, password_hash, role, is_active) VALUES (?, ?, ?, 1) """, (user["username"], password_hash, user["role"])) print(f"✅ Created user: {user['username']} (role: {user['role']})") else: print(f"â„šī¸ User already exists: {user['username']}") conn.commit() def get_user_by_username(self, username: str) -> Optional[Dict]: """Retrieve user by username""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, username, password_hash, role, is_active FROM users WHERE username = ? """, (username,)) row = cursor.fetchone() if row: return { "id": row["id"], "username": row["username"], "password_hash": row["password_hash"], "role": row["role"], "is_active": bool(row["is_active"]) } return None def get_user_by_id(self, user_id: int) -> Optional[Dict]: """Retrieve user by ID""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, username, password_hash, role, is_active FROM users WHERE id = ? """, (user_id,)) row = cursor.fetchone() if row: return { "id": row["id"], "username": row["username"], "password_hash": row["password_hash"], "role": row["role"], "is_active": bool(row["is_active"]) } return None def get_all_users(self) -> List[Dict]: """Get all users (excluding password hashes)""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, username, role, is_active, created_at FROM users ORDER BY created_at DESC """) rows = cursor.fetchall() return [ { "id": row["id"], "username": row["username"], "role": row["role"], "is_active": bool(row["is_active"]), "created_at": row["created_at"] } for row in rows ] def update_user_status(self, username: str, is_active: bool) -> bool: """Enable or disable user account""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE users SET is_active = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ? """, (is_active, username)) conn.commit() return cursor.rowcount > 0 def create_user(self, username: str, password: str, role: str) -> bool: """Create new user""" try: password_hash = self.auth_handler.hash_password(password) with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO users (username, password_hash, role, is_active) VALUES (?, ?, ?, 1) """, (username, password_hash, role)) conn.commit() return True except sqlite3.IntegrityError: # Username already exists return False def update_user_password(self, username: str, new_password: str) -> bool: """Update user password""" password_hash = self.auth_handler.hash_password(new_password) with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE users SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ? """, (password_hash, username)) conn.commit() return cursor.rowcount > 0 def update_user_role(self, username: str, new_role: str) -> bool: """Update user role""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE users SET role = ?, updated_at = CURRENT_TIMESTAMP WHERE username = ? """, (new_role, username)) conn.commit() return cursor.rowcount > 0 def delete_user(self, username: str) -> bool: """Delete user (use with caution)""" with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM users WHERE username = ? """, (username,)) conn.commit() return cursor.rowcount > 0