guohanghui commited on
Commit
8a37b74
·
verified ·
1 Parent(s): 2733aaf

Update BioSPPy/mcp_output/mcp_plugin/mcp_service.py

Browse files
BioSPPy/mcp_output/mcp_plugin/mcp_service.py CHANGED
@@ -1,103 +1,199 @@
1
  import os
2
  import sys
 
3
 
4
  source_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "source")
5
  if source_path not in sys.path:
6
  sys.path.insert(0, source_path)
7
 
8
  from fastmcp import FastMCP
 
9
 
10
- from setup import UploadCommand
11
- from docs.conf import Mock
12
 
13
- mcp = FastMCP("unknown_service")
14
 
15
 
16
- @mcp.tool(name="uploadcommand", description="UploadCommand class")
17
- def uploadcommand(*args, **kwargs):
18
- """UploadCommand class"""
 
 
 
 
 
 
19
  try:
20
- if UploadCommand is None:
21
- return {"success": False, "result": None, "error": "Class UploadCommand is not available, path may need adjustment"}
22
-
23
- # MCP parameter type conversion
24
- converted_args = []
25
- converted_kwargs = kwargs.copy()
26
-
27
- # Handle position argument type conversion
28
- for arg in args:
29
- if isinstance(arg, str):
30
- # Try to convert to numeric type
31
- try:
32
- if '.' in arg:
33
- converted_args.append(float(arg))
34
- else:
35
- converted_args.append(int(arg))
36
- except ValueError:
37
- converted_args.append(arg)
38
- else:
39
- converted_args.append(arg)
40
-
41
- # Handle keyword argument type conversion
42
- for key, value in converted_kwargs.items():
43
- if isinstance(value, str):
44
- try:
45
- if '.' in value:
46
- converted_kwargs[key] = float(value)
47
- else:
48
- converted_kwargs[key] = int(value)
49
- except ValueError:
50
- pass
51
 
52
- instance = UploadCommand(*converted_args, **converted_kwargs)
53
- return {"success": True, "result": str(instance), "error": None}
 
 
 
 
 
 
 
 
54
  except Exception as e:
55
  return {"success": False, "result": None, "error": str(e)}
56
 
57
- @mcp.tool(name="mock", description="Mock class")
58
- def mock(*args, **kwargs):
59
- """Mock class"""
 
 
 
 
 
 
 
60
  try:
61
- if Mock is None:
62
- return {"success": False, "result": None, "error": "Class Mock is not available, path may need adjustment"}
63
 
64
- # MCP parameter type conversion
65
- converted_args = []
66
- converted_kwargs = kwargs.copy()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
- # Handle position argument type conversion
69
- for arg in args:
70
- if isinstance(arg, str):
71
- # Try to convert to numeric type
72
- try:
73
- if '.' in arg:
74
- converted_args.append(float(arg))
75
- else:
76
- converted_args.append(int(arg))
77
- except ValueError:
78
- converted_args.append(arg)
79
- else:
80
- converted_args.append(arg)
 
 
 
 
 
 
 
 
 
 
 
81
 
82
- # Handle keyword argument type conversion
83
- for key, value in converted_kwargs.items():
84
- if isinstance(value, str):
85
- try:
86
- if '.' in value:
87
- converted_kwargs[key] = float(value)
88
- else:
89
- converted_kwargs[key] = int(value)
90
- except ValueError:
91
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
- instance = Mock(*converted_args, **converted_kwargs)
94
- return {"success": True, "result": str(instance), "error": None}
 
 
 
 
 
 
95
  except Exception as e:
96
  return {"success": False, "result": None, "error": str(e)}
97
 
98
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
  def create_app():
 
 
 
 
 
 
101
  """Create and return FastMCP application instance"""
102
  return mcp
103
 
 
1
  import os
2
  import sys
3
+ from typing import List, Optional
4
 
5
  source_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "source")
6
  if source_path not in sys.path:
7
  sys.path.insert(0, source_path)
8
 
9
  from fastmcp import FastMCP
10
+ import numpy as np
11
 
12
+ # Import BioSPPy modules
13
+ from biosppy.signals import ecg, eda, emg, resp, ppg, eeg, tools
14
 
15
+ mcp = FastMCP("biosppy_service")
16
 
17
 
18
+ @mcp.tool(name="process_ecg", description="Process an ECG signal and extract R-peaks and heart rate")
19
+ def process_ecg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
20
+ """
21
+ Process a raw ECG signal and extract relevant signal features.
22
+
23
+ :param signal: Raw ECG signal as a list of float values.
24
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
25
+ :return: Dictionary containing filtered signal, R-peaks, and heart rate.
26
+ """
27
  try:
28
+ signal_array = np.array(signal)
29
+ result = ecg.ecg(signal=signal_array, sampling_rate=sampling_rate, show=False, interactive=False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
+ return {
32
+ "success": True,
33
+ "result": {
34
+ "rpeaks": result['rpeaks'].tolist(),
35
+ "heart_rate": result['heart_rate'].tolist() if len(result['heart_rate']) > 0 else [],
36
+ "heart_rate_ts": result['heart_rate_ts'].tolist() if len(result['heart_rate_ts']) > 0 else [],
37
+ "num_beats": len(result['rpeaks']),
38
+ },
39
+ "error": None
40
+ }
41
  except Exception as e:
42
  return {"success": False, "result": None, "error": str(e)}
43
 
44
+
45
+ @mcp.tool(name="process_eda", description="Process an EDA (electrodermal activity) signal")
46
+ def process_eda(signal: List[float], sampling_rate: float = 1000.0) -> dict:
47
+ """
48
+ Process a raw EDA signal and extract relevant features.
49
+
50
+ :param signal: Raw EDA signal as a list of float values.
51
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
52
+ :return: Dictionary containing filtered signal and skin conductance responses.
53
+ """
54
  try:
55
+ signal_array = np.array(signal)
56
+ result = eda.eda(signal=signal_array, sampling_rate=sampling_rate, show=False)
57
 
58
+ return {
59
+ "success": True,
60
+ "result": {
61
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
62
+ "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
63
+ "peaks": result['peaks'].tolist() if hasattr(result['peaks'], 'tolist') else list(result['peaks']),
64
+ "amplitudes": result['amplitudes'].tolist() if hasattr(result['amplitudes'], 'tolist') else list(result['amplitudes']),
65
+ },
66
+ "error": None
67
+ }
68
+ except Exception as e:
69
+ return {"success": False, "result": None, "error": str(e)}
70
+
71
+
72
+ @mcp.tool(name="process_emg", description="Process an EMG (electromyogram) signal")
73
+ def process_emg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
74
+ """
75
+ Process a raw EMG signal and extract relevant features.
76
+
77
+ :param signal: Raw EMG signal as a list of float values.
78
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
79
+ :return: Dictionary containing filtered signal and onsets.
80
+ """
81
+ try:
82
+ signal_array = np.array(signal)
83
+ result = emg.emg(signal=signal_array, sampling_rate=sampling_rate, show=False)
84
 
85
+ return {
86
+ "success": True,
87
+ "result": {
88
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
89
+ "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
90
+ },
91
+ "error": None
92
+ }
93
+ except Exception as e:
94
+ return {"success": False, "result": None, "error": str(e)}
95
+
96
+
97
+ @mcp.tool(name="process_resp", description="Process a respiration signal")
98
+ def process_resp(signal: List[float], sampling_rate: float = 1000.0) -> dict:
99
+ """
100
+ Process a raw respiration signal and extract relevant features.
101
+
102
+ :param signal: Raw respiration signal as a list of float values.
103
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
104
+ :return: Dictionary containing filtered signal and respiration rate.
105
+ """
106
+ try:
107
+ signal_array = np.array(signal)
108
+ result = resp.resp(signal=signal_array, sampling_rate=sampling_rate, show=False)
109
 
110
+ return {
111
+ "success": True,
112
+ "result": {
113
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
114
+ "zeros": result['zeros'].tolist() if hasattr(result['zeros'], 'tolist') else list(result['zeros']),
115
+ "resp_rate": result['resp_rate'].tolist() if hasattr(result['resp_rate'], 'tolist') else list(result['resp_rate']),
116
+ },
117
+ "error": None
118
+ }
119
+ except Exception as e:
120
+ return {"success": False, "result": None, "error": str(e)}
121
+
122
+
123
+ @mcp.tool(name="filter_signal", description="Apply a digital filter to a signal")
124
+ def filter_signal(signal: List[float], sampling_rate: float = 1000.0,
125
+ ftype: str = "butter", band: str = "lowpass",
126
+ frequency: float = 45.0, order: int = 4) -> dict:
127
+ """
128
+ Apply a digital filter to a signal.
129
+
130
+ :param signal: Input signal as a list of float values.
131
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
132
+ :param ftype: Filter type - 'butter', 'cheby1', 'cheby2', 'ellip', 'bessel', 'FIR' (default: 'butter').
133
+ :param band: Band type - 'lowpass', 'highpass', 'bandpass', 'bandstop' (default: 'lowpass').
134
+ :param frequency: Cutoff frequency in Hz (default: 45.0).
135
+ :param order: Filter order (default: 4).
136
+ :return: Dictionary containing filtered signal.
137
+ """
138
+ try:
139
+ signal_array = np.array(signal)
140
+ filtered, _, _ = tools.filter_signal(
141
+ signal=signal_array,
142
+ ftype=ftype,
143
+ band=band,
144
+ frequency=frequency,
145
+ order=order,
146
+ sampling_rate=sampling_rate
147
+ )
148
 
149
+ return {
150
+ "success": True,
151
+ "result": {
152
+ "filtered": filtered.tolist()[:100] if len(filtered) > 100 else filtered.tolist(),
153
+ "length": len(filtered),
154
+ },
155
+ "error": None
156
+ }
157
  except Exception as e:
158
  return {"success": False, "result": None, "error": str(e)}
159
 
160
 
161
+ @mcp.tool(name="get_heart_rate", description="Calculate heart rate from R-peak indices")
162
+ def get_heart_rate(rpeaks: List[int], sampling_rate: float = 1000.0) -> dict:
163
+ """
164
+ Calculate heart rate from R-peak indices.
165
+
166
+ :param rpeaks: List of R-peak indices.
167
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
168
+ :return: Dictionary containing heart rate values and statistics.
169
+ """
170
+ try:
171
+ rpeaks_array = np.array(rpeaks)
172
+ hr_idx, hr = tools.get_heart_rate(beats=rpeaks_array, sampling_rate=sampling_rate, smooth=True, size=3)
173
+
174
+ return {
175
+ "success": True,
176
+ "result": {
177
+ "heart_rate": hr.tolist(),
178
+ "heart_rate_indices": hr_idx.tolist(),
179
+ "mean_hr": float(np.mean(hr)) if len(hr) > 0 else None,
180
+ "std_hr": float(np.std(hr)) if len(hr) > 0 else None,
181
+ "min_hr": float(np.min(hr)) if len(hr) > 0 else None,
182
+ "max_hr": float(np.max(hr)) if len(hr) > 0 else None,
183
+ },
184
+ "error": None
185
+ }
186
+ except Exception as e:
187
+ return {"success": False, "result": None, "error": str(e)}
188
+
189
 
190
  def create_app():
191
+ """
192
+ Create and return the FastMCP application instance.
193
+
194
+ :return: The FastMCP application instance.
195
+ """
196
+ return mcp
197
  """Create and return FastMCP application instance"""
198
  return mcp
199