| | import os |
| | import math |
| | import hashlib |
| | import numpy as np |
| | import cv2 |
| | from PIL import Image |
| | from tqdm import tqdm |
| | import gradio as gr |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| |
|
| | |
| | PADDING_TOKEN = b"<PAD>" |
| | CHUNK_SIZE = 1000 * 1024 |
| | FRAMES_DIR = './frames' |
| | WIDTH, HEIGHT = 1920, 1080 |
| | REQUIRED_LENGTH = WIDTH * HEIGHT |
| | VIDEO_PATH = './output_video.mp4' |
| |
|
| | |
| | if not os.path.exists(FRAMES_DIR): |
| | os.makedirs(FRAMES_DIR) |
| |
|
| | def encode(file_path): |
| | """Encode a file by splitting it into chunks, converting those chunks to images, and creating a video.""" |
| | if not os.path.exists('./parts'): |
| | os.makedirs('./parts') |
| | file_size = os.path.getsize(file_path) |
| | num_chunks = math.ceil(file_size / CHUNK_SIZE) |
| | |
| | with open(file_path, 'rb') as f: |
| | for i in tqdm(range(num_chunks), desc="Splitting file", unit="chunk"): |
| | chunk = f.read(CHUNK_SIZE) |
| | |
| | if len(chunk) < CHUNK_SIZE: |
| | padding_size = CHUNK_SIZE - len(chunk) |
| | padding = np.full(padding_size, PADDING_TOKEN, dtype='S1') |
| | chunk = np.concatenate((np.frombuffer(chunk, dtype='S1'), padding)) |
| | chunk_file_path = os.path.join('./parts', f"{i}.part") |
| | with open(chunk_file_path, 'wb') as chunk_file: |
| | chunk_file.write(chunk) |
| | |
| | chunk_files = [os.path.join('./parts', f"{i}.part") for i in range(num_chunks)] |
| | with ThreadPoolExecutor() as executor: |
| | futures = [executor.submit(convert_part_to_image, chunk_file, i) for i, chunk_file in enumerate(chunk_files)] |
| | for future in tqdm(as_completed(futures), desc="Converting chunks to images", unit="chunk", total=len(chunk_files)): |
| | future.result() |
| | |
| | create_video_from_frames(FRAMES_DIR, VIDEO_PATH) |
| | return VIDEO_PATH |
| |
|
| | def decode(video_path, output_file_path): |
| | """Decode a video back to the original file by extracting frames, converting them back to chunks, and merging them.""" |
| | |
| | extracted_frames_dir = './extracted_frames' |
| | extract_frames_from_video(video_path, extracted_frames_dir) |
| | |
| | extracted_frame_files = sorted([os.path.join(extracted_frames_dir, f) for f in os.listdir(extracted_frames_dir) if f.endswith('.png')]) |
| | |
| | chunk_files = [] |
| | with ThreadPoolExecutor() as executor: |
| | futures = [executor.submit(convert_frame_to_chunk, frame_file, i) for i, frame_file in enumerate(extracted_frame_files)] |
| | for future in tqdm(as_completed(futures), desc="Converting frames to chunks", unit="frame", total=len(extracted_frame_files)): |
| | chunk_file_path = future.result() |
| | chunk_files.append(chunk_file_path) |
| | |
| | merge_chunks(chunk_files, output_file_path) |
| | |
| | original_hash = calculate_sha256(output_file_path) |
| | return output_file_path, f"SHA-256 hash of the decoded file: {original_hash}" |
| |
|
| | def convert_frame_to_chunk(frame_file, i): |
| | binary_string = hd_rgb_image_to_binary(frame_file) |
| | chunk_file_path = os.path.join('./parts', f"{i}.part") |
| | with open(chunk_file_path, 'wb') as chunk_file: |
| | chunk_file.write(int(binary_string, 2).to_bytes(len(binary_string) // 8, byteorder='big')) |
| | return chunk_file_path |
| |
|
| | def binary_string_to_rgb_image(binary_string, width=1920, height=1080, output_path='output_rgb.png'): |
| | """Convert a binary string to an RGB image and save it.""" |
| | if len(binary_string) != REQUIRED_LENGTH * 3: |
| | raise ValueError(f"Binary string must be exactly {REQUIRED_LENGTH * 3} bits for {width}x{height} resolution") |
| | |
| | image = Image.new('RGB', (width, height)) |
| | |
| | pixels = image.load() |
| | for y in range(height): |
| | for x in range(width): |
| | index = (y * width + x) * 3 |
| | r = int(binary_string[index:index+1], 2) * 255 |
| | g = int(binary_string[index+1:index+2], 2) * 255 |
| | b = int(binary_string[index+2:index+3], 2) * 255 |
| | pixels[x, y] = (r, g, b) |
| | |
| | image.save(output_path) |
| |
|
| | def hd_rgb_image_to_binary(image_path): |
| | """Convert an RGB image back to a binary string.""" |
| | image = Image.open(image_path).convert('RGB') |
| | width, height = image.size |
| | binary_string = "" |
| | pixels = image.load() |
| | for y in range(height): |
| | for x in range(width): |
| | r, g, b = pixels[x, y] |
| | binary_string += '1' if r == 255 else '0' |
| | binary_string += '1' if g == 255 else '0' |
| | binary_string += '1' if b == 255 else '0' |
| | return binary_string |
| |
|
| | def convert_part_to_image(chunk_file_path, part_index, width=1920, height=1080): |
| | """Convert a chunk file to an image and save it in the /frames directory.""" |
| | with open(chunk_file_path, 'rb') as chunk_file: |
| | chunk_data = chunk_file.read() |
| | |
| | binary_string = ''.join([f'{byte:08b}' for byte in chunk_data]) |
| | |
| | binary_string = binary_string[:REQUIRED_LENGTH * 3].ljust(REQUIRED_LENGTH * 3, '0') |
| | image_path = os.path.join(FRAMES_DIR, f"frame_{part_index}.png") |
| | binary_string_to_rgb_image(binary_string, width, height, image_path) |
| |
|
| | def merge_chunks(chunk_files, output_file): |
| | """Merge chunk files into a single file, removing padding.""" |
| | with open(output_file, 'wb') as f: |
| | |
| | for chunk_file in tqdm(chunk_files, desc="Merging chunks", unit="chunk"): |
| | with open(chunk_file, 'rb') as chunk: |
| | data = chunk.read() |
| | |
| | data = data.rstrip(PADDING_TOKEN) |
| | f.write(data) |
| |
|
| | def calculate_sha256(file_path): |
| | """Calculate the SHA-256 hash of a file.""" |
| | sha256_hash = hashlib.sha256() |
| | with open(file_path, 'rb') as f: |
| | for byte_block in iter(lambda: f.read(4096), b""): |
| | sha256_hash.update(byte_block) |
| | return sha256_hash.hexdigest() |
| |
|
| | def create_video_from_frames(frame_folder, video_path, width=1920, height=1080, fps=30): |
| | """Create a video from a folder of image frames.""" |
| | frame_files = sorted([os.path.join(frame_folder, f) for f in os.listdir(frame_folder) if f.endswith('.png')]) |
| | |
| | fourcc = cv2.VideoWriter_fourcc(*'H264') |
| | video_writer = cv2.VideoWriter(video_path, fourcc, fps, (width, height)) |
| | |
| | for frame_file in tqdm(frame_files, desc="Creating video", unit="frame"): |
| | frame = cv2.imread(frame_file) |
| | video_writer.write(frame) |
| | video_writer.release() |
| |
|
| | def extract_frames_from_video(video_path, output_folder): |
| | """Extract frames from a video and save them to the output folder.""" |
| | if not os.path.exists(output_folder): |
| | os.makedirs(output_folder) |
| | cap = cv2.VideoCapture(video_path) |
| | frame_index = 0 |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | |
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| | frame_path = os.path.join(output_folder, f"frame_{frame_index}.png") |
| | cv2.imwrite(frame_path, frame) |
| | frame_index += 1 |
| | cap.release() |
| |
|
| | def cleanup(): |
| | """Clean up temporary files and directories.""" |
| | directories_to_remove = ['./parts', './frames', './extracted_frames'] |
| | for directory in directories_to_remove: |
| | if os.path.exists(directory): |
| | for file_name in os.listdir(directory): |
| | file_path = os.path.join(directory, file_name) |
| | try: |
| | if os.path.isfile(file_path) or os.path.islink(file_path): |
| | os.unlink(file_path) |
| | elif os.path.isdir(file_path): |
| | os.rmdir(file_path) |
| | except Exception as e: |
| | print(f"Failed to delete {file_path}. Reason: {e}") |
| | os.rmdir(directory) |
| |
|
| | def encode_interface(file): |
| | video_path = encode(file.name) |
| | return video_path |
| |
|
| | def decode_interface(video, output_file): |
| | output_file_path, result = decode(video.name, output_file.name) |
| | return output_file_path, result |
| |
|
| | |
| | with gr.Blocks() as demo: |
| | gr.Markdown("# File Encoding/Decoding Tool") |
| |
|
| | with gr.Tab("Encode"): |
| | gr.Markdown("### Steps to Encode a File") |
| | gr.Markdown("1. Upload the file you want to encode.") |
| | gr.Markdown("2. Click the 'Encode' button.") |
| | gr.Markdown("3. Download the encoded video.") |
| |
|
| | file_input = gr.File(label="Upload file to encode") |
| | encode_button = gr.Button("Encode") |
| | encode_output = gr.Video(label="Encoded Video") |
| | encode_button.click(encode_interface, inputs=file_input, outputs=encode_output) |
| |
|
| | with gr.Tab("Decode"): |
| | gr.Markdown("### Steps to Decode a Video") |
| | gr.Markdown("1. Upload the video you want to decode.") |
| | gr.Markdown("2. Specify the output file path.") |
| | gr.Markdown("3. Click the 'Decode' button.") |
| | gr.Markdown("4. Download the decoded file.") |
| | gr.Markdown("5. View the decoding result.") |
| |
|
| | video_input = gr.File(label="Upload video to decode") |
| | output_file_input = gr.File(label="Output file path") |
| | decode_button = gr.Button("Decode") |
| | decode_output = gr.File(label="Decoded File", interactive=True) |
| | decode_result = gr.Textbox(label="Decoding Result") |
| | decode_button.click(decode_interface, inputs=[video_input, output_file_input], outputs=[decode_output, decode_result]) |
| |
|
| | demo.launch() |