TanmayTomar commited on
Commit
57a9af9
·
verified ·
1 Parent(s): 55694e7

Update pmo_func.py

Browse files
Files changed (1) hide show
  1. pmo_func.py +313 -313
pmo_func.py CHANGED
@@ -1,313 +1,313 @@
1
- import numpy as np
2
- import faiss
3
- from sentence_transformers import SentenceTransformer
4
- from sentence_transformers.cross_encoder import CrossEncoder
5
- from transformers import pipeline
6
- from PIL import Image, ImageChops, ImageEnhance
7
- import torch
8
- from google.cloud import vision
9
- import os
10
- import io
11
- from transformers import AutoModelForSequenceClassification, AutoTokenizer
12
- from transformers import T5Tokenizer, T5ForConditionalGeneration
13
- from dotenv import load_dotenv
14
- import requests
15
- from bs4 import BeautifulSoup
16
- import trafilatura as tra
17
-
18
- DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
19
-
20
- class retriver:
21
- def __init__(self):
22
- self.retrivermodel = SentenceTransformer('all-MiniLM-L6-v2')
23
-
24
- def build_faiss_idx(self, evidence_corpus):
25
- embeddings = self.retrivermodel.encode(evidence_corpus)
26
- index = faiss.IndexFlatIP(embeddings.shape[1])
27
- index.add(np.array(embeddings, dtype=np.float32))
28
- faiss.write_index(index, "evidence_index.faiss")
29
- return index
30
-
31
- def retrieve_evidence(self, claim, index, evidence_corpus, top_k=10):
32
- claim_embedding = self.retrivermodel.encode([claim])
33
- distances, indices = index.search(np.array(claim_embedding, dtype=np.float32), top_k)
34
- retrieved_docs = [evidence_corpus[i] for i in indices[0]]
35
- return retrieved_docs, indices[0]
36
-
37
- class reranker:
38
- def __init__(self):
39
- self.reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', device=DEVICE)
40
-
41
- def rerank_evidendce(self, claim, evidence_list):
42
- sentance_pairs = [[claim, evidence] for evidence in evidence_list]
43
- score = self.reranker_model.predict(sentance_pairs)
44
- scored_evidence = sorted(zip(score, evidence_list), reverse=True)
45
- return scored_evidence
46
-
47
- class Classifier:
48
- def __init__(self):
49
- self.model_name = "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli"
50
- self.label_names = ["entailment", "neutral", "contradiction"]
51
- self.device = torch.device(DEVICE)
52
- print(f"Classifier device: {self.device}")
53
- self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device)
54
- self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
55
- self.model.eval()
56
-
57
- def classify(self, claim, top_evidence):
58
- verdicts = []
59
- evidences = [e[1] for e in top_evidence]
60
- if not evidences:
61
- return "NEUTRAL", []
62
-
63
- inputs = self.tokenizer(evidences, [claim] * len(evidences), return_tensors="pt", padding=True, truncation=True, max_length=512)
64
- with torch.no_grad():
65
- inputs = {k: v.to(self.device) for k, v in inputs.items()}
66
- outputs = self.model(**inputs)
67
-
68
- probs = torch.softmax(outputs.logits, dim=-1)
69
- for i, evidence in enumerate(evidences):
70
- pred = torch.argmax(probs[i]).item()
71
- verdicts.append({
72
- "evidence": evidence,
73
- "verdict": self.label_names[pred],
74
- "scores": {name: float(probs[i][j]) for j, name in enumerate(self.label_names)}
75
- })
76
-
77
- top_verdict_info = verdicts[0]
78
- if top_verdict_info["verdict"] == "entailment" and top_verdict_info["scores"]["entailment"] > 0.8:
79
- result = "TRUE"
80
- elif top_verdict_info["verdict"] == "contradiction" and top_verdict_info["scores"]["contradiction"] > 0.8:
81
- result = "FALSE"
82
- else:
83
- for v in verdicts[1:]:
84
- if v["verdict"] == "contradiction" and v["scores"]["contradiction"] > 0.9:
85
- result = "FALSE"
86
- break
87
- else:
88
- result = "NEUTRAL"
89
- return result, verdicts
90
-
91
- def __call__(self, claim, evidences):
92
- return self.classify(claim, evidences)
93
-
94
- class summarizer:
95
- def __init__(self):
96
- self.model_name = "google/flan-t5-base" # Using a smaller model for server efficiency
97
- self.model = T5ForConditionalGeneration.from_pretrained(self.model_name)
98
- self.tokenizer = T5Tokenizer.from_pretrained(self.model_name)
99
- self.device = torch.device(DEVICE)
100
- self.model.to(self.device)
101
- self.model.eval()
102
- print(f"Summarizer device: {self.device}")
103
-
104
- def forward(self, claim, top_evidence, verdict, max_input_len=1024, max_output_len=150):
105
- evidence_texts = [e[1] for e in top_evidence]
106
- if not evidence_texts:
107
- return verdict, "No evidence was provided to generate a summary."
108
-
109
- input_text = f"""Claim: "{claim}"\nVerdict: {verdict}\nEvidence:\n{"\n---\n".join(evidence_texts)}\n\nWrite a short, neutral explanation for why the verdict is {verdict}, based only on the evidence provided."""
110
- inputs = self.tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_input_len).to(self.device)
111
-
112
- with torch.no_grad():
113
- summary_ids = self.model.generate(inputs["input_ids"], max_length=max_output_len, num_beams=4, early_stopping=True)
114
-
115
- summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
116
- return verdict, summary
117
-
118
- def __call__(self, claim, top_evidence, verdict):
119
- return self.forward(claim, top_evidence, verdict)
120
-
121
- class FactChecker:
122
- def __init__(self):
123
- self.factcheck_api = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
124
- self.google_search = "https://www.google.com/search"
125
- load_dotenv()
126
- self.factcheck_api_key = os.getenv("GOOGLE_FACT_CHECK_API_KEY")
127
- # Lazy load heavy models
128
- self.reranker = None
129
- self.classifier = None
130
- self.summarizer = None
131
-
132
- def check_google_factcheck(self, claim: str, pages: int = 5):
133
- if not self.factcheck_api_key:
134
- print("Google FactCheck API key not found in .env file.")
135
- return None
136
-
137
- params = {'key': self.factcheck_api_key, 'query': claim, 'languageCode': 'en-US', 'pageSize': pages}
138
- try:
139
- response = requests.get(self.factcheck_api, params=params, timeout=10)
140
- response.raise_for_status()
141
- data = response.json()
142
- if 'claims' in data and data['claims']:
143
- claim_data = data['claims'][0]
144
- review = claim_data.get('claimReview', [{}])[0]
145
- return {
146
- 'claim': claim_data.get('text', claim),
147
- 'verdict': review.get('textualRating', 'Unknown'),
148
- 'summary': f"Rated by {review.get('publisher', {}).get('name', 'Unknown')}",
149
- 'source': [review.get('publisher', {}).get('name', 'Unknown')],
150
- 'method': 'google_factcheck',
151
- 'URLs': [review.get('url', '')]
152
- }
153
- except Exception as e:
154
- print(f"FactCheck API error: {e}")
155
- return None
156
-
157
- def google_news_search(self, query: str, num_pages: int = 1):
158
- print("Searching the Web...")
159
- headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
160
- articles_gg = []
161
- for page in range(num_pages):
162
- params = {"q": query, "tbm": "nws", 'start': page * 10}
163
- try:
164
- res = requests.get(self.google_search, params=params, headers=headers, timeout=15)
165
- soup = BeautifulSoup(res.text, 'html.parser')
166
- # Note: This selector is fragile and may break if Google changes its HTML.
167
- for article_link in soup.select("a.WlydOe"):
168
- title_div = article_link.find('div', class_="n0jPhd")
169
- source_div = article_link.find('div', class_="MgUUmf")
170
-
171
- if not (title_div and source_div): continue
172
-
173
- title = title_div.text
174
- a_url = article_link['href']
175
- source = source_div.text
176
-
177
- content = tra.extract(tra.fetch_url(a_url)) if a_url else "No content extracted"
178
- articles_gg.append({'title': title, 'url': a_url, 'text': content or "", 'source': source})
179
- except Exception as e:
180
- print(f"Error during web search: {e}")
181
-
182
- top_evidences = [d.get('text', '') for d in articles_gg]
183
- urls = [d.get('url', '') for d in articles_gg]
184
- return top_evidences, urls, articles_gg
185
-
186
- def search_and_analyze_claim(self, claim: str):
187
- print("Performing web analysis...")
188
-
189
- if self.reranker is None:
190
- print("Loading AI models for web analysis...")
191
- self.reranker = reranker()
192
- self.classifier = Classifier()
193
- self.summarizer = summarizer()
194
-
195
- top_evidences, urls, article_list = self.google_news_search(claim)
196
-
197
- if not top_evidences:
198
- return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found.', 'source': [], 'method': 'web_search', 'URLs': []}
199
-
200
- reranked_articles = self.reranker.rerank_evidendce(claim, top_evidences)
201
- if not reranked_articles:
202
- return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found after reranking.', 'source': [], 'method': 'web_search', 'URLs': []}
203
-
204
- verdict, _ = self.classifier(claim, reranked_articles)
205
- _, summary = self.summarizer(claim, reranked_articles[:3], verdict)
206
-
207
- return {
208
- 'claim': claim,
209
- 'verdict': verdict,
210
- 'summary': summary,
211
- 'source': [arc.get('source', '') for arc in article_list],
212
- 'method': 'web_analysis',
213
- 'URLs': urls
214
- }
215
-
216
- def check_claim(self, claim: str):
217
- """Main function to check a claim using the fallback pipeline."""
218
- print(f"\n--- Checking claim: '{claim}' ---")
219
- factcheck_result = self.check_google_factcheck(claim)
220
- if factcheck_result:
221
- print("Found result in FactCheck database.")
222
- return factcheck_result
223
-
224
- print("No FactCheck result, falling back to live web analysis...")
225
- return self.search_and_analyze_claim(claim)
226
-
227
- class img_manipulation:
228
- def __init__(self):
229
- self.GEN_AI_IMAGE = pipeline("image-classification", model="umm-maybe/AI-image-detector", device=DEVICE)
230
-
231
- def Gen_AI_IMG(self, img_pth):
232
- try:
233
- with Image.open(img_pth) as img:
234
- img = img.convert('RGB')
235
- result = self.GEN_AI_IMAGE(img)
236
- proba = next((item['score'] for item in result if item['label'] == 'artificial'), 0.0)
237
- return proba * 100
238
- except Exception as e:
239
- print(f'AI image detection error: {e}')
240
- return 0.0
241
-
242
- def generated_image(self, img_pth, quality=90, scale=15):
243
- try:
244
- with Image.open(img_pth) as orig_img:
245
- orig_img = orig_img.convert('RGB')
246
- temp_path = 'temp_resaved.jpg'
247
- orig_img.save(temp_path, 'JPEG', quality=quality)
248
- with Image.open(temp_path) as resaved_img:
249
- ela_image = ImageChops.difference(orig_img, resaved_img)
250
- os.remove(temp_path)
251
- ela_data = np.array(ela_image)
252
- mean_intensity = ela_data.mean()
253
- scaled_score = min(100, (mean_intensity / 25.0) * 100)
254
-
255
- # Save the ELA image and return its path for serving
256
- ela_path = "ela_result.png"
257
- enhancer = ImageEnhance.Brightness(ela_image)
258
- max_diff = max(1, max([ex[1] for ex in ela_image.getextrema()]))
259
- ela_image_enhanced = enhancer.enhance(scale / max_diff)
260
- ela_image_enhanced.save(ela_path)
261
- return scaled_score, ela_path
262
- except Exception as e:
263
- print(f'ELA generation error: {e}')
264
- return 0.0, None
265
-
266
- def run_image_forensics(self, image_path):
267
- ai_score = self.Gen_AI_IMG(image_path)
268
- classic_score, ela_path = self.generated_image(image_path)
269
- return {
270
- "ai_generated_score_percent": ai_score,
271
- "classic_edit_score_percent": classic_score,
272
- "ela_image_path": ela_path
273
- }
274
-
275
- class OCR:
276
- def __init__(self, key_path='GOOGLE_VISION_API.json'):
277
- os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path
278
- self.client = vision.ImageAnnotatorClient()
279
-
280
- def _get_full_vision_analysis(self, img_pth):
281
- try:
282
- with open(img_pth, 'rb') as image_file:
283
- content = image_file.read()
284
- image = vision.Image(content=content)
285
- features = [{'type_': vision.Feature.Type.DOCUMENT_TEXT_DETECTION}, {'type_': vision.Feature.Type.SAFE_SEARCH_DETECTION}, {'type_': vision.Feature.Type.LANDMARK_DETECTION}, {'type_': vision.Feature.Type.LOGO_DETECTION}, {'type_': vision.Feature.Type.WEB_DETECTION}]
286
- response = self.client.annotate_image({'image': image, 'features': features})
287
- return response, None
288
- except Exception as e:
289
- return None, str(e)
290
-
291
- def get_in_image_anal(self, img_pth):
292
- response, error = self._get_full_vision_analysis(img_pth)
293
- if error: return {'error': error}
294
- report = {}
295
- if response.full_text_annotation: report['Extracted Text'] = response.full_text_annotation.text
296
- if response.safe_search_annotation:
297
- safe = response.safe_search_annotation
298
- report['Safe Search'] = {'adult': vision.Likelihood(safe.adult).name, 'violence': vision.Likelihood(safe.violence).name}
299
- entities = []
300
- if response.landmark_annotations: entities.extend([f'Landmark: {l.description}' for l in response.landmark_annotations])
301
- if response.logo_annotations: entities.extend([f'Logo: {l.description}' for l in response.logo_annotations])
302
- if entities: report['Identified Entities'] = entities
303
- return report
304
-
305
- def rev_img_search(self, img_pth):
306
- response, error = self._get_full_vision_analysis(img_pth)
307
- if error: return {'error': error}
308
- report = {}
309
- if response.web_detection and response.web_detection.pages_with_matching_images:
310
- matches = [{'title': p.page_title, 'url': p.url} for p in response.web_detection.pages_with_matching_images[:5]]
311
- report['Reverse Image Matches'] = matches
312
- return report
313
-
 
1
+ import numpy as np
2
+ import faiss
3
+ from sentence_transformers import SentenceTransformer
4
+ from sentence_transformers.cross_encoder import CrossEncoder
5
+ from transformers import pipeline
6
+ from PIL import Image, ImageChops, ImageEnhance
7
+ import torch
8
+ from google.cloud import vision
9
+ import os
10
+ import io
11
+ from transformers import AutoModelForSequenceClassification, AutoTokenizer
12
+ from transformers import T5Tokenizer, T5ForConditionalGeneration
13
+ from dotenv import load_dotenv
14
+ import requests
15
+ from bs4 import BeautifulSoup
16
+ import trafilatura as tra
17
+
18
+ DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
19
+
20
+ class retriver:
21
+ def __init__(self):
22
+ self.retrivermodel = SentenceTransformer('all-MiniLM-L6-v2')
23
+
24
+ def build_faiss_idx(self, evidence_corpus):
25
+ embeddings = self.retrivermodel.encode(evidence_corpus)
26
+ index = faiss.IndexFlatIP(embeddings.shape[1])
27
+ index.add(np.array(embeddings, dtype=np.float32))
28
+ faiss.write_index(index, "evidence_index.faiss")
29
+ return index
30
+
31
+ def retrieve_evidence(self, claim, index, evidence_corpus, top_k=10):
32
+ claim_embedding = self.retrivermodel.encode([claim])
33
+ distances, indices = index.search(np.array(claim_embedding, dtype=np.float32), top_k)
34
+ retrieved_docs = [evidence_corpus[i] for i in indices[0]]
35
+ return retrieved_docs, indices[0]
36
+
37
+ class reranker:
38
+ def __init__(self):
39
+ self.reranker_model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', device=DEVICE)
40
+
41
+ def rerank_evidendce(self, claim, evidence_list):
42
+ sentance_pairs = [[claim, evidence] for evidence in evidence_list]
43
+ score = self.reranker_model.predict(sentance_pairs)
44
+ scored_evidence = sorted(zip(score, evidence_list), reverse=True)
45
+ return scored_evidence
46
+
47
+ class Classifier:
48
+ def __init__(self):
49
+ self.model_name = "MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli"
50
+ self.label_names = ["entailment", "neutral", "contradiction"]
51
+ self.device = torch.device(DEVICE)
52
+ print(f"Classifier device: {self.device}")
53
+ self.model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device)
54
+ self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
55
+ self.model.eval()
56
+
57
+ def classify(self, claim, top_evidence):
58
+ verdicts = []
59
+ evidences = [e[1] for e in top_evidence]
60
+ if not evidences:
61
+ return "NEUTRAL", []
62
+
63
+ inputs = self.tokenizer(evidences, [claim] * len(evidences), return_tensors="pt", padding=True, truncation=True, max_length=512)
64
+ with torch.no_grad():
65
+ inputs = {k: v.to(self.device) for k, v in inputs.items()}
66
+ outputs = self.model(**inputs)
67
+
68
+ probs = torch.softmax(outputs.logits, dim=-1)
69
+ for i, evidence in enumerate(evidences):
70
+ pred = torch.argmax(probs[i]).item()
71
+ verdicts.append({
72
+ "evidence": evidence,
73
+ "verdict": self.label_names[pred],
74
+ "scores": {name: float(probs[i][j]) for j, name in enumerate(self.label_names)}
75
+ })
76
+
77
+ top_verdict_info = verdicts[0]
78
+ if top_verdict_info["verdict"] == "entailment" and top_verdict_info["scores"]["entailment"] > 0.8:
79
+ result = "TRUE"
80
+ elif top_verdict_info["verdict"] == "contradiction" and top_verdict_info["scores"]["contradiction"] > 0.8:
81
+ result = "FALSE"
82
+ else:
83
+ for v in verdicts[1:]:
84
+ if v["verdict"] == "contradiction" and v["scores"]["contradiction"] > 0.9:
85
+ result = "FALSE"
86
+ break
87
+ else:
88
+ result = "NEUTRAL"
89
+ return result, verdicts
90
+
91
+ def __call__(self, claim, evidences):
92
+ return self.classify(claim, evidences)
93
+
94
+ class summarizer:
95
+ def __init__(self):
96
+ self.model_name = "google/flan-t5-base" # Using a smaller model for server efficiency
97
+ self.model = T5ForConditionalGeneration.from_pretrained(self.model_name)
98
+ self.tokenizer = T5Tokenizer.from_pretrained(self.model_name)
99
+ self.device = torch.device(DEVICE)
100
+ self.model.to(self.device)
101
+ self.model.eval()
102
+ print(f"Summarizer device: {self.device}")
103
+
104
+ def forward(self, claim, top_evidence, verdict, max_input_len=1024, max_output_len=150):
105
+ evidence_texts = [e[1] for e in top_evidence]
106
+ if not evidence_texts:
107
+ return verdict, "No evidence was provided to generate a summary."
108
+
109
+ input_text = f"""Claim: "{claim}"\nVerdict: {verdict}\nEvidence:\n{"\n---\n".join(evidence_texts)}\n\nWrite a short, neutral explanation for why the verdict is {verdict}, based only on the evidence provided."""
110
+ inputs = self.tokenizer(input_text, return_tensors="pt", truncation=True, max_length=max_input_len).to(self.device)
111
+
112
+ with torch.no_grad():
113
+ summary_ids = self.model.generate(inputs["input_ids"], max_length=max_output_len, num_beams=4, early_stopping=True)
114
+
115
+ summary = self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
116
+ return verdict, summary
117
+
118
+ def __call__(self, claim, top_evidence, verdict):
119
+ return self.forward(claim, top_evidence, verdict)
120
+
121
+ class FactChecker:
122
+ def __init__(self):
123
+ self.factcheck_api = "https://factchecktools.googleapis.com/v1alpha1/claims:search"
124
+ self.google_search = "https://www.google.com/search"
125
+ load_dotenv()
126
+ self.factcheck_api_key = GOOGLE_FACT_CHECK_API_KEY
127
+ # Lazy load heavy models
128
+ self.reranker = None
129
+ self.classifier = None
130
+ self.summarizer = None
131
+
132
+ def check_google_factcheck(self, claim: str, pages: int = 5):
133
+ if not self.factcheck_api_key:
134
+ print("Google FactCheck API key not found in .env file.")
135
+ return None
136
+
137
+ params = {'key': self.factcheck_api_key, 'query': claim, 'languageCode': 'en-US', 'pageSize': pages}
138
+ try:
139
+ response = requests.get(self.factcheck_api, params=params, timeout=10)
140
+ response.raise_for_status()
141
+ data = response.json()
142
+ if 'claims' in data and data['claims']:
143
+ claim_data = data['claims'][0]
144
+ review = claim_data.get('claimReview', [{}])[0]
145
+ return {
146
+ 'claim': claim_data.get('text', claim),
147
+ 'verdict': review.get('textualRating', 'Unknown'),
148
+ 'summary': f"Rated by {review.get('publisher', {}).get('name', 'Unknown')}",
149
+ 'source': [review.get('publisher', {}).get('name', 'Unknown')],
150
+ 'method': 'google_factcheck',
151
+ 'URLs': [review.get('url', '')]
152
+ }
153
+ except Exception as e:
154
+ print(f"FactCheck API error: {e}")
155
+ return None
156
+
157
+ def google_news_search(self, query: str, num_pages: int = 1):
158
+ print("Searching the Web...")
159
+ headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}
160
+ articles_gg = []
161
+ for page in range(num_pages):
162
+ params = {"q": query, "tbm": "nws", 'start': page * 10}
163
+ try:
164
+ res = requests.get(self.google_search, params=params, headers=headers, timeout=15)
165
+ soup = BeautifulSoup(res.text, 'html.parser')
166
+ # Note: This selector is fragile and may break if Google changes its HTML.
167
+ for article_link in soup.select("a.WlydOe"):
168
+ title_div = article_link.find('div', class_="n0jPhd")
169
+ source_div = article_link.find('div', class_="MgUUmf")
170
+
171
+ if not (title_div and source_div): continue
172
+
173
+ title = title_div.text
174
+ a_url = article_link['href']
175
+ source = source_div.text
176
+
177
+ content = tra.extract(tra.fetch_url(a_url)) if a_url else "No content extracted"
178
+ articles_gg.append({'title': title, 'url': a_url, 'text': content or "", 'source': source})
179
+ except Exception as e:
180
+ print(f"Error during web search: {e}")
181
+
182
+ top_evidences = [d.get('text', '') for d in articles_gg]
183
+ urls = [d.get('url', '') for d in articles_gg]
184
+ return top_evidences, urls, articles_gg
185
+
186
+ def search_and_analyze_claim(self, claim: str):
187
+ print("Performing web analysis...")
188
+
189
+ if self.reranker is None:
190
+ print("Loading AI models for web analysis...")
191
+ self.reranker = reranker()
192
+ self.classifier = Classifier()
193
+ self.summarizer = summarizer()
194
+
195
+ top_evidences, urls, article_list = self.google_news_search(claim)
196
+
197
+ if not top_evidences:
198
+ return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found.', 'source': [], 'method': 'web_search', 'URLs': []}
199
+
200
+ reranked_articles = self.reranker.rerank_evidendce(claim, top_evidences)
201
+ if not reranked_articles:
202
+ return {'claim': claim, 'verdict': 'Unverifiable', 'summary': 'No relevant sources found after reranking.', 'source': [], 'method': 'web_search', 'URLs': []}
203
+
204
+ verdict, _ = self.classifier(claim, reranked_articles)
205
+ _, summary = self.summarizer(claim, reranked_articles[:3], verdict)
206
+
207
+ return {
208
+ 'claim': claim,
209
+ 'verdict': verdict,
210
+ 'summary': summary,
211
+ 'source': [arc.get('source', '') for arc in article_list],
212
+ 'method': 'web_analysis',
213
+ 'URLs': urls
214
+ }
215
+
216
+ def check_claim(self, claim: str):
217
+ """Main function to check a claim using the fallback pipeline."""
218
+ print(f"\n--- Checking claim: '{claim}' ---")
219
+ factcheck_result = self.check_google_factcheck(claim)
220
+ if factcheck_result:
221
+ print("Found result in FactCheck database.")
222
+ return factcheck_result
223
+
224
+ print("No FactCheck result, falling back to live web analysis...")
225
+ return self.search_and_analyze_claim(claim)
226
+
227
+ class img_manipulation:
228
+ def __init__(self):
229
+ self.GEN_AI_IMAGE = pipeline("image-classification", model="umm-maybe/AI-image-detector", device=DEVICE)
230
+
231
+ def Gen_AI_IMG(self, img_pth):
232
+ try:
233
+ with Image.open(img_pth) as img:
234
+ img = img.convert('RGB')
235
+ result = self.GEN_AI_IMAGE(img)
236
+ proba = next((item['score'] for item in result if item['label'] == 'artificial'), 0.0)
237
+ return proba * 100
238
+ except Exception as e:
239
+ print(f'AI image detection error: {e}')
240
+ return 0.0
241
+
242
+ def generated_image(self, img_pth, quality=90, scale=15):
243
+ try:
244
+ with Image.open(img_pth) as orig_img:
245
+ orig_img = orig_img.convert('RGB')
246
+ temp_path = 'temp_resaved.jpg'
247
+ orig_img.save(temp_path, 'JPEG', quality=quality)
248
+ with Image.open(temp_path) as resaved_img:
249
+ ela_image = ImageChops.difference(orig_img, resaved_img)
250
+ os.remove(temp_path)
251
+ ela_data = np.array(ela_image)
252
+ mean_intensity = ela_data.mean()
253
+ scaled_score = min(100, (mean_intensity / 25.0) * 100)
254
+
255
+ # Save the ELA image and return its path for serving
256
+ ela_path = "ela_result.png"
257
+ enhancer = ImageEnhance.Brightness(ela_image)
258
+ max_diff = max(1, max([ex[1] for ex in ela_image.getextrema()]))
259
+ ela_image_enhanced = enhancer.enhance(scale / max_diff)
260
+ ela_image_enhanced.save(ela_path)
261
+ return scaled_score, ela_path
262
+ except Exception as e:
263
+ print(f'ELA generation error: {e}')
264
+ return 0.0, None
265
+
266
+ def run_image_forensics(self, image_path):
267
+ ai_score = self.Gen_AI_IMG(image_path)
268
+ classic_score, ela_path = self.generated_image(image_path)
269
+ return {
270
+ "ai_generated_score_percent": ai_score,
271
+ "classic_edit_score_percent": classic_score,
272
+ "ela_image_path": ela_path
273
+ }
274
+
275
+ class OCR:
276
+ def __init__(self, key_path= GOOGLE_VISION_API):
277
+ os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_path
278
+ self.client = vision.ImageAnnotatorClient()
279
+
280
+ def _get_full_vision_analysis(self, img_pth):
281
+ try:
282
+ with open(img_pth, 'rb') as image_file:
283
+ content = image_file.read()
284
+ image = vision.Image(content=content)
285
+ features = [{'type_': vision.Feature.Type.DOCUMENT_TEXT_DETECTION}, {'type_': vision.Feature.Type.SAFE_SEARCH_DETECTION}, {'type_': vision.Feature.Type.LANDMARK_DETECTION}, {'type_': vision.Feature.Type.LOGO_DETECTION}, {'type_': vision.Feature.Type.WEB_DETECTION}]
286
+ response = self.client.annotate_image({'image': image, 'features': features})
287
+ return response, None
288
+ except Exception as e:
289
+ return None, str(e)
290
+
291
+ def get_in_image_anal(self, img_pth):
292
+ response, error = self._get_full_vision_analysis(img_pth)
293
+ if error: return {'error': error}
294
+ report = {}
295
+ if response.full_text_annotation: report['Extracted Text'] = response.full_text_annotation.text
296
+ if response.safe_search_annotation:
297
+ safe = response.safe_search_annotation
298
+ report['Safe Search'] = {'adult': vision.Likelihood(safe.adult).name, 'violence': vision.Likelihood(safe.violence).name}
299
+ entities = []
300
+ if response.landmark_annotations: entities.extend([f'Landmark: {l.description}' for l in response.landmark_annotations])
301
+ if response.logo_annotations: entities.extend([f'Logo: {l.description}' for l in response.logo_annotations])
302
+ if entities: report['Identified Entities'] = entities
303
+ return report
304
+
305
+ def rev_img_search(self, img_pth):
306
+ response, error = self._get_full_vision_analysis(img_pth)
307
+ if error: return {'error': error}
308
+ report = {}
309
+ if response.web_detection and response.web_detection.pages_with_matching_images:
310
+ matches = [{'title': p.page_title, 'url': p.url} for p in response.web_detection.pages_with_matching_images[:5]]
311
+ report['Reverse Image Matches'] = matches
312
+ return report
313
+