| | |
| | |
| | |
| | |
| | |
| | import re |
| | import cv2 |
| | import sox |
| | import wget |
| | import yt_dlp |
| | import ffmpeg |
| | import pickle |
| | import tarfile |
| | import warnings |
| | import numpy as np |
| | import pandas as pd |
| | from tqdm import tqdm |
| | from skimage import transform |
| | from collections import deque |
| | from urllib.error import HTTPError |
| |
|
| |
|
| | def is_empty(path): |
| | return any(path.iterdir()) == False |
| |
|
| |
|
| | def read_txt_file(txt_filepath): |
| | with open(txt_filepath) as fin: |
| | return (line.strip() for line in fin.readlines()) |
| |
|
| |
|
| | def write_txt_file(lines, out_txt_filepath): |
| | with open(out_txt_filepath, "w") as fout: |
| | fout.writelines("\n".join([ln.strip() for ln in lines])) |
| |
|
| |
|
| | def normalize_text(text): |
| | PUNCS = "!\"#$%&'()*+,-./:;<=>?@[\\]^_`{|}~؟؛,’‘×÷" |
| | |
| | text = re.sub(r"\([^)]*\)", "", text) |
| | |
| | text = text.translate(str.maketrans("", "", PUNCS)) |
| | |
| | text = text.lower() |
| | return text.strip() |
| |
|
| |
|
| | def download_file(url, download_path): |
| | filename = url.rpartition("/")[-1] |
| | if not (download_path / filename).exists(): |
| | try: |
| | |
| | print(f"Downloading {filename} from {url}") |
| | custom_bar = ( |
| | lambda current, total, width=80: wget.bar_adaptive( |
| | round(current / 1024 / 1024, 2), |
| | round(total / 1024 / 1024, 2), |
| | width, |
| | ) |
| | + " MB" |
| | ) |
| | wget.download(url, out=str(download_path / filename), bar=custom_bar) |
| | except Exception as e: |
| | message = f"Downloading {filename} failed!" |
| | raise HTTPError(e.url, e.code, message, e.hdrs, e.fp) |
| | return True |
| |
|
| |
|
| | def extract_tgz(tgz_filepath, extract_path, out_filename=None): |
| | if not tgz_filepath.exists(): |
| | raise FileNotFoundError(f"{tgz_filepath} is not found!!") |
| | tgz_filename = tgz_filepath.name |
| | tgz_object = tarfile.open(tgz_filepath) |
| | if not out_filename: |
| | out_filename = tgz_object.getnames()[0] |
| | |
| | if not (extract_path / out_filename).exists(): |
| | for mem in tqdm(tgz_object.getmembers(), desc=f"Extracting {tgz_filename}"): |
| | out_filepath = extract_path / mem.get_info()["name"] |
| | if mem.isfile() and not out_filepath.exists(): |
| | tgz_object.extract(mem, path=extract_path) |
| | tgz_object.close() |
| |
|
| |
|
| | def download_extract_file_if_not(url, tgz_filepath, download_filename): |
| | download_path = tgz_filepath.parent |
| | if not tgz_filepath.exists(): |
| | |
| | download_file(url, download_path) |
| | |
| | extract_tgz(tgz_filepath, download_path, download_filename) |
| |
|
| |
|
| | def load_meanface_metadata(metadata_path): |
| | mean_face_filepath = metadata_path / "20words_mean_face.npy" |
| | if not mean_face_filepath.exists(): |
| | download_file( |
| | "https://dl.fbaipublicfiles.com/muavic/metadata/20words_mean_face.npy", |
| | metadata_path, |
| | ) |
| | return np.load(mean_face_filepath) |
| |
|
| |
|
| | def load_video_metadata(filepath): |
| | if not filepath.exists(): |
| | |
| | lang_dir = filepath.parent.parent |
| | lang = lang_dir.name |
| | tgz_filepath = lang_dir.parent / f"{lang}_metadata.tgz" |
| | download_extract_file_if_not( |
| | url=f"https://dl.fbaipublicfiles.com/muavic/metadata/{lang}_metadata.tgz", |
| | tgz_filepath=tgz_filepath, |
| | download_filename=lang |
| | ) |
| | if not filepath.exists(): |
| | |
| | return None |
| | assert filepath.exists(), f"{filepath} should've been downloaded!" |
| | with open(filepath, "rb") as fin: |
| | metadata = pickle.load(fin) |
| | return metadata |
| |
|
| |
|
| | def download_video_from_youtube(download_path, yt_id): |
| | """Downloads a video from YouTube given its id on YouTube""" |
| | video_out_path = download_path / f"{yt_id}.mp4" |
| | if video_out_path.exists(): |
| | downloaded = True |
| | else: |
| | url = f"https://www.youtube.com/watch?v={yt_id}" |
| | |
| | |
| | ydl_opts = {"quiet": True, "format": "mp4", "outtmpl": str(video_out_path)} |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | try: |
| | ydl.download([url]) |
| | downloaded = True |
| | except yt_dlp.utils.DownloadError: |
| | downloaded = False |
| | return downloaded |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | def resize_frames(input_frames, new_size): |
| | resized_frames = [] |
| | for frame in input_frames: |
| | try: |
| | resized_frames.append(cv2.resize(frame, new_size)) |
| | except: |
| | pass |
| | return resized_frames |
| |
|
| |
|
| | def get_audio_duration(audio_filepath): |
| | return sox.file_info.duration(audio_filepath) |
| |
|
| |
|
| | def get_video_duration(video_filepath): |
| | try: |
| | streams = ffmpeg.probe(video_filepath)["streams"] |
| | for stream in streams: |
| | if stream["codec_type"] == "video": |
| | return float(stream["duration"]) |
| | except: |
| | warnings.warn(f"Video file: `{video_filepath}` is corrupted... skipping!!") |
| | return -1 |
| |
|
| |
|
| | def get_video_resolution(video_filepath): |
| | for stream in ffmpeg.probe(video_filepath)["streams"]: |
| | if stream["codec_type"] == "video": |
| | height = int(stream["height"]) |
| | width = int(stream["width"]) |
| | return height, width |
| | raise TypeError(f"Input file: {video_filepath} doesn't have video stream!") |
| |
|
| |
|
| | def get_audio_video_info(audio_path, video_path, fid): |
| | audio_filepath = audio_path / f"{fid}.wav" |
| | video_filepath = video_path / f"{fid}.mp4" |
| | audio_frames = ( |
| | int(get_audio_duration(audio_filepath) * 16_000) |
| | if audio_filepath.exists() |
| | else -1 |
| | ) |
| | video_frames = ( |
| | int(get_video_duration(video_filepath) * 25) if video_filepath.exists() else -1 |
| | ) |
| | return { |
| | "id": fid, |
| | "video": str(video_filepath), |
| | "audio": str(audio_filepath), |
| | "video_frames": video_frames, |
| | "audio_samples": audio_frames, |
| | } |
| |
|
| |
|
| | def split_video_to_frames(video_filepath, fstart=None, fend=None, out_fps=25): |
| | |
| | |
| | width, height = get_video_resolution(video_filepath) |
| | video_stream = ffmpeg.input(str(video_filepath)).video.filter("fps", fps=out_fps) |
| | channels = 3 |
| | try: |
| | if fstart is not None and fend is not None: |
| | process = ( |
| | video_stream.trim(start_frame=fstart, end_frame=fend) |
| | .setpts("PTS-STARTPTS") |
| | .output("pipe:", format="rawvideo", pix_fmt="bgr24") |
| | .run_async(pipe_stdout=True, quiet=True) |
| | ) |
| | frames_counter = 0 |
| | while frames_counter < fend - fstart: |
| | in_bytes = process.stdout.read(width * height * channels) |
| | in_frame = np.frombuffer(in_bytes, np.uint8).reshape( |
| | width, height, channels |
| | ) |
| | yield in_frame |
| | frames_counter += 1 |
| | else: |
| | process = ( |
| | video_stream.setpts("PTS-STARTPTS") |
| | .output("pipe:", format="rawvideo", pix_fmt="bgr24") |
| | .run_async(pipe_stdout=True, quiet=True) |
| | ) |
| | while True: |
| | in_bytes = process.stdout.read(width * height * channels) |
| | if not in_bytes: |
| | break |
| | in_frame = np.frombuffer(in_bytes, np.uint8).reshape( |
| | width, height, channels |
| | ) |
| | yield in_frame |
| |
|
| | finally: |
| | process.stdout.close() |
| | process.wait() |
| |
|
| |
|
| | def save_video(frames, out_filepath, fps, vcodec="libx264"): |
| | if len(frames) == 0: |
| | warnings.warn( |
| | f"Video segment `{out_filepath.stem}` has no metadata..." + |
| | " skipping!!" |
| | ) |
| | return |
| | height, width, _ = frames[0].shape |
| | process = ( |
| | ffmpeg.input( |
| | "pipe:", format="rawvideo", pix_fmt="bgr24", s="{}x{}".format(width, height) |
| | ) |
| | .output(str(out_filepath), pix_fmt="bgr24", vcodec=vcodec, r=fps) |
| | .overwrite_output() |
| | .run_async(pipe_stdin=True, quiet=True) |
| | ) |
| | for _, frame in enumerate(frames): |
| | try: |
| | process.stdin.write(frame.astype(np.uint8).tobytes()) |
| | except: |
| | print(process.stderr.read()) |
| | process.stdin.close() |
| | process.wait() |
| |
|
| |
|
| | def load_video(filename): |
| | cap = cv2.VideoCapture(filename) |
| | while cap.isOpened(): |
| | ret, frame = cap.read() |
| | if ret: |
| | yield frame |
| | else: |
| | break |
| | cap.release() |
| |
|
| |
|
| | def warp_img(src, dst, img, std_size): |
| | tform = transform.estimate_transform( |
| | "similarity", src, dst |
| | ) |
| | warped = transform.warp( |
| | img, inverse_map=tform.inverse, output_shape=std_size |
| | ) |
| | warped = warped * 255 |
| | warped = warped.astype("uint8") |
| | return warped, tform |
| |
|
| |
|
| | def apply_transform(trans, img, std_size): |
| | warped = transform.warp(img, inverse_map=trans.inverse, output_shape=std_size) |
| | warped = warped * 255 |
| | warped = warped.astype("uint8") |
| | return warped |
| |
|
| |
|
| | def cut_patch(img, metadata, height, width, threshold=5): |
| | center_x, center_y = np.mean(metadata, axis=0) |
| | if center_y - height < 0: |
| | center_y = height |
| | if center_y - height < 0 - threshold: |
| | raise Exception("too much bias in height") |
| | if center_x - width < 0: |
| | center_x = width |
| | if center_x - width < 0 - threshold: |
| | raise Exception("too much bias in width") |
| |
|
| | if center_y + height > img.shape[0]: |
| | center_y = img.shape[0] - height |
| | if center_y + height > img.shape[0] + threshold: |
| | raise Exception("too much bias in height") |
| | if center_x + width > img.shape[1]: |
| | center_x = img.shape[1] - width |
| | if center_x + width > img.shape[1] + threshold: |
| | raise Exception("too much bias in width") |
| |
|
| | cutted_img = np.copy( |
| | img[ |
| | int(round(center_y) - round(height)) : int(round(center_y) + round(height)), |
| | int(round(center_x) - round(width)) : int(round(center_x) + round(width)), |
| | ] |
| | ) |
| | return cutted_img |
| |
|
| |
|
| | def crop_patch( |
| | video_frames, |
| | num_frames, |
| | metadata, |
| | mean_face_metadata, |
| | std_size=(256, 256), |
| | window_margin=12, |
| | start_idx=48, |
| | stop_idx=68, |
| | crop_height=96, |
| | crop_width=96, |
| | ): |
| | """Crop mouth patch""" |
| | stablePntsIDs = [33, 36, 39, 42, 45] |
| | margin = min(num_frames, window_margin) |
| | q_frame, q_metadata = deque(), deque() |
| | sequence = [] |
| | for frame_idx, frame in enumerate(video_frames): |
| | if frame_idx >= len(metadata): |
| | break |
| | q_metadata.append(metadata[frame_idx]) |
| | q_frame.append(frame) |
| | if len(q_frame) == margin: |
| | smoothed_metadata = np.mean(q_metadata, axis=0) |
| | cur_metadata = q_metadata.popleft() |
| | cur_frame = q_frame.popleft() |
| | |
| | trans_frame, trans = warp_img( |
| | smoothed_metadata[stablePntsIDs, :], |
| | mean_face_metadata[stablePntsIDs, :], |
| | cur_frame, |
| | std_size, |
| | ) |
| | trans_metadata = trans(cur_metadata) |
| | |
| | sequence.append( |
| | cut_patch( |
| | trans_frame, |
| | trans_metadata[start_idx:stop_idx], |
| | crop_height // 2, |
| | crop_width // 2, |
| | ) |
| | ) |
| |
|
| | while q_frame: |
| | cur_frame = q_frame.popleft() |
| | |
| | trans_frame = apply_transform(trans, cur_frame, std_size) |
| | |
| | trans_metadata = trans(q_metadata.popleft()) |
| | |
| | sequence.append( |
| | cut_patch( |
| | trans_frame, |
| | trans_metadata[start_idx:stop_idx], |
| | crop_height // 2, |
| | crop_width // 2, |
| | ) |
| | ) |
| | return sequence |
| |
|
| |
|
| | def read_av_manifest(tsv_filepath): |
| | with open(tsv_filepath) as fin: |
| | res = [] |
| | for ln in fin.readlines()[1:]: |
| | id_, video, audio, video_frames, audio_samples = ln.strip().split("\t") |
| | res.append( |
| | { |
| | "id": id_, |
| | "video": video, |
| | "audio": audio, |
| | "video_frames": video_frames, |
| | "audio_samples": audio_samples, |
| | } |
| | ) |
| | df = pd.DataFrame(res) |
| | df["video_frames"] = df["video_frames"].astype(int) |
| | df["audio_samples"] = df["audio_samples"].astype(int) |
| | return df |
| |
|
| |
|
| | def write_av_manifest(df, out_filepath): |
| | with open(out_filepath, "w") as fout: |
| | fout.write("/\n") |
| | df.to_csv(out_filepath, sep="\t", header=False, index=False, mode="a") |
| |
|