File size: 3,328 Bytes
d23039a
 
fba30db
d23039a
 
 
 
 
 
780a87a
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
 
 
 
 
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
780a87a
 
 
d23039a
 
 
 
 
 
 
 
 
 
 
 
 
fba30db
d23039a
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from __future__ import annotations

import asyncio
import io
import os
import tempfile
from typing import Iterable

from fastapi import HTTPException, UploadFile, status
from config import settings

IMAGE_MAGIC_BYTES: dict[bytes, str] = {
    b"\xff\xd8\xff": "image/jpeg",
    b"\x89PNG\r\n\x1a\n": "image/png",
    b"RIFF": "image/webp",  # partial; WEBP has 'RIFF....WEBP'
}


def _detect_mime_by_magic(head: bytes) -> str | None:
    for sig, mime in IMAGE_MAGIC_BYTES.items():
        if head.startswith(sig):
            if mime == "image/webp" and b"WEBP" not in head[:16]:
                continue
            return mime
    return None


async def read_upload_bytes(
    file: UploadFile,
    allowed_mimes: Iterable[str],
    max_size_mb: int,
) -> tuple[bytes, str]:
    """Read an UploadFile into memory after validating type and size.
    Returns (raw_bytes, detected_mime). Raises HTTPException on failure.
    """
    data = await file.read()
    if not data:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Empty file — no bytes received",
        )
    size_mb = len(data) / (1024 * 1024)
    if size_mb > max_size_mb:
        raise HTTPException(
            status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
            detail=f"File too large ({size_mb:.1f} MB > {max_size_mb} MB)",
        )

    mime = _detect_mime_by_magic(data[:16]) or (file.content_type or "")
    if mime not in allowed_mimes:
        raise HTTPException(
            status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
            detail=f"Unsupported type '{mime}'. Allowed: {list(allowed_mimes)}",
        )
    return data, mime


def bytes_to_buffer(data: bytes) -> io.BytesIO:
    return io.BytesIO(data)


async def save_upload_to_tempfile(
    file: UploadFile,
    allowed_mimes: Iterable[str],
    max_size_mb: int,
    suffix: str = ".mp4",
) -> tuple[str, str]:
    """Stream an UploadFile to a temp file on disk. Returns (path, mime).
    MIME is taken from the client's content_type (no magic-byte check for videos).
    Caller is responsible for deleting the temp file.
    """
    mime = (file.content_type or "").lower()
    if mime not in allowed_mimes:
        raise HTTPException(
            status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
            detail=f"Unsupported type '{mime}'. Allowed: {list(allowed_mimes)}",
        )

    max_bytes = max_size_mb * 1024 * 1024
    upload_dir = os.path.abspath(settings.UPLOAD_DIR)
    os.makedirs(upload_dir, exist_ok=True)
    fd, path = tempfile.mkstemp(suffix=suffix, prefix="ds_vid_", dir=upload_dir)
    written = 0
    try:
        with os.fdopen(fd, "wb") as out:
            while True:
                chunk = await file.read(1024 * 1024)
                if not chunk:
                    break
                written += len(chunk)
                if written > max_bytes:
                    raise HTTPException(
                        status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                        detail=f"File too large (> {max_size_mb} MB)",
                    )
                await asyncio.to_thread(out.write, chunk)
    except Exception:
        try:
            os.unlink(path)
        except OSError:
            pass
        raise
    return path, mime