guohanghui commited on
Commit
360dbbd
·
verified ·
1 Parent(s): 0712baa

Update BioSPPy/mcp_output/mcp_plugin/mcp_service.py

Browse files
BioSPPy/mcp_output/mcp_plugin/mcp_service.py CHANGED
@@ -1,160 +1,201 @@
 
 
 
 
 
 
 
 
1
  from fastmcp import FastMCP
 
 
 
 
2
 
3
- # Create the FastMCP service application
4
  mcp = FastMCP("biosppy_service")
5
 
6
- @mcp.tool(name="process_ecg", description="Process ECG signals and extract features")
7
- def process_ecg(signal: list, sampling_rate: int) -> dict:
8
- """
9
- Process ECG signals and extract features such as heart rate and R-peaks.
10
 
11
- Parameters:
12
- - signal: List of ECG signal values.
13
- - sampling_rate: Sampling rate of the signal in Hz.
 
14
 
15
- Returns:
16
- - dict: Processed ECG features including heart rate and R-peaks.
 
17
  """
18
  try:
19
- from biosppy.signals import ecg
20
-
21
- output = ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)
22
 
23
  return {
24
  "success": True,
25
- "heart_rate": output[6],
26
- "rpeaks": output[2].tolist()
 
 
 
 
 
27
  }
28
  except Exception as e:
29
- return {"success": False, "error": str(e)}
30
 
31
- @mcp.tool(name="process_resp", description="Process respiratory signals and extract features")
32
- def process_resp(signal: list, sampling_rate: int) -> dict:
33
- """
34
- Process respiratory signals and extract features.
35
 
36
- Parameters:
37
- - signal: List of respiratory signal values.
38
- - sampling_rate: Sampling rate of the signal in Hz.
 
39
 
40
- Returns:
41
- - dict: Processed respiratory features.
 
42
  """
43
  try:
44
- from biosppy.signals import resp
45
-
46
- output = resp.resp(signal=signal, sampling_rate=sampling_rate, show=False)
47
 
48
  return {
49
  "success": True,
50
- "resp_rate": output[5],
51
- "filtered_signal": output[1].tolist()
 
 
 
 
 
52
  }
53
  except Exception as e:
54
- return {"success": False, "error": str(e)}
55
 
56
- @mcp.tool(name="process_eda", description="Process electrodermal activity (EDA) signals")
57
- def process_eda(signal: list, sampling_rate: int) -> dict:
58
- """
59
- Process electrodermal activity (EDA) signals and extract features.
60
 
61
- Parameters:
62
- - signal: List of EDA signal values.
63
- - sampling_rate: Sampling rate of the signal in Hz.
 
64
 
65
- Returns:
66
- - dict: Processed EDA features.
 
67
  """
68
  try:
69
- from biosppy.signals import eda
70
-
71
- output = eda.eda(signal=signal, sampling_rate=sampling_rate, show=False)
72
 
73
  return {
74
  "success": True,
75
- "filtered_signal": output[1].tolist(),
76
- "onsets": output[2].tolist()
 
 
 
77
  }
78
  except Exception as e:
79
- return {"success": False, "error": str(e)}
80
 
81
- @mcp.tool(name="process_emg", description="Process electromyography (EMG) signals")
82
- def process_emg(signal: list, sampling_rate: int) -> dict:
83
- """
84
- Process electromyography (EMG) signals and extract features.
85
 
86
- Parameters:
87
- - signal: List of EMG signal values.
88
- - sampling_rate: Sampling rate of the signal in Hz.
 
89
 
90
- Returns:
91
- - dict: Processed EMG features.
 
92
  """
93
  try:
94
- from biosppy.signals import emg
95
-
96
- output = emg.emg(signal=signal, sampling_rate=sampling_rate, show=False)
97
 
98
  return {
99
  "success": True,
100
- "filtered_signal": output[1].tolist()
 
 
 
 
 
101
  }
102
  except Exception as e:
103
- return {"success": False, "error": str(e)}
104
 
105
- @mcp.tool(name="process_pcg", description="Process phonocardiogram (PCG) signals")
106
- def process_pcg(signal: list, sampling_rate: int) -> dict:
107
- """
108
- Process phonocardiogram (PCG) signals and extract features.
109
-
110
- Parameters:
111
- - signal: List of PCG signal values.
112
- - sampling_rate: Sampling rate of the signal in Hz.
113
 
114
- Returns:
115
- - dict: Processed PCG features.
 
 
 
 
 
 
 
 
 
 
 
 
116
  """
117
  try:
118
- from biosppy.signals import pcg
119
-
120
- output = pcg.pcg(signal=signal, sampling_rate=sampling_rate, show=False)
 
 
 
 
 
 
121
 
122
  return {
123
  "success": True,
124
- "filtered_signal": output[1].tolist()
 
 
 
 
125
  }
126
  except Exception as e:
127
- return {"success": False, "error": str(e)}
128
 
129
- @mcp.tool(name="process_ecg_synthesis", description="Synthesize ECG signals")
130
- def process_ecg_synthesis(duration: float, sampling_rate: int) -> dict:
131
- """
132
- Synthesize ECG signals for a given duration and sampling rate.
133
 
134
- Parameters:
135
- - duration: Duration of the synthesized signal in seconds.
136
- - sampling_rate: Sampling rate of the signal in Hz.
 
137
 
138
- Returns:
139
- - dict: Synthesized ECG signal.
 
140
  """
141
  try:
142
- from biosppy.synthesizers import ecg
143
-
144
- signal = ecg.ecg(duration=duration, sampling_rate=sampling_rate)
145
 
146
  return {
147
  "success": True,
148
- "synthesized_signal": signal.tolist()
 
 
 
 
 
 
 
 
149
  }
150
  except Exception as e:
151
- return {"success": False, "error": str(e)}
 
152
 
153
- def create_app() -> FastMCP:
154
  """
155
  Create and return the FastMCP application instance.
156
 
157
- Returns:
158
- - FastMCP: The FastMCP application instance.
159
  """
160
- return mcp
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ from typing import List, Optional
4
+
5
+ source_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "source")
6
+ if source_path not in sys.path:
7
+ sys.path.insert(0, source_path)
8
+
9
  from fastmcp import FastMCP
10
+ import numpy as np
11
+
12
+ # Import BioSPPy modules
13
+ from biosppy.signals import ecg, eda, emg, resp, ppg, eeg, tools
14
 
 
15
  mcp = FastMCP("biosppy_service")
16
 
 
 
 
 
17
 
18
+ @mcp.tool(name="process_ecg", description="Process an ECG signal and extract R-peaks and heart rate")
19
+ def process_ecg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
20
+ """
21
+ Process a raw ECG signal and extract relevant signal features.
22
 
23
+ :param signal: Raw ECG signal as a list of float values.
24
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
25
+ :return: Dictionary containing filtered signal, R-peaks, and heart rate.
26
  """
27
  try:
28
+ signal_array = np.array(signal)
29
+ result = ecg.ecg(signal=signal_array, sampling_rate=sampling_rate, show=False, interactive=False)
 
30
 
31
  return {
32
  "success": True,
33
+ "result": {
34
+ "rpeaks": result['rpeaks'].tolist(),
35
+ "heart_rate": result['heart_rate'].tolist() if len(result['heart_rate']) > 0 else [],
36
+ "heart_rate_ts": result['heart_rate_ts'].tolist() if len(result['heart_rate_ts']) > 0 else [],
37
+ "num_beats": len(result['rpeaks']),
38
+ },
39
+ "error": None
40
  }
41
  except Exception as e:
42
+ return {"success": False, "result": None, "error": str(e)}
43
 
 
 
 
 
44
 
45
+ @mcp.tool(name="process_eda", description="Process an EDA (electrodermal activity) signal")
46
+ def process_eda(signal: List[float], sampling_rate: float = 1000.0) -> dict:
47
+ """
48
+ Process a raw EDA signal and extract relevant features.
49
 
50
+ :param signal: Raw EDA signal as a list of float values.
51
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
52
+ :return: Dictionary containing filtered signal and skin conductance responses.
53
  """
54
  try:
