skillsync-cli / app_new.py
Mr-Haseeb786
fixes
5b0bd16
Raw
History Blame Contribute Delete
48 kB
"""
Web Interface for Job Application AI Agent - React Frontend Version
This module provides a Flask web application for the job application AI agent
with a modern React frontend and REST API endpoints.
"""
import os
import logging
import json
import uuid
import time
import sys
import importlib
from datetime import datetime
from pathlib import Path
from flask import Flask, render_template, request, redirect, url_for, flash, send_file, session, jsonify
from flask_cors import CORS
import pandas as pd
import zipfile
import io
from job_apply_ai.cv_modifier.cv_analyzer import CVAnalyzer, CVModifier, batch_process_jobs
from job_apply_ai.utils.helpers import ensure_directory_exists
from CV_Tailor.ai_cv_tailor import AILangGraphTailor
# CV Analyzer Module
from job_apply_ai.analyzer.ats_evaluator import ATSEvaluator
import docx
# Changes made to add the Apify Actor
# Added Dependencies
import urllib.parse
from apify_client import ApifyClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__, static_folder='static', static_url_path='/static', template_folder='templates')
app.secret_key = os.environ.get('SECRET_KEY', 'dev_key_for_testing')
app.config['SESSION_COOKIE_NAME'] = f"job_apply_ai_session_{int(time.time())}"
app.config['JSON_SORT_KEYS'] = False
# Enable CORS for API endpoints
CORS(app, resources={r"/api/*": {"origins": "*"}})
# Keep runtime files local to the project unless overridden by env var.
runtime_root = os.environ.get('JOB_APPLY_AI_DATA_DIR', os.path.join(os.getcwd(), '.runtime'))
if not os.path.isabs(runtime_root):
runtime_root = os.path.abspath(runtime_root)
app.config['UPLOAD_FOLDER'] = os.path.join(runtime_root, 'uploads')
ensure_directory_exists(app.config['UPLOAD_FOLDER'])
# Create output directories
app.config['CV_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'cvs')
app.config['JOBS_OUTPUT_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'jobs')
app.config['STATE_DIR'] = os.path.join(app.config['UPLOAD_FOLDER'], 'session_state')
ensure_directory_exists(app.config['CV_OUTPUT_DIR'])
ensure_directory_exists(app.config['JOBS_OUTPUT_DIR'])
ensure_directory_exists(app.config['STATE_DIR'])
# Define where the system's Jinja ATS template lives
app.config['ATS_TEMPLATE_PATH'] = os.path.join(os.getcwd(), 'data', 'ats_template.docx')
app.config['SESSION_TYPE'] = 'filesystem'
SUPPORTED_TAILORING_MODES = {'local', 'api'}
SUPPORTED_LLM_PROVIDERS = {'ollama', 'groq', 'grok', 'openai'}
def _parse_bool(value, default=False):
if value is None:
return default
if isinstance(value, bool):
return value
return str(value).strip().lower() in {'1', 'true', 'yes', 'on'}
def _session_state_path(state_id):
return os.path.join(app.config['STATE_DIR'], f"{state_id}.json")
def _get_tailoring_mode():
"""Resolve active tailoring mode from session, then environment."""
mode = (session.get('tailoring_mode') or '').strip().lower()
if mode in SUPPORTED_TAILORING_MODES:
return mode
env_mode = (os.environ.get('CV_TAILORING_MODE', 'local') or 'local').strip().lower()
if env_mode not in SUPPORTED_TAILORING_MODES:
env_mode = 'local'
return env_mode
def _set_tailoring_mode(mode):
mode = (mode or '').strip().lower()
if mode in SUPPORTED_TAILORING_MODES:
session['tailoring_mode'] = mode
return mode
return _get_tailoring_mode()
def _get_llm_provider():
provider = (session.get('llm_provider') or '').strip().lower()
if provider in SUPPORTED_LLM_PROVIDERS:
return provider
env_provider = (os.environ.get('LLM_PROVIDER', 'ollama') or 'ollama').strip().lower()
if env_provider not in SUPPORTED_LLM_PROVIDERS:
env_provider = 'ollama'
return env_provider
def _set_llm_provider(provider):
provider = (provider or '').strip().lower()
if provider not in SUPPORTED_LLM_PROVIDERS:
provider = _get_llm_provider()
session['llm_provider'] = provider
# Keep compatibility with integrations that read environment variables.
os.environ['LLM_PROVIDER'] = provider
return provider
def _is_summary_tailoring_enabled():
if 'enable_professional_summary' in session:
return _parse_bool(session.get('enable_professional_summary'))
return _parse_bool(os.environ.get('CV_ENABLE_SUMMARY_TAILORING', '0'))
def _set_summary_tailoring_enabled(enabled):
enabled_bool = _parse_bool(enabled)
session['enable_professional_summary'] = enabled_bool
os.environ['CV_ENABLE_SUMMARY_TAILORING'] = '1' if enabled_bool else '0'
return enabled_bool
def _is_cover_letter_enabled():
if 'include_cover_letters' in session:
return _parse_bool(session.get('include_cover_letters'))
return _parse_bool(os.environ.get('API_INCLUDE_COVER_LETTERS', '0'))
def _set_cover_letter_enabled(enabled):
enabled_bool = _parse_bool(enabled)
session['include_cover_letters'] = enabled_bool
os.environ['API_INCLUDE_COVER_LETTERS'] = '1' if enabled_bool else '0'
return enabled_bool
def _sanitize_filename(text):
"""Sanitize text to be safe for use in filenames."""
if not text:
return 'file'
# Replace problematic characters with underscores
sanitized = text.replace('/', '_').replace('\\', '_').replace(' ', '_')
# Remove any other special characters that could cause issues
sanitized = ''.join(c for c in sanitized if c.isalnum() or c == '_')
# Limit length
return sanitized[:20]
def _clear_job_context(keep_cv_template=True):
"""Clear prior search/CV generation state for a fresh workflow."""
state_id = session.pop('jobs_state_id', None)
if state_id:
state_path = _session_state_path(state_id)
if os.path.exists(state_path):
try:
os.remove(state_path)
except OSError:
pass
for key in [
'jobs_file', 'excel_filename', 'generated_cvs', 'successful_jobs',
'failed_jobs', 'current_cv', 'current_cv_filename',
]:
session.pop(key, None)
if not keep_cv_template:
session.pop('cv_template', None)
def _save_processed_jobs(processed_jobs):
state_id = str(uuid.uuid4())
state_path = _session_state_path(state_id)
with open(state_path, 'w', encoding='utf-8') as f:
json.dump(processed_jobs, f, ensure_ascii=False)
session['jobs_state_id'] = state_id
return state_id
def _load_processed_jobs():
state_id = session.get('jobs_state_id')
if not state_id:
return []
state_path = _session_state_path(state_id)
if not os.path.exists(state_path):
return []
try:
with open(state_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except Exception as e:
logger.error(f"Failed to load session job state: {str(e)}")
return []
def _build_professional_summary(job, matched_categories):
"""Build a concise, professional summary tailored to role and extracted skills."""
job_title = (job.get('title') or 'Professional').strip()
company = (job.get('company') or 'your target company').strip()
flat_skills = []
for skills in (matched_categories or {}).values():
for s in skills or []:
skill = str(s).strip()
if skill:
flat_skills.append(skill)
deduped = []
seen = set()
for skill in flat_skills:
key = skill.lower()
if key not in seen:
seen.add(key)
deduped.append(skill)
top_skills = deduped[:5]
if top_skills:
skills_text = ", ".join(top_skills)
return (
f"{job_title} professional with hands-on experience in {skills_text}. "
f"Delivers reliable, scalable outcomes through strong collaboration, ownership, "
f"and structured problem-solving. Ready to contribute immediate impact at {company}."
)
return (
f"{job_title} professional focused on delivering measurable outcomes through "
f"technical execution, collaboration, and continuous improvement. "
f"Motivated to contribute meaningful impact at {company}."
)
# Addition to optimize CV Analyzer
def _enrich_job_with_skills(job, analyzer=None):
"""
Just-In-Time (JIT) processing: Runs the CVAnalyzer only if skills are missing.
"""
# If it already has categories, skip the heavy NLP processing
if 'matched_categories' in job and job['matched_categories']:
return job
if not analyzer:
analyzer = CVAnalyzer()
description = job.get('description', '')
if description and description != "Description empty.":
logger.info(f"JIT Analysis running for job: {job.get('title')}")
matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(description)
job['matched_skills'] = matched_skills
job['matched_requirements'] = matched_requirements
job['matched_categories'] = matched_categories
else:
logger.warning(f"Skipping JIT Analysis for {job.get('title')} - No description.")
job['matched_skills'] = []
job['matched_requirements'] = []
job['matched_categories'] = {}
return job
# ==================== React Frontend Routes ====================
@app.route('/app')
@app.route('/app/<path:path>')
def react_app(path=''):
"""Serve React app for all routes under /app"""
react_build_path = os.path.join(os.path.dirname(__file__), '../../frontend/dist')
if os.path.exists(react_build_path):
index_path = os.path.join(react_build_path, 'index.html')
if os.path.exists(index_path):
return send_file(index_path)
# Fallback to template if React build doesn't exist
return render_template('index.html')
# @app.route('/')
# def index():
# """Render the home page."""
# return render_template('index.html')
@app.route('/')
def index():
"""Root health check for Hugging Face."""
return jsonify({
'status': 'online',
'service': 'Job Apply AI API',
'message': 'Backend is running! Connect via Vercel frontend.'
}), 200
# ==================== REST API Endpoints for React ====================
@app.route('/api/health', methods=['GET'])
def api_health():
"""Health check endpoint."""
return jsonify({'status': 'ok', 'version': '2.0.0'})
@app.route('/api/config', methods=['GET'])
def api_config():
"""Get current app configuration."""
return jsonify({
'tailoring_mode': _get_tailoring_mode(),
'llm_provider': _get_llm_provider(),
'enable_professional_summary': _is_summary_tailoring_enabled(),
'include_cover_letters': _is_cover_letter_enabled(),
'supported_modes': list(SUPPORTED_TAILORING_MODES),
'supported_providers': list(SUPPORTED_LLM_PROVIDERS),
})
@app.route('/api/set-tailoring-mode', methods=['POST'])
def api_set_tailoring_mode():
"""Set tailoring mode via API."""
data = request.get_json()
mode = data.get('mode', 'local')
new_mode = _set_tailoring_mode(mode)
return jsonify({'success': True, 'mode': new_mode})
@app.route('/api/settings', methods=['POST'])
def api_save_settings():
"""Save runtime settings from frontend modal."""
try:
data = request.get_json() or {}
mode = _set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode()))
provider = _set_llm_provider(data.get('llm_provider', _get_llm_provider()))
summary_enabled = _set_summary_tailoring_enabled(
data.get('enable_professional_summary', _is_summary_tailoring_enabled())
)
cover_letters_enabled = _set_cover_letter_enabled(
data.get('include_cover_letters', _is_cover_letter_enabled())
)
return jsonify({
'success': True,
'settings': {
'tailoring_mode': mode,
'llm_provider': provider,
'enable_professional_summary': summary_enabled,
'include_cover_letters': cover_letters_enabled,
},
})
except Exception as e:
logger.error(f"Error saving settings: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/upload-cv', methods=['POST'])
def api_upload_cv():
"""Upload CV template via API."""
try:
if 'file' not in request.files:
return jsonify({'success': False, 'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': 'No file selected'}), 400
if not file.filename.endswith('.docx'):
return jsonify({'success': False, 'error': 'Only .docx files are supported'}), 400
filename = f"cv_template_{int(time.time())}.docx"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
session['cv_template'] = filepath
logger.info(f"CV template uploaded: {filename}")
return jsonify({
'success': True,
'filename': file.filename,
'message': 'CV template uploaded successfully'
})
except Exception as e:
logger.error(f"Error uploading CV: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/user-profile', methods=['GET', 'POST'])
def api_user_profile():
"""Get or save the user's base profile data into the session."""
try:
if request.method == 'POST':
data = request.get_json() or {}
# Save the incoming data to the Flask session
session['user_profile'] = {
'first_name': data.get('first_name', ''),
'last_name': data.get('last_name', ''),
'email': data.get('email', ''),
'phone': data.get('phone', ''),
'linkedin': data.get('linkedin', ''),
'github': data.get('github', '')
}
logger.info("User profile saved to session.")
return jsonify({'success': True, 'message': 'Profile saved successfully.'})
# If GET request, return whatever is currently in the session
profile = session.get('user_profile', {})
return jsonify({'success': True, 'profile': profile})
except Exception as e:
logger.error(f"Error handling user profile: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
# Added Apify Actor Job Search
@app.route('/api/apify-search', methods=['POST'])
def api_apify_search():
"""Standalone endpoint to search for jobs via Apify and save results to JSON."""
"""Standalone endpoint to search via Apify, fetch descriptions locally, and save to JSON."""
try:
data = request.get_json()
keyword = data.get('keyword', '').strip()
location = data.get('location', '').strip()
max_jobs = int(data.get('max_jobs', 10))
max_days_old = int(data.get('max_days_old', 14))
if not keyword or not location:
return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400
logger.info(f"Starting dedicated Apify search: {keyword} in {location}")
# ==========================================
# 1. URL Construction & Apify Call
# ==========================================
base_url = "https://www.linkedin.com/jobs/search?"
params = {
"keywords": keyword,
"location": location,
}
if max_days_old:
seconds = max_days_old * 24 * 60 * 60
params["f_TPR"] = f"r{seconds}"
final_url = base_url + urllib.parse.urlencode(params, safe="%2C")
apify_token = os.environ.get("APIFY_API_TOKEN")
if not apify_token:
return jsonify({'success': False, 'error': 'Apify API token missing'}), 500
client = ApifyClient(apify_token)
# Force Apify to scrape at least 10 to satisfy its internal requirements
apify_count = max(10, max_jobs)
run_input = {
"urls": [final_url],
"count": apify_count
}
app_env = os.environ.get("ENVIOR")
if app_env == "PROD":
logger.info("Running in Production")
logger.info("Executing Apify Actor to get job list...")
run = client.actor("curious_coder/linkedin-jobs-scraper").call(run_input=run_input)
dataset = client.dataset(run["defaultDatasetId"]).list_items().items
if not dataset:
return jsonify({
'success': True,
'jobs': [],
'message': 'No jobs found on LinkedIn for these parameters.'
})
# ==========================================
# 2. Map the Data
# ==========================================
jobs = []
for item in dataset[:max_jobs]:
jobs.append({
'id': str(item.get('id')),
'title': item.get('title', 'Unknown Title'),
'company': item.get('companyName', item.get('company', 'Unknown Company')),
'link': item.get('link', ''),
'apply_url': item.get('applyUrl', ''),
'description': item.get('descriptionText', 'No Description')
})
else:
# ==========================================
# Simulate Delay & Load Mock Data
# ==========================================
logger.info("Running in Development")
logger.info("Simulating Apify scraping delay (5 seconds)...")
time.sleep(5)
# Target the specific JSON file you saved previously
mock_file_path = ".runtime/uploads/jobs/apify_jobs_20260424_184439.json"
if not os.path.exists(mock_file_path):
return jsonify({'success': False, 'error': f'Mock file not found: {mock_file_path}'}), 500
with open(mock_file_path, 'r', encoding='utf-8') as f:
all_mock_jobs = json.load(f)
# Enforce the max_jobs limit requested by the client to mimic actual behavior
jobs = all_mock_jobs[:max_jobs]
# ==========================================
# 3. Save Results to a JSON File
# ==========================================
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"apify_jobs_mocked{timestamp}.json"
filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(jobs, f, indent=4, ensure_ascii=False)
logger.info(f"Successfully saved {len(jobs)} jobs with descriptions to {filename}")
_save_processed_jobs(jobs)
return jsonify({
'success': True,
'jobs': jobs,
'json_file': filename,
'message': f'Found and saved {len(jobs)} jobs to JSON.'
})
except Exception as e:
logger.error(f"Error in Apify search endpoint: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
# CV Analyzer
@app.route('/api/evaluate-fit', methods=['POST'])
def api_evaluate_fit():
"""Run heavy semantic analysis to get a fit score before generating a CV."""
try:
data = request.get_json()
job_id = data.get('job_id')
if not job_id:
return jsonify({'success': False, 'error': 'No job ID provided'}), 400
# 1. Ensure the user has uploaded their base CV
user_resume_path = session.get('cv_template')
if not user_resume_path or not os.path.exists(user_resume_path):
return jsonify({'success': False, 'error': 'Please upload a base CV first.'}), 400
# 2. Extract raw text from the base CV
try:
doc = docx.Document(user_resume_path)
resume_text = "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
except Exception as e:
return jsonify({'success': False, 'error': f'Failed to read CV: {str(e)}'}), 500
# 3. Find the specific Job Description from the session
jobs = _load_processed_jobs()
job = next((j for j in jobs if j.get('id') == job_id or str(jobs.index(j)) == str(job_id)), None)
if not job or not job.get('description'):
return jsonify({'success': False, 'error': 'Job description not found.'}), 404
# 4. Run the Heavy Evaluator (Lazy Loads SBERT if it's the first time!)
logger.info(f"Running deep semantic evaluation for job: {job.get('title')}")
evaluator = ATSEvaluator()
result = evaluator.evaluate_fit(resume_text, job['description'])
return jsonify({
'success': True,
'match_score': result['match_score'],
'matched_skills': result['matched_skills'],
'missing_skills': result['missing_skills']
})
except Exception as e:
logger.error(f"Error in evaluate-fit: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/search', methods=['POST'])
def api_search_jobs():
"""Search for jobs via API."""
try:
data = request.get_json()
keyword = data.get('keyword', '').strip()
location = data.get('location', '').strip()
max_jobs = int(data.get('max_jobs', 10))
max_days_old = int(data.get('max_days_old', 14))
if not keyword or not location:
return jsonify({'success': False, 'error': 'Keyword and location are required'}), 400
_set_tailoring_mode(data.get('tailoring_mode', _get_tailoring_mode()))
_clear_job_context(keep_cv_template=True)
logger.info(f"Searching jobs: {keyword} in {location}")
scraper = LinkedInScraper(headless=True)
jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs, max_days_old=max_days_old)
if not jobs:
return jsonify({
'success': True,
'jobs': [],
'message': 'No jobs found'
})
# Fetch descriptions for each job
for i, job in enumerate(jobs):
try:
logger.info(f"Fetching description {i+1}/{len(jobs)}")
title, company, description = scraper.fetch_job_description(job['link'])
jobs[i]['description'] = description
except Exception as e:
logger.warning(f"Failed to fetch description for {job.get('title')}: {str(e)}")
jobs[i]['description'] = ''
# Save to Excel
today_date = datetime.today().strftime("%Y-%m-%d")
filename = f"linkedin_jobs_{today_date}_{int(time.time())}.xlsx"
filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename)
df = pd.DataFrame(jobs)
df.to_excel(filepath, index=False)
session['jobs_file'] = filepath
session['excel_filename'] = filename
# Extract skills from job descriptions
analyzer = CVAnalyzer()
processed_jobs = []
for i, job in enumerate(jobs):
if job.get('description'):
try:
matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description'])
job['matched_skills'] = matched_skills
job['matched_categories'] = matched_categories
# Add unique ID
job['id'] = str(i)
processed_jobs.append(job)
except Exception as e:
logger.warning(f"Failed to analyze skills for {job.get('title')}: {str(e)}")
_save_processed_jobs(processed_jobs)
return jsonify({
'success': True,
'jobs': processed_jobs,
'excel_file': filename,
'message': f'Found {len(processed_jobs)} jobs'
})
except Exception as e:
logger.error(f"Error searching jobs: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/jobs', methods=['GET'])
def api_get_jobs():
"""Get stored jobs from session."""
try:
jobs = _load_processed_jobs()
return jsonify({
'success': True,
'jobs': jobs,
'count': len(jobs)
})
except Exception as e:
logger.error(f"Error getting jobs: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
# @app.route('/api/generate-cv/<job_index>', methods=['POST'])
@app.route('/api/generate-cv/<job_id>', methods=['POST'])
def api_generate_cv(job_id):
"""Generate CV for a single job with JIT Analysis."""
try:
jobs = _load_processed_jobs()
# job_idx = int(job_index)
# 1. Find the actual job index using the UUID (with legacy fallback)
job_idx = -1
for i, j in enumerate(jobs):
if j.get('id') == job_id or str(i) == str(job_id):
job_idx = i
break
# if job_idx < 0 or job_idx >= len(jobs):
if job_idx == -1:
return jsonify({'success': False, 'error': 'Invalid job index'}), 400
job = jobs[job_idx]
# 2. JIT Analysis: Enrich the job with skills if it hasn't been done yet
job = _enrich_job_with_skills(job)
# 3. Save the enriched job back to the session so we don't re-analyze if they click again
jobs[job_idx] = job
_save_processed_jobs(jobs)
# cv_template_path = session.get('cv_template')
# if not cv_template_path or not os.path.exists(cv_template_path):
# return jsonify({'success': False, 'error': 'CV template not found'}), 400
user_resume_path = session.get('cv_template')
ats_template_path = app.config.get('ATS_TEMPLATE_PATH')
if not user_resume_path or not os.path.exists(user_resume_path):
return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400
if not ats_template_path or not os.path.exists(ats_template_path):
return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500
# tailoring_mode = _get_tailoring_mode()
# matched_categories = job.get('matched_categories', {})
# professional_summary = _build_professional_summary(job, matched_categories)
# Generate output filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
company_slug = _sanitize_filename(job.get('company', 'Company'))
title_slug = _sanitize_filename(job.get('title', 'Position'))
output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx"
output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename)
# ==========================================
# EXECUTE LANGGRAPH AI TAILOR
# ==========================================
logger.info(f"Starting LangGraph Tailor for {job.get('title')}")
ai_tailor = AILangGraphTailor()
# Extract raw text from the USER'S uploaded resume
original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path)
# Run the Graph to get the tailored JSON
final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', ''))
# Map the JSON to the SYSTEM'S ATS Jinja template and save it
ai_tailor.generate_final_docx(
tailored_dict=final_state["tailored_resume"],
template_path=ats_template_path,
output_path=output_path
)
# Modify CV
# modifier = CVModifier(cv_template_path)
# generated_cover_letter_filename = None
# if tailoring_mode == 'api':
# try:
# generation_result = _generate_cv_with_api_tailoring(job, cv_template_path, output_path)
# provider = generation_result.get('provider', 'unknown')
# generated_cover_letter_filename = generation_result.get('cover_letter_filename')
# logger.info(f"API tailoring completed using {provider}")
# except Exception as e:
# # Fallback to local tailoring
# logger.warning(f"API tailoring failed, falling back to local: {str(e)}")
# if _is_summary_tailoring_enabled():
# modifier.update_profile_summary(professional_summary)
# modifier.update_skills_section(matched_categories)
# modifier.save_modified_cv(output_path)
# else:
# # Local tailoring mode
# if _is_summary_tailoring_enabled():
# modifier.update_profile_summary(professional_summary)
# modifier.update_skills_section(matched_categories)
# modifier.save_modified_cv(output_path)
# return jsonify({
# 'success': True,
# 'filename': output_filename,
# 'cover_letter_filename': generated_cover_letter_filename,
# 'job_title': job.get('title'),
# 'company': job.get('company'),
# 'message': f"CV generated for {job.get('title')} at {job.get('company')}"
# })
return jsonify({
'success': True,
'filename': output_filename,
'job_title': job.get('title'),
'company': job.get('company'),
'job': job, # Keeps UI updated
'message': f"CV successfully tailored for {job.get('company')}"
})
except Exception as e:
logger.error(f"Error generating CV: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/generate-all-cvs', methods=['POST'])
def api_generate_all_cvs():
"""Generate CVs for multiple jobs with JIT Analysis."""
try:
data = request.get_json()
# job_indices = data.get('job_indices', [])
requested_ids = data.get('job_indices', [])
jobs = _load_processed_jobs()
# if not job_indices:
# job_indices = list(range(len(jobs)))
# 1. Map the requested UUIDs back to their actual list indices
job_indices = []
if requested_ids:
for req_id in requested_ids:
for i, j in enumerate(jobs):
if j.get('id') == req_id or str(i) == str(req_id):
job_indices.append(i)
break
else:
# Fallback if no IDs are passed
job_indices = list(range(len(jobs)))
# if not session.get('cv_template') or not os.path.exists(session.get('cv_template')):
# return jsonify({'success': False, 'error': 'CV template not found'}), 400
# cv_template_path = session.get('cv_template')
user_resume_path = session.get('cv_template')
ats_template_path = app.config.get('ATS_TEMPLATE_PATH')
if not user_resume_path or not os.path.exists(user_resume_path):
return jsonify({'success': False, 'error': 'User resume not found. Please upload a resume first.'}), 400
if not ats_template_path or not os.path.exists(ats_template_path):
return jsonify({'success': False, 'error': 'System ATS template not found on server.'}), 500
tailoring_mode = _get_tailoring_mode()
successful = []
failed = []
# Instantiate the Analyzer ONCE for the entire batch to save CPU
analyzer = CVAnalyzer()
jobs_modified = False
for idx in job_indices:
try:
if idx < 0 or idx >= len(jobs):
continue
# JIT Analysis for each job in the batch
job = _enrich_job_with_skills(jobs[idx], analyzer)
jobs[idx] = job
jobs_modified = True
# Now the data is ready for your custom Tailor
matched_categories = job.get('matched_categories', {})
professional_summary = _build_professional_summary(job, matched_categories)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
company_slug = _sanitize_filename(job.get('company', 'Company'))
title_slug = _sanitize_filename(job.get('title', 'Position'))
output_filename = f"CV_{timestamp}_{company_slug}_{title_slug}.docx"
output_path = os.path.join(app.config['CV_OUTPUT_DIR'], output_filename)
# ==========================================
# EXECUTE LANGGRAPH AI TAILOR
# ==========================================
logger.info(f"Starting LangGraph Tailor for {job.get('title')}")
ai_tailor = AILangGraphTailor()
# Extract text from the user's uploaded resume
original_resume_text = ai_tailor.extract_text_from_docx(user_resume_path)
# Run the Graph to get the tailored JSON
final_state = ai_tailor.run_pipeline(original_resume_text, job.get('description', ''))
# Map the JSON to the system ATS template and save it
ai_tailor.generate_final_docx(
tailored_dict=final_state["tailored_resume"],
template_path=ats_template_path,
output_path=output_path
)
# modifier = CVModifier(cv_template_path)
# cover_letter_filename = None
# if tailoring_mode == 'api':
# try:
# generation_result = _generate_cv_with_api_tailoring(job, cv_template_path, output_path)
# cover_letter_filename = generation_result.get('cover_letter_filename')
# except Exception as e:
# if _is_summary_tailoring_enabled():
# modifier.update_profile_summary(professional_summary)
# modifier.update_skills_section(matched_categories)
# modifier.save_modified_cv(output_path)
# else:
# if _is_summary_tailoring_enabled():
# modifier.update_profile_summary(professional_summary)
# modifier.update_skills_section(matched_categories)
# modifier.save_modified_cv(output_path)
# successful.append({
# 'job_index': idx,
# 'filename': output_filename,
# 'cover_letter_filename': cover_letter_filename,
# 'job_title': job.get('title'),
# 'company': job.get('company'),
# })
successful.append({
'job_index': idx,
'filename': output_filename,
'cover_letter_filename': None, # Adjust if adding cover letter logic back
'job_title': job.get('title'),
'company': job.get('company'),
})
except Exception as e:
logger.error(f"Error generating CV for job {idx}: {str(e)}")
failed.append({
'job_index': idx,
'error': str(e),
'job_title': jobs[idx].get('title') if idx < len(jobs) else 'Unknown',
})
if jobs_modified:
_save_processed_jobs(jobs)
# Create ZIP file
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
for cv in successful:
file_path = os.path.join(app.config['CV_OUTPUT_DIR'], cv['filename'])
if os.path.exists(file_path):
zip_file.write(file_path, arcname=cv['filename'])
cover_letter_filename = cv.get('cover_letter_filename')
if cover_letter_filename:
cover_letter_path = os.path.join(app.config['CV_OUTPUT_DIR'], cover_letter_filename)
if os.path.exists(cover_letter_path):
zip_file.write(cover_letter_path, arcname=cover_letter_filename)
zip_buffer.seek(0)
zip_filename = f"CVs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
zip_path = os.path.join(app.config['CV_OUTPUT_DIR'], zip_filename)
with open(zip_path, 'wb') as f:
f.write(zip_buffer.getvalue())
return jsonify({
'success': True,
'successful': successful,
'failed': failed,
'zip_filename': zip_filename,
'total_generated': len(successful),
'total_failed': len(failed),
})
except Exception as e:
logger.error(f"Error generating all CVs: {str(e)}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/download/<filename>', methods=['GET'])
def api_download_file(filename):
"""Download a file."""
try:
import urllib.parse
# Decode URL-encoded filename
decoded_filename = urllib.parse.unquote(filename)
# Security check: prevent directory traversal
if '..' in decoded_filename or '/' in decoded_filename or '\\' in decoded_filename:
return jsonify({'error': 'Invalid filename'}), 400
file_path = os.path.join(app.config['CV_OUTPUT_DIR'], decoded_filename)
if not os.path.exists(file_path):
file_path = os.path.join(app.config['JOBS_OUTPUT_DIR'], decoded_filename)
if not os.path.exists(file_path):
logger.warning(f"File not found: {decoded_filename}")
return jsonify({'error': 'File not found'}), 404
logger.info(f"Downloading file: {decoded_filename}")
return send_file(file_path, as_attachment=True, download_name=decoded_filename)
except Exception as e:
logger.error(f"Error downloading file: {str(e)}")
return jsonify({'error': str(e)}), 500
# Communication endpoint with the Browser Extension (Auto-Apply)
@app.route('/api/latest-application-data', methods=['GET'])
def api_latest_application_data():
"""Endpoint for the Chrome Extension to fetch real user data for autofill."""
# 1. Grab the user profile from the session
user_profile = session.get('user_profile')
if not user_profile:
return jsonify({
'success': False,
'error': 'No user profile found. Please save your profile in the React app first!'
}), 400
# 2. Grab the latest generated CV from the session
latest_cv = session.get('latest_cv_filename')
cv_url = None
if latest_cv:
import urllib.parse
# Dynamically build the full download URL based on the server's current host
safe_filename = urllib.parse.quote(latest_cv)
cv_url = f"{request.host_url}api/download/{safe_filename}"
return jsonify({
'success': True,
'user': user_profile,
# 'cv_url': cv_url,
'message': 'Real session data retrieved successfully'
})
def _generate_cv_with_api_tailoring(job, cv_template, output_path):
"""Use API subproject updater to generate a tailored CV and optional cover letter."""
api_project_root = os.path.join(os.getcwd(), "Automatic CV and Cover Letter with API")
if not os.path.exists(api_project_root):
raise FileNotFoundError("API subproject folder not found")
if api_project_root not in sys.path:
sys.path.append(api_project_root)
try:
# Ensure integrations that depend on env vars see the active provider.
os.environ['LLM_PROVIDER'] = _get_llm_provider()
APIIntegration = importlib.import_module('src.utils.openai_integration').OpenAIIntegration
DocumentUpdater = importlib.import_module('src.updaters.document_updater').DocumentUpdater
cover_letter_template = os.environ.get(
'API_COVER_LETTER_TEMPLATE_PATH',
os.path.join(api_project_root, 'data', 'Cover Letter_Imon .docx')
)
if not os.path.isabs(cover_letter_template):
cover_letter_template = os.path.abspath(cover_letter_template)
if not os.path.exists(cover_letter_template):
if _is_cover_letter_enabled():
raise FileNotFoundError("Cover letter template not found")
# Cover letters are disabled, so reuse CV template path as a safe placeholder.
cover_letter_template = cv_template
description = (job.get('description') or '').strip()
if not description:
raise ValueError("Job description is empty")
llm_integration = APIIntegration()
provider = getattr(llm_integration, 'provider', 'unknown')
if provider in ('openai', 'grok', 'groq') and not llm_integration.is_api_key_set():
raise ValueError(f"API key not configured for provider '{provider}'")
updater = DocumentUpdater(cv_template, cover_letter_template, llm_integration)
updater.update_cv(description, output_path)
generated_cover_letter_filename = None
if _is_cover_letter_enabled():
cv_basename = os.path.basename(output_path)
if cv_basename.startswith('CV_'):
cover_letter_filename = f"CoverLetter_{cv_basename[3:]}"
else:
cover_letter_filename = f"CoverLetter_{cv_basename}"
cover_letter_output_path = os.path.join(os.path.dirname(output_path), cover_letter_filename)
updater.update_cover_letter(description, cover_letter_output_path)
generated_cover_letter_filename = cover_letter_filename
return {
'provider': provider,
'cover_letter_filename': generated_cover_letter_filename,
}
except Exception as e:
raise Exception(f"API tailoring error: {str(e)}")
# ==================== Legacy Routes (keep for backward compatibility) ====================
@app.route('/search', methods=['GET', 'POST'])
def search_jobs():
"""Legacy search route - redirects to React app"""
if request.method == 'POST':
keyword = request.form.get('keyword', '')
location = request.form.get('location', '')
max_jobs = int(request.form.get('max_jobs', 10))
_set_tailoring_mode(request.form.get('tailoring_mode', _get_tailoring_mode()))
if not keyword or not location:
flash('Please enter both job title and location', 'error')
return redirect(url_for('index'))
try:
scraper = LinkedInScraper(headless=True)
jobs = scraper.scrape_job_listings(keyword, location, max_jobs=max_jobs)
if not jobs:
flash('No jobs found. Try different search terms.', 'warning')
return redirect(url_for('index'))
for i, job in enumerate(jobs):
logger.info(f"Fetching description for job {i+1}/{len(jobs)}: {job['title']}")
title, company, description = scraper.fetch_job_description(job['link'])
jobs[i]['description'] = description
today_date = datetime.today().strftime("%Y-%m-%d")
filename = f"linkedin_jobs_{today_date}.xlsx"
filepath = os.path.join(app.config['JOBS_OUTPUT_DIR'], filename)
df = pd.DataFrame(jobs)
df.to_excel(filepath, index=False)
session['jobs_file'] = filepath
session['excel_filename'] = filename
analyzer = CVAnalyzer()
processed_jobs = []
for job in jobs:
if job.get('description'):
matched_skills, matched_requirements, matched_categories = analyzer.extract_skills_from_description(job['description'])
job['matched_skills'] = matched_skills
job['matched_categories'] = matched_categories
processed_jobs.append(job)
_save_processed_jobs(processed_jobs)
return render_template('job_list.html',
jobs=processed_jobs,
excel_file=filename,
excel_path=filepath,
tailoring_mode=_get_tailoring_mode(),
llm_provider=_get_llm_provider())
except Exception as e:
logger.error(f"Error during job search: {str(e)}")
flash(f'An error occurred: {str(e)}', 'error')
return redirect(url_for('index'))
return redirect(url_for('index'))
@app.route('/upload_cv', methods=['GET', 'POST'])
def upload_cv():
"""Legacy CV upload route"""
if request.method == 'POST':
if 'cv_file' not in request.files:
flash('No file part', 'error')
return redirect(request.url)
file = request.files['cv_file']
if file.filename == '':
flash('No selected file', 'error')
return redirect(request.url)
if not file.filename.endswith('.docx'):
flash('Only .docx files are supported', 'error')
return redirect(request.url)
try:
filename = f"cv_template_{int(time.time())}.docx"
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
session['cv_template'] = filepath
flash('CV template uploaded successfully', 'success')
return redirect(url_for('index'))
except Exception as e:
logger.error(f"Error uploading CV: {str(e)}")
flash(f'Error uploading CV: {str(e)}', 'error')
return redirect(request.url)
return render_template('upload_cv.html')
# @app.errorhandler(404)
# def not_found(error):
# """Handle 404 errors"""
# return render_template('404.html'), 404
# @app.errorhandler(500)
# def internal_error(error):
# """Handle 500 errors"""
# logger.error(f"Internal server error: {str(error)}")
# return render_template('500.html'), 500
@app.errorhandler(404)
def not_found(error):
"""Handle 404 errors"""
return jsonify({'success': False, 'error': 'API endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 errors"""
logger.error(f"Internal server error: {str(error)}")
return jsonify({'success': False, 'error': 'Internal server error occurred'}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 5050))
debug = os.environ.get('FLASK_DEBUG', 'False').lower() == 'true'
app.run(host='0.0.0.0', port=port, debug=debug)