Spaces:
Sleeping
Sleeping
File size: 3,936 Bytes
8de87ce 1bdf5bb 8de87ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 | from fastapi import FastAPI, Depends, HTTPException, status, File, UploadFile, Response
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
from io import BytesIO
from PIL import Image, ImageDraw
import numpy as np
from ultralytics import YOLO
# Configuración JWT
SECRET_KEY = "clave-ultra-secreta"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Hash de contraseñas
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Base de datos falsa
fake_users_db = {
"uriel": {
"username": "uriel",
"full_name": "Uriel Escalona",
"email": "uriel@example.com",
"hashed_password": pwd_context.hash("1234"),
"disabled": False
}
}
# Modelos
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
full_name: str
email: str
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
# Autenticación
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
model = YOLO("app/models/yolov8n.pt")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user or not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=401,
detail="No autorizado",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
user = get_user(fake_users_db, username)
if user is None:
raise credentials_exception
return user
except JWTError:
raise credentials_exception
# Endpoint para login
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Credenciales inválidas")
access_token = create_access_token(data={"sub": user.username}, expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": access_token, "token_type": "bearer"}
# Endpoint protegido: recibe imagen y devuelve con cajas
@app.post("/detectar/")
async def detectar_yolo(file: UploadFile = File(...), current_user: User = Depends(get_current_user)):
contents = await file.read()
image = Image.open(BytesIO(contents)).convert("RGB")
img_array = np.array(image)
results = model.predict(img_array)
draw = ImageDraw.Draw(image)
for r in results:
for box in r.boxes:
cls = int(box.cls[0])
label = model.names[cls]
bbox = box.xyxy[0].tolist()
draw.rectangle(bbox, outline="red", width=2)
draw.text((bbox[0], bbox[1]), label, fill="white")
output = BytesIO()
image.save(output, format="PNG")
output.seek(0)
return Response(content=output.read(), media_type="image/png")
|