55
+ signal_array = np.array(signal)
56
+ result = eda.eda(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
57
 
58
  return {
59
  "success": True,
60
+ "result": {
61
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
62
+ "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
63
+ "peaks": result['peaks'].tolist() if hasattr(result['peaks'], 'tolist') else list(result['peaks']),
64
+ "amplitudes": result['amplitudes'].tolist() if hasattr(result['amplitudes'], 'tolist') else list(result['amplitudes']),
65
+ },
66
+ "error": None
67
  }
68
  except Exception as e:
69
+ return {"success": False, "result": None, "error": str(e)}
70
 
 
 
 
 
71
 
72
+ @mcp.tool(name="process_emg", description="Process an EMG (electromyogram) signal")
73
+ def process_emg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
74
+ """
75
+ Process a raw EMG signal and extract relevant features.
76
 
77
+ :param signal: Raw EMG signal as a list of float values.
78
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
79
+ :return: Dictionary containing filtered signal and onsets.
80
  """
81
  try:
82
+ signal_array = np.array(signal)
83
+ result = emg.emg(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
84
 
85
  return {
86
  "success": True,
87
+ "result": {
88
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
89
+ "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
90
+ },
91
+ "error": None
92
  }
93
  except Exception as e:
94
+ return {"success": False, "result": None, "error": str(e)}
95
 
 
 
 
 
96
 
97
+ @mcp.tool(name="process_resp", description="Process a respiration signal")
98
+ def process_resp(signal: List[float], sampling_rate: float = 1000.0) -> dict:
99
+ """
100
+ Process a raw respiration signal and extract relevant features.
101
 
102
+ :param signal: Raw respiration signal as a list of float values.
103
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
104
+ :return: Dictionary containing filtered signal and respiration rate.
105
  """
106
  try:
107
+ signal_array = np.array(signal)
108
+ result = resp.resp(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
109
 
110
  return {
111
  "success": True,
112
+ "result": {
113
+ "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
114
+ "zeros": result['zeros'].tolist() if hasattr(result['zeros'], 'tolist') else list(result['zeros']),
115
+ "resp_rate": result['resp_rate'].tolist() if hasattr(result['resp_rate'], 'tolist') else list(result['resp_rate']),
116
+ },
117
+ "error": None
118
  }
119
  except Exception as e:
120
+ return {"success": False, "result": None, "error": str(e)}
121
 
 
 
 
 
 
 
 
 
122
 
123
+ @mcp.tool(name="filter_signal", description="Apply a digital filter to a signal")
124
+ def filter_signal(signal: List[float], sampling_rate: float = 1000.0,
125
+ ftype: str = "butter", band: str = "lowpass",
126
+ frequency: float = 45.0, order: int = 4) -> dict:
127
+ """
128
+ Apply a digital filter to a signal.
129
+
130
+ :param signal: Input signal as a list of float values.
131
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
132
+ :param ftype: Filter type - 'butter', 'cheby1', 'cheby2', 'ellip', 'bessel', 'FIR' (default: 'butter').
133
+ :param band: Band type - 'lowpass', 'highpass', 'bandpass', 'bandstop' (default: 'lowpass').
134
+ :param frequency: Cutoff frequency in Hz (default: 45.0).
135
+ :param order: Filter order (default: 4).
136
+ :return: Dictionary containing filtered signal.
137
  """
138
  try:
139
+ signal_array = np.array(signal)
140
+ filtered, _, _ = tools.filter_signal(
141
+ signal=signal_array,
142
+ ftype=ftype,
143
+ band=band,
144
+ frequency=frequency,
145
+ order=order,
146
+ sampling_rate=sampling_rate
147
+ )
148
 
149
  return {
150
  "success": True,
151
+ "result": {
152
+ "filtered": filtered.tolist()[:100] if len(filtered) > 100 else filtered.tolist(),
153
+ "length": len(filtered),
154
+ },
155
+ "error": None
156
  }
157
  except Exception as e:
158
+ return {"success": False, "result": None, "error": str(e)}
159
 
 
 
 
 
160
 
161
+ @mcp.tool(name="get_heart_rate", description="Calculate heart rate from R-peak indices")
162
+ def get_heart_rate(rpeaks: List[int], sampling_rate: float = 1000.0) -> dict:
163
+ """
164
+ Calculate heart rate from R-peak indices.
165
 
166
+ :param rpeaks: List of R-peak indices.
167
+ :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
168
+ :return: Dictionary containing heart rate values and statistics.
169
  """
170
  try:
171
+ rpeaks_array = np.array(rpeaks)
172
+ hr_idx, hr = tools.get_heart_rate(beats=rpeaks_array, sampling_rate=sampling_rate, smooth=True, size=3)
 
173
 
174
  return {
175
  "success": True,
176
+ "result": {
177
+ "heart_rate": hr.tolist(),
178
+ "heart_rate_indices": hr_idx.tolist(),
179
+ "mean_hr": float(np.mean(hr)) if len(hr) > 0 else None,
180
+ "std_hr": float(np.std(hr)) if len(hr) > 0 else None,
181
+ "min_hr": float(np.min(hr)) if len(hr) > 0 else None,
182
+ "max_hr": float(np.max(hr)) if len(hr) > 0 else None,
183
+ },
184
+ "error": None
185
  }
186
  except Exception as e:
187
+ return {"success": False, "result": None, "error": str(e)}
188
+
189
 
190
+ def create_app():
191
  """
192
  Create and return the FastMCP application instance.
193
 
194
+ :return: The FastMCP application instance.
 
195
  """
196
+ return mcp
197
+ """Create and return FastMCP application instance"""
198
+ return mcp
199
+
200
+ if __name__ == "__main__":
201
+ mcp.run(transport="http", host="0.0.0.0", port=8000)