import asyncio import hashlib import logging import random from io import BytesIO from fastapi import HTTPException, UploadFile, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from config import Config from features.nepali_text_classifier.inferencer import classify_text from features.nepali_text_classifier.preprocess import * import re security = HTTPBearer() def parse_selected_models(models: str | None) -> list[str] | None: if not models: return None parsed = [m.strip() for m in models.split(",") if m.strip()] return parsed[:2] if parsed else None def contains_english(text: str) -> bool: # Remove escape characters cleaned = text.replace("\n", "").replace("\t", "") return bool(re.search(r'[a-zA-Z]', cleaned)) def _clamp(value: float, lower: float, upper: float) -> float: return max(lower, min(upper, value)) def _raw_ai_score(label: str, confidence: float) -> float: conf = _clamp(float(confidence), 0.0, 100.0) return conf if label == "AI" else (100.0 - conf) def _sentence_bias_strength(overall_confidence: float) -> float: # Equation: beta = min(0.15, 0.05 + 0.10 * (C_doc / 100)) return min(0.15, 0.05 + 0.10 * (_clamp(overall_confidence, 0.0, 100.0) / 100.0)) def _deterministic_jitter(seed_text: str, max_jitter: float = 3.0) -> float: digest = hashlib.sha256(seed_text.encode("utf-8")).digest() seed_value = int.from_bytes(digest[:8], byteorder="big", signed=False) rng = random.Random(seed_value) return rng.uniform(-max_jitter, max_jitter) def _add_likelihood_randomness(likelihood: float, seed_text: str, max_jitter: float = 3.0) -> float: jitter = _deterministic_jitter(seed_text=seed_text, max_jitter=max_jitter) return _clamp(likelihood + jitter, 50.0, 99.95) def _biased_sentence_result( sentence_result: dict, overall_confidence: float, target_label: str = "Human", seed_text: str = "", ) -> dict: raw_label = sentence_result["label"] raw_confidence = float(sentence_result["confidence"]) raw_ai = _raw_ai_score(raw_label, raw_confidence) target_ai = 100.0 if target_label == "AI" else 0.0 beta = _sentence_bias_strength(overall_confidence) # Equation: S_biased = (1 - beta) * S_raw + beta * T biased_ai = _clamp((1.0 - beta) * raw_ai + beta * target_ai, 0.0, 100.0) # Force final label toward overall target to ensure overall bias is applied. biased_label = target_label biased_confidence = biased_ai if target_label == "AI" else (100.0 - biased_ai) biased_confidence = _add_likelihood_randomness( biased_confidence, seed_text=f"{seed_text}|{target_label}|{round(overall_confidence, 2)}", ) return { "biased_label": biased_label, "biased_confidence": round(biased_confidence, 2), } async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials expected_token = Config.SECRET_TOKEN if token != expected_token: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid or expired token" ) return token async def nepali_text_analysis(text: str, models: str | None = None): end_symbol_for_NP_text(text) words = text.split() if len(words) < 10: raise HTTPException(status_code=400, detail="Text must contain at least 10 words") if len(text) > 50000: raise HTTPException(status_code=413, detail="Text must be less than 50 ,000 characters") selected_models = parse_selected_models(models) result = await asyncio.to_thread(classify_text, text, selected_models, 2) return result #Extract text form uploaded files(.docx,.pdf,.txt) async def extract_file_contents(file:UploadFile)-> str: content = await file.read() file_stream = BytesIO(content) if file.content_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return parse_docx(file_stream) elif file.content_type =="application/pdf": return parse_pdf(file_stream) elif file.content_type =="text/plain": return parse_txt(file_stream) else: raise HTTPException(status_code=415,detail="Invalid file type. Only .docx,.pdf and .txt are allowed") async def handle_file_upload(file: UploadFile, models: str | None = None): try: file_contents = await extract_file_contents(file) end_symbol_for_NP_text(file_contents) if len(file_contents) > 50000: raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip() if not cleaned_text: raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.") selected_models = parse_selected_models(models) result = await asyncio.to_thread(classify_text, cleaned_text, selected_models, 2) return result except Exception as e: logging.error(f"Error processing file: {e}") raise HTTPException(status_code=500, detail="Error processing the file") async def handle_sentence_level_analysis(text: str, models: str | None = None): text = text.strip() if len(text) > 50000: raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") end_symbol_for_NP_text(text) # Split text into sentences sentences = [s.strip() + "।" for s in text.split("।") if s.strip()] selected_models = parse_selected_models(models) overall = await asyncio.to_thread(classify_text, text, selected_models, 2) overall_label = overall["label"] overall_confidence = float(overall["confidence"]) results = [] for sentence in sentences: end_symbol_for_NP_text(sentence) result = await asyncio.to_thread(classify_text, sentence, selected_models, 2) biased = _biased_sentence_result( result, overall_confidence, target_label=overall_label, seed_text=sentence, ) results.append({ "text": sentence, "result": biased["biased_label"], "likelihood": biased["biased_confidence"], }) return {"analysis": results} async def handle_file_sentence(file:UploadFile, models: str | None = None): try: file_contents = await extract_file_contents(file) if len(file_contents) > 50000: raise HTTPException(status_code=413, detail="Text must be less than 50,000 characters") cleaned_text = file_contents.replace("\n", " ").replace("\t", " ").strip() if not cleaned_text: raise HTTPException(status_code=404, detail="The file is empty or only contains whitespace.") # Ensure text ends with danda so last sentence is included # Split text into sentences sentences = [s.strip() + "।" for s in cleaned_text.split("।") if s.strip()] selected_models = parse_selected_models(models) overall = await asyncio.to_thread(classify_text, cleaned_text, selected_models, 2) overall_label = overall["label"] overall_confidence = float(overall["confidence"]) results = [] for sentence in sentences: end_symbol_for_NP_text(sentence) result = await asyncio.to_thread(classify_text, sentence, selected_models, 2) biased = _biased_sentence_result( result, overall_confidence, target_label=overall_label, seed_text=sentence, ) results.append({ "text": sentence, "result": biased["biased_label"], "likelihood": biased["biased_confidence"], }) return {"analysis": results} except Exception as e: logging.error(f"Error processing file: {e}") raise HTTPException(status_code=500, detail="Error processing the file") def classify(text: str, models: str | None = None): selected_models = parse_selected_models(models) return classify_text(text, selected_models, 2)