| | from fastapi import FastAPI, HTTPException, BackgroundTasks |
| | from fastapi.responses import JSONResponse |
| | import asyncio |
| | import os |
| | import time |
| | import json |
| | from typing import Optional, Dict, Any, List |
| | from enum import Enum |
| | from pydantic import BaseModel |
| | from rich.progress import ( |
| | Progress, |
| | SpinnerColumn, |
| | TimeElapsedColumn, |
| | DownloadColumn, |
| | TransferSpeedColumn, |
| | BarColumn, |
| | TextColumn, |
| | ) |
| | from rich.console import Console |
| | from rich.live import Live |
| | from rich.table import Table |
| | import download_channel |
| |
|
| | |
| | console = Console() |
| |
|
| | app = FastAPI(title="Telegram Channel Downloader API") |
| |
|
| | |
| | active_downloads: Dict[str, Dict[str, Any]] = {} |
| |
|
| | class FileStatus(str, Enum): |
| | PENDING = "pending" |
| | DOWNLOADING = "downloading" |
| | DOWNLOADED = "downloaded" |
| | FAILED = "failed" |
| |
|
| | class ChannelFile(BaseModel): |
| | message_id: int |
| | filename: str |
| | status: FileStatus |
| | size: Optional[int] = None |
| | download_time: Optional[float] = None |
| | error: Optional[str] = None |
| | upload_path: Optional[str] = None |
| |
|
| | class DownloadState(BaseModel): |
| | channel: str |
| | last_scanned_id: Optional[int] = None |
| | files: List[ChannelFile] = [] |
| | current_download: Optional[int] = None |
| | last_updated: float = time.time() |
| |
|
| | class DownloadRequest(BaseModel): |
| | channel: Optional[str] = None |
| | message_limit: Optional[int] = None |
| |
|
| | class DownloadStatus(BaseModel): |
| | channel: str |
| | status: str |
| | message_count: int = 0 |
| | downloaded: int = 0 |
| | downloading: Optional[str] = None |
| | error: Optional[str] = None |
| |
|
| | def create_hf_dataset(token: str) -> bool: |
| | """Create the Hugging Face dataset if it doesn't exist.""" |
| | try: |
| | from huggingface_hub import create_repo, RepoNotFoundError |
| | try: |
| | |
| | create_repo( |
| | repo_id=download_channel.HF_REPO_ID, |
| | token=token, |
| | repo_type="dataset", |
| | exist_ok=True |
| | ) |
| | console.print(f"[green]Created or verified dataset:[/green] {download_channel.HF_REPO_ID}") |
| | |
| | |
| | initial_state = DownloadState(channel=download_channel.CHANNEL) |
| | with open(download_channel.STATE_FILE, "w", encoding="utf-8") as f: |
| | json.dump(initial_state.dict(), f, indent=2, ensure_ascii=False) |
| | |
| | |
| | if download_channel.upload_file_to_hf( |
| | download_channel.STATE_FILE, |
| | download_channel.STATE_FILE, |
| | token |
| | ): |
| | console.print("[green]Initialized dataset with empty state file[/green]") |
| | return True |
| | except Exception as e: |
| | console.print(f"[red]Failed to create dataset:[/red] {str(e)}") |
| | return False |
| | except ImportError: |
| | console.print("[red]huggingface_hub not properly installed[/red]") |
| | return False |
| | return True |
| |
|
| | def download_state_from_hf(token: str) -> DownloadState: |
| | """Try to download the state file from the HF dataset. Returns state dict or creates new.""" |
| | if not token: |
| | return DownloadState(channel=download_channel.CHANNEL) |
| | try: |
| | |
| | local_path = download_channel.hf_hub_download( |
| | repo_id=download_channel.HF_REPO_ID, |
| | filename=download_channel.STATE_FILE, |
| | repo_type="dataset", |
| | token=token |
| | ) |
| | with open(local_path, "r", encoding="utf-8") as f: |
| | data = json.load(f) |
| | return DownloadState(**data) |
| | except Exception as e: |
| | console.print(f"[yellow]No existing state found, creating new dataset:[/yellow] {str(e)}") |
| | if create_hf_dataset(token): |
| | console.print("[green]Dataset created successfully![/green]") |
| | return DownloadState(channel=download_channel.CHANNEL) |
| | else: |
| | console.print("[red]Failed to create dataset, using local state only[/red]") |
| | return DownloadState(channel=download_channel.CHANNEL) |
| |
|
| | async def clean_downloaded_file(file_path: str): |
| | """Remove local file after successful upload""" |
| | try: |
| | os.remove(file_path) |
| | console.print(f"[blue]Cleaned up:[/blue] {os.path.basename(file_path)}") |
| | except Exception as e: |
| | console.print(f"[yellow]Warning:[/yellow] Could not clean up {file_path}: {e}") |
| |
|
| | async def update_and_upload_state(state: DownloadState, token: str) -> bool: |
| | """Update state timestamp and upload to dataset""" |
| | state.last_updated = time.time() |
| | try: |
| | |
| | with open(download_channel.STATE_FILE, "w", encoding="utf-8") as f: |
| | json.dump(state.dict(), f, indent=2, ensure_ascii=False) |
| | |
| | return download_channel.upload_file_to_hf( |
| | download_channel.STATE_FILE, |
| | download_channel.STATE_FILE, |
| | token |
| | ) |
| | except Exception as e: |
| | console.print(f"[red]Failed to update state:[/red] {e}") |
| | return False |
| |
|
| | async def process_message(message, state: DownloadState, client) -> Optional[str]: |
| | """Process a single message, return output path if file downloaded or None""" |
| | if not message.media: |
| | return None |
| |
|
| | |
| | is_rar = False |
| | filename = "" |
| | if message.file: |
| | filename = getattr(message.file, 'name', '') or '' |
| | if filename: |
| | is_rar = filename.lower().endswith('.rar') |
| | else: |
| | mime_type = getattr(message.file, 'mime_type', '') or '' |
| | is_rar = 'rar' in mime_type.lower() if mime_type else False |
| |
|
| | if not is_rar: |
| | return None |
| |
|
| | |
| | if filename: |
| | suggested = f"{message.id}_{filename}" |
| | else: |
| | suggested = f"{message.id}.rar" |
| |
|
| | return os.path.join(download_channel.OUTPUT_DIR, suggested) |
| |
|
| | async def run_download(channel: Optional[str], message_limit: Optional[int], task_id: str): |
| | """Background task to run the download with state management""" |
| | try: |
| | |
| | if channel: |
| | download_channel.CHANNEL = channel |
| | if message_limit is not None: |
| | download_channel.MESSAGE_LIMIT = message_limit |
| |
|
| | |
| | state = download_state_from_hf(download_channel.HF_TOKEN) |
| | |
| | |
| | status = { |
| | "channel": state.channel, |
| | "status": "running", |
| | "message_count": len(state.files), |
| | "downloaded": len([f for f in state.files if f.status == FileStatus.DOWNLOADED]), |
| | "downloading": None, |
| | "error": None |
| | } |
| | active_downloads[task_id] = status |
| |
|
| | |
| | progress = Progress( |
| | SpinnerColumn(), |
| | TextColumn("[bold blue]{task.fields[filename]}", justify="right"), |
| | BarColumn(bar_width=40), |
| | "[progress.percentage]{task.percentage:>3.1f}%", |
| | "•", |
| | DownloadColumn(), |
| | "•", |
| | TransferSpeedColumn(), |
| | "•", |
| | TimeElapsedColumn(), |
| | ) |
| | |
| | overall_progress = Progress( |
| | TextColumn("[bold yellow]{task.description}", justify="right"), |
| | BarColumn(bar_width=40), |
| | "[progress.percentage]{task.percentage:>3.1f}%", |
| | "•", |
| | TextColumn("[bold green]{task.fields[stats]}") |
| | ) |
| |
|
| | |
| | client = download_channel.TelegramClient( |
| | download_channel.SESSION_FILE, |
| | download_channel.API_ID, |
| | download_channel.API_HASH |
| | ) |
| |
|
| | async with client: |
| | try: |
| | entity = await client.get_entity(download_channel.CHANNEL) |
| | except Exception as e: |
| | console.print(f"[red]Failed to resolve channel:[/red] {e}") |
| | return 1 |
| |
|
| | console.print(f"[green]Starting download from:[/green] {entity.title if hasattr(entity, 'title') else download_channel.CHANNEL}") |
| |
|
| | |
| | scan_count = 0 |
| | last_message_id = state.last_scanned_id |
| | |
| | try: |
| | async for message in client.iter_messages(entity, limit=download_channel.MESSAGE_LIMIT or None): |
| | scan_count += 1 |
| | |
| | |
| | if last_message_id is None or message.id > last_message_id: |
| | last_message_id = message.id |
| | |
| | |
| | if any(f.message_id == message.id for f in state.files): |
| | continue |
| | |
| | |
| | out_path = await process_message(message, state, client) |
| | if out_path: |
| | |
| | file_info = ChannelFile( |
| | message_id=message.id, |
| | filename=os.path.basename(out_path), |
| | status=FileStatus.PENDING, |
| | size=getattr(message.media, 'size', 0) or 0 |
| | ) |
| | state.files.append(file_info) |
| | |
| | |
| | state.last_scanned_id = last_message_id |
| | if download_channel.HF_TOKEN: |
| | await update_and_upload_state(state, download_channel.HF_TOKEN) |
| | |
| | console.print(f"[green]Channel scan complete:[/green] Found {scan_count} messages") |
| | |
| | except Exception as e: |
| | console.print(f"[red]Error during channel scan:[/red] {e}") |
| | |
| | |
| | pending_files = [f for f in state.files if f.status == FileStatus.PENDING] |
| | total_pending = len(pending_files) |
| |
|
| | if total_pending == 0: |
| | console.print("[green]No new files to download![/green]") |
| | return 0 |
| |
|
| | console.print(f"[green]Starting downloads:[/green] {total_pending} files pending") |
| |
|
| | |
| | with Live(progress) as live_progress, Live(overall_progress) as live_overall: |
| | overall_task = overall_progress.add_task( |
| | f"Channel: {download_channel.CHANNEL}", |
| | total=total_pending, |
| | stats=f"Pending: {total_pending}" |
| | ) |
| |
|
| | for file_info in pending_files: |
| | try: |
| | |
| | file_info.status = FileStatus.DOWNLOADING |
| | state.current_download = file_info.message_id |
| | if download_channel.HF_TOKEN: |
| | await update_and_upload_state(state, download_channel.HF_TOKEN) |
| |
|
| | |
| | status["downloading"] = file_info.filename |
| |
|
| | |
| | message = await client.get_messages(entity, ids=file_info.message_id) |
| | if not message or not message.media: |
| | file_info.status = FileStatus.FAILED |
| | file_info.error = "Message not found or no media" |
| | continue |
| |
|
| | out_path = os.path.join(download_channel.OUTPUT_DIR, file_info.filename) |
| | file_task = progress.add_task( |
| | "download", |
| | total=file_info.size or 100, |
| | filename=file_info.filename |
| | ) |
| |
|
| | |
| | start_time = time.time() |
| | try: |
| | async def progress_callback(current, total): |
| | progress.update(file_task, completed=current) |
| | overall_stats = f"Downloaded: {len([f for f in state.files if f.status == FileStatus.DOWNLOADED])}" |
| | overall_progress.update(overall_task, completed=current/total*100, stats=overall_stats) |
| |
|
| | await client.download_media( |
| | message, |
| | file=out_path, |
| | progress_callback=progress_callback |
| | ) |
| |
|
| | |
| | if download_channel.HF_TOKEN: |
| | console.print(f"[yellow]Uploading to HF:[/yellow] {file_info.filename}") |
| | path_in_repo = f"files/{file_info.filename}" |
| | ok = download_channel.upload_file_to_hf( |
| | out_path, |
| | path_in_repo, |
| | download_channel.HF_TOKEN |
| | ) |
| | if ok: |
| | console.print(f"[green]Uploaded:[/green] {file_info.filename}") |
| | |
| | await clean_downloaded_file(out_path) |
| | file_info.upload_path = path_in_repo |
| | else: |
| | console.print(f"[red]Upload failed:[/red] {file_info.filename}") |
| | file_info.error = "Upload to dataset failed" |
| | file_info.status = FileStatus.FAILED |
| | continue |
| |
|
| | |
| | file_info.status = FileStatus.DOWNLOADED |
| | file_info.download_time = time.time() - start_time |
| | |
| | |
| | if download_channel.HF_TOKEN: |
| | await update_and_upload_state(state, download_channel.HF_TOKEN) |
| |
|
| | |
| | status["downloaded"] += 1 |
| | await asyncio.sleep(0.2) |
| |
|
| | except download_channel.errors.FloodWaitError as fw: |
| | wait = int(fw.seconds) if fw.seconds else 60 |
| | console.print(f"[yellow]FloodWait:[/yellow] Sleeping {wait}s") |
| | await asyncio.sleep(wait + 1) |
| | |
| | continue |
| |
|
| | except Exception as e: |
| | console.print(f"[red]Error:[/red] {str(e)}") |
| | file_info.status = FileStatus.FAILED |
| | file_info.error = str(e) |
| | if download_channel.HF_TOKEN: |
| | await update_and_upload_state(state, download_channel.HF_TOKEN) |
| |
|
| | except Exception as e: |
| | console.print(f"[red]Fatal error processing {file_info.filename}:[/red] {str(e)}") |
| | continue |
| |
|
| | |
| | state.current_download = None |
| | if download_channel.HF_TOKEN: |
| | await update_and_upload_state(state, download_channel.HF_TOKEN) |
| |
|
| | console.print("[green]Download session completed![/green]") |
| | status["status"] = "completed" |
| | status["downloading"] = None |
| |
|
| | except Exception as e: |
| | console.print(f"[red]Fatal error:[/red] {str(e)}") |
| | if "status" in locals(): |
| | status["status"] = "failed" |
| | status["error"] = str(e) |
| | |
| | return 0 |
| |
|
| | @app.on_event("startup") |
| | async def start_initial_download(): |
| | """Start the download process automatically when the server starts""" |
| | task_id = "initial_download" |
| | |
| | |
| | if not download_channel.HF_TOKEN: |
| | console.print("[red]ERROR: HF_TOKEN not set. Please set your Hugging Face token.[/red]") |
| | return |
| | |
| | |
| | console.print("[yellow]Checking Hugging Face dataset...[/yellow]") |
| | try: |
| | state = download_state_from_hf(download_channel.HF_TOKEN) |
| | console.print(f"[green]Using channel:[/green] {state.channel}") |
| | |
| | |
| | os.makedirs(download_channel.OUTPUT_DIR, exist_ok=True) |
| | |
| | |
| | asyncio.create_task(run_download( |
| | channel=None, |
| | message_limit=None, |
| | task_id=task_id |
| | )) |
| | console.print(f"[green]Started initial download task:[/green] {task_id}") |
| | |
| | except Exception as e: |
| | console.print(f"[red]Failed to initialize:[/red] {str(e)}") |
| |
|
| | @app.post("/download", response_model=Dict[str, str]) |
| | async def start_download(request: DownloadRequest, background_tasks: BackgroundTasks): |
| | """Start a new download task""" |
| | task_id = f"download_{len(active_downloads) + 1}" |
| | |
| | background_tasks.add_task( |
| | run_download, |
| | channel=request.channel, |
| | message_limit=request.message_limit, |
| | task_id=task_id |
| | ) |
| | |
| | return {"task_id": task_id} |
| |
|
| | @app.get("/status/{task_id}", response_model=DownloadStatus) |
| | async def get_status(task_id: str): |
| | """Get the status of a download task""" |
| | if task_id not in active_downloads: |
| | raise HTTPException(status_code=404, detail="Task not found") |
| | return active_downloads[task_id] |
| |
|
| | @app.get("/active", response_model=Dict[str, DownloadStatus]) |
| | async def list_active(): |
| | """List all active or completed downloads""" |
| | return active_downloads |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="127.0.0.1", port=8000) |