Spaces:
Running
Running
| from fastapi import FastAPI, HTTPException, Query | |
| import uvicorn | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup as bs | |
| import mysql.connector | |
| import os | |
| import google.generativeai as genai | |
| import json | |
| from util.keywordExtract import * | |
| from typing import Optional,List, Dict, Any | |
| import pandas as pd | |
| import torch | |
| import pandas as pd | |
| from io import StringIO # pandas.read_html에 문자열을 전달할 때 필요 | |
| import logging # 로깅을 위해 추가 | |
| import time # 요청 간 지연을 위해 추가 (선택 사항이지만 권장) | |
| from embedding_module import embed_keywords | |
| from keyword_module import summarize_kobart as summarize, extract_keywords | |
| from pykrx import stock | |
| from functools import lru_cache | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from googletrans import Translator | |
| from starlette.concurrency import run_in_threadpool | |
| import FinanceDataReader as fdr | |
| app = FastAPI() | |
| # 로깅 설정 | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not API_KEY: | |
| # API 키가 없으면 에러를 발생시키거나 경고 | |
| print("❌ GEMINI_API_KEY 환경 변수가 설정되지 않았습니다.") | |
| else: | |
| genai.configure(api_key=API_KEY) | |
| logger.info("✅ Gemini API 설정 완료 (환경 변수 사용)") | |
| # --------------------------------------- | |
| # 입력/출력 모델 | |
| # --------------------------------------- | |
| class NewsRequest(BaseModel): | |
| url: str | |
| id: Optional[str] = None | |
| class SummaryInput(BaseModel): | |
| url: str | |
| class KeywordsInput(BaseModel): | |
| summary: str | |
| class CompanyInput(BaseModel): | |
| summary: Optional[str] = None | |
| keywords: Optional[List[str]] = None | |
| class SentimentInput(BaseModel): | |
| content: str | |
| class PredictInput(BaseModel): | |
| keywords: List[Union[str, Dict[str, Any]]] | |
| # --------------------------------------- | |
| # 간단한 분류기 (기존과 동일) | |
| # --------------------------------------- | |
| class SimpleClassifier(torch.nn.Module): | |
| def __init__(self, input_dim): | |
| super().__init__() | |
| self.net = torch.nn.Sequential( | |
| torch.nn.Linear(input_dim, 64), | |
| torch.nn.ReLU(), | |
| torch.nn.Linear(64, 1), | |
| torch.nn.Sigmoid() | |
| ) | |
| def forward(self, x): | |
| return self.net(x) | |
| # --------------------------------------- | |
| # 공통 유틸: HTML, 파서, 썸네일 | |
| # --------------------------------------- | |
| def fetch_html(url: str) -> bs: | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| resp = requests.get(url, headers=headers, timeout=7) | |
| resp.raise_for_status() | |
| return bs(resp.text, "html.parser") | |
| def parse_naver(soup: bs): | |
| title = soup.select_one("h2.media_end_head_headline") or soup.title | |
| title_text = title.get_text(strip=True) if title else "제목 없음" | |
| time_tag = soup.select_one("span.media_end_head_info_datestamp_time") | |
| time_text = time_tag.get_text(strip=True) if time_tag else "시간 없음" | |
| content_area = soup.find("div", {"id": "newsct_article"}) or soup.find("div", {"id": "dic_area"}) | |
| if content_area: | |
| paragraphs = content_area.find_all("p") | |
| content = '\n'.join([p.get_text(strip=True) for p in paragraphs]) if paragraphs else content_area.get_text(strip=True) | |
| else: | |
| content = "본문 없음" | |
| return title_text, time_text, content | |
| def parse_daum(soup: bs): | |
| title = soup.select_one("h3.tit_view") or soup.title | |
| title_text = title.get_text(strip=True) if title else "제목 없음" | |
| time_tag = soup.select_one("span.num_date") | |
| time_text = time_tag.get_text(strip=True) if time_tag else "시간 없음" | |
| content_area = soup.find("div", {"class": "article_view"}) | |
| if content_area: | |
| paragraphs = content_area.find_all("p") | |
| content = '\n'.join([p.get_text(strip=True) for p in paragraphs]) if paragraphs else content_area.get_text(strip=True) | |
| else: | |
| content = "본문 없음" | |
| return title_text, time_text, content | |
| def extract_thumbnail(soup: bs) -> Optional[str]: | |
| tag = soup.find("meta", property="og:image") | |
| return tag["content"] if tag and "content" in tag.attrs else None | |
| def parse_article_all(url: str) -> Dict[str, Any]: | |
| soup = fetch_html(url) | |
| if "naver.com" in url: | |
| title, time_str, content = parse_naver(soup) | |
| elif "daum.net" in url: | |
| title, time_str, content = parse_daum(soup) | |
| else: | |
| raise HTTPException(status_code=400, detail="지원하지 않는 뉴스 사이트입니다.") | |
| thumbnail = extract_thumbnail(soup) | |
| return { | |
| "title": title, | |
| "time": time_str, | |
| "content": content, | |
| "thumbnail_url": thumbnail, | |
| "url": url, | |
| } | |
| # --------------------------------------- | |
| # 회사명 추론 (Gemini) | |
| # --------------------------------------- | |
| def gemini_use(text_for_company: str) -> str: | |
| generation_config = genai.GenerationConfig(temperature=1) | |
| model = genai.GenerativeModel('gemini-2.0-flash', generation_config=generation_config) | |
| prompt = f""" | |
| 아래 내용을 참고해서 가장 연관성이 높은 주식 상장 회사 이름 하나만 말해줘. | |
| 다른 설명 없이 회사 이름만 대답해. | |
| "{text_for_company}" | |
| """ | |
| response = model.generate_content(prompt) | |
| try: | |
| return response.text.strip() | |
| except AttributeError: | |
| return response.candidates[0].content.parts[0].text.strip() | |
| # --------------------------------------- | |
| # 1) 요약 단계 | |
| # --------------------------------------- | |
| def step_summary(inp: SummaryInput): | |
| meta = parse_article_all(inp.url) | |
| # 너가 기존 resultKeyword를 먼저 쓰고 싶다면 이 한 줄로 대체 가능: | |
| # rk = resultKeyword(meta["content"]); return {**meta, "summary": rk["summary"]} | |
| summary_text = summarize(meta["content"]) | |
| return {**meta, "summary": summary_text} | |
| # 2) 키워드 단계 | |
| def step_keywords(inp: KeywordsInput): | |
| print("키워드는 옴") | |
| try: | |
| rk = resultKeyword(inp.summary) | |
| return {"keywords": rk["keyword"]} | |
| except Exception as e: | |
| print("❌ 키워드 추출 오류:", e) | |
| return {"keywords": []} | |
| # 3) 관련 상장사 단계 | |
| def step_company(inp: CompanyInput): | |
| if inp.summary: | |
| text = inp.summary | |
| elif inp.keywords: | |
| text = ", ".join(inp.keywords) | |
| else: | |
| raise HTTPException(status_code=400, detail="summary 또는 keywords 중 하나가 필요합니다.") | |
| company = gemini_use(text) | |
| return {"company": company} | |
| # 4) 감정 단계 | |
| def step_sentiment(inp: SentimentInput): | |
| s = analyze_sentiment(inp.content) | |
| pos, neg, neu = s["positive"], s["negative"], s["neutral"] | |
| # 중립 절반, 나머지 비율 재분배 (기존 로직) | |
| reduced_net = neu / 2 | |
| remaining = neu - reduced_net | |
| total_non_neu = neg + pos | |
| if total_non_neu > 0: | |
| neg += remaining * (neg / total_non_neu) | |
| pos += remaining * (pos / total_non_neu) | |
| else: | |
| neg += remaining / 2 | |
| pos += remaining / 2 | |
| neu = reduced_net | |
| max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0] | |
| if max_label == "긍정": | |
| if pos >= 0.9: label = f"매우 긍정 ({pos*100:.1f}%)" | |
| elif pos >= 0.6: label = f"긍정 ({pos*100:.1f}%)" | |
| else: label = f"약한 긍정 ({pos*100:.1f}%)" | |
| elif max_label == "부정": | |
| if neg >= 0.9: label = f"매우 부정 ({neg*100:.1f}%)" | |
| elif neg >= 0.6: label = f"부정 ({neg*100:.1f}%)" | |
| else: label = f"약한 부정 ({neg*100:.1f}%)" | |
| else: | |
| label = f"중립 ({neu*100:.1f}%)" | |
| return { | |
| "raw": {"positive": s["positive"], "negative": s["negative"], "neutral": s["neutral"]}, | |
| "adjusted": {"positive": pos, "negative": neg, "neutral": neu}, | |
| "sentiment": label | |
| } | |
| # 5) 주가 예측 단계 | |
| def step_predict(inp: PredictInput): | |
| # 🔹 문자열 리스트로 정제 (딕셔너리인 경우 "word" 키 사용) | |
| clean_keywords = [] | |
| for kw in inp.keywords: | |
| if isinstance(kw, str): | |
| clean_keywords.append(kw) | |
| elif isinstance(kw, dict) and "word" in kw: | |
| clean_keywords.append(kw["word"]) | |
| if not clean_keywords: | |
| raise HTTPException(status_code=400, detail="keywords 리스트가 비어 있습니다.") | |
| # 🔹 이하 기존 로직 동일 | |
| keyword_vec = embed_keywords(clean_keywords) | |
| input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0) | |
| input_dim = input_vec.shape[1] | |
| model = SimpleClassifier(input_dim) | |
| model.load_state_dict(torch.load("news_model.pt", map_location="cpu")) | |
| model.eval() | |
| with torch.no_grad(): | |
| prob = model(input_vec).item() | |
| pred_label = '📈 상승 (1)' if prob >= 0.5 else '📉 하락 (0)' | |
| return {"prediction": pred_label, "prob": prob} | |
| # --------------------------------------- | |
| # 호환용: 기존 parse-news (한방 요청) - 유지 | |
| # --------------------------------------- | |
| def parse_news(req: NewsRequest): | |
| url = req.url.strip() | |
| try: | |
| meta = parse_article_all(url) | |
| # 키워드/요약(기존 resultKeyword 사용) | |
| rk = resultKeyword(meta["content"]) | |
| targetCompany = gemini_use(rk) # 텍스트 변환은 f-string 내부에서 처리됨 | |
| # 감정(기존 로직) | |
| s = analyze_sentiment(meta["content"]) | |
| pos, neg, neu = s["positive"], s["negative"], s["neutral"] | |
| print("부정:", neg) | |
| print("중립:", neu) | |
| print("긍정:", pos) | |
| reduced_net = neu / 2 | |
| remaining = neu - reduced_net | |
| total_non_neu = neg + pos | |
| if total_non_neu > 0: | |
| neg += remaining * (neg / total_non_neu) | |
| pos += remaining * (pos / total_non_neu) | |
| else: | |
| neg += remaining / 2 | |
| pos += remaining / 2 | |
| neu = reduced_net | |
| max_label = max([("부정", neg), ("중립", neu), ("긍정", pos)], key=lambda x: x[1])[0] | |
| if max_label == "긍정": | |
| if pos >= 0.9: sentiment_label = f"매우 긍정 ({pos*100:.1f}%)" | |
| elif pos >= 0.6: sentiment_label = f"긍정 ({pos*100:.1f}%)" | |
| else: sentiment_label = f"약한 긍정 ({pos*100:.1f}%)" | |
| elif max_label == "부정": | |
| if neg >= 0.9: sentiment_label = f"매우 부정 ({neg*100:.1f}%)" | |
| elif neg >= 0.6: sentiment_label = f"부정 ({neg*100:.1f}%)" | |
| else: sentiment_label = f"약한 부정 ({neg*100:.1f}%)" | |
| else: | |
| sentiment_label = f"중립 ({neu*100:.1f}%)" | |
| # 예측 | |
| summary_text = rk.get("summary") or summarize(meta["content"]) | |
| _, keywords_2nd = extract_keywords(summary_text) | |
| clean_keywords = [kw for kw, _ in keywords_2nd] | |
| keyword_vec = embed_keywords(clean_keywords) | |
| input_vec = torch.tensor(keyword_vec, dtype=torch.float32).unsqueeze(0) | |
| model = SimpleClassifier(input_vec.shape[1]) | |
| model.load_state_dict(torch.load("news_model.pt", map_location="cpu")) | |
| model.eval() | |
| with torch.no_grad(): | |
| prob = model(input_vec).item() | |
| prediction_label = '📈 상승 (1)' if prob >= 0.5 else '📉 하락 (0)' | |
| return { | |
| **meta, | |
| "message": "뉴스 파싱 및 저장 완료", | |
| "summary": rk["summary"], | |
| "keyword": rk["keyword"], | |
| "company": targetCompany, | |
| "sentiment": sentiment_label, | |
| "sentiment_value": sentiment_label, | |
| "prediction": prediction_label, | |
| "prob": prob, | |
| } | |
| except requests.exceptions.RequestException as e: | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"서버 오류: {e}") | |
| except Exception as e: | |
| traceback.print_exc() | |
| raise HTTPException(status_code=500, detail=f"서버 오류: {e}") | |
| # --------------------------------------- | |
| # 주가 데이터 (기존 유지) | |
| # --------------------------------------- | |
| krx_listings: pd.DataFrame = None | |
| us_listings: pd.DataFrame = None | |
| translator: Translator = None | |
| async def load_initial_data(): | |
| global krx_listings, us_listings, translator | |
| logger.info("✅ 서버 시작: 초기 데이터 로딩을 시작합니다...") | |
| try: | |
| krx_listings = await run_in_threadpool(fdr.StockListing, 'KRX') | |
| logger.info("📊 한국 상장 기업 목록 로딩 완료.") | |
| nasdaq = await run_in_threadpool(fdr.StockListing, 'NASDAQ') | |
| nyse = await run_in_threadpool(fdr.StockListing, 'NYSE') | |
| amex = await run_in_threadpool(fdr.StockListing, 'AMEX') | |
| us_listings = pd.concat([nasdaq, nyse, amex], ignore_index=True) | |
| logger.info("📊 미국 상장 기업 목록 로딩 완료.") | |
| translator = Translator() | |
| logger.info("🌐 번역기 초기화 완료.") | |
| logger.info("✅ 초기 데이터 로딩 성공.") | |
| except Exception as e: | |
| logger.error(f"🚨 초기 데이터 로딩 오류: {e}", exc_info=True) | |
| def get_stock_info(company_name: str) -> Dict[str, str] | None: | |
| kr_match = krx_listings[krx_listings['Name'].str.contains(company_name, case=False, na=False)] | |
| if not kr_match.empty: | |
| s = kr_match.iloc[0] | |
| return {"market": "KRX", "symbol": s['Code'], "name": s['Name']} | |
| try: | |
| company_name_eng = translator.translate(company_name, src='ko', dest='en').text | |
| us_match = us_listings[ | |
| us_listings['Name'].str.contains(company_name_eng, case=False, na=False) | | |
| us_listings['Symbol'].str.fullmatch(company_name_eng, case=False) | |
| ] | |
| if not us_match.empty: | |
| s = us_match.iloc[0] | |
| return {"market": "US", "symbol": s['Symbol'], "name": s['Name']} | |
| except Exception as e: | |
| logger.error(f"번역/미국 주식 검색 오류: {e}") | |
| return None | |
| def fetch_stock_prices_sync(symbol: str, days: int = 365) -> Optional[pd.DataFrame]: | |
| end_date = datetime.today() | |
| start_date = end_date - timedelta(days=days) | |
| try: | |
| df = fdr.DataReader(symbol, start=start_date, end=end_date) | |
| if df.empty: | |
| return None | |
| return df | |
| except Exception as e: | |
| logger.error(f"'{symbol}' 데이터 조회 오류: {e}", exc_info=True) | |
| return None | |
| async def get_stock_data_by_name(company_name: str = Query(..., description="조회할 회사명")) -> List[Dict[str, Any]]: | |
| if not company_name or not company_name.strip(): | |
| raise HTTPException(status_code=400, detail="회사명을 입력해주세요.") | |
| stock_info = await run_in_threadpool(get_stock_info, company_name.strip()) | |
| if not stock_info: | |
| raise HTTPException(status_code=404, detail=f"'{company_name}'에 해당하는 종목을 찾을 수 없습니다.") | |
| prices_df = await run_in_threadpool(fetch_stock_prices_sync, stock_info['symbol'], 365) | |
| if prices_df is None or prices_df.empty: | |
| raise HTTPException(status_code=404, detail=f"'{stock_info['name']}'의 시세 데이터를 찾을 수 없습니다.") | |
| prices_df.index.name = 'Date' | |
| prices_df.reset_index(inplace=True) | |
| prices_df['Date'] = prices_df['Date'].dt.strftime('%Y-%m-%d') | |
| return prices_df.to_dict(orient='records') | |
| # --------------------------------------- | |
| # 실행 | |
| # --------------------------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |