Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import csv | |
| import os | |
| from pathlib import Path | |
| import zipfile | |
| from datasets import load_dataset | |
| from huggingface_hub import hf_hub_download | |
| from tqdm import tqdm | |
| from src.data.eye_extract import ( | |
| extract_sequences_from_video_bytes, | |
| extract_sequences_from_video_path, | |
| ) | |
| HF_DATASET = "bitmind/FaceForensicsC23" | |
| HF_ZIP_FILE = "FaceForensics++_C23.zip" | |
| REAL_PATH_MARKER = "/real/" | |
| FAKE_PATH_MARKER = "/fake/deepfakes/" | |
| VIDEO_SUFFIXES = {".mp4", ".avi", ".mov", ".mkv"} | |
| def _video_id_from_path(repo_path: str) -> str: | |
| """Extract stable video id from zip:// or filesystem paths.""" | |
| normalized = repo_path.replace("\\", "/") | |
| if "://" in normalized: | |
| inner = normalized.split("://", 1)[1].split("::")[0] | |
| return Path(inner).stem | |
| return Path(normalized).stem | |
| def _is_fake_path(path_lower: str) -> bool: | |
| return FAKE_PATH_MARKER in path_lower and "deepfakedetection" not in path_lower | |
| def _is_real_path(path_lower: str) -> bool: | |
| return REAL_PATH_MARKER in path_lower | |
| def _video_path_from_item(item: dict) -> str: | |
| video = item.get("video") | |
| if isinstance(video, dict): | |
| return str(video.get("path", "")) | |
| return str(video or "") | |
| def _video_bytes_from_item(item: dict) -> bytes | None: | |
| import fsspec | |
| video = item.get("video") | |
| if not isinstance(video, dict): | |
| return None | |
| raw = video.get("bytes") | |
| if raw is not None: | |
| return raw | |
| path = video.get("path") | |
| if not path: | |
| return None | |
| if str(path).startswith("zip://"): | |
| with fsspec.open(path, "rb") as handle: | |
| return handle.read() | |
| if os.path.exists(path): | |
| return Path(path).read_bytes() | |
| return None | |
| def iter_hf_stream(num_real: int, num_fake: int): | |
| """ | |
| Stream one HF dataset row at a time (no torchcodec decode). | |
| Each video is loaded into RAM, processed, then discarded. | |
| """ | |
| ds = load_dataset(HF_DATASET, split="train", streaming=True) | |
| ds = ds.decode(False) | |
| if num_real == 0 and num_fake > 0: | |
| print("Skipping first ~1000 rows (DeepFakeDetection) to reach /fake/Deepfakes/ ...") | |
| ds = ds.skip(1000) | |
| elif num_fake == 0 and num_real > 0: | |
| print("Skipping first ~6000 rows to reach /real/ videos ...") | |
| ds = ds.skip(6000) | |
| real_count = 0 | |
| fake_count = 0 | |
| scanned = 0 | |
| for item in ds: | |
| scanned += 1 | |
| if scanned % 250 == 0 and (num_real > 0 or num_fake > 0): | |
| tqdm.write(f" scanned {scanned} rows (real={real_count}, fake={fake_count})") | |
| video_path_str = _video_path_from_item(item) | |
| path_lower = video_path_str.lower() | |
| is_real = _is_real_path(path_lower) and real_count < num_real | |
| is_fake = _is_fake_path(path_lower) and fake_count < num_fake | |
| if not is_real and not is_fake: | |
| continue | |
| label = 0 if is_real else 1 | |
| video_bytes = _video_bytes_from_item(item) | |
| if video_bytes is None: | |
| continue | |
| yield video_path_str, video_bytes, label | |
| if is_real: | |
| real_count += 1 | |
| else: | |
| fake_count += 1 | |
| if real_count >= num_real and fake_count >= num_fake: | |
| break | |
| def _resolve_hub_zip() -> Path: | |
| zip_path = hf_hub_download( | |
| repo_id=HF_DATASET, | |
| filename=HF_ZIP_FILE, | |
| repo_type="dataset", | |
| ) | |
| return Path(zip_path) | |
| def iter_zip_members(num_real: int, num_fake: int, zip_path: Path): | |
| """Read one video at a time from the HF-hosted zip (cached under ~/.cache/huggingface).""" | |
| real_count = 0 | |
| fake_count = 0 | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| for member in zf.namelist(): | |
| path = member.replace("\\", "/") | |
| if Path(path).suffix.lower() not in VIDEO_SUFFIXES: | |
| continue | |
| path_lower = path.lower() | |
| is_real = _is_real_path(path_lower) and real_count < num_real | |
| is_fake = _is_fake_path(path_lower) and fake_count < num_fake | |
| if not is_real and not is_fake: | |
| continue | |
| label = 0 if is_real else 1 | |
| with zf.open(member) as handle: | |
| video_bytes = handle.read() | |
| yield path, video_bytes, label | |
| if is_real: | |
| real_count += 1 | |
| else: | |
| fake_count += 1 | |
| if real_count >= num_real and fake_count >= num_fake: | |
| break | |
| def iter_videos(num_real: int, num_fake: int): | |
| """Stream via HF datasets API; fall back to zip cache if needed.""" | |
| try: | |
| yield from iter_hf_stream(num_real, num_fake) | |
| return | |
| except Exception as exc: | |
| print(f"HF streaming failed ({exc}). Trying zip cache (may be slow first time).") | |
| zip_path = _resolve_hub_zip() | |
| yield from iter_zip_members(num_real, num_fake, zip_path) | |
| def _append_metadata_row(metadata_path: Path, row: dict) -> None: | |
| write_header = not metadata_path.exists() | |
| with metadata_path.open("a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=["npz_path", "label", "video_id", "split"]) | |
| if write_header: | |
| writer.writeheader() | |
| writer.writerow(row) | |
| def stream_and_extract( | |
| out_root: str | Path, | |
| num_real: int = 200, | |
| num_fake: int = 200, | |
| seq_len: int = 16, | |
| metadata_csv: str | Path = "data/metadata.csv", | |
| seed: int = 42, | |
| append_metadata: bool = False, | |
| ) -> None: | |
| out_root = Path(out_root) | |
| out_root.mkdir(parents=True, exist_ok=True) | |
| metadata_path = Path(metadata_csv) | |
| metadata_path.parent.mkdir(parents=True, exist_ok=True) | |
| print( | |
| f"Streaming from HuggingFace ({HF_DATASET}) — one video at a time, " | |
| "no full zip download into project folder." | |
| ) | |
| print("Videos are processed in RAM; only .npz files are saved under data/processed/.") | |
| metadata_rows: list[dict] = [] | |
| real_count = 0 | |
| fake_count = 0 | |
| pbar = tqdm(total=num_real + num_fake, desc="Streaming videos") | |
| for repo_path, video_bytes, label in iter_videos(num_real, num_fake): | |
| video_id = _video_id_from_path(repo_path) | |
| sequences = extract_sequences_from_video_bytes( | |
| video_bytes, label, video_id, seq_len=seq_len | |
| ) | |
| if not sequences: | |
| print(f"Warning: no face/eye sequences in {repo_path}, skipping.") | |
| continue | |
| for seq in sequences: | |
| npz_path = out_root / f"{seq['video_id']}.npz" | |
| np.savez_compressed( | |
| npz_path, | |
| frames=seq["frames"], | |
| ear=seq["ear"], | |
| label=np.array(seq["label"]), | |
| video_id=np.array(seq["video_id"]), | |
| ) | |
| row = { | |
| "npz_path": str(npz_path.resolve()), | |
| "label": label, | |
| "video_id": video_id, | |
| "split": "train", | |
| } | |
| metadata_rows.append(row) | |
| # Write progressively so Ctrl+C doesn't lose completed videos | |
| _append_metadata_row(metadata_path, row) | |
| if label == 0: | |
| real_count += 1 | |
| else: | |
| fake_count += 1 | |
| pbar.update(1) | |
| pbar.close() | |
| if append_metadata and metadata_path.exists(): | |
| import pandas as pd | |
| existing = pd.read_csv(metadata_path).to_dict(orient="records") | |
| metadata_rows = existing + metadata_rows | |
| if not metadata_rows: | |
| raise RuntimeError( | |
| "No sequences extracted. Ensure `hf auth login` is done and paths contain " | |
| "/real/ or /fake/Deepfakes/. For fake-only runs, wait 1–2 min or use --append " | |
| "if adding to an existing metadata.csv." | |
| ) | |
| rng = np.random.default_rng(seed) | |
| unique_ids = sorted({row["video_id"] for row in metadata_rows}) | |
| rng.shuffle(unique_ids) | |
| n = len(unique_ids) | |
| train_ids = set(unique_ids[: int(0.7 * n)]) | |
| val_ids = set(unique_ids[int(0.7 * n) : int(0.85 * n)]) | |
| for row in metadata_rows: | |
| if row["video_id"] in train_ids: | |
| row["split"] = "train" | |
| elif row["video_id"] in val_ids: | |
| row["split"] = "val" | |
| else: | |
| row["split"] = "test" | |
| with metadata_path.open("w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=["npz_path", "label", "video_id", "split"]) | |
| writer.writeheader() | |
| writer.writerows(metadata_rows) | |
| print(f"\nDone! {real_count} real + {fake_count} fake videos processed.") | |
| print(f"Total sequences: {len(metadata_rows)}") | |
| print(f"Metadata written to: {metadata_path}") | |
| print(f"Sequences saved to: {out_root}") | |
| def main() -> None: | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--out-root", default="data/processed") | |
| parser.add_argument("--metadata-csv", default="data/metadata.csv") | |
| parser.add_argument("--num-real", type=int, default=200) | |
| parser.add_argument("--num-fake", type=int, default=200) | |
| parser.add_argument("--seq-len", type=int, default=16) | |
| parser.add_argument("--seed", type=int, default=42) | |
| parser.add_argument( | |
| "--append", | |
| action="store_true", | |
| help="Append new sequences to existing metadata.csv instead of overwriting.", | |
| ) | |
| args = parser.parse_args() | |
| stream_and_extract( | |
| out_root=args.out_root, | |
| num_real=args.num_real, | |
| num_fake=args.num_fake, | |
| seq_len=args.seq_len, | |
| metadata_csv=args.metadata_csv, | |
| seed=args.seed, | |
| append_metadata=args.append, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |