"""GitHub repository management and Pages deployment.""" import shutil import time from pathlib import Path import git from github import Github, GithubException from shared.config import settings from shared.logger import setup_logger from shared.utils import sanitize_repo_name logger = setup_logger(__name__) class GitHubManager: """Manage GitHub repository creation, pushing, and Pages deployment.""" def __init__(self) -> None: """Initialize GitHub manager.""" self.github = Github(settings.github_token) self.user = self.github.get_user() logger.info(f"Initialized GitHubManager for user {self.user.login}") def create_and_deploy( self, task_id: str, code_dir: Path ) -> tuple[str, str, str]: """Create repo, push code, enable Pages, and return URLs. Args: task_id: Unique task identifier code_dir: Directory containing generated code Returns: Tuple of (repo_url, commit_sha, pages_url) """ repo_name = sanitize_repo_name(task_id) logger.info(f"Creating and deploying repository: {repo_name}") # Create GitHub repository repo_url = self._create_repo(repo_name) # Initialize git and push commit_sha = self._push_code(repo_name, code_dir) # Enable GitHub Pages pages_url = self._enable_pages(repo_name) # Wait for Pages to be ready self._wait_for_pages(pages_url) logger.info( f"Successfully deployed {repo_name}: " f"repo={repo_url}, commit={commit_sha}, pages={pages_url}" ) return repo_url, commit_sha, pages_url def update_and_redeploy( self, repo_name: str, code_dir: Path ) -> tuple[str, str]: """Update existing repo with new code and return new commit. Args: repo_name: Repository name code_dir: Directory containing updated code Returns: Tuple of (repo_url, commit_sha) """ logger.info(f"Updating repository: {repo_name}") try: repo = self.user.get_repo(repo_name) repo_url = repo.html_url # Clone, update, and push commit_sha = self._update_code(repo_name, code_dir) logger.info(f"Successfully updated {repo_name}: commit={commit_sha}") return repo_url, commit_sha except GithubException as e: logger.error(f"Failed to update repo {repo_name}: {e}") raise def _create_repo(self, repo_name: str) -> str: """Create a new GitHub repository. Args: repo_name: Repository name Returns: Repository URL """ try: # Check if repo already exists try: existing_repo = self.user.get_repo(repo_name) logger.warning(f"Repository {repo_name} already exists, using it") return existing_repo.html_url except GithubException: pass # Repo doesn't exist, create it # Create new repo repo = self.user.create_repo( name=repo_name, description=f"Auto-generated repository for task {repo_name}", private=False, # Must be public for Pages auto_init=False, ) logger.info(f"Created repository: {repo.html_url}") return repo.html_url except GithubException as e: logger.error(f"Failed to create repository {repo_name}: {e}") raise def _push_code(self, repo_name: str, code_dir: Path) -> str: """Initialize git repo and push code. Args: repo_name: Repository name code_dir: Directory containing code Returns: Commit SHA """ try: # Ensure LICENSE file exists self._ensure_license(code_dir) # Initialize git repo repo = git.Repo.init(code_dir) # Configure git with repo.config_writer() as config: if not config.has_option("user", "name"): config.set_value("user", "name", settings.github_username) if not config.has_option("user", "email"): config.set_value("user", "email", settings.student_email) # Add all files repo.index.add("*") # Create .gitignore if needed gitignore_path = code_dir / ".gitignore" if not gitignore_path.exists(): gitignore_path.write_text("*.pyc\n__pycache__/\n.DS_Store\n") repo.index.add([".gitignore"]) # Commit commit = repo.index.commit("Initial commit: Generated application") commit_sha = commit.hexsha # Rename master to main if needed try: repo.git.branch("-M", "main") except Exception: pass # Already on main or error, continue # Add remote remote_url = f"https://{settings.github_token}@github.com/{settings.github_username}/{repo_name}.git" try: origin = repo.remote("origin") origin.set_url(remote_url) except ValueError: origin = repo.create_remote("origin", remote_url) # Push origin.push(refspec="main:main", force=True) logger.info(f"Pushed code to {repo_name}: commit {commit_sha}") return commit_sha except Exception as e: logger.error(f"Failed to push code to {repo_name}: {e}") raise def _update_code(self, repo_name: str, code_dir: Path) -> str: """Update existing repository with new code. Args: repo_name: Repository name code_dir: Directory containing updated code Returns: New commit SHA """ try: # Clone the repo temp_dir = settings.generated_repos_dir / f"{repo_name}_temp" if temp_dir.exists(): shutil.rmtree(temp_dir) clone_url = f"https://{settings.github_token}@github.com/{settings.github_username}/{repo_name}.git" repo = git.Repo.clone_from(clone_url, temp_dir) # Copy new files (except .git) for item in code_dir.iterdir(): if item.name == ".git": continue dest = temp_dir / item.name if item.is_file(): shutil.copy2(item, dest) elif item.is_dir(): if dest.exists(): shutil.rmtree(dest) shutil.copytree(item, dest) # Ensure LICENSE self._ensure_license(temp_dir) # Commit and push repo.index.add("*") commit = repo.index.commit("Update: Revised application") commit_sha = commit.hexsha origin = repo.remote("origin") origin.push() # Cleanup shutil.rmtree(temp_dir) logger.info(f"Updated code in {repo_name}: commit {commit_sha}") return commit_sha except Exception as e: logger.error(f"Failed to update code in {repo_name}: {e}") raise def _ensure_license(self, code_dir: Path) -> None: """Ensure MIT LICENSE file exists in code directory. Args: code_dir: Directory to add license to """ license_path = code_dir / "LICENSE" if not license_path.exists(): year = time.strftime("%Y") license_text = f"""MIT License Copyright (c) {year} {settings.github_username} Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ license_path.write_text(license_text) logger.debug(f"Created MIT LICENSE in {code_dir}") def _enable_pages(self, repo_name: str) -> str: """Enable GitHub Pages for repository. Args: repo_name: Repository name Returns: Pages URL """ import requests try: # Use GitHub REST API to enable Pages api_url = f"https://api.github.com/repos/{settings.github_username}/{repo_name}/pages" headers = { "Accept": "application/vnd.github+json", "Authorization": f"Bearer {settings.github_token}", "X-GitHub-Api-Version": "2022-11-28" } data = { "source": { "branch": "main", "path": "/" } } response = requests.post(api_url, headers=headers, json=data) if response.status_code == 201: logger.info(f"Enabled GitHub Pages for {repo_name}") elif response.status_code == 409: logger.info(f"GitHub Pages already enabled for {repo_name}") else: logger.warning(f"Pages enable response: {response.status_code} - {response.text}") # Construct Pages URL pages_url = f"https://{settings.github_username}.github.io/{repo_name}/" return pages_url except Exception as e: logger.error(f"Failed to enable Pages for {repo_name}: {e}") # Return expected URL even if enabling fails return f"https://{settings.github_username}.github.io/{repo_name}/" def _wait_for_pages(self, pages_url: str, max_wait: int = 60) -> bool: """Wait for GitHub Pages to be ready. Args: pages_url: Pages URL to check max_wait: Maximum wait time in seconds Returns: True if ready, False if timeout """ import requests logger.info(f"Waiting for Pages to be ready: {pages_url}") for i in range(max_wait): try: response = requests.get(pages_url, timeout=5) if response.status_code == 200: logger.info(f"Pages ready after {i+1} seconds") return True except requests.RequestException: pass time.sleep(1) logger.warning(f"Pages not ready after {max_wait} seconds") return False