Spaces:
Sleeping
Sleeping
| """ | |
| Task Manager MCP Server - A comprehensive example for learning MCP development | |
| This server provides task management functionality with multiple tools: | |
| - Create, read, update, delete tasks | |
| - List tasks with filtering | |
| - Mark tasks as complete/incomplete | |
| - Add tags and priorities | |
| - Search functionality | |
| Perfect for learning MCP concepts and extending with new features! | |
| """ | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import Any, Dict, List, Optional | |
| from enum import Enum | |
| from mcp.server.fastmcp import FastMCP | |
| from pydantic import BaseModel | |
| # Initialize FastMCP server | |
| mcp = FastMCP("task-manager") | |
| # Data models | |
| class Priority(str, Enum): | |
| LOW = "low" | |
| MEDIUM = "medium" | |
| HIGH = "high" | |
| URGENT = "urgent" | |
| class TaskStatus(str, Enum): | |
| PENDING = "pending" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| CANCELLED = "cancelled" | |
| class Task(BaseModel): | |
| id: int | |
| title: str | |
| description: str = "" | |
| status: TaskStatus = TaskStatus.PENDING | |
| priority: Priority = Priority.MEDIUM | |
| tags: List[str] = [] | |
| created_at: str | |
| updated_at: str | |
| due_date: Optional[str] = None | |
| completed_at: Optional[str] = None | |
| # In-memory storage (in a real app, you'd use a database) | |
| tasks_db: Dict[int, Task] = {} | |
| next_task_id = 1 | |
| # File-based persistence | |
| DATA_FILE = "tasks.json" | |
| def load_tasks(): | |
| """Load tasks from file if it exists""" | |
| global tasks_db, next_task_id | |
| if os.path.exists(DATA_FILE): | |
| try: | |
| with open(DATA_FILE, 'r') as f: | |
| data = json.load(f) | |
| tasks_db = {int(k): Task(**v) for k, v in data.get('tasks', {}).items()} | |
| next_task_id = data.get('next_id', 1) | |
| except Exception as e: | |
| print(f"Error loading tasks: {e}") | |
| def save_tasks(): | |
| """Save tasks to file""" | |
| try: | |
| data = { | |
| 'tasks': {str(k): v.model_dump() for k, v in tasks_db.items()}, | |
| 'next_id': next_task_id | |
| } | |
| with open(DATA_FILE, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| except Exception as e: | |
| print(f"Error saving tasks: {e}") | |
| def get_current_time() -> str: | |
| """Get current timestamp as ISO string""" | |
| return datetime.now().isoformat() | |
| # Load existing tasks on startup | |
| load_tasks() | |
| def create_task( | |
| title: str, | |
| description: str = "", | |
| priority: str = "medium", | |
| tags: str = "", | |
| due_date: str = "" | |
| ) -> str: | |
| """Create a new task. | |
| Args: | |
| title: Task title (required) | |
| description: Detailed description of the task | |
| priority: Task priority (low, medium, high, urgent) | |
| tags: Comma-separated list of tags | |
| due_date: Due date in YYYY-MM-DD format | |
| """ | |
| global next_task_id | |
| try: | |
| # Parse priority | |
| task_priority = Priority(priority.lower()) | |
| except ValueError: | |
| return f"Invalid priority '{priority}'. Use: low, medium, high, urgent" | |
| # Parse tags | |
| task_tags = [tag.strip() for tag in tags.split(",") if tag.strip()] | |
| # Validate due date if provided | |
| task_due_date = None | |
| if due_date: | |
| try: | |
| datetime.fromisoformat(due_date) | |
| task_due_date = due_date | |
| except ValueError: | |
| return f"Invalid due date format. Use YYYY-MM-DD" | |
| # Create task | |
| current_time = get_current_time() | |
| task = Task( | |
| id=next_task_id, | |
| title=title, | |
| description=description, | |
| priority=task_priority, | |
| tags=task_tags, | |
| created_at=current_time, | |
| updated_at=current_time, | |
| due_date=task_due_date | |
| ) | |
| tasks_db[next_task_id] = task | |
| next_task_id += 1 | |
| save_tasks() | |
| return f"β Created task #{task.id}: {task.title}" | |
| def list_tasks( | |
| status: str = "all", | |
| priority: str = "all", | |
| tag: str = "", | |
| limit: int = 10 | |
| ) -> str: | |
| """List tasks with optional filtering. | |
| Args: | |
| status: Filter by status (all, pending, in_progress, completed, cancelled) | |
| priority: Filter by priority (all, low, medium, high, urgent) | |
| tag: Filter by tag (exact match) | |
| limit: Maximum number of tasks to return | |
| """ | |
| if not tasks_db: | |
| return "π No tasks found. Create your first task!" | |
| filtered_tasks = list(tasks_db.values()) | |
| # Filter by status | |
| if status != "all": | |
| try: | |
| status_filter = TaskStatus(status) | |
| filtered_tasks = [t for t in filtered_tasks if t.status == status_filter] | |
| except ValueError: | |
| return f"Invalid status '{status}'. Use: all, pending, in_progress, completed, cancelled" | |
| # Filter by priority | |
| if priority != "all": | |
| try: | |
| priority_filter = Priority(priority) | |
| filtered_tasks = [t for t in filtered_tasks if t.priority == priority_filter] | |
| except ValueError: | |
| return f"Invalid priority '{priority}'. Use: all, low, medium, high, urgent" | |
| # Filter by tag | |
| if tag: | |
| filtered_tasks = [t for t in filtered_tasks if tag in t.tags] | |
| # Sort by creation date (newest first) | |
| filtered_tasks.sort(key=lambda x: x.created_at, reverse=True) | |
| # Apply limit | |
| filtered_tasks = filtered_tasks[:limit] | |
| if not filtered_tasks: | |
| return "π No tasks match your filters." | |
| # Format output | |
| result = f"π Found {len(filtered_tasks)} task(s):\n\n" | |
| for task in filtered_tasks: | |
| status_emoji = { | |
| TaskStatus.PENDING: "β³", | |
| TaskStatus.IN_PROGRESS: "π", | |
| TaskStatus.COMPLETED: "β ", | |
| TaskStatus.CANCELLED: "β" | |
| } | |
| priority_emoji = { | |
| Priority.LOW: "π’", | |
| Priority.MEDIUM: "π‘", | |
| Priority.HIGH: "π ", | |
| Priority.URGENT: "π΄" | |
| } | |
| tags_str = f" #{' #'.join(task.tags)}" if task.tags else "" | |
| due_str = f" (Due: {task.due_date})" if task.due_date else "" | |
| result += f"{status_emoji[task.status]} #{task.id} - {task.title}\n" | |
| result += f" {priority_emoji[task.priority]} {task.priority.value.title()}{due_str}{tags_str}\n" | |
| if task.description: | |
| result += f" π {task.description}\n" | |
| result += "\n" | |
| return result.strip() | |
| def get_task(task_id: int) -> str: | |
| """Get detailed information about a specific task. | |
| Args: | |
| task_id: The ID of the task to retrieve | |
| """ | |
| if task_id not in tasks_db: | |
| return f"β Task #{task_id} not found." | |
| task = tasks_db[task_id] | |
| status_emoji = { | |
| TaskStatus.PENDING: "β³", | |
| TaskStatus.IN_PROGRESS: "π", | |
| TaskStatus.COMPLETED: "β ", | |
| TaskStatus.CANCELLED: "β" | |
| } | |
| priority_emoji = { | |
| Priority.LOW: "π’", | |
| Priority.MEDIUM: "π‘", | |
| Priority.HIGH: "π ", | |
| Priority.URGENT: "π΄" | |
| } | |
| result = f"π Task #{task.id}\n" | |
| result += f"Title: {task.title}\n" | |
| result += f"Status: {status_emoji[task.status]} {task.status.value.replace('_', ' ').title()}\n" | |
| result += f"Priority: {priority_emoji[task.priority]} {task.priority.value.title()}\n" | |
| if task.description: | |
| result += f"Description: {task.description}\n" | |
| if task.tags: | |
| result += f"Tags: #{' #'.join(task.tags)}\n" | |
| if task.due_date: | |
| result += f"Due Date: {task.due_date}\n" | |
| result += f"Created: {task.created_at}\n" | |
| result += f"Updated: {task.updated_at}\n" | |
| if task.completed_at: | |
| result += f"Completed: {task.completed_at}\n" | |
| return result | |
| def update_task( | |
| task_id: int, | |
| title: str = "", | |
| description: str = "", | |
| status: str = "", | |
| priority: str = "", | |
| tags: str = "", | |
| due_date: str = "" | |
| ) -> str: | |
| """Update an existing task. Only provided fields will be updated. | |
| Args: | |
| task_id: The ID of the task to update | |
| title: New task title | |
| description: New task description | |
| status: New status (pending, in_progress, completed, cancelled) | |
| priority: New priority (low, medium, high, urgent) | |
| tags: New comma-separated list of tags | |
| due_date: New due date in YYYY-MM-DD format (use 'none' to clear) | |
| """ | |
| if task_id not in tasks_db: | |
| return f"β Task #{task_id} not found." | |
| task = tasks_db[task_id] | |
| updated_fields = [] | |
| # Update title | |
| if title: | |
| task.title = title | |
| updated_fields.append("title") | |
| # Update description | |
| if description: | |
| task.description = description | |
| updated_fields.append("description") | |
| # Update status | |
| if status: | |
| try: | |
| new_status = TaskStatus(status) | |
| task.status = new_status | |
| updated_fields.append("status") | |
| # Set completion time if marking as completed | |
| if new_status == TaskStatus.COMPLETED and not task.completed_at: | |
| task.completed_at = get_current_time() | |
| elif new_status != TaskStatus.COMPLETED: | |
| task.completed_at = None | |
| except ValueError: | |
| return f"Invalid status '{status}'. Use: pending, in_progress, completed, cancelled" | |
| # Update priority | |
| if priority: | |
| try: | |
| task.priority = Priority(priority) | |
| updated_fields.append("priority") | |
| except ValueError: | |
| return f"Invalid priority '{priority}'. Use: low, medium, high, urgent" | |
| # Update tags | |
| if tags: | |
| if tags.lower() == "none": | |
| task.tags = [] | |
| else: | |
| task.tags = [tag.strip() for tag in tags.split(",") if tag.strip()] | |
| updated_fields.append("tags") | |
| # Update due date | |
| if due_date: | |
| if due_date.lower() == "none": | |
| task.due_date = None | |
| else: | |
| try: | |
| datetime.fromisoformat(due_date) | |
| task.due_date = due_date | |
| except ValueError: | |
| return f"Invalid due date format. Use YYYY-MM-DD or 'none' to clear" | |
| updated_fields.append("due_date") | |
| if not updated_fields: | |
| return f"β No fields provided to update for task #{task_id}." | |
| # Update timestamp | |
| task.updated_at = get_current_time() | |
| save_tasks() | |
| return f"β Updated task #{task_id}: {', '.join(updated_fields)}" | |
| def delete_task(task_id: int) -> str: | |
| """Delete a task permanently. | |
| Args: | |
| task_id: The ID of the task to delete | |
| """ | |
| if task_id not in tasks_db: | |
| return f"β Task #{task_id} not found." | |
| task = tasks_db[task_id] | |
| del tasks_db[task_id] | |
| save_tasks() | |
| return f"ποΈ Deleted task #{task_id}: {task.title}" | |
| def search_tasks(query: str, limit: int = 10) -> str: | |
| """Search tasks by title, description, or tags. | |
| Args: | |
| query: Search query (searches in title, description, and tags) | |
| limit: Maximum number of results to return | |
| """ | |
| if not tasks_db: | |
| return "π No tasks to search." | |
| query_lower = query.lower() | |
| matching_tasks = [] | |
| for task in tasks_db.values(): | |
| # Search in title, description, and tags | |
| if (query_lower in task.title.lower() or | |
| query_lower in task.description.lower() or | |
| any(query_lower in tag.lower() for tag in task.tags)): | |
| matching_tasks.append(task) | |
| if not matching_tasks: | |
| return f"π No tasks found matching '{query}'." | |
| # Sort by relevance (title matches first, then description, then tags) | |
| def relevance_score(task): | |
| score = 0 | |
| if query_lower in task.title.lower(): | |
| score += 3 | |
| if query_lower in task.description.lower(): | |
| score += 2 | |
| if any(query_lower in tag.lower() for tag in task.tags): | |
| score += 1 | |
| return score | |
| matching_tasks.sort(key=relevance_score, reverse=True) | |
| matching_tasks = matching_tasks[:limit] | |
| result = f"π Found {len(matching_tasks)} task(s) matching '{query}':\n\n" | |
| for task in matching_tasks: | |
| status_emoji = { | |
| TaskStatus.PENDING: "β³", | |
| TaskStatus.IN_PROGRESS: "π", | |
| TaskStatus.COMPLETED: "β ", | |
| TaskStatus.CANCELLED: "β" | |
| } | |
| result += f"{status_emoji[task.status]} #{task.id} - {task.title}\n" | |
| if task.description: | |
| result += f" π {task.description[:100]}{'...' if len(task.description) > 100 else ''}\n" | |
| result += "\n" | |
| return result.strip() | |
| def get_stats() -> str: | |
| """Get task statistics and summary.""" | |
| if not tasks_db: | |
| return "π No tasks yet. Create your first task to see statistics!" | |
| total_tasks = len(tasks_db) | |
| # Count by status | |
| status_counts = {} | |
| for status in TaskStatus: | |
| status_counts[status] = sum(1 for task in tasks_db.values() if task.status == status) | |
| # Count by priority | |
| priority_counts = {} | |
| for priority in Priority: | |
| priority_counts[priority] = sum(1 for task in tasks_db.values() if task.priority == priority) | |
| # Overdue tasks | |
| today = datetime.now().date() | |
| overdue_count = 0 | |
| for task in tasks_db.values(): | |
| if (task.due_date and task.status != TaskStatus.COMPLETED and | |
| datetime.fromisoformat(task.due_date).date() < today): | |
| overdue_count += 1 | |
| # Most common tags | |
| all_tags = [] | |
| for task in tasks_db.values(): | |
| all_tags.extend(task.tags) | |
| tag_counts = {} | |
| for tag in all_tags: | |
| tag_counts[tag] = tag_counts.get(tag, 0) + 1 | |
| top_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:5] | |
| result = f"π Task Statistics\n\n" | |
| result += f"Total Tasks: {total_tasks}\n\n" | |
| result += "π By Status:\n" | |
| for status, count in status_counts.items(): | |
| emoji = {"pending": "β³", "in_progress": "π", "completed": "β ", "cancelled": "β"} | |
| result += f" {emoji.get(status.value, 'β’')} {status.value.replace('_', ' ').title()}: {count}\n" | |
| result += "\nπ― By Priority:\n" | |
| for priority, count in priority_counts.items(): | |
| emoji = {"low": "π’", "medium": "π‘", "high": "π ", "urgent": "π΄"} | |
| result += f" {emoji.get(priority.value, 'β’')} {priority.value.title()}: {count}\n" | |
| if overdue_count > 0: | |
| result += f"\nβ οΈ Overdue Tasks: {overdue_count}\n" | |
| if top_tags: | |
| result += f"\nπ·οΈ Top Tags:\n" | |
| for tag, count in top_tags: | |
| result += f" #{tag}: {count}\n" | |
| return result | |
| def main(): | |
| """Initialize and run the MCP server""" | |
| print("Starting Task Manager MCP Server...") | |
| print(f"Data file: {os.path.abspath(DATA_FILE)}") | |
| print(f"Loaded {len(tasks_db)} existing tasks") | |
| mcp.run(transport="stdio") | |
| if __name__ == "__main__": | |
| main() |