| """ |
| Web Interface for Job Application AI Agent - React Frontend Version |
| |
| This module provides a Flask web application for the job application AI agent |
| with a modern React frontend and REST API endpoints. |
| """ |
|
|
| import os |
| import logging |
| import json |
| import uuid |
| import time |
| import sys |
| import importlib |
| from datetime import datetime |
| from pathlib import Path |
| from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session, jsonify |
| from flask_cors import CORS |
| import pandas as pd |
| import zipfile |
| import io |
|
|
| from job_apply_ai.scraper.linkedin import LinkedInScraper |
| from job_apply_ai.cv_modifier.cv_analyzer import CVAnalyzer, CVModifier, batch_process_jobs |
| from job_apply_ai.utils.helpers import ensure_directory_exists |
| from CV_Tailor.ai_cv_tailor import AILangGraphTailor |
|
|
| |
| from job_apply_ai.analyzer.ats_evaluator import ATSEvaluator |
| import docx |
|
|
| |
| |
| import urllib.parse |
| from apify_client import ApifyClient |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| app = Flask(__name__, static_folder='static', static_url_path='/static', template_folder='templates') |
| app.secret_key = os.environ.get('SECRET_KEY', 'dev_key_for_testing') |
| app.config['SESSION_COOKIE_NAME'] = f"job_apply_ai_session_{int(time.time())}" |
| app.config['JSON_SORT_KEYS'] = False |
|
|
| |
| CORS(app, resources={r"/api/*": {"origins": "*"}}) |
|
|
| |
| runtime_root = os.environ.get('JOB_APPLY_AI_DATA_DIR', os.path.join(os.getcwd(), '.runtime')) |
| if not os.path.isabs(runtime_root): |
| runtime_root = os.path.abspath(runtime_root) |
| app.config['UPLOAD_FOLDER'] = os.path.join(runtime_root, 'uploads') |
| ensure_directory_exists(app.config['UPLOAD_FOLDER']) |
|
|
| |
| app.config['CV_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'cvs') |
| app.config['JOBS_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'jobs') |
| app.config['STATE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'session_state') |
| ensure_directory_exists(app.config['CV_OUTPUT_DIR']) |
| ensure_directory_exists(app.config['JOBS_OUTPUT_DIR']) |
| ensure_directory_exists(app.config['STATE_DIR']) |
|
|
| |
| app.config['ATS_TEMPLATE_PATH'] = os.path.join(os.getcwd(), 'data', 'ats_template.docx') |
|
|
| app.config['SESSION_TYPE'] = 'filesystem' |
|
|
| SUPPORTED_TAILORING_MODES = {'local', 'api'} |
| SUPPORTED_LLM_PROVIDERS = {'ollama', 'groq', 'grok', 'openai'} |
|
|
|
|
| def _parse_bool(value, default=False): |
| if value is None: |
| return default |
| if isinstance(value, bool): |
| return value |
| return str(value).strip().lower() in {'1', 'true', 'yes', 'on'} |
|
|
| def _session_state_path(state_id): |
| return os.path.join(app.config['STATE_DIR'], f"{state_id}.json") |
|
|
| def _get_tailoring_mode(): |
| """Resolve active tailoring mode from session, then environment.""" |
| mode = (session.get('tailoring_mode') or '').strip().lower() |
| if mode in SUPPORTED_TAILORING_MODES: |
| return mode |
| env_mode = (os.environ.get('CV_TAILORING_MODE', 'local') or 'local').strip().lower() |
| if env_mode not in SUPPORTED_TAILORING_MODES: |
| env_mode = 'local' |
| return env_mode |
|
|
| def _set_tailoring_mode(mode): |
| mode = (mode or '').strip().lower() |
| if mode in SUPPORTED_TAILORING_MODES: |
| session['tailoring_mode'] = mode |
| return mode |
| return _get_tailoring_mode() |
|
|
|
|
| def _get_llm_provider(): |
| provider = (session.get('llm_provider') or '').strip().lower() |
| if provider in SUPPORTED_LLM_PROVIDERS: |
| return provider |
| env_provider = (os.environ.get('LLM_PROVIDER', 'ollama') or 'ollama').strip().lower() |
| if env_provider not in SUPPORTED_LLM_PROVIDERS: |
| env_provider = 'ollama' |
| return env_provider |
|
|
|
|
| def _set_llm_provider(provider): |
| provider = (provider or '').strip().lower() |
| if provider not in SUPPORTED_LLM_PROVIDERS: |
| provider = _get_llm_provider() |
| session['llm_provider'] = provider |
| |
| os.environ['LLM_PROVIDER'] = provider |
| return provider |
|
|
|
|
| def _is_summary_tailoring_enabled(): |
| if 'enable_professional_summary' in session: |
| return _parse_bool(session.get('enable_professional_summary')) |
| return _parse_bool(os.environ.get('CV_ENABLE_SUMMARY_TAILORING', '0')) |
|
|
|
|
| def _set_summary_tailoring_enabled(enabled): |
| enabled_bool = _parse_bool(enabled) |
| session['enable_professional_summary'] = enabled_bool |
| os.environ['CV_ENABLE_SUMMARY_TAILORING'] = '1' if enabled_bool else '0' |
| return enabled_bool |
|
|
|
|
| def _is_cover_letter_enabled(): |
| if 'include_cover_letters' in session: |
| return _parse_bool(session.get('include_cover_letters')) |
| return _parse_bool(os.environ.get('API_INCLUDE_COVER_LETTERS', '0')) |
|
|
|
|
| def _set_cover_letter_enabled(enabled): |
| enabled_bool = _parse_bool(enabled) |
| session['include_cover_letters'] = enabled_bool |
| os.environ['API_INCLUDE_COVER_LETTERS'] = '1' if enabled_bool else '0' |
| return enabled_bool |
|
|
| def _sanitize_filename(text): |
| """Sanitize text to be safe for use in filenames.""" |
| if not text: |
| return 'file' |
| |
| sanitized = text.replace('/', '_').replace('\\', '_').replace(' ', '_') |
| |
| sanitized = ''.join(c for c in sanitized if c.isalnum() or c == '_') |
| |
| return sanitized[:20] |
|
|
| def _clear_job_context(keep_cv_template=True): |
| """Clear prior search/CV generation state for a fresh workflow.""" |
| state_id = session.pop('jobs_state_id', None) |
| if state_id: |
| state_path = _session_state_path(state_id) |
| if os.path.exists(state_path): |
| try: |
| os.remove(state_path) |
| except OSError: |
| pass |
|
|
| for key in [ |
| 'jobs_file', 'excel_filename', 'generated_cvs', 'successful_jobs', |
| 'failed_jobs', 'current_cv', 'current_cv_filename', |
| ]: |
| session.pop(key, None) |
|
|
| if not keep_cv_template: |
| session.pop('cv_template', None) |
|
|
| def _save_processed_jobs(processed_jobs): |
| state_id = str(uuid.uuid4()) |
| state_path = _session_state_path(state_id) |
| with open(state_path, 'w', encoding='utf-8') as f: |
| json.dump(processed_jobs, f, ensure_ascii=False) |
| session['jobs_state_id'] = state_id |
| return state_id |
|
|
| def _load_processed_jobs(): |
| state_id = session.get('jobs_state_id') |
| if not state_id: |
| return [] |
| state_path = _session_state_path(state_id) |
| if not os.path.exists(state_path): |
| return [] |
| try: |
| with open(state_path, 'r', encoding='utf-8') as f: |
| data = json.load(f) |
| return data if isinstance(data, list) else [] |
| except Exception as e: |
| logger.error(f"Failed to load session job state: {str(e)}") |
| return [] |
|
|
| def _build_professional_summary(job, matched_categories): |
| """Build a concise, professional summary tailored to role and extracted skills.""" |
| job_title = (job.get('title') or 'Professional').strip() |
| company = (job.get('company') or 'your target company').strip() |
| flat_skills = [] |
| for skills in (matched_categories or {}).values(): |
| for s in skills or []: |
| skill = str(s).strip() |
| if skill: |
| flat_skills.append(skill) |
| deduped = [] |
| seen = set() |
| for skill in flat_skills: |
| key = skill.lower() |
| if key not in seen: |
| seen.add(key) |
| deduped.append(skill) |
| top_skills = deduped[:5] |
| if top_skills: |
| skills_text = ", ".join(top_skills) |
| return ( |
| f"{job_title} professional with hands-on experience in {skills_text}. " |
| f"Delivers reliable, scalable outcomes through strong collaboration, ownership, " |
| f"and structured problem-solving. Ready to contribute immediate impact at {company}." |
| ) |
| return ( |
| f"{job_title} professional focused on delivering measurable outcomes through " |
| f"technical execution, collaboration, and continuous improvement. " |
| f"Motivated to contribute meaningful impact at {company}." |
| ) |
|
|
| |
| def _enrich_job_with_skills(job, analyzer=None): |
| """ |
| Just-In-Time (JIT) processing: Runs the CVAnalyzer only if skills are missing. |
| """ |
| |
| if 'matched_categories' in job and job['matched_categories']: |
| return job |
| |
| if not analyzer: |
| analyzer = CVAnalyzer() |
| |
| description = job.get('description', '') |
| if description and description != "Description empty.": |
| logger.info(f"JIT Analysis running for job: {job.get('title')}") |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(description) |
| job['matched_skills'] = matched_skills |
| job['matched_requirements'] = matched_requirements |
| job['matched_categories'] = matched_categories |
| else: |
| logger.warning(f"Skipping JIT Analysis for {job.get('title')} - No description.") |
| job['matched_skills'] = [] |
| job['matched_requirements'] = [] |
| job['matched_categories'] = {} |
| |
| return job |
|
|
| |
|
|
| @app.route('/app') |
| @app.route('/app/<path:path>') |
| def react_app(path=''): |
| """Serve React app for all routes under /app""" |
| react_build_path = os.path.join(os.path.dirname(__file__), '../../frontend/dist') |
| if os.path.exists(react_build_path): |
| index_path = os.path.join(react_build_path, 'index.html') |
| if os.path.exists(index_path): |
| return send_file(index_path) |
| |
| return render_template('index.html') |
|
|
| @app.route('/') |
| def index(): |
| """Render the home page.""" |
| return render_template('index.html') |
|
|
| |
|
|
| @app.route('/api/health', methods=['GET']) |
| def api_health(): |
| """Health check endpoint.""" |
| return jsonify({'status': 'ok', 'version': '2.0.0'}) |
|
|
| @app.route('/api/config', methods=['GET']) |
| def api_config(): |
| """Get current app configuration.""" |
| return jsonify({ |
| 'tailoring_mode': _get_tailoring_mode(), |
| 'llm_provider': _get_llm_provider(), |
| 'enable_professional_summary': _is_summary_tailoring_enabled(), |
| 'include_cover_letters': _is_cover_letter_enabled(), |
| 'supported_modes': list(SUPPORTED_TAILORING_MODES), |
| 'supported_providers': list(SUPPORTED_LLM_PROVIDERS), |
| }) |
|
|
| @app.route('/api/set-tailoring-mode', methods=['POST']) |
| def api_set_tailoring_mode(): |
| """Set tailoring mode via API.""" |
| data = request.get_json() |
| mode = data.get('mode', 'local') |
| new_mode = _set_tailoring_mode(mode) |
| return jsonify({'success': True, 'mode': new_mode}) |
|
|
|
|
| @app.route('/api/settings', methods=['POST']) |
| def api_save_settings(): |
| """Save runtime settings from frontend modal.""" |
| try: |
| data = request.get_json() or {} |
|
|
| mode = _set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode())) |
| provider = _set_llm_provider(data.get('llm_provider', _get_llm_provider())) |
| summary_enabled = _set_summary_tailoring_enabled( |
| data.get('enable_professional_summary', _is_summary_tailoring_enabled()) |
| ) |
| cover_letters_enabled = _set_cover_letter_enabled( |
| data.get('include_cover_letters', _is_cover_letter_enabled()) |
| ) |
|
|
| return jsonify({ |
| 'success': True, |
| 'settings': { |
| 'tailoring_mode': mode, |
| 'llm_provider': provider, |
| 'enable_professional_summary': summary_enabled, |
| 'include_cover_letters': cover_letters_enabled, |
| }, |
| }) |
| except Exception as e: |
| logger.error(f"Error saving settings: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/upload-cv', methods=['POST']) |
| def api_upload_cv(): |
| """Upload CV template via API.""" |
| try: |
| if 'file' not in request.files: |
| return jsonify({'success': False, 'error': 'No file provided'}), 400 |
| |
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({'success': False, 'error': 'No file selected'}), 400 |
| |
| if not file.filename.endswith('.docx'): |
| return jsonify({'success': False, 'error': 'Only .docx files are supported'}), 400 |
| |
| filename = f"cv_template_{int(time.time())}.docx" |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(filepath) |
| |
| session['cv_template'] = filepath |
| logger.info(f"CV template uploaded: {filename}") |
| |
| return jsonify({ |
| 'success': True, |
| 'filename': file.filename, |
| 'message': 'CV template uploaded successfully' |
| }) |
| except Exception as e: |
| logger.error(f"Error uploading CV: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/user-profile', methods=['GET', 'POST']) |
| def api_user_profile(): |
| """Get or save the user's base profile data into the session.""" |
| try: |
| if request.method == 'POST': |
| data = request.get_json() or {} |
| |
| session['user_profile'] = { |
| 'first_name': data.get('first_name', ''), |
| 'last_name': data.get('last_name', ''), |
| 'email': data.get('email', ''), |
| 'phone': data.get('phone', ''), |
| 'linkedin': data.get('linkedin', ''), |
| 'github': data.get('github', '') |
| } |
| logger.info("User profile saved to session.") |
| return jsonify({'success': True, 'message': 'Profile saved successfully.'}) |
| |
| |
| profile = session.get('user_profile', {}) |
| return jsonify({'success': True, 'profile': profile}) |
| |
| except Exception as e: |
| logger.error(f"Error handling user profile: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| |
| @app.route('/api/apify-search', methods=['POST']) |
| def api_apify_search(): |
| """Standalone endpoint to search for jobs via Apify and save results to JSON.""" |
| """Standalone endpoint to search via Apify, fetch descriptions locally, and save to JSON.""" |
| try: |
| data = request.get_json() |
| keyword = data.get('keyword', '').strip() |
| location = data.get('location', '').strip() |
| max_jobs = int(data.get('max_jobs', 10)) |
| max_days_old = int(data.get('max_days_old', 14)) |
| |
| if not keyword or not location: |
| return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400 |
| |
| logger.info(f"Starting dedicated Apify search: {keyword} in {location}") |
| |
| |
| |
| |
| base_url = "https://www.linkedin.com/jobs/search?" |
| params = { |
| "keywords": keyword, |
| "location": location, |
| } |
|
|
| if max_days_old: |
| seconds = max_days_old * 24 * 60 * 60 |
| params["f_TPR"] = f"r{seconds}" |
|
|
| final_url = base_url + urllib.parse.urlencode(params, safe="%2C") |
| |
| apify_token = os.environ.get("APIFY_API_TOKEN") |
| if not apify_token: |
| return jsonify({'success': False, 'error': 'Apify API token missing'}), 500 |
|
|
| client = ApifyClient(apify_token) |
| |
| |
| apify_count = max(10, max_jobs) |
| run_input = { |
| "urls": [final_url], |
| "count": apify_count |
| } |
|
|
| app_env = os.environ.get("ENVIOR") |
|
|
| if app_env == "PROD": |
|
|
| logger.info("Running in Production") |
| logger.info("Executing Apify Actor to get job list...") |
| run = client.actor("curious_coder/linkedin-jobs-scraper").call(run_input=run_input) |
| dataset = client.dataset(run["defaultDatasetId"]).list_items().items |
|
|
| |
| if not dataset: |
| return jsonify({ |
| 'success': True, |
| 'jobs': [], |
| 'message': 'No jobs found on LinkedIn for these parameters.' |
| }) |
|
|
| |
| |
| |
| jobs = [] |
| for item in dataset[:max_jobs]: |
| jobs.append({ |
| 'id': str(item.get('id')), |
| 'title': item.get('title', 'Unknown Title'), |
| 'company': item.get('companyName', item.get('company', 'Unknown Company')), |
| 'link': item.get('link', ''), |
| 'apply_url': item.get('applyUrl', ''), |
| 'description': item.get('descriptionText', 'No Description') |
| }) |
| else: |
| |
| |
| |
| logger.info("Running in Development") |
| logger.info("Simulating Apify scraping delay (5 seconds)...") |
| time.sleep(5) |
| |
| |
| mock_file_path = ".runtime/uploads/jobs/apify_jobs_20260424_184439.json" |
| |
| if not os.path.exists(mock_file_path): |
| return jsonify({'success': False, 'error': f'Mock file not found: {mock_file_path}'}), 500 |
| |
| with open(mock_file_path, 'r', encoding='utf-8') as f: |
| all_mock_jobs = json.load(f) |
| |
| |
| jobs = all_mock_jobs[:max_jobs] |
|
|
| |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"apify_jobs_mocked{timestamp}.json" |
| |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) |
|
|
| with open(filepath, 'w', encoding='utf-8') as f: |
| json.dump(jobs, f, indent=4, ensure_ascii=False) |
| |
| logger.info(f"Successfully saved {len(jobs)} jobs with descriptions to {filename}") |
|
|
| _save_processed_jobs(jobs) |
| |
| return jsonify({ |
| 'success': True, |
| 'jobs': jobs, |
| 'json_file': filename, |
| 'message': f'Found and saved {len(jobs)} jobs to JSON.' |
| }) |
| |
| except Exception as e: |
| logger.error(f"Error in Apify search endpoint: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
| |
| @app.route('/api/evaluate-fit', methods=['POST']) |
| def api_evaluate_fit(): |
| """Run heavy semantic analysis to get a fit score before generating a CV.""" |
| try: |
| data = request.get_json() |
| job_id = data.get('job_id') |
| |
| if not job_id: |
| return jsonify({'success': False, 'error': 'No job ID provided'}), 400 |
| |
| |
| user_resume_path = session.get('cv_template') |
| if not user_resume_path or not os.path.exists(user_resume_path): |
| return jsonify({'success': False, 'error': 'Please upload a base CV first.'}), 400 |
| |
| |
| try: |
| doc = docx.Document(user_resume_path) |
| resume_text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()]) |
| except Exception as e: |
| return jsonify({'success': False, 'error': f'Failed to read CV: {str(e)}'}), 500 |
| |
| |
| jobs = _load_processed_jobs() |
| job = next((j for j in jobs if j.get('id') == job_id or str(jobs.index(j)) == str(job_id)), None) |
| |
| if not job or not job.get('description'): |
| return jsonify({'success': False, 'error': 'Job description not found.'}), 404 |
| |
| |
| logger.info(f"Running deep semantic evaluation for job: {job.get('title')}") |
| evaluator = ATSEvaluator() |
| result = evaluator.evaluate_fit(resume_text, job['description']) |
| |
| return jsonify({ |
| 'success': True, |
| 'match_score': result['match_score'], |
| 'matched_skills': result['matched_skills'], |
| 'missing_skills': result['missing_skills'] |
| }) |
| |
| except Exception as e: |
| logger.error(f"Error in evaluate-fit: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/search', methods=['POST']) |
| def api_search_jobs(): |
| """Search for jobs via API.""" |
| try: |
| data = request.get_json() |
| keyword = data.get('keyword', '').strip() |
| location = data.get('location', '').strip() |
| max_jobs = int(data.get('max_jobs', 10)) |
| max_days_old = int(data.get('max_days_old', 14)) |
| |
| if not keyword or not location: |
| return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400 |
| |
| _set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode())) |
| _clear_job_context(keep_cv_template=True) |
| |
| logger.info(f"Searching jobs: {keyword} in {location}") |
| scraper = LinkedInScraper(headless=True) |
| jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs, max_days_old=max_days_old) |
| |
| if not jobs: |
| return jsonify({ |
| 'success': True, |
| 'jobs': [], |
| 'message': 'No jobs found' |
| }) |
| |
| |
| for i, job in enumerate(jobs): |
| try: |
| logger.info(f"Fetching description {i+1}/{len(jobs)}") |
| title, company, description = scraper.fetch_job_description(job['link']) |
| jobs[i]['description'] = description |
| except Exception as e: |
| logger.warning(f"Failed to fetch description for {job.get('title')}: {str(e)}") |
| jobs[i]['description'] = '' |
| |
| |
| today_date = datetime.today().strftime("%Y-%m-%d") |
| filename = f"linkedin_jobs_{today_date}_{int(time.time())}.xlsx" |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) |
| df = pd.DataFrame(jobs) |
| df.to_excel(filepath, index=False) |
| session['jobs_file'] = filepath |
| session['excel_filename'] = filename |
| |
| |
| analyzer = CVAnalyzer() |
| processed_jobs = [] |
| |
| for i, job in enumerate(jobs): |
| if job.get('description'): |
| try: |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description']) |
| job['matched_skills'] = matched_skills |
| job['matched_categories'] = matched_categories |
| |
| job['id'] = str(i) |
| processed_jobs.append(job) |
| except Exception as e: |
| logger.warning(f"Failed to analyze skills for {job.get('title')}: {str(e)}") |
| |
| _save_processed_jobs(processed_jobs) |
| |
| return jsonify({ |
| 'success': True, |
| 'jobs': processed_jobs, |
| 'excel_file': filename, |
| 'message': f'Found {len(processed_jobs)} jobs' |
| }) |
| except Exception as e: |
| logger.error(f"Error searching jobs: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/jobs', methods=['GET']) |
| def api_get_jobs(): |
| """Get stored jobs from session.""" |
| try: |
| jobs = _load_processed_jobs() |
| return jsonify({ |
| 'success': True, |
| 'jobs': jobs, |
| 'count': len(jobs) |
| }) |
| except Exception as e: |
| logger.error(f"Error getting jobs: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| |
| @app.route('/api/generate-cv/<job_id>', methods=['POST']) |
| def api_generate_cv(job_id): |
| """Generate CV for a single job with JIT Analysis.""" |
| try: |
| jobs = _load_processed_jobs() |
|
|
| |
|
|
| |
| job_idx = -1 |
| for i, j in enumerate(jobs): |
| if j.get('id') == job_id or str(i) == str(job_id): |
| job_idx = i |
| break |
| |
| |
| if job_idx == -1: |
| return jsonify({'success': False, 'error': 'Invalid job index'}), 400 |
| |
| job = jobs[job_idx] |
|
|
| |
| job = _enrich_job_with_skills(job) |
|
|
| |
| jobs[job_idx] = job |
| _save_processed_jobs(jobs) |
|
|
| |
| |
| |
| |
|
|
| user_resume_path = session.get('cv_template') |
| ats_template_path = app.config.get('ATS_TEMPLATE_PATH') |
| |
| if not user_resume_path or not os.path.exists(user_resume_path): |
| return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400 |
| |
| if not ats_template_path or not os.path.exists(ats_template_path): |
| return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500 |
| |
| |
| |
| |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| company_slug = _sanitize_filename(job.get('company', 'Company')) |
| title_slug = _sanitize_filename(job.get('title', 'Position')) |
| output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx" |
| output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename) |
|
|
| |
| |
| |
| logger.info(f"Starting LangGraph Tailor for {job.get('title')}") |
| ai_tailor = AILangGraphTailor() |
| |
| |
| original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path) |
| |
| |
| final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', '')) |
| |
| |
| ai_tailor.generate_final_docx( |
| tailored_dict=final_state["tailored_resume"], |
| template_path=ats_template_path, |
| output_path=output_path |
| ) |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| return jsonify({ |
| 'success': True, |
| 'filename': output_filename, |
| 'job_title': job.get('title'), |
| 'company': job.get('company'), |
| 'job': job, |
| 'message': f"CV successfully tailored for {job.get('company')}" |
| }) |
| except Exception as e: |
| logger.error(f"Error generating CV: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/generate-all-cvs', methods=['POST']) |
| def api_generate_all_cvs(): |
| """Generate CVs for multiple jobs with JIT Analysis.""" |
| try: |
| data = request.get_json() |
| |
| requested_ids = data.get('job_indices', []) |
| jobs = _load_processed_jobs() |
| |
| |
| |
|
|
| |
| job_indices = [] |
| if requested_ids: |
| for req_id in requested_ids: |
| for i, j in enumerate(jobs): |
| if j.get('id') == req_id or str(i) == str(req_id): |
| job_indices.append(i) |
| break |
| else: |
| |
| job_indices = list(range(len(jobs))) |
| |
| |
| |
| |
| |
|
|
| user_resume_path = session.get('cv_template') |
| ats_template_path = app.config.get('ATS_TEMPLATE_PATH') |
| |
| if not user_resume_path or not os.path.exists(user_resume_path): |
| return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400 |
| |
| if not ats_template_path or not os.path.exists(ats_template_path): |
| return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500 |
|
|
| tailoring_mode = _get_tailoring_mode() |
| successful = [] |
| failed = [] |
|
|
| |
| analyzer = CVAnalyzer() |
| jobs_modified = False |
| |
| for idx in job_indices: |
| try: |
| if idx < 0 or idx >= len(jobs): |
| continue |
|
|
| |
| job = _enrich_job_with_skills(jobs[idx], analyzer) |
| jobs[idx] = job |
| jobs_modified = True |
|
|
| |
| matched_categories = job.get('matched_categories', {}) |
| professional_summary = _build_professional_summary(job, matched_categories) |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| company_slug = _sanitize_filename(job.get('company', 'Company')) |
| title_slug = _sanitize_filename(job.get('title', 'Position')) |
| output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx" |
| output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename) |
|
|
| |
| |
| |
| logger.info(f"Starting LangGraph Tailor for {job.get('title')}") |
| ai_tailor = AILangGraphTailor() |
| |
| |
| original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path) |
| |
| |
| final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', '')) |
| |
| |
| ai_tailor.generate_final_docx( |
| tailored_dict=final_state["tailored_resume"], |
| template_path=ats_template_path, |
| output_path=output_path |
| ) |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| successful.append({ |
| 'job_index': idx, |
| 'filename': output_filename, |
| 'cover_letter_filename': None, |
| 'job_title': job.get('title'), |
| 'company': job.get('company'), |
| }) |
| except Exception as e: |
| logger.error(f"Error generating CV for job {idx}: {str(e)}") |
| failed.append({ |
| 'job_index': idx, |
| 'error': str(e), |
| 'job_title': jobs[idx].get('title') if idx < len(jobs) else 'Unknown', |
| }) |
|
|
| if jobs_modified: |
| _save_processed_jobs(jobs) |
| |
| |
| zip_buffer = io.BytesIO() |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
| for cv in successful: |
| file_path = os.path.join(app.config['CV_OUTPUT_DIR'], cv['filename']) |
| if os.path.exists(file_path): |
| zip_file.write(file_path, arcname=cv['filename']) |
| cover_letter_filename = cv.get('cover_letter_filename') |
| if cover_letter_filename: |
| cover_letter_path = os.path.join(app.config['CV_OUTPUT_DIR'], cover_letter_filename) |
| if os.path.exists(cover_letter_path): |
| zip_file.write(cover_letter_path, arcname=cover_letter_filename) |
| |
| zip_buffer.seek(0) |
| zip_filename = f"CVs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip" |
| zip_path = os.path.join(app.config['CV_OUTPUT_DIR'], zip_filename) |
| |
| with open(zip_path, 'wb') as f: |
| f.write(zip_buffer.getvalue()) |
| |
| return jsonify({ |
| 'success': True, |
| 'successful': successful, |
| 'failed': failed, |
| 'zip_filename': zip_filename, |
| 'total_generated': len(successful), |
| 'total_failed': len(failed), |
| }) |
| except Exception as e: |
| logger.error(f"Error generating all CVs: {str(e)}") |
| return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
| @app.route('/api/download/<filename>', methods=['GET']) |
| def api_download_file(filename): |
| """Download a file.""" |
| try: |
| import urllib.parse |
| |
| decoded_filename = urllib.parse.unquote(filename) |
| |
| |
| if '..' in decoded_filename or '/' in decoded_filename or '\\' in decoded_filename: |
| return jsonify({'error': 'Invalid filename'}), 400 |
| |
| file_path = os.path.join(app.config['CV_OUTPUT_DIR'], decoded_filename) |
| if not os.path.exists(file_path): |
| file_path = os.path.join(app.config['JOBS_OUTPUT_DIR'], decoded_filename) |
| |
| if not os.path.exists(file_path): |
| logger.warning(f"File not found: {decoded_filename}") |
| return jsonify({'error': 'File not found'}), 404 |
| |
| logger.info(f"Downloading file: {decoded_filename}") |
| return send_file(file_path, as_attachment=True, download_name=decoded_filename) |
| except Exception as e: |
| logger.error(f"Error downloading file: {str(e)}") |
| return jsonify({'error': str(e)}), 500 |
|
|
| |
| @app.route('/api/latest-application-data', methods=['GET']) |
| def api_latest_application_data(): |
| """Endpoint for the Chrome Extension to fetch real user data for autofill.""" |
| |
| |
| user_profile = session.get('user_profile') |
| |
| if not user_profile: |
| return jsonify({ |
| 'success': False, |
| 'error': 'No user profile found. Please save your profile in the React app first!' |
| }), 400 |
|
|
| |
| latest_cv = session.get('latest_cv_filename') |
| cv_url = None |
| |
| if latest_cv: |
| import urllib.parse |
| |
| safe_filename = urllib.parse.quote(latest_cv) |
| cv_url = f"{request.host_url}api/download/{safe_filename}" |
|
|
| return jsonify({ |
| 'success': True, |
| 'user': user_profile, |
| |
| 'message': 'Real session data retrieved successfully' |
| }) |
|
|
| def _generate_cv_with_api_tailoring(job, cv_template, output_path): |
| """Use API subproject updater to generate a tailored CV and optional cover letter.""" |
| api_project_root = os.path.join(os.getcwd(), "Automatic CV and Cover Letter with API") |
| if not os.path.exists(api_project_root): |
| raise FileNotFoundError("API subproject folder not found") |
|
|
| if api_project_root not in sys.path: |
| sys.path.append(api_project_root) |
|
|
| try: |
| |
| os.environ['LLM_PROVIDER'] = _get_llm_provider() |
| APIIntegration = importlib.import_module('src.utils.openai_integration').OpenAIIntegration |
| DocumentUpdater = importlib.import_module('src.updaters.document_updater').DocumentUpdater |
|
|
| cover_letter_template = os.environ.get( |
| 'API_COVER_LETTER_TEMPLATE_PATH', |
| os.path.join(api_project_root, 'data', 'Cover Letter_Imon .docx') |
| ) |
| if not os.path.isabs(cover_letter_template): |
| cover_letter_template = os.path.abspath(cover_letter_template) |
|
|
| if not os.path.exists(cover_letter_template): |
| if _is_cover_letter_enabled(): |
| raise FileNotFoundError("Cover letter template not found") |
| |
| cover_letter_template = cv_template |
|
|
| description = (job.get('description') or '').strip() |
| if not description: |
| raise ValueError("Job description is empty") |
|
|
| llm_integration = APIIntegration() |
| provider = getattr(llm_integration, 'provider', 'unknown') |
| if provider in ('openai', 'grok', 'groq') and not llm_integration.is_api_key_set(): |
| raise ValueError(f"API key not configured for provider '{provider}'") |
|
|
| updater = DocumentUpdater(cv_template, cover_letter_template, llm_integration) |
| updater.update_cv(description, output_path) |
|
|
| generated_cover_letter_filename = None |
| if _is_cover_letter_enabled(): |
| cv_basename = os.path.basename(output_path) |
| if cv_basename.startswith('CV_'): |
| cover_letter_filename = f"CoverLetter_{cv_basename[3:]}" |
| else: |
| cover_letter_filename = f"CoverLetter_{cv_basename}" |
| cover_letter_output_path = os.path.join(os.path.dirname(output_path), cover_letter_filename) |
| updater.update_cover_letter(description, cover_letter_output_path) |
| generated_cover_letter_filename = cover_letter_filename |
|
|
| return { |
| 'provider': provider, |
| 'cover_letter_filename': generated_cover_letter_filename, |
| } |
| except Exception as e: |
| raise Exception(f"API tailoring error: {str(e)}") |
|
|
| |
|
|
| @app.route('/search', methods=['GET', 'POST']) |
| def search_jobs(): |
| """Legacy search route - redirects to React app""" |
| if request.method == 'POST': |
| keyword = request.form.get('keyword', '') |
| location = request.form.get('location', '') |
| max_jobs = int(request.form.get('max_jobs', 10)) |
| |
| _set_tailoring_mode(request.form.get('tailoring_mode', _get_tailoring_mode())) |
| |
| if not keyword or not location: |
| flash('Please enter both job title and location', 'error') |
| return redirect(url_for('index')) |
| |
| try: |
| scraper = LinkedInScraper(headless=True) |
| jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs) |
| |
| if not jobs: |
| flash('No jobs found. Try different search terms.', 'warning') |
| return redirect(url_for('index')) |
| |
| for i, job in enumerate(jobs): |
| logger.info(f"Fetching description for job {i+1}/{len(jobs)}: {job['title']}") |
| title, company, description = scraper.fetch_job_description(job['link']) |
| jobs[i]['description'] = description |
| |
| today_date = datetime.today().strftime("%Y-%m-%d") |
| filename = f"linkedin_jobs_{today_date}.xlsx" |
| filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename) |
| |
| df = pd.DataFrame(jobs) |
| df.to_excel(filepath, index=False) |
| session['jobs_file'] = filepath |
| session['excel_filename'] = filename |
| |
| analyzer = CVAnalyzer() |
| processed_jobs = [] |
| |
| for job in jobs: |
| if job.get('description'): |
| matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description']) |
| job['matched_skills'] = matched_skills |
| job['matched_categories'] = matched_categories |
| processed_jobs.append(job) |
| |
| _save_processed_jobs(processed_jobs) |
| |
| return render_template('job_list.html', |
| jobs=processed_jobs, |
| excel_file=filename, |
| excel_path=filepath, |
| tailoring_mode=_get_tailoring_mode(), |
| llm_provider=_get_llm_provider()) |
| |
| except Exception as e: |
| logger.error(f"Error during job search: {str(e)}") |
| flash(f'An error occurred: {str(e)}', 'error') |
| return redirect(url_for('index')) |
| |
| return redirect(url_for('index')) |
|
|
| @app.route('/upload_cv', methods=['GET', 'POST']) |
| def upload_cv(): |
| """Legacy CV upload route""" |
| if request.method == 'POST': |
| if 'cv_file' not in request.files: |
| flash('No file part', 'error') |
| return redirect(request.url) |
| |
| file = request.files['cv_file'] |
| if file.filename == '': |
| flash('No selected file', 'error') |
| return redirect(request.url) |
| |
| if not file.filename.endswith('.docx'): |
| flash('Only .docx files are supported', 'error') |
| return redirect(request.url) |
| |
| try: |
| filename = f"cv_template_{int(time.time())}.docx" |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(filepath) |
| session['cv_template'] = filepath |
| flash('CV template uploaded successfully', 'success') |
| return redirect(url_for('index')) |
| except Exception as e: |
| logger.error(f"Error uploading CV: {str(e)}") |
| flash(f'Error uploading CV: {str(e)}', 'error') |
| return redirect(request.url) |
| |
| return render_template('upload_cv.html') |
|
|
| @app.errorhandler(404) |
| def not_found(error): |
| """Handle 404 errors""" |
| return render_template('404.html'), 404 |
|
|
| @app.errorhandler(500) |
| def internal_error(error): |
| """Handle 500 errors""" |
| logger.error(f"Internal server error: {str(error)}") |
| return render_template('500.html'), 500 |
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 5050)) |
| debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true' |
| app.run(host='0.0.0.0', port=port, debug=debug) |
|
|