Spaces:
Paused
Paused
| """ | |
| Web Interface for Job Application AI Agent - React Frontend Version | |
| This module provides a Flask web application for the job application AI agent | |
| with a modern React frontend and REST API endpoints. | |
| """ | |
| import os | |
| import logging | |
| import json | |
| import uuid | |
| import time | |
| import sys | |
| import importlib | |
| from datetime import datetime | |
| from pathlib import Path | |
| from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session, jsonify | |
| from flask_cors import CORS | |
| import pandas as pd | |
| import zipfile | |
| import io | |
| from job_apply_ai.cv_modifier.cv_analyzer import CVAnalyzer, CVModifier, batch_process_jobs | |
| from job_apply_ai.utils.helpers import ensure_directory_exists | |
| from CV_Tailor.ai_cv_tailor import AILangGraphTailor | |
| # CV Analyzer Module | |
| from job_apply_ai.analyzer.ats_evaluator import ATSEvaluator | |
| import docx | |
| # Changes made to add the Apify Actor | |
| # Added Dependencies | |
| import urllib.parse | |
| from apify_client import ApifyClient | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Initialize Flask app | |
| app = Flask(__name__, static_folder='static', static_url_path='/static', template_folder='templates') | |
| app.secret_key = os.environ.get('SECRET_KEY', 'dev_key_for_testing') | |
| app.config['SESSION_COOKIE_NAME'] = f"job_apply_ai_session_{int(time.time())}" | |
| app.config['JSON_SORT_KEYS'] = False | |
| # Enable CORS for API endpoints | |
| CORS(app, resources={r"/api/*": {"origins": "*"}}) | |
| # Keep runtime files local to the project unless overridden by env var. | |
| runtime_root = os.environ.get('JOB_APPLY_AI_DATA_DIR', os.path.join(os.getcwd(), '.runtime')) | |
| if not os.path.isabs(runtime_root): | |
| runtime_root = os.path.abspath(runtime_root) | |
| app.config['UPLOAD_FOLDER'] = os.path.join(runtime_root, 'uploads') | |
| ensure_directory_exists(app.config['UPLOAD_FOLDER']) | |
| # Create output directories | |
| app.config['CV_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'cvs') | |
| app.config['JOBS_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'jobs') | |
| app.config['STATE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'session_state') | |
| ensure_directory_exists(app.config['CV_OUTPUT_DIR']) | |
| ensure_directory_exists(app.config['JOBS_OUTPUT_DIR']) | |
| ensure_directory_exists(app.config['STATE_DIR']) | |
| # Define where the system's Jinja ATS template lives | |
| app.config['ATS_TEMPLATE_PATH'] = os.path.join(os.getcwd(), 'data', 'ats_template.docx') | |
| app.config['SESSION_TYPE'] = 'filesystem' | |
| SUPPORTED_TAILORING_MODES = {'local', 'api'} | |
| SUPPORTED_LLM_PROVIDERS = {'ollama', 'groq', 'grok', 'openai'} | |
| def _parse_bool(value, default=False): | |
| if value is None: | |
| return default | |
| if isinstance(value, bool): | |
| return value | |
| return str(value).strip().lower() in {'1', 'true', 'yes', 'on'} | |
| def _session_state_path(state_id): | |
| return os.path.join(app.config['STATE_DIR'], f"{state_id}.json") | |
| def _get_tailoring_mode(): | |
| """Resolve active tailoring mode from session, then environment.""" | |
| mode = (session.get('tailoring_mode') or '').strip().lower() | |
| if mode in SUPPORTED_TAILORING_MODES: | |
| return mode | |
| env_mode = (os.environ.get('CV_TAILORING_MODE', 'local') or 'local').strip().lower() | |
| if env_mode not in SUPPORTED_TAILORING_MODES: | |
| env_mode = 'local' | |
| return env_mode | |
| def _set_tailoring_mode(mode): | |
| mode = (mode or '').strip().lower() | |
| if mode in SUPPORTED_TAILORING_MODES: | |
| session['tailoring_mode'] = mode | |
| return mode | |
| return _get_tailoring_mode() | |
| def _get_llm_provider(): | |
| provider = (session.get('llm_provider') or '').strip().lower() | |
| if provider in SUPPORTED_LLM_PROVIDERS: | |
| return provider | |
| env_provider = (os.environ.get('LLM_PROVIDER', 'ollama') or 'ollama').strip().lower() | |
| if env_provider not in SUPPORTED_LLM_PROVIDERS: | |
| env_provider = 'ollama' | |
| return env_provider | |
| def _set_llm_provider(provider): | |
| provider = (provider or '').strip().lower() | |
| if provider not in SUPPORTED_LLM_PROVIDERS: | |
| provider = _get_llm_provider() | |
| session['llm_provider'] = provider | |
| # Keep compatibility with integrations that read environment variables. | |
| os.environ['LLM_PROVIDER'] = provider | |
| return provider | |
| def _is_summary_tailoring_enabled(): | |
| if 'enable_professional_summary' in session: | |
| return _parse_bool(session.get('enable_professional_summary')) | |
| return _parse_bool(os.environ.get('CV_ENABLE_SUMMARY_TAILORING', '0')) | |
| def _set_summary_tailoring_enabled(enabled): | |
| enabled_bool = _parse_bool(enabled) | |
| session['enable_professional_summary'] = enabled_bool | |
| os.environ['CV_ENABLE_SUMMARY_TAILORING'] = '1' if enabled_bool else '0' | |
| return enabled_bool | |
| def _is_cover_letter_enabled(): | |
| if 'include_cover_letters' in session: | |
| return _parse_bool(session.get('include_cover_letters')) | |
| return _parse_bool(os.environ.get('API_INCLUDE_COVER_LETTERS', '0')) | |
| def _set_cover_letter_enabled(enabled): | |
| enabled_bool = _parse_bool(enabled) | |
| session['include_cover_letters'] = enabled_bool | |
| os.environ['API_INCLUDE_COVER_LETTERS'] = '1' if enabled_bool else '0' | |
| return enabled_bool | |
| def _sanitize_filename(text): | |
| """Sanitize text to be safe for use in filenames.""" | |
| if not text: | |
| return 'file' | |
| # Replace problematic characters with underscores | |
| sanitized = text.replace('/', '_').replace('\\', '_').replace(' ', '_') | |
| # Remove any other special characters that could cause issues | |
| sanitized = ''.join(c for c in sanitized if c.isalnum() or c == '_') | |
| # Limit length | |
| return sanitized[:20] | |
| def _clear_job_context(keep_cv_template=True): | |
| """Clear prior search/CV generation state for a fresh workflow.""" | |
| state_id = session.pop('jobs_state_id', None) | |
| if state_id: | |
| state_path = _session_state_path(state_id) | |
| if os.path.exists(state_path): | |
| try: | |
| os.remove(state_path) | |
| except OSError: | |
| pass | |
| for key in [ | |
| 'jobs_file', 'excel_filename', 'generated_cvs', 'successful_jobs', | |
| 'failed_jobs', 'current_cv', 'current_cv_filename', | |
| ]: | |
| session.pop(key, None) | |
| if not keep_cv_template: | |
| session.pop('cv_template', None) | |
| def _save_processed_jobs(processed_jobs): | |
| state_id = str(uuid.uuid4()) | |
| state_path = _session_state_path(state_id) | |
| with open(state_path, 'w', encoding='utf-8') as f: | |
| json.dump(processed_jobs, f, ensure_ascii=False) | |
| session['jobs_state_id'] = state_id | |
| return state_id | |
| def _load_processed_jobs(): | |
| state_id = session.get('jobs_state_id') | |
| if not state_id: | |
| return [] | |
| state_path = _session_state_path(state_id) | |
| if not os.path.exists(state_path): | |
| return [] | |
| try: | |
| with open(state_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data if isinstance(data, list) else [] | |
| except Exception as e: | |
| logger.error(f"Failed to load session job state: {str(e)}") | |
| return [] | |
| def _build_professional_summary(job, matched_categories): | |
| """Build a concise, professional summary tailored to role and extracted skills.""" | |
| job_title = (job.get('title') or 'Professional').strip() | |
| company = (job.get('company') or 'your target company').strip() | |
| flat_skills = [] | |
| for skills in (matched_categories or {}).values(): | |
| for s in skills or []: | |
| skill = str(s).strip() | |
| if skill: | |
| flat_skills.append(skill) | |
| deduped = [] | |
| seen = set() | |
| for skill in flat_skills: | |
| key = skill.lower() | |
| if key not in seen: | |
| seen.add(key) | |
| deduped.append(skill) | |
| top_skills = deduped[:5] | |
| if top_skills: | |
| skills_text = ", ".join(top_skills) | |
| return ( | |
| f"{job_title} professional with hands-on experience in {skills_text}. " | |
| f"Delivers reliable, scalable outcomes through strong collaboration, ownership, " | |
| f"and structured problem-solving. Ready to contribute immediate impact at {company}." | |
| ) | |
| return ( | |
| f"{job_title} professional focused on delivering measurable outcomes through " | |
| f"technical execution, collaboration, and continuous improvement. " | |
| f"Motivated to contribute meaningful impact at {company}." | |
| ) | |
| # Addition to optimize CV Analyzer | |
| def _enrich_job_with_skills(job, analyzer=None): | |
| """ | |
| Just-In-Time (JIT) processing: Runs the CVAnalyzer only if skills are missing. | |
| """ | |
| # If it already has categories, skip the heavy NLP processing | |
| if 'matched_categories' in job and job['matched_categories']: | |
| return job | |
| if not analyzer: | |
| analyzer = CVAnalyzer() | |
| description = job.get('description', '') | |
| if description and description != "Description empty.": | |
| logger.info(f"JIT Analysis running for job: {job.get('title')}") | |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(description) | |
| job['matched_skills'] = matched_skills | |
| job['matched_requirements'] = matched_requirements | |
| job['matched_categories'] = matched_categories | |
| else: | |
| logger.warning(f"Skipping JIT Analysis for {job.get('title')} - No description.") | |
| job['matched_skills'] = [] | |
| job['matched_requirements'] = [] | |
| job['matched_categories'] = {} | |
| return job | |
| # ==================== React Frontend Routes ==================== | |
| def react_app(path=''): | |
| """Serve React app for all routes under /app""" | |
| react_build_path = os.path.join(os.path.dirname(__file__), '../../frontend/dist') | |
| if os.path.exists(react_build_path): | |
| index_path = os.path.join(react_build_path, 'index.html') | |
| if os.path.exists(index_path): | |
| return send_file(index_path) | |
| # Fallback to template if React build doesn't exist | |
| return render_template('index.html') | |
| # @app.route('/') | |
| # def index(): | |
| # """Render the home page.""" | |
| # return render_template('index.html') | |
| def index(): | |
| """Root health check for Hugging Face.""" | |
| return jsonify({ | |
| 'status': 'online', | |
| 'service': 'Job Apply AI API', | |
| 'message': 'Backend is running! Connect via Vercel frontend.' | |
| }), 200 | |
| # ==================== REST API Endpoints for React ==================== | |
| def api_health(): | |
| """Health check endpoint.""" | |
| return jsonify({'status': 'ok', 'version': '2.0.0'}) | |
| def api_config(): | |
| """Get current app configuration.""" | |
| return jsonify({ | |
| 'tailoring_mode': _get_tailoring_mode(), | |
| 'llm_provider': _get_llm_provider(), | |
| 'enable_professional_summary': _is_summary_tailoring_enabled(), | |
| 'include_cover_letters': _is_cover_letter_enabled(), | |
| 'supported_modes': list(SUPPORTED_TAILORING_MODES), | |
| 'supported_providers': list(SUPPORTED_LLM_PROVIDERS), | |
| }) | |
| def api_set_tailoring_mode(): | |
| """Set tailoring mode via API.""" | |
| data = request.get_json() | |
| mode = data.get('mode', 'local') | |
| new_mode = _set_tailoring_mode(mode) | |
| return jsonify({'success': True, 'mode': new_mode}) | |
| def api_save_settings(): | |
| """Save runtime settings from frontend modal.""" | |
| try: | |
| data = request.get_json() or {} | |
| mode = _set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode())) | |
| provider = _set_llm_provider(data.get('llm_provider', _get_llm_provider())) | |
| summary_enabled = _set_summary_tailoring_enabled( | |
| data.get('enable_professional_summary', _is_summary_tailoring_enabled()) | |
| ) | |
| cover_letters_enabled = _set_cover_letter_enabled( | |
| data.get('include_cover_letters', _is_cover_letter_enabled()) | |
| ) | |
| return jsonify({ | |
| 'success': True, | |
| 'settings': { | |
| 'tailoring_mode': mode, | |
| 'llm_provider': provider, | |
| 'enable_professional_summary': summary_enabled, | |
| 'include_cover_letters': cover_letters_enabled, | |
| }, | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error saving settings: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_upload_cv(): | |
| """Upload CV template via API.""" | |
| try: | |
| if 'file' not in request.files: | |
| return jsonify({'success': False, 'error': 'No file provided'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'success': False, 'error': 'No file selected'}), 400 | |
| if not file.filename.endswith('.docx'): | |
| return jsonify({'success': False, 'error': 'Only .docx files are supported'}), 400 | |
| filename = f"cv_template_{int(time.time())}.docx" | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| session['cv_template'] = filepath | |
| logger.info(f"CV template uploaded: {filename}") | |
| return jsonify({ | |
| 'success': True, | |
| 'filename': file.filename, | |
| 'message': 'CV template uploaded successfully' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error uploading CV: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_user_profile(): | |
| """Get or save the user's base profile data into the session.""" | |
| try: | |
| if request.method == 'POST': | |
| data = request.get_json() or {} | |
| # Save the incoming data to the Flask session | |
| session['user_profile'] = { | |
| 'first_name': data.get('first_name', ''), | |
| 'last_name': data.get('last_name', ''), | |
| 'email': data.get('email', ''), | |
| 'phone': data.get('phone', ''), | |
| 'linkedin': data.get('linkedin', ''), | |
| 'github': data.get('github', '') | |
| } | |
| logger.info("User profile saved to session.") | |
| return jsonify({'success': True, 'message': 'Profile saved successfully.'}) | |
| # If GET request, return whatever is currently in the session | |
| profile = session.get('user_profile', {}) | |
| return jsonify({'success': True, 'profile': profile}) | |
| except Exception as e: | |
| logger.error(f"Error handling user profile: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # Added Apify Actor Job Search | |
| def api_apify_search(): | |
| """Standalone endpoint to search for jobs via Apify and save results to JSON.""" | |
| """Standalone endpoint to search via Apify, fetch descriptions locally, and save to JSON.""" | |
| try: | |
| data = request.get_json() | |
| keyword = data.get('keyword', '').strip() | |
| location = data.get('location', '').strip() | |
| max_jobs = int(data.get('max_jobs', 10)) | |
| max_days_old = int(data.get('max_days_old', 14)) | |
| if not keyword or not location: | |
| return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400 | |
| logger.info(f"Starting dedicated Apify search: {keyword} in {location}") | |
| # ========================================== | |
| # 1. URL Construction & Apify Call | |
| # ========================================== | |
| base_url = "https://www.linkedin.com/jobs/search?" | |
| params = { | |
| "keywords": keyword, | |
| "location": location, | |
| } | |
| if max_days_old: | |
| seconds = max_days_old * 24 * 60 * 60 | |
| params["f_TPR"] = f"r{seconds}" | |
| final_url = base_url + urllib.parse.urlencode(params, safe="%2C") | |
| apify_token = os.environ.get("APIFY_API_TOKEN") | |
| if not apify_token: | |
| return jsonify({'success': False, 'error': 'Apify API token missing'}), 500 | |
| client = ApifyClient(apify_token) | |
| # Force Apify to scrape at least 10 to satisfy its internal requirements | |
| apify_count = max(10, max_jobs) | |
| run_input = { | |
| "urls": [final_url], | |
| "count": apify_count | |
| } | |
| app_env = os.environ.get("ENVIOR") | |
| if app_env == "PROD": | |
| logger.info("Running in Production") | |
| logger.info("Executing Apify Actor to get job list...") | |
| run = client.actor("curious_coder/linkedin-jobs-scraper").call(run_input=run_input) | |
| dataset = client.dataset(run["defaultDatasetId"]).list_items().items | |
| if not dataset: | |
| return jsonify({ | |
| 'success': True, | |
| 'jobs': [], | |
| 'message': 'No jobs found on LinkedIn for these parameters.' | |
| }) | |
| # ========================================== | |
| # 2. Map the Data | |
| # ========================================== | |
| jobs = [] | |
| for item in dataset[:max_jobs]: | |
| jobs.append({ | |
| 'id': str(item.get('id')), | |
| 'title': item.get('title', 'Unknown Title'), | |
| 'company': item.get('companyName', item.get('company', 'Unknown Company')), | |
| 'link': item.get('link', ''), | |
| 'apply_url': item.get('applyUrl', ''), | |
| 'description': item.get('descriptionText', 'No Description') | |
| }) | |
| else: | |
| # ========================================== | |
| # Simulate Delay & Load Mock Data | |
| # ========================================== | |
| logger.info("Running in Development") | |
| logger.info("Simulating Apify scraping delay (5 seconds)...") | |
| time.sleep(5) | |
| # Target the specific JSON file you saved previously | |
| mock_file_path = ".runtime/uploads/jobs/apify_jobs_20260424_184439.json" | |
| if not os.path.exists(mock_file_path): | |
| return jsonify({'success': False, 'error': f'Mock file not found: {mock_file_path}'}), 500 | |
| with open(mock_file_path, 'r', encoding='utf-8') as f: | |
| all_mock_jobs = json.load(f) | |
| # Enforce the max_jobs limit requested by the client to mimic actual behavior | |
| jobs = all_mock_jobs[:max_jobs] | |
| # ========================================== | |
| # 3. Save Results to a JSON File | |
| # ========================================== | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"apify_jobs_mocked{timestamp}.json" | |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(jobs, f, indent=4, ensure_ascii=False) | |
| logger.info(f"Successfully saved {len(jobs)} jobs with descriptions to {filename}") | |
| _save_processed_jobs(jobs) | |
| return jsonify({ | |
| 'success': True, | |
| 'jobs': jobs, | |
| 'json_file': filename, | |
| 'message': f'Found and saved {len(jobs)} jobs to JSON.' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in Apify search endpoint: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # CV Analyzer | |
| def api_evaluate_fit(): | |
| """Run heavy semantic analysis to get a fit score before generating a CV.""" | |
| try: | |
| data = request.get_json() | |
| job_id = data.get('job_id') | |
| if not job_id: | |
| return jsonify({'success': False, 'error': 'No job ID provided'}), 400 | |
| # 1. Ensure the user has uploaded their base CV | |
| user_resume_path = session.get('cv_template') | |
| if not user_resume_path or not os.path.exists(user_resume_path): | |
| return jsonify({'success': False, 'error': 'Please upload a base CV first.'}), 400 | |
| # 2. Extract raw text from the base CV | |
| try: | |
| doc = docx.Document(user_resume_path) | |
| resume_text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) | |
| except Exception as e: | |
| return jsonify({'success': False, 'error': f'Failed to read CV: {str(e)}'}), 500 | |
| # 3. Find the specific Job Description from the session | |
| jobs = _load_processed_jobs() | |
| job = next((j for j in jobs if j.get('id') == job_id or str(jobs.index(j)) == str(job_id)), None) | |
| if not job or not job.get('description'): | |
| return jsonify({'success': False, 'error': 'Job description not found.'}), 404 | |
| # 4. Run the Heavy Evaluator (Lazy Loads SBERT if it's the first time!) | |
| logger.info(f"Running deep semantic evaluation for job: {job.get('title')}") | |
| evaluator = ATSEvaluator() | |
| result = evaluator.evaluate_fit(resume_text, job['description']) | |
| return jsonify({ | |
| 'success': True, | |
| 'match_score': result['match_score'], | |
| 'matched_skills': result['matched_skills'], | |
| 'missing_skills': result['missing_skills'] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error in evaluate-fit: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_search_jobs(): | |
| """Search for jobs via API.""" | |
| try: | |
| data = request.get_json() | |
| keyword = data.get('keyword', '').strip() | |
| location = data.get('location', '').strip() | |
| max_jobs = int(data.get('max_jobs', 10)) | |
| max_days_old = int(data.get('max_days_old', 14)) | |
| if not keyword or not location: | |
| return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400 | |
| _set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode())) | |
| _clear_job_context(keep_cv_template=True) | |
| logger.info(f"Searching jobs: {keyword} in {location}") | |
| scraper = LinkedInScraper(headless=True) | |
| jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs, max_days_old=max_days_old) | |
| if not jobs: | |
| return jsonify({ | |
| 'success': True, | |
| 'jobs': [], | |
| 'message': 'No jobs found' | |
| }) | |
| # Fetch descriptions for each job | |
| for i, job in enumerate(jobs): | |
| try: | |
| logger.info(f"Fetching description {i+1}/{len(jobs)}") | |
| title, company, description = scraper.fetch_job_description(job['link']) | |
| jobs[i]['description'] = description | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch description for {job.get('title')}: {str(e)}") | |
| jobs[i]['description'] = '' | |
| # Save to Excel | |
| today_date = datetime.today().strftime("%Y-%m-%d") | |
| filename = f"linkedin_jobs_{today_date}_{int(time.time())}.xlsx" | |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) | |
| df = pd.DataFrame(jobs) | |
| df.to_excel(filepath, index=False) | |
| session['jobs_file'] = filepath | |
| session['excel_filename'] = filename | |
| # Extract skills from job descriptions | |
| analyzer = CVAnalyzer() | |
| processed_jobs = [] | |
| for i, job in enumerate(jobs): | |
| if job.get('description'): | |
| try: | |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description']) | |
| job['matched_skills'] = matched_skills | |
| job['matched_categories'] = matched_categories | |
| # Add unique ID | |
| job['id'] = str(i) | |
| processed_jobs.append(job) | |
| except Exception as e: | |
| logger.warning(f"Failed to analyze skills for {job.get('title')}: {str(e)}") | |
| _save_processed_jobs(processed_jobs) | |
| return jsonify({ | |
| 'success': True, | |
| 'jobs': processed_jobs, | |
| 'excel_file': filename, | |
| 'message': f'Found {len(processed_jobs)} jobs' | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error searching jobs: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_get_jobs(): | |
| """Get stored jobs from session.""" | |
| try: | |
| jobs = _load_processed_jobs() | |
| return jsonify({ | |
| 'success': True, | |
| 'jobs': jobs, | |
| 'count': len(jobs) | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error getting jobs: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # @app.route('/api/generate-cv/<job_index>', methods=['POST']) | |
| def api_generate_cv(job_id): | |
| """Generate CV for a single job with JIT Analysis.""" | |
| try: | |
| jobs = _load_processed_jobs() | |
| # job_idx = int(job_index) | |
| # 1. Find the actual job index using the UUID (with legacy fallback) | |
| job_idx = -1 | |
| for i, j in enumerate(jobs): | |
| if j.get('id') == job_id or str(i) == str(job_id): | |
| job_idx = i | |
| break | |
| # if job_idx < 0 or job_idx >= len(jobs): | |
| if job_idx == -1: | |
| return jsonify({'success': False, 'error': 'Invalid job index'}), 400 | |
| job = jobs[job_idx] | |
| # 2. JIT Analysis: Enrich the job with skills if it hasn't been done yet | |
| job = _enrich_job_with_skills(job) | |
| # 3. Save the enriched job back to the session so we don't re-analyze if they click again | |
| jobs[job_idx] = job | |
| _save_processed_jobs(jobs) | |
| # cv_template_path = session.get('cv_template') | |
| # if not cv_template_path or not os.path.exists(cv_template_path): | |
| # return jsonify({'success': False, 'error': 'CV template not found'}), 400 | |
| user_resume_path = session.get('cv_template') | |
| ats_template_path = app.config.get('ATS_TEMPLATE_PATH') | |
| if not user_resume_path or not os.path.exists(user_resume_path): | |
| return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400 | |
| if not ats_template_path or not os.path.exists(ats_template_path): | |
| return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500 | |
| # tailoring_mode = _get_tailoring_mode() | |
| # matched_categories = job.get('matched_categories', {}) | |
| # professional_summary = _build_professional_summary(job, matched_categories) | |
| # Generate output filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| company_slug = _sanitize_filename(job.get('company', 'Company')) | |
| title_slug = _sanitize_filename(job.get('title', 'Position')) | |
| output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx" | |
| output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename) | |
| # ========================================== | |
| # EXECUTE LANGGRAPH AI TAILOR | |
| # ========================================== | |
| logger.info(f"Starting LangGraph Tailor for {job.get('title')}") | |
| ai_tailor = AILangGraphTailor() | |
| # Extract raw text from the USER'S uploaded resume | |
| original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path) | |
| # Run the Graph to get the tailored JSON | |
| final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', '')) | |
| # Map the JSON to the SYSTEM'S ATS Jinja template and save it | |
| ai_tailor.generate_final_docx( | |
| tailored_dict=final_state["tailored_resume"], | |
| template_path=ats_template_path, | |
| output_path=output_path | |
| ) | |
| # Modify CV | |
| # modifier = CVModifier(cv_template_path) | |
| # generated_cover_letter_filename = None | |
| # if tailoring_mode == 'api': | |
| # try: | |
| # generation_result = _generate_cv_with_api_tailoring(job, cv_template_path, output_path) | |
| # provider = generation_result.get('provider', 'unknown') | |
| # generated_cover_letter_filename = generation_result.get('cover_letter_filename') | |
| # logger.info(f"API tailoring completed using {provider}") | |
| # except Exception as e: | |
| # # Fallback to local tailoring | |
| # logger.warning(f"API tailoring failed, falling back to local: {str(e)}") | |
| # if _is_summary_tailoring_enabled(): | |
| # modifier.update_profile_summary(professional_summary) | |
| # modifier.update_skills_section(matched_categories) | |
| # modifier.save_modified_cv(output_path) | |
| # else: | |
| # # Local tailoring mode | |
| # if _is_summary_tailoring_enabled(): | |
| # modifier.update_profile_summary(professional_summary) | |
| # modifier.update_skills_section(matched_categories) | |
| # modifier.save_modified_cv(output_path) | |
| # return jsonify({ | |
| # 'success': True, | |
| # 'filename': output_filename, | |
| # 'cover_letter_filename': generated_cover_letter_filename, | |
| # 'job_title': job.get('title'), | |
| # 'company': job.get('company'), | |
| # 'message': f"CV generated for {job.get('title')} at {job.get('company')}" | |
| # }) | |
| return jsonify({ | |
| 'success': True, | |
| 'filename': output_filename, | |
| 'job_title': job.get('title'), | |
| 'company': job.get('company'), | |
| 'job': job, # Keeps UI updated | |
| 'message': f"CV successfully tailored for {job.get('company')}" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error generating CV: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_generate_all_cvs(): | |
| """Generate CVs for multiple jobs with JIT Analysis.""" | |
| try: | |
| data = request.get_json() | |
| # job_indices = data.get('job_indices', []) | |
| requested_ids = data.get('job_indices', []) | |
| jobs = _load_processed_jobs() | |
| # if not job_indices: | |
| # job_indices = list(range(len(jobs))) | |
| # 1. Map the requested UUIDs back to their actual list indices | |
| job_indices = [] | |
| if requested_ids: | |
| for req_id in requested_ids: | |
| for i, j in enumerate(jobs): | |
| if j.get('id') == req_id or str(i) == str(req_id): | |
| job_indices.append(i) | |
| break | |
| else: | |
| # Fallback if no IDs are passed | |
| job_indices = list(range(len(jobs))) | |
| # if not session.get('cv_template') or not os.path.exists(session.get('cv_template')): | |
| # return jsonify({'success': False, 'error': 'CV template not found'}), 400 | |
| # cv_template_path = session.get('cv_template') | |
| user_resume_path = session.get('cv_template') | |
| ats_template_path = app.config.get('ATS_TEMPLATE_PATH') | |
| if not user_resume_path or not os.path.exists(user_resume_path): | |
| return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400 | |
| if not ats_template_path or not os.path.exists(ats_template_path): | |
| return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500 | |
| tailoring_mode = _get_tailoring_mode() | |
| successful = [] | |
| failed = [] | |
| # Instantiate the Analyzer ONCE for the entire batch to save CPU | |
| analyzer = CVAnalyzer() | |
| jobs_modified = False | |
| for idx in job_indices: | |
| try: | |
| if idx < 0 or idx >= len(jobs): | |
| continue | |
| # JIT Analysis for each job in the batch | |
| job = _enrich_job_with_skills(jobs[idx], analyzer) | |
| jobs[idx] = job | |
| jobs_modified = True | |
| # Now the data is ready for your custom Tailor | |
| matched_categories = job.get('matched_categories', {}) | |
| professional_summary = _build_professional_summary(job, matched_categories) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| company_slug = _sanitize_filename(job.get('company', 'Company')) | |
| title_slug = _sanitize_filename(job.get('title', 'Position')) | |
| output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx" | |
| output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename) | |
| # ========================================== | |
| # EXECUTE LANGGRAPH AI TAILOR | |
| # ========================================== | |
| logger.info(f"Starting LangGraph Tailor for {job.get('title')}") | |
| ai_tailor = AILangGraphTailor() | |
| # Extract text from the user's uploaded resume | |
| original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path) | |
| # Run the Graph to get the tailored JSON | |
| final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', '')) | |
| # Map the JSON to the system ATS template and save it | |
| ai_tailor.generate_final_docx( | |
| tailored_dict=final_state["tailored_resume"], | |
| template_path=ats_template_path, | |
| output_path=output_path | |
| ) | |
| # modifier = CVModifier(cv_template_path) | |
| # cover_letter_filename = None | |
| # if tailoring_mode == 'api': | |
| # try: | |
| # generation_result = _generate_cv_with_api_tailoring(job, cv_template_path, output_path) | |
| # cover_letter_filename = generation_result.get('cover_letter_filename') | |
| # except Exception as e: | |
| # if _is_summary_tailoring_enabled(): | |
| # modifier.update_profile_summary(professional_summary) | |
| # modifier.update_skills_section(matched_categories) | |
| # modifier.save_modified_cv(output_path) | |
| # else: | |
| # if _is_summary_tailoring_enabled(): | |
| # modifier.update_profile_summary(professional_summary) | |
| # modifier.update_skills_section(matched_categories) | |
| # modifier.save_modified_cv(output_path) | |
| # successful.append({ | |
| # 'job_index': idx, | |
| # 'filename': output_filename, | |
| # 'cover_letter_filename': cover_letter_filename, | |
| # 'job_title': job.get('title'), | |
| # 'company': job.get('company'), | |
| # }) | |
| successful.append({ | |
| 'job_index': idx, | |
| 'filename': output_filename, | |
| 'cover_letter_filename': None, # Adjust if adding cover letter logic back | |
| 'job_title': job.get('title'), | |
| 'company': job.get('company'), | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error generating CV for job {idx}: {str(e)}") | |
| failed.append({ | |
| 'job_index': idx, | |
| 'error': str(e), | |
| 'job_title': jobs[idx].get('title') if idx < len(jobs) else 'Unknown', | |
| }) | |
| if jobs_modified: | |
| _save_processed_jobs(jobs) | |
| # Create ZIP file | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
| for cv in successful: | |
| file_path = os.path.join(app.config['CV_OUTPUT_DIR'], cv['filename']) | |
| if os.path.exists(file_path): | |
| zip_file.write(file_path, arcname=cv['filename']) | |
| cover_letter_filename = cv.get('cover_letter_filename') | |
| if cover_letter_filename: | |
| cover_letter_path = os.path.join(app.config['CV_OUTPUT_DIR'], cover_letter_filename) | |
| if os.path.exists(cover_letter_path): | |
| zip_file.write(cover_letter_path, arcname=cover_letter_filename) | |
| zip_buffer.seek(0) | |
| zip_filename = f"CVs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" | |
| zip_path = os.path.join(app.config['CV_OUTPUT_DIR'], zip_filename) | |
| with open(zip_path, 'wb') as f: | |
| f.write(zip_buffer.getvalue()) | |
| return jsonify({ | |
| 'success': True, | |
| 'successful': successful, | |
| 'failed': failed, | |
| 'zip_filename': zip_filename, | |
| 'total_generated': len(successful), | |
| 'total_failed': len(failed), | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error generating all CVs: {str(e)}") | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| def api_download_file(filename): | |
| """Download a file.""" | |
| try: | |
| import urllib.parse | |
| # Decode URL-encoded filename | |
| decoded_filename = urllib.parse.unquote(filename) | |
| # Security check: prevent directory traversal | |
| if '..' in decoded_filename or '/' in decoded_filename or '\\' in decoded_filename: | |
| return jsonify({'error': 'Invalid filename'}), 400 | |
| file_path = os.path.join(app.config['CV_OUTPUT_DIR'], decoded_filename) | |
| if not os.path.exists(file_path): | |
| file_path = os.path.join(app.config['JOBS_OUTPUT_DIR'], decoded_filename) | |
| if not os.path.exists(file_path): | |
| logger.warning(f"File not found: {decoded_filename}") | |
| return jsonify({'error': 'File not found'}), 404 | |
| logger.info(f"Downloading file: {decoded_filename}") | |
| return send_file(file_path, as_attachment=True, download_name=decoded_filename) | |
| except Exception as e: | |
| logger.error(f"Error downloading file: {str(e)}") | |
| return jsonify({'error': str(e)}), 500 | |
| # Communication endpoint with the Browser Extension (Auto-Apply) | |
| def api_latest_application_data(): | |
| """Endpoint for the Chrome Extension to fetch real user data for autofill.""" | |
| # 1. Grab the user profile from the session | |
| user_profile = session.get('user_profile') | |
| if not user_profile: | |
| return jsonify({ | |
| 'success': False, | |
| 'error': 'No user profile found. Please save your profile in the React app first!' | |
| }), 400 | |
| # 2. Grab the latest generated CV from the session | |
| latest_cv = session.get('latest_cv_filename') | |
| cv_url = None | |
| if latest_cv: | |
| import urllib.parse | |
| # Dynamically build the full download URL based on the server's current host | |
| safe_filename = urllib.parse.quote(latest_cv) | |
| cv_url = f"{request.host_url}api/download/{safe_filename}" | |
| return jsonify({ | |
| 'success': True, | |
| 'user': user_profile, | |
| # 'cv_url': cv_url, | |
| 'message': 'Real session data retrieved successfully' | |
| }) | |
| def _generate_cv_with_api_tailoring(job, cv_template, output_path): | |
| """Use API subproject updater to generate a tailored CV and optional cover letter.""" | |
| api_project_root = os.path.join(os.getcwd(), "Automatic CV and Cover Letter with API") | |
| if not os.path.exists(api_project_root): | |
| raise FileNotFoundError("API subproject folder not found") | |
| if api_project_root not in sys.path: | |
| sys.path.append(api_project_root) | |
| try: | |
| # Ensure integrations that depend on env vars see the active provider. | |
| os.environ['LLM_PROVIDER'] = _get_llm_provider() | |
| APIIntegration = importlib.import_module('src.utils.openai_integration').OpenAIIntegration | |
| DocumentUpdater = importlib.import_module('src.updaters.document_updater').DocumentUpdater | |
| cover_letter_template = os.environ.get( | |
| 'API_COVER_LETTER_TEMPLATE_PATH', | |
| os.path.join(api_project_root, 'data', 'Cover Letter_Imon .docx') | |
| ) | |
| if not os.path.isabs(cover_letter_template): | |
| cover_letter_template = os.path.abspath(cover_letter_template) | |
| if not os.path.exists(cover_letter_template): | |
| if _is_cover_letter_enabled(): | |
| raise FileNotFoundError("Cover letter template not found") | |
| # Cover letters are disabled, so reuse CV template path as a safe placeholder. | |
| cover_letter_template = cv_template | |
| description = (job.get('description') or '').strip() | |
| if not description: | |
| raise ValueError("Job description is empty") | |
| llm_integration = APIIntegration() | |
| provider = getattr(llm_integration, 'provider', 'unknown') | |
| if provider in ('openai', 'grok', 'groq') and not llm_integration.is_api_key_set(): | |
| raise ValueError(f"API key not configured for provider '{provider}'") | |
| updater = DocumentUpdater(cv_template, cover_letter_template, llm_integration) | |
| updater.update_cv(description, output_path) | |
| generated_cover_letter_filename = None | |
| if _is_cover_letter_enabled(): | |
| cv_basename = os.path.basename(output_path) | |
| if cv_basename.startswith('CV_'): | |
| cover_letter_filename = f"CoverLetter_{cv_basename[3:]}" | |
| else: | |
| cover_letter_filename = f"CoverLetter_{cv_basename}" | |
| cover_letter_output_path = os.path.join(os.path.dirname(output_path), cover_letter_filename) | |
| updater.update_cover_letter(description, cover_letter_output_path) | |
| generated_cover_letter_filename = cover_letter_filename | |
| return { | |
| 'provider': provider, | |
| 'cover_letter_filename': generated_cover_letter_filename, | |
| } | |
| except Exception as e: | |
| raise Exception(f"API tailoring error: {str(e)}") | |
| # ==================== Legacy Routes (keep for backward compatibility) ==================== | |
| def search_jobs(): | |
| """Legacy search route - redirects to React app""" | |
| if request.method == 'POST': | |
| keyword = request.form.get('keyword', '') | |
| location = request.form.get('location', '') | |
| max_jobs = int(request.form.get('max_jobs', 10)) | |
| _set_tailoring_mode(request.form.get('tailoring_mode', _get_tailoring_mode())) | |
| if not keyword or not location: | |
| flash('Please enter both job title and location', 'error') | |
| return redirect(url_for('index')) | |
| try: | |
| scraper = LinkedInScraper(headless=True) | |
| jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs) | |
| if not jobs: | |
| flash('No jobs found. Try different search terms.', 'warning') | |
| return redirect(url_for('index')) | |
| for i, job in enumerate(jobs): | |
| logger.info(f"Fetching description for job {i+1}/{len(jobs)}: {job['title']}") | |
| title, company, description = scraper.fetch_job_description(job['link']) | |
| jobs[i]['description'] = description | |
| today_date = datetime.today().strftime("%Y-%m-%d") | |
| filename = f"linkedin_jobs_{today_date}.xlsx" | |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) | |
| df = pd.DataFrame(jobs) | |
| df.to_excel(filepath, index=False) | |
| session['jobs_file'] = filepath | |
| session['excel_filename'] = filename | |
| analyzer = CVAnalyzer() | |
| processed_jobs = [] | |
| for job in jobs: | |
| if job.get('description'): | |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description']) | |
| job['matched_skills'] = matched_skills | |
| job['matched_categories'] = matched_categories | |
| processed_jobs.append(job) | |
| _save_processed_jobs(processed_jobs) | |
| return render_template('job_list.html', | |
| jobs=processed_jobs, | |
| excel_file=filename, | |
| excel_path=filepath, | |
| tailoring_mode=_get_tailoring_mode(), | |
| llm_provider=_get_llm_provider()) | |
| except Exception as e: | |
| logger.error(f"Error during job search: {str(e)}") | |
| flash(f'An error occurred: {str(e)}', 'error') | |
| return redirect(url_for('index')) | |
| return redirect(url_for('index')) | |
| def upload_cv(): | |
| """Legacy CV upload route""" | |
| if request.method == 'POST': | |
| if 'cv_file' not in request.files: | |
| flash('No file part', 'error') | |
| return redirect(request.url) | |
| file = request.files['cv_file'] | |
| if file.filename == '': | |
| flash('No selected file', 'error') | |
| return redirect(request.url) | |
| if not file.filename.endswith('.docx'): | |
| flash('Only .docx files are supported', 'error') | |
| return redirect(request.url) | |
| try: | |
| filename = f"cv_template_{int(time.time())}.docx" | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| session['cv_template'] = filepath | |
| flash('CV template uploaded successfully', 'success') | |
| return redirect(url_for('index')) | |
| except Exception as e: | |
| logger.error(f"Error uploading CV: {str(e)}") | |
| flash(f'Error uploading CV: {str(e)}', 'error') | |
| return redirect(request.url) | |
| return render_template('upload_cv.html') | |
| # @app.errorhandler(404) | |
| # def not_found(error): | |
| # """Handle 404 errors""" | |
| # return render_template('404.html'), 404 | |
| # @app.errorhandler(500) | |
| # def internal_error(error): | |
| # """Handle 500 errors""" | |
| # logger.error(f"Internal server error: {str(error)}") | |
| # return render_template('500.html'), 500 | |
| def not_found(error): | |
| """Handle 404 errors""" | |
| return jsonify({'success': False, 'error': 'API endpoint not found'}), 404 | |
| def internal_error(error): | |
| """Handle 500 errors""" | |
| logger.error(f"Internal server error: {str(error)}") | |
| return jsonify({'success': False, 'error': 'Internal server error occurred'}), 500 | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 5050)) | |
| debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' | |
| app.run(host='0.0.0.0', port=port, debug=debug) | |