""" MathPulse AI - Event-Driven Automation Engine Processes educational workflows based on a diagnostic-first, risk-driven intervention model. Trigger points: 1. Diagnostic Assessment Completion (highest priority) 2. Quiz / Assessment Submission (continuous) 3. New Student Enrollment 4. External Data Import (teacher action) 5. Admin Content Updates Each event is routed to a dedicated handler that orchestrates classification, quiz generation, notifications and dashboard updates. """ import os import json import math import logging import traceback from typing import List, Optional, Dict, Any, Tuple from datetime import datetime, timedelta from pydantic import BaseModel, Field logger = logging.getLogger("mathpulse.automation") # ─── Constants ────────────────────────────────────────────────── AT_RISK_THRESHOLD = 60 # < 60 % → At Risk WEAK_TOPIC_THRESHOLD = 0.50 # < 50 % accuracy → weak topic HIGH_RISK_RATIO = 0.75 # 75 %+ subjects at risk MEDIUM_RISK_RATIO = 0.50 # 50-75 % REMEDIAL_CONFIG = { "High": {"questions": 15, "dist": {"easy": 60, "medium": 30, "hard": 10}}, "Medium": {"questions": 12, "dist": {"easy": 50, "medium": 35, "hard": 15}}, "Low": {"questions": 10, "dist": {"easy": 40, "medium": 40, "hard": 20}}, } # ─── Request / Response Models ────────────────────────────────── class DiagnosticResult(BaseModel): """Per-subject score from diagnostic assessment.""" subject: str score: float = Field(..., ge=0, le=100) class DiagnosticCompletionPayload(BaseModel): """Payload sent when a student completes the diagnostic.""" studentId: str results: List[DiagnosticResult] gradeLevel: str = "Grade 10" questionBreakdown: Optional[Dict[str, list]] = None # topic → [{correct: bool, …}] class QuizSubmissionPayload(BaseModel): """Payload sent on quiz / assessment submission.""" studentId: str quizId: str subject: str score: float = Field(..., ge=0, le=100) totalQuestions: int correctAnswers: int timeSpentSeconds: int answers: Optional[List[Dict[str, Any]]] = None class StudentEnrollmentPayload(BaseModel): """Payload sent when a new student account is created.""" studentId: str name: str email: str gradeLevel: str = "Grade 10" teacherId: Optional[str] = None class DataImportPayload(BaseModel): """Payload sent after a teacher uploads a spreadsheet.""" teacherId: str students: List[Dict[str, Any]] # parsed student rows columnMapping: Dict[str, str] class ContentUpdatePayload(BaseModel): """Payload sent when admin performs CRUD on curriculum.""" adminId: str action: str # create | update | delete contentType: str # lesson | quiz | module | subject contentId: str subjectId: Optional[str] = None details: Optional[str] = None # ─── Risk classification helpers ───────────────────────────────── class SubjectRiskClassification(BaseModel): status: str # "At Risk" | "On Track" score: float confidence: float needsIntervention: bool class AutomationResult(BaseModel): """Standardised result returned by every handler.""" success: bool event: str studentId: Optional[str] = None message: str riskClassifications: Optional[Dict[str, Dict[str, Any]]] = None overallRisk: Optional[str] = None atRiskSubjects: Optional[List[str]] = None weakTopics: Optional[List[Dict[str, Any]]] = None learningPath: Optional[str] = None remedialQuizzesCreated: int = 0 interventions: Optional[str] = None notifications: List[str] = Field(default_factory=list) # ─── Automation Engine ────────────────────────────────────────── class MathPulseAutomationEngine: """ Stateless event-driven automation system. Each ``handle_*`` method is an independent, self-contained handler that receives a validated Pydantic payload and returns an ``AutomationResult``. Firebase / Hugging Face calls are only attempted when available. """ # ──────────────────────────────────────────────────────────── # 1. DIAGNOSTIC COMPLETION (highest-priority) # ──────────────────────────────────────────────────────────── async def handle_diagnostic_completion( self, payload: DiagnosticCompletionPayload ) -> AutomationResult: """ Runs when a student completes the mandatory diagnostic. Steps: 1. Classify per-subject risk 2. Identify weak topics 3. Compute overall risk 4. Generate personalised learning path (AI) 5. Create remedial quiz assignments 6. Generate teacher intervention recommendations (AI) 7. Persist everything & notify """ student_id = payload.studentId logger.info(f"📊 DIAGNOSTIC COMPLETED for {student_id}") notifications: list[str] = [] # 1 — subject-level risk risk_classifications = self._classify_subject_risks(payload.results) # 2 — weak topics weak_topics = self._identify_weak_topics(payload.questionBreakdown) # 3 — overall risk overall_risk = self._calculate_overall_risk(risk_classifications) at_risk_subjects = [ subj for subj, data in risk_classifications.items() if data["status"] == "At Risk" ] # 4 — learning path (AI call) learning_path: Optional[str] = None if at_risk_subjects: learning_path = await self._generate_learning_path( at_risk_subjects, weak_topics, payload.gradeLevel ) # 5 — remedial quizzes remedial_count = 0 remedial_quizzes: list[dict] = [] if at_risk_subjects: remedial_quizzes = self._build_remedial_quiz_configs( student_id, at_risk_subjects, overall_risk, payload.gradeLevel ) remedial_count = len(remedial_quizzes) # 6 — teacher interventions (AI call) interventions: Optional[str] = None if at_risk_subjects: interventions = await self._generate_teacher_interventions( risk_classifications, weak_topics ) # 7 — notification messages if at_risk_subjects: notifications.append( f"Diagnostic complete — {len(at_risk_subjects)} subject(s) flagged At Risk: " + ", ".join(at_risk_subjects) ) else: notifications.append("Diagnostic complete — all subjects On Track!") logger.info( f"✅ DIAGNOSTIC PROCESSING COMPLETE for {student_id} | " f"Overall={overall_risk} | AtRisk={at_risk_subjects}" ) return AutomationResult( success=True, event="diagnostic_completed", studentId=student_id, message=f"Diagnostic processed for {student_id}", riskClassifications=risk_classifications, overallRisk=overall_risk, atRiskSubjects=at_risk_subjects, weakTopics=weak_topics, learningPath=learning_path, remedialQuizzesCreated=remedial_count, interventions=interventions, notifications=notifications, ) # ──────────────────────────────────────────────────────────── # 2. QUIZ SUBMISSION (continuous) # ──────────────────────────────────────────────────────────── async def handle_quiz_submission( self, payload: QuizSubmissionPayload ) -> AutomationResult: """Recalculate risk for a subject after a quiz is submitted.""" student_id = payload.studentId logger.info(f"📝 QUIZ SUBMITTED by {student_id} — {payload.subject} ({payload.score}%)") notifications: list[str] = [] # Determine new status for this subject new_status = "At Risk" if payload.score < AT_RISK_THRESHOLD else "On Track" confidence = ( (AT_RISK_THRESHOLD - payload.score) / AT_RISK_THRESHOLD if new_status == "At Risk" else (payload.score - AT_RISK_THRESHOLD) / (100 - AT_RISK_THRESHOLD) ) risk_classifications = { payload.subject: { "status": new_status, "score": payload.score, "confidence": round(abs(confidence), 2), "needsIntervention": new_status == "At Risk", } } at_risk = [payload.subject] if new_status == "At Risk" else [] if new_status == "At Risk": notifications.append( f"Quiz result: {payload.subject} scored {payload.score}% — status changed to At Risk" ) else: notifications.append( f"Quiz result: {payload.subject} scored {payload.score}% — On Track" ) return AutomationResult( success=True, event="quiz_submitted", studentId=student_id, message=f"Quiz processed for {student_id}", riskClassifications=risk_classifications, overallRisk=None, # single-subject update — overall recalculated on frontend atRiskSubjects=at_risk, notifications=notifications, ) # ──────────────────────────────────────────────────────────── # 3. STUDENT ENROLLMENT # ──────────────────────────────────────────────────────────── async def handle_student_enrollment( self, payload: StudentEnrollmentPayload ) -> AutomationResult: """ Prepare a new student: - Create empty progress record skeleton - Initialise gamification (XP 0, Level 1, no streaks) - Flag as needing diagnostic """ student_id = payload.studentId logger.info(f"🆕 NEW STUDENT ENROLLED: {student_id}") progress_skeleton = { "userId": student_id, "subjects": {}, "lessons": {}, "quizAttempts": [], "totalLessonsCompleted": 0, "totalQuizzesCompleted": 0, "averageScore": 0, } gamification_init = { "level": 1, "currentXP": 0, "totalXP": 0, "streak": 0, "hasTakenDiagnostic": False, "atRiskSubjects": [], } notifications: list[str] = [ f"Welcome {payload.name}! Please complete the diagnostic assessment to personalise your learning path.", ] if payload.teacherId: notifications.append( f"New student {payload.name} enrolled — diagnostic pending." ) return AutomationResult( success=True, event="student_enrolled", studentId=student_id, message=f"Student {payload.name} enrolled and initialised", notifications=notifications, ) # ──────────────────────────────────────────────────────────── # 4. DATA IMPORT (teacher action) # ──────────────────────────────────────────────────────────── async def handle_data_import( self, payload: DataImportPayload ) -> AutomationResult: """ After a teacher uploads a spreadsheet, recalculate risk for every imported student and flag any status changes. """ logger.info(f"📂 DATA IMPORT by teacher {payload.teacherId} — {len(payload.students)} students") notifications: list[str] = [] newly_at_risk: list[str] = [] for student_row in payload.students: name = student_row.get("name", "Unknown") avg_score = float(student_row.get("avgQuizScore", 0)) if avg_score < AT_RISK_THRESHOLD: newly_at_risk.append(name) if newly_at_risk: notifications.append( f"Data import flagged {len(newly_at_risk)} student(s) as At Risk: " + ", ".join(newly_at_risk[:5]) + ("…" if len(newly_at_risk) > 5 else "") ) notifications.append( f"Data import complete — {len(payload.students)} student records processed." ) return AutomationResult( success=True, event="data_imported", studentId=None, message=f"Data import processed for {len(payload.students)} students", atRiskSubjects=None, notifications=notifications, ) # ──────────────────────────────────────────────────────────── # 5. CONTENT UPDATE (admin action) # ──────────────────────────────────────────────────────────── async def handle_content_update( self, payload: ContentUpdatePayload ) -> AutomationResult: """ After admin CRUD on curriculum, log & notify. """ logger.info( f"📚 CONTENT UPDATE by admin {payload.adminId}: " f"{payload.action} {payload.contentType} {payload.contentId}" ) notifications: list[str] = [ f"Curriculum update: {payload.action}d {payload.contentType} " f"({payload.contentId}). Teachers may want to review affected quizzes.", ] return AutomationResult( success=True, event="content_updated", studentId=None, message=f"Content {payload.action} processed for {payload.contentType}", notifications=notifications, ) # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # INTERNAL HELPERS # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # --- risk classification --- @staticmethod def _classify_subject_risks( results: List[DiagnosticResult], ) -> Dict[str, Dict[str, Any]]: """Classify each subject as 'At Risk' or 'On Track'.""" classifications: Dict[str, Dict[str, Any]] = {} for r in results: if r.score < AT_RISK_THRESHOLD: status = "At Risk" confidence = round((AT_RISK_THRESHOLD - r.score) / AT_RISK_THRESHOLD, 2) else: status = "On Track" confidence = round( (r.score - AT_RISK_THRESHOLD) / (100 - AT_RISK_THRESHOLD), 2 ) classifications[r.subject] = { "status": status, "score": r.score, "confidence": confidence, "needsIntervention": status == "At Risk", } return classifications @staticmethod def _identify_weak_topics( question_breakdown: Optional[Dict[str, list]], ) -> List[Dict[str, Any]]: """ Drill into per-topic accuracy from diagnostic question-level data. Returns topics sorted weakest-first. """ if not question_breakdown: return [] weak: list[dict] = [] for topic, questions in question_breakdown.items(): if not questions: continue correct_count = sum(1 for q in questions if q.get("correct")) accuracy = correct_count / len(questions) if accuracy < WEAK_TOPIC_THRESHOLD: weak.append({ "topic": topic, "accuracy": round(accuracy, 2), "questionsAttempted": len(questions), "priority": "high" if accuracy < 0.3 else "medium", }) weak.sort(key=lambda x: x["accuracy"]) return weak @staticmethod def _calculate_overall_risk( classifications: Dict[str, Dict[str, Any]], ) -> str: total = len(classifications) if total == 0: return "Low" at_risk_count = sum( 1 for d in classifications.values() if d["status"] == "At Risk" ) ratio = at_risk_count / total if ratio >= HIGH_RISK_RATIO: return "High" elif ratio >= MEDIUM_RISK_RATIO: return "Medium" return "Low" # --- remedial quiz configs --- @staticmethod def _build_remedial_quiz_configs( student_id: str, at_risk_subjects: List[str], overall_risk: str, grade_level: str, ) -> List[Dict[str, Any]]: """Return list of quiz configuration dicts ready for persistence.""" cfg = REMEDIAL_CONFIG.get(overall_risk, REMEDIAL_CONFIG["Low"]) quizzes: list[dict] = [] for subject in at_risk_subjects: quizzes.append({ "studentId": student_id, "subject": subject, "quizConfig": { "topics": [subject], "gradeLevel": grade_level, "numQuestions": cfg["questions"], "questionTypes": [ "identification", "enumeration", "multiple_choice", "word_problem", ], "difficultyDistribution": cfg["dist"], "bloomLevels": ["remember", "understand", "apply"], "includeGraphs": False, "excludeTopics": [], "purpose": "remedial", "targetStudent": student_id, }, "status": "pending", "autoGenerated": True, "reason": f'Diagnostic identified "{subject}" as At Risk', "priority": "high" if overall_risk == "High" else "medium", "dueInDays": 7, }) return quizzes # --- AI helpers (Hugging Face) --- async def _generate_learning_path( self, at_risk_subjects: List[str], weak_topics: List[Dict[str, Any]], grade_level: str, ) -> Optional[str]: """Generate a personalised learning path via HF Serverless Inference.""" try: from main import call_hf_chat weakness_lines = ", ".join(at_risk_subjects) topic_lines = "\n".join( f" - {t['topic']} ({t['accuracy']*100:.0f}% accuracy)" for t in weak_topics[:5] ) prompt = ( f"Generate a personalised math learning path for a {grade_level} student.\n\n" f"Weak subjects: {weakness_lines}\n" f"Weak topics:\n{topic_lines}\n\n" "Create 5-7 specific activities. For each give:\n" "1. Activity title\n" "2. Brief description (1-2 sentences)\n" "3. Estimated duration\n" "4. Type (video, practice, quiz, reading, interactive)\n\n" "Format as a numbered list. Be specific." ) return call_hf_chat( messages=[ { "role": "system", "content": ( "You are an educational curriculum expert specialising in " "mathematics. Create clear, actionable learning paths." ), }, {"role": "user", "content": prompt}, ], max_tokens=1500, temperature=0.7, ) except Exception as e: logger.warning(f"Learning-path AI call failed: {e}") return None async def _generate_teacher_interventions( self, risk_classifications: Dict[str, Dict[str, Any]], weak_topics: List[Dict[str, Any]], ) -> Optional[str]: """Generate teacher intervention recommendations via HF Serverless Inference.""" try: from main import call_hf_chat at_risk = [ subj for subj, data in risk_classifications.items() if data["status"] == "At Risk" ] topic_lines = "\n".join( f"- {t['topic']} ({t['accuracy']*100:.0f}% accuracy)" for t in weak_topics[:5] ) prompt = ( "You are an educational intervention specialist. A student has completed " "their diagnostic assessment with the following results:\n\n" f"At-Risk Subjects: {', '.join(at_risk)}\n\n" f"Weak Topics Identified:\n{topic_lines}\n\n" "Generate a 'Remedial Path Timeline' with:\n" "1. Prioritised list of topics to address (most critical first)\n" "2. Suggested teaching strategies for each topic\n" "3. Recommended one-on-one intervention activities\n" "4. Timeline for reassessment\n" "5. Warning signs that student needs additional support\n\n" "Keep response under 300 words, structured with clear sections." ) return call_hf_chat( messages=[ { "role": "system", "content": ( "You are an expert educational intervention specialist. " "Provide actionable, structured recommendations for teachers." ), }, {"role": "user", "content": prompt}, ], max_tokens=1000, temperature=0.5, ) except Exception as e: logger.warning(f"Teacher-intervention AI call failed: {e}") return None # Module-level singleton automation_engine = MathPulseAutomationEngine()