| | import os |
| | from typing import Optional, List |
| | from datetime import datetime |
| | from pathlib import Path |
| |
|
| | from fastapi import FastAPI, Depends, HTTPException, status, Request, Form |
| | from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm |
| | from fastapi.staticfiles import StaticFiles |
| | from fastapi.templating import Jinja2Templates |
| | from fastapi.responses import RedirectResponse |
| | from pydantic import BaseModel, EmailStr |
| | from passlib.context import CryptContext |
| |
|
| | from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey, TIMESTAMP |
| | from sqlalchemy.sql import func |
| | from sqlalchemy.orm import sessionmaker, declarative_base, relationship, Session |
| |
|
| | |
| | |
| | |
| | SQLALCHEMY_DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://postgres.nujbaaxesubgawnazmza:gVgyOv6DL9vkzGmh@aws-1-ap-south-1.pooler.supabase.com:6543/postgres") |
| |
|
| | engine = create_engine( |
| | SQLALCHEMY_DATABASE_URL, |
| | pool_pre_ping=True, |
| | pool_size=5, |
| | max_overflow=10, |
| | pool_recycle=300, |
| | connect_args={"connect_timeout": 10}, |
| | ) |
| | SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
| |
|
| | Base = declarative_base() |
| |
|
| | def get_db(): |
| | db = SessionLocal() |
| | try: |
| | yield db |
| | finally: |
| | db.close() |
| |
|
| | |
| | |
| | |
| | class User(Base): |
| | __tablename__ = "users" |
| |
|
| | id = Column(Integer, primary_key=True, index=True) |
| | name = Column(String(100), nullable=False) |
| | age = Column(Integer, nullable=False) |
| | address = Column(Text, nullable=False) |
| | email = Column(String(100), unique=True, index=True, nullable=False) |
| | mobile_number = Column(String(20), nullable=False) |
| | password_hash = Column(String(255), nullable=False) |
| | created_at = Column(TIMESTAMP, server_default=func.now()) |
| |
|
| | files = relationship("File", back_populates="owner") |
| |
|
| | class File(Base): |
| | __tablename__ = "files" |
| |
|
| | id = Column(Integer, primary_key=True, index=True) |
| | filename = Column(String(255), nullable=False) |
| | file_type = Column(String(50), nullable=False) |
| | user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) |
| | upload_timestamp = Column(TIMESTAMP, server_default=func.now()) |
| |
|
| | owner = relationship("User", back_populates="files") |
| |
|
| | |
| | |
| | |
| | class UserCreate(BaseModel): |
| | name: str |
| | age: int |
| | address: str |
| | email: EmailStr |
| | mobile_number: str |
| | password: str |
| |
|
| | class UserLogin(BaseModel): |
| | email: EmailStr |
| | password: str |
| |
|
| | class UserResponse(BaseModel): |
| | id: int |
| | name: str |
| | email: str |
| |
|
| | class Config: |
| | from_attributes = True |
| |
|
| | class Token(BaseModel): |
| | access_token: str |
| | token_type: str |
| |
|
| | class TokenData(BaseModel): |
| | email: Optional[str] = None |
| |
|
| | |
| | |
| | |
| |
|
| | app = FastAPI(title="Task 1 API") |
| |
|
| | @app.on_event("startup") |
| | def on_startup(): |
| | """Create DB tables on startup — retry-safe.""" |
| | try: |
| | Base.metadata.create_all(bind=engine) |
| | print("✅ Database tables created / verified.") |
| | except Exception as e: |
| | print(f"⚠️ Could not connect to database on startup: {e}") |
| | print(" The app will start anyway; DB operations will fail until the DB is reachable.") |
| |
|
| | |
| | BASE_DIR = Path(__file__).resolve().parent |
| | app.mount("/static", StaticFiles(directory=str(BASE_DIR / "static")), name="static") |
| | templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) |
| |
|
| | pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
| | oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login") |
| |
|
| | |
| | def verify_password(plain_password, hashed_password): |
| | return pwd_context.verify(plain_password[:72], hashed_password) |
| |
|
| | def get_password_hash(password): |
| | return pwd_context.hash(password[:72]) |
| |
|
| | def get_user_by_email(db: Session, email: str): |
| | return db.query(User).filter(User.email == email).first() |
| |
|
| | |
| | |
| | |
| | @app.post("/api/signup", response_model=UserResponse, status_code=status.HTTP_201_CREATED) |
| | def signup(user: UserCreate, db: Session = Depends(get_db)): |
| | db_user = get_user_by_email(db, email=user.email) |
| | if db_user: |
| | raise HTTPException(status_code=400, detail="Email already registered") |
| | |
| | hashed_password = get_password_hash(user.password) |
| | |
| | new_user = User( |
| | name=user.name, |
| | age=user.age, |
| | address=user.address, |
| | email=user.email, |
| | mobile_number=user.mobile_number, |
| | password_hash=hashed_password |
| | ) |
| | |
| | db.add(new_user) |
| | db.commit() |
| | db.refresh(new_user) |
| | |
| | return new_user |
| |
|
| | @app.post("/api/login", response_model=Token) |
| | def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): |
| | |
| | |
| | user = get_user_by_email(db, email=form_data.username) |
| | if not user: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid credentials", |
| | headers={"WWW-Authenticate": "Bearer"}, |
| | ) |
| | |
| | if not verify_password(form_data.password, user.password_hash): |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid credentials", |
| | headers={"WWW-Authenticate": "Bearer"}, |
| | ) |
| | |
| | |
| | |
| | access_token = f"fake-jwt-token-for-{user.email}" |
| | |
| | return {"access_token": access_token, "token_type": "bearer"} |
| |
|
| | @app.get("/api/users/me", response_model=UserResponse) |
| | def read_users_me(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): |
| | |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if user is None: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| | return user |
| |
|
| | |
| | |
| | |
| |
|
| | UPLOAD_DIR = "uploads" |
| | os.makedirs(UPLOAD_DIR, exist_ok=True) |
| | ALLOWED_EXTENSIONS = {".pdf", ".png", ".jpeg", ".jpg"} |
| |
|
| | from fastapi import UploadFile, File as FastAPIFile |
| | from fastapi.responses import FileResponse |
| | import shutil |
| |
|
| | @app.post("/api/upload") |
| | async def upload_file( |
| | file: UploadFile = FastAPIFile(...), |
| | token: str = Depends(oauth2_scheme), |
| | db: Session = Depends(get_db) |
| | ): |
| | |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if not user: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| |
|
| | |
| | file_ext = os.path.splitext(file.filename)[1].lower() |
| | if file_ext not in ALLOWED_EXTENSIONS: |
| | raise HTTPException(status_code=400, detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}") |
| |
|
| | |
| | custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{file.filename}" |
| | file_path = os.path.join(UPLOAD_DIR, custom_filename) |
| |
|
| | |
| | with open(file_path, "wb") as buffer: |
| | shutil.copyfileobj(file.file, buffer) |
| |
|
| | |
| | db_file = File( |
| | filename=custom_filename, |
| | file_type=file_ext, |
| | user_id=user.id |
| | ) |
| | db.add(db_file) |
| | db.commit() |
| | db.refresh(db_file) |
| |
|
| | return {"message": "File uploaded successfully", "file_id": db_file.id, "filename": custom_filename} |
| |
|
| |
|
| | @app.get("/api/download/{file_id}") |
| | async def download_file( |
| | file_id: int, |
| | token: str = Depends(oauth2_scheme), |
| | db: Session = Depends(get_db) |
| | ): |
| | |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if not user: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| |
|
| | |
| | file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| | if not file_record: |
| | raise HTTPException(status_code=404, detail="File not found") |
| |
|
| | file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| | if not os.path.exists(file_path): |
| | raise HTTPException(status_code=404, detail="File is missing on the server") |
| |
|
| | return FileResponse(path=file_path, filename=file_record.filename) |
| |
|
| |
|
| | |
| | class FileMetaResponse(BaseModel): |
| | id: int |
| | filename: str |
| | file_type: str |
| | upload_timestamp: Optional[datetime] = None |
| |
|
| | class Config: |
| | from_attributes = True |
| |
|
| |
|
| | @app.get("/api/files", response_model=List[FileMetaResponse]) |
| | async def list_files( |
| | token: str = Depends(oauth2_scheme), |
| | db: Session = Depends(get_db) |
| | ): |
| | """List all files belonging to the authenticated user.""" |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if not user: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| |
|
| | files = db.query(File).filter(File.user_id == user.id).all() |
| | return files |
| |
|
| |
|
| | @app.put("/api/files/{file_id}") |
| | async def update_file( |
| | file_id: int, |
| | file: UploadFile = FastAPIFile(...), |
| | token: str = Depends(oauth2_scheme), |
| | db: Session = Depends(get_db) |
| | ): |
| | """Replace an existing file with a new upload.""" |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if not user: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| |
|
| | file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| | if not file_record: |
| | raise HTTPException(status_code=404, detail="File not found") |
| |
|
| | |
| | file_ext = os.path.splitext(file.filename)[1].lower() |
| | if file_ext not in ALLOWED_EXTENSIONS: |
| | raise HTTPException(status_code=400, detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_EXTENSIONS)}") |
| |
|
| | |
| | old_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| | if os.path.exists(old_path): |
| | os.remove(old_path) |
| |
|
| | |
| | custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{file.filename}" |
| | new_path = os.path.join(UPLOAD_DIR, custom_filename) |
| | with open(new_path, "wb") as buffer: |
| | shutil.copyfileobj(file.file, buffer) |
| |
|
| | |
| | file_record.filename = custom_filename |
| | file_record.file_type = file_ext |
| | db.commit() |
| | db.refresh(file_record) |
| |
|
| | return {"message": "File updated successfully", "file_id": file_record.id, "filename": custom_filename} |
| |
|
| |
|
| | @app.delete("/api/files/{file_id}") |
| | async def delete_file( |
| | file_id: int, |
| | token: str = Depends(oauth2_scheme), |
| | db: Session = Depends(get_db) |
| | ): |
| | """Delete a file record and its physical file.""" |
| | email = token.replace("fake-jwt-token-for-", "") |
| | user = get_user_by_email(db, email=email) |
| | if not user: |
| | raise HTTPException(status_code=401, detail="Invalid token") |
| |
|
| | file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| | if not file_record: |
| | raise HTTPException(status_code=404, detail="File not found") |
| |
|
| | |
| | file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| | if os.path.exists(file_path): |
| | os.remove(file_path) |
| |
|
| | db.delete(file_record) |
| | db.commit() |
| |
|
| | return {"message": "File deleted successfully"} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def _get_current_user_from_cookie(request: Request, db: Session): |
| | """Read the auth token from a cookie and return the User or None.""" |
| | token = request.cookies.get("access_token", "") |
| | if not token: |
| | return None |
| | email = token.replace("fake-jwt-token-for-", "") |
| | return get_user_by_email(db, email=email) |
| |
|
| |
|
| | @app.get("/") |
| | def root_redirect(): |
| | return RedirectResponse(url="/login", status_code=302) |
| |
|
| |
|
| | |
| | @app.get("/signup") |
| | def page_signup(request: Request): |
| | return templates.TemplateResponse("signup.html", {"request": request}) |
| |
|
| |
|
| | @app.post("/signup") |
| | def page_signup_post( |
| | request: Request, |
| | name: str = Form(...), |
| | age: int = Form(...), |
| | address: str = Form(...), |
| | email: str = Form(...), |
| | mobile_number: str = Form(...), |
| | password: str = Form(...), |
| | db: Session = Depends(get_db), |
| | ): |
| | existing = get_user_by_email(db, email=email) |
| | if existing: |
| | return templates.TemplateResponse("signup.html", { |
| | "request": request, |
| | "message": "Email already registered", |
| | "msg_type": "error", |
| | }) |
| |
|
| | new_user = User( |
| | name=name, age=age, address=address, |
| | email=email, mobile_number=mobile_number, |
| | password_hash=get_password_hash(password), |
| | ) |
| | db.add(new_user) |
| | db.commit() |
| |
|
| | response = RedirectResponse(url="/login", status_code=302) |
| | return response |
| |
|
| |
|
| | |
| | @app.get("/login") |
| | def page_login(request: Request): |
| | return templates.TemplateResponse("login.html", {"request": request}) |
| |
|
| |
|
| | @app.post("/login") |
| | def page_login_post( |
| | request: Request, |
| | email: str = Form(...), |
| | password: str = Form(...), |
| | db: Session = Depends(get_db), |
| | ): |
| | user = get_user_by_email(db, email=email) |
| | if not user or not verify_password(password, user.password_hash): |
| | return templates.TemplateResponse("login.html", { |
| | "request": request, |
| | "message": "Invalid email or password", |
| | "msg_type": "error", |
| | }) |
| |
|
| | token = f"fake-jwt-token-for-{user.email}" |
| | response = RedirectResponse(url="/dashboard", status_code=302) |
| | response.set_cookie(key="access_token", value=token, httponly=True) |
| | return response |
| |
|
| |
|
| | |
| | @app.get("/dashboard") |
| | def page_dashboard(request: Request, db: Session = Depends(get_db)): |
| | user = _get_current_user_from_cookie(request, db) |
| | if not user: |
| | return RedirectResponse(url="/login", status_code=302) |
| |
|
| | files = db.query(File).filter(File.user_id == user.id).all() |
| | return templates.TemplateResponse("dashboard.html", { |
| | "request": request, |
| | "user": user, |
| | "files": files, |
| | }) |
| |
|
| |
|
| | |
| | @app.post("/upload") |
| | async def page_upload( |
| | request: Request, |
| | file1: UploadFile = FastAPIFile(...), |
| | file2: Optional[UploadFile] = FastAPIFile(None), |
| | db: Session = Depends(get_db), |
| | ): |
| | user = _get_current_user_from_cookie(request, db) |
| | if not user: |
| | return RedirectResponse(url="/login", status_code=302) |
| |
|
| | uploaded = 0 |
| | for f in [file1, file2]: |
| | if f is None or f.filename == "": |
| | continue |
| | file_ext = os.path.splitext(f.filename)[1].lower() |
| | if file_ext not in ALLOWED_EXTENSIONS: |
| | continue |
| | custom_filename = f"{user.id}_{user.name.replace(' ', '_')}_{f.filename}" |
| | dest = os.path.join(UPLOAD_DIR, custom_filename) |
| | with open(dest, "wb") as buf: |
| | shutil.copyfileobj(f.file, buf) |
| | db_file = File(filename=custom_filename, file_type=file_ext, user_id=user.id) |
| | db.add(db_file) |
| | uploaded += 1 |
| |
|
| | db.commit() |
| | return RedirectResponse(url="/dashboard", status_code=302) |
| |
|
| |
|
| | |
| | @app.get("/download/{file_id}") |
| | async def page_download(file_id: int, request: Request, db: Session = Depends(get_db)): |
| | user = _get_current_user_from_cookie(request, db) |
| | if not user: |
| | return RedirectResponse(url="/login", status_code=302) |
| |
|
| | file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| | if not file_record: |
| | return RedirectResponse(url="/dashboard", status_code=302) |
| |
|
| | file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| | if not os.path.exists(file_path): |
| | return RedirectResponse(url="/dashboard", status_code=302) |
| |
|
| | from fastapi.responses import FileResponse as FR |
| | return FR(path=file_path, filename=file_record.filename) |
| |
|
| |
|
| | |
| | @app.get("/delete/{file_id}") |
| | def page_delete(file_id: int, request: Request, db: Session = Depends(get_db)): |
| | user = _get_current_user_from_cookie(request, db) |
| | if not user: |
| | return RedirectResponse(url="/login", status_code=302) |
| |
|
| | file_record = db.query(File).filter(File.id == file_id, File.user_id == user.id).first() |
| | if file_record: |
| | file_path = os.path.join(UPLOAD_DIR, file_record.filename) |
| | if os.path.exists(file_path): |
| | os.remove(file_path) |
| | db.delete(file_record) |
| | db.commit() |
| |
|
| | return RedirectResponse(url="/dashboard", status_code=302) |
| |
|
| |
|
| | |
| | @app.get("/logout") |
| | def page_logout(): |
| | response = RedirectResponse(url="/login", status_code=302) |
| | response.delete_cookie("access_token") |
| | return response |
| |
|