| """ |
| Learner System - Extracts improvements from experience. |
| Analyzes success/failure patterns and generates actionable insights. |
| """ |
|
|
| import json |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Any |
| from collections import defaultdict |
| from pathlib import Path |
| import os |
|
|
| from .memory import get_memory |
| from .observer import get_observer |
|
|
|
|
| class ExperienceLearner: |
| """Analyzes experiences and extracts learning insights.""" |
| |
| def __init__(self, memory=None, observer=None): |
| self.memory = memory or get_memory() |
| self.observer = observer or get_observer() |
| |
| self.min_success_for_pattern = 3 |
| self.min_failure_for_pattern = 2 |
| |
| def analyze_task_outcome(self, task_id: str, task_type: str, |
| success: bool, steps: List[Dict], |
| decisions: List[Dict]) -> Dict: |
| """Analyze a task's outcome and extract learnings.""" |
| learnings = [] |
| |
| |
| good_decisions = [d for d in decisions if d.get('rationale')] |
| if success and good_decisions: |
| |
| for decision in good_decisions: |
| self.memory.store_memory( |
| content=f"Task {task_type} succeeded when: {decision.get('choice')} - {decision.get('rationale')}", |
| category='success_pattern', |
| metadata={'task_type': task_type, 'decision': decision.get('choice')} |
| ) |
| learnings.append({ |
| 'type': 'success_pattern', |
| 'content': f"Using {decision.get('choice')} worked well for {task_type}" |
| }) |
| |
| if not success: |
| |
| for decision in decisions: |
| self.memory.store_memory( |
| content=f"Task {task_type} failed when: {decision.get('choice')}", |
| category='failure_pattern', |
| metadata={'task_type': task_type, 'decision': decision.get('choice')} |
| ) |
| learnings.append({ |
| 'type': 'failure_pattern', |
| 'content': f"Avoid {decision.get('choice')} for {task_type}" |
| }) |
| |
| |
| if not success: |
| suggestions = self._generate_improvements(task_type, steps, decisions) |
| for suggestion in suggestions: |
| self.memory.add_improvement( |
| suggestion=suggestion['suggestion'], |
| category=suggestion['category'], |
| priority=suggestion['priority'] |
| ) |
| learnings.append({ |
| 'type': 'improvement_suggestion', |
| 'content': suggestion['suggestion'] |
| }) |
| |
| |
| lesson_title = f"{task_type} task {'success' if success else 'failure'}" |
| lesson_desc = f"Learned from {task_type} task: {len(steps)} steps, {len(decisions)} decisions" |
| |
| |
| lessons = self.memory.get_lessons() |
| existing = [l for l in lessons if task_type in l.get('title', '')] |
| |
| if existing: |
| self.memory.update_lesson_stats(existing[0]['id'], success) |
| else: |
| self.memory.add_lesson( |
| title=lesson_title, |
| description=lesson_desc, |
| pattern=task_type |
| ) |
| |
| return { |
| 'learnings': learnings, |
| 'improvement_suggestions': len([l for l in learnings if l['type'] == 'improvement_suggestion']) |
| } |
| |
| def _generate_improvements(self, task_type: str, steps: List[Dict], |
| decisions: List[Dict]) -> List[Dict]: |
| """Generate improvement suggestions based on failure analysis.""" |
| suggestions = [] |
| |
| |
| step_types = defaultdict(int) |
| for step in steps: |
| step_types[step.get('type', 'unknown')] += 1 |
| |
| |
| if step_types.get('error', 0) > 2: |
| suggestions.append({ |
| 'suggestion': f"Consider error handling improvements for {task_type} tasks", |
| 'category': 'error_handling', |
| 'priority': 8 |
| }) |
| |
| if len(steps) > 10: |
| suggestions.append({ |
| 'suggestion': f"Break down {task_type} tasks into smaller steps", |
| 'category': 'task_decomposition', |
| 'priority': 7 |
| }) |
| |
| |
| if not decisions: |
| suggestions.append({ |
| 'suggestion': f"Add more decision checkpoints for {task_type} tasks", |
| 'category': 'decision_making', |
| 'priority': 6 |
| }) |
| |
| return suggestions |
| |
| def extract_patterns(self, lookback_days: int = 7) -> Dict: |
| """Extract patterns from recent history.""" |
| patterns = { |
| 'successful_approaches': [], |
| 'failure_patterns': [], |
| 'recommended_improvements': [] |
| } |
| |
| |
| memories = self.memory.get_all_memories() |
| |
| |
| success_memories = [m for m in memories |
| if m.get('success_rate', 0) > 0.7 and m.get('use_count', 0) > 2] |
| |
| for mem in success_memories[:10]: |
| patterns['successful_approaches'].append({ |
| 'content': mem['content'], |
| 'success_rate': mem['success_rate'], |
| 'uses': mem['use_count'] |
| }) |
| |
| |
| failure_memories = [m for m in memories |
| if m.get('success_rate', 0) < 0.4] |
| |
| for mem in failure_memories[:10]: |
| patterns['failure_patterns'].append({ |
| 'content': mem['content'], |
| 'success_rate': mem['success_rate'] |
| }) |
| |
| |
| improvements = self.memory.get_pending_improvements() |
| patterns['recommended_improvements'] = [ |
| {'suggestion': i['suggestion'], 'priority': i['priority']} |
| for i in improvements[:10] |
| ] |
| |
| return patterns |
| |
| def analyze_session_improvements(self, session_id: str) -> Dict: |
| """Analyze a specific session and recommend improvements.""" |
| observer_data = self.observer.analyze_reasoning_patterns(session_id) |
| |
| |
| session_file = self.observer.log_dir / f"session_{session_id}.json" |
| if not session_file.exists(): |
| return {'error': 'Session not found'} |
| |
| with open(session_file) as f: |
| session = json.load(f) |
| |
| recommendations = [] |
| |
| |
| task_success = defaultdict(lambda: {'success': 0, 'failed': 0}) |
| for task in session.get('tasks', []): |
| task_type = task.get('task_type', 'unknown') |
| if task.get('success'): |
| task_success[task_type]['success'] += 1 |
| else: |
| task_success[task_type]['failed'] += 1 |
| |
| |
| for task_type, stats in task_success.items(): |
| if stats['failed'] > stats['success']: |
| recommendations.append({ |
| 'category': 'task_improvement', |
| 'task_type': task_type, |
| 'suggestion': f"Focus on improving {task_type} task handling", |
| 'priority': min(10, 5 + stats['failed']) |
| }) |
| |
| return { |
| 'session_id': session_id, |
| 'task_stats': dict(task_success), |
| 'recommendations': recommendations |
| } |
| |
| def generate_learning_report(self) -> Dict: |
| """Generate a comprehensive learning report.""" |
| stats = self.memory.get_stats() |
| patterns = self.extract_patterns() |
| |
| |
| total_tasks = stats.get('total_tasks_completed', 0) + stats.get('total_tasks_failed', 0) |
| if total_tasks > 0: |
| improvement_trend = stats.get('overall_success_rate', 0) |
| else: |
| improvement_trend = 0 |
| |
| return { |
| 'generated_at': datetime.utcnow().isoformat(), |
| 'statistics': stats, |
| 'patterns': patterns, |
| 'improvement_trend': improvement_trend, |
| 'top_recommendations': patterns['recommended_improvements'][:5] |
| } |
| |
| def continuous_learning_cycle(self): |
| """Run a continuous learning cycle - analyze and improve.""" |
| |
| lessons = self.memory.get_lessons() |
| |
| |
| unverified_lessons = [l for l in lessons if not l.get('verified')] |
| |
| |
| for lesson in lessons: |
| total = lesson.get('success_count', 0) + lesson.get('failure_count', 0) |
| if total >= self.min_success_for_pattern: |
| success_rate = lesson.get('success_count', 0) / total |
| if success_rate > 0.7: |
| |
| pass |
| |
| |
| patterns = self.extract_patterns() |
| |
| new_suggestions = [] |
| |
| |
| for failure in patterns.get('failure_patterns', [])[:3]: |
| |
| new_suggestions.append({ |
| 'suggestion': f"Address pattern: {failure.get('content', '')[:100]}", |
| 'category': 'pattern_improvement', |
| 'priority': 6 |
| }) |
| |
| |
| for suggestion in new_suggestions: |
| self.memory.add_improvement( |
| suggestion=suggestion['suggestion'], |
| category=suggestion['category'], |
| priority=suggestion['priority'] |
| ) |
| |
| return { |
| 'verified_lessons': len([l for l in lessons if l.get('verified')]), |
| 'new_suggestions': len(new_suggestions), |
| 'pending_improvements': len(patterns.get('recommended_improvements', [])) |
| } |
|
|
|
|
| |
| _learner_instance = None |
|
|
|
|
| def get_learner() -> ExperienceLearner: |
| """Get or create the global learner instance.""" |
| global _learner_instance |
| if _learner_instance is None: |
| _learner_instance = ExperienceLearner() |
| return _learner_instance |