guohanghui commited on
Commit
0712baa
·
verified ·
1 Parent(s): 8a37b74

Update BioSPPy/mcp_output/mcp_plugin/mcp_service.py

Browse files
BioSPPy/mcp_output/mcp_plugin/mcp_service.py CHANGED
@@ -1,201 +1,160 @@
1
- import os
2
- import sys
3
- from typing import List, Optional
4
-
5
- source_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), "source")
6
- if source_path not in sys.path:
7
- sys.path.insert(0, source_path)
8
-
9
  from fastmcp import FastMCP
10
- import numpy as np
11
-
12
- # Import BioSPPy modules
13
- from biosppy.signals import ecg, eda, emg, resp, ppg, eeg, tools
14
 
 
15
  mcp = FastMCP("biosppy_service")
16
 
17
-
18
- @mcp.tool(name="process_ecg", description="Process an ECG signal and extract R-peaks and heart rate")
19
- def process_ecg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
20
  """
21
- Process a raw ECG signal and extract relevant signal features.
 
 
 
 
22
 
23
- :param signal: Raw ECG signal as a list of float values.
24
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
25
- :return: Dictionary containing filtered signal, R-peaks, and heart rate.
26
  """
27
  try:
28
- signal_array = np.array(signal)
29
- result = ecg.ecg(signal=signal_array, sampling_rate=sampling_rate, show=False, interactive=False)
 
30
 
31
  return {
32
  "success": True,
33
- "result": {
34
- "rpeaks": result['rpeaks'].tolist(),
35
- "heart_rate": result['heart_rate'].tolist() if len(result['heart_rate']) > 0 else [],
36
- "heart_rate_ts": result['heart_rate_ts'].tolist() if len(result['heart_rate_ts']) > 0 else [],
37
- "num_beats": len(result['rpeaks']),
38
- },
39
- "error": None
40
  }
41
  except Exception as e:
42
- return {"success": False, "result": None, "error": str(e)}
43
 
44
-
45
- @mcp.tool(name="process_eda", description="Process an EDA (electrodermal activity) signal")
46
- def process_eda(signal: List[float], sampling_rate: float = 1000.0) -> dict:
47
  """
48
- Process a raw EDA signal and extract relevant features.
 
 
 
 
49
 
50
- :param signal: Raw EDA signal as a list of float values.
51
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
52
- :return: Dictionary containing filtered signal and skin conductance responses.
53
  """
54
  try:
55
- signal_array = np.array(signal)
56
- result = eda.eda(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
57
 
58
  return {
59
  "success": True,
60
- "result": {
61
- "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
62
- "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
63
- "peaks": result['peaks'].tolist() if hasattr(result['peaks'], 'tolist') else list(result['peaks']),
64
- "amplitudes": result['amplitudes'].tolist() if hasattr(result['amplitudes'], 'tolist') else list(result['amplitudes']),
65
- },
66
- "error": None
67
  }
68
  except Exception as e:
69
- return {"success": False, "result": None, "error": str(e)}
70
 
71
-
72
- @mcp.tool(name="process_emg", description="Process an EMG (electromyogram) signal")
73
- def process_emg(signal: List[float], sampling_rate: float = 1000.0) -> dict:
74
  """
75
- Process a raw EMG signal and extract relevant features.
 
 
 
 
76
 
77
- :param signal: Raw EMG signal as a list of float values.
78
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
79
- :return: Dictionary containing filtered signal and onsets.
80
  """
81
  try:
82
- signal_array = np.array(signal)
83
- result = emg.emg(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
84
 
85
  return {
86
  "success": True,
87
- "result": {
88
- "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
89
- "onsets": result['onsets'].tolist() if hasattr(result['onsets'], 'tolist') else list(result['onsets']),
90
- },
91
- "error": None
92
  }
93
  except Exception as e:
94
- return {"success": False, "result": None, "error": str(e)}
95
 
96
-
97
- @mcp.tool(name="process_resp", description="Process a respiration signal")
98
- def process_resp(signal: List[float], sampling_rate: float = 1000.0) -> dict:
99
  """
100
- Process a raw respiration signal and extract relevant features.
 
 
 
 
101
 
102
- :param signal: Raw respiration signal as a list of float values.
103
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
104
- :return: Dictionary containing filtered signal and respiration rate.
105
  """
106
  try:
107
- signal_array = np.array(signal)
108
- result = resp.resp(signal=signal_array, sampling_rate=sampling_rate, show=False)
 
109
 
110
  return {
111
  "success": True,
112
- "result": {
113
- "filtered": result['filtered'].tolist()[:100] if len(result['filtered']) > 100 else result['filtered'].tolist(),
114
- "zeros": result['zeros'].tolist() if hasattr(result['zeros'], 'tolist') else list(result['zeros']),
115
- "resp_rate": result['resp_rate'].tolist() if hasattr(result['resp_rate'], 'tolist') else list(result['resp_rate']),
116
- },
117
- "error": None
118
  }
119
  except Exception as e:
120
- return {"success": False, "result": None, "error": str(e)}
121
 
122
-
123
- @mcp.tool(name="filter_signal", description="Apply a digital filter to a signal")
124
- def filter_signal(signal: List[float], sampling_rate: float = 1000.0,
125
- ftype: str = "butter", band: str = "lowpass",
126
- frequency: float = 45.0, order: int = 4) -> dict:
127
  """
128
- Apply a digital filter to a signal.
129
-
130
- :param signal: Input signal as a list of float values.
131
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
132
- :param ftype: Filter type - 'butter', 'cheby1', 'cheby2', 'ellip', 'bessel', 'FIR' (default: 'butter').
133
- :param band: Band type - 'lowpass', 'highpass', 'bandpass', 'bandstop' (default: 'lowpass').
134
- :param frequency: Cutoff frequency in Hz (default: 45.0).
135
- :param order: Filter order (default: 4).
136
- :return: Dictionary containing filtered signal.
137
  """
138
  try:
139
- signal_array = np.array(signal)
140
- filtered, _, _ = tools.filter_signal(
141
- signal=signal_array,
142
- ftype=ftype,
143
- band=band,
144
- frequency=frequency,
145
- order=order,
146
- sampling_rate=sampling_rate
147
- )
148
 
149
  return {
150
  "success": True,
151
- "result": {
152
- "filtered": filtered.tolist()[:100] if len(filtered) > 100 else filtered.tolist(),
153
- "length": len(filtered),
154
- },
155
- "error": None
156
  }
157
  except Exception as e:
158
- return {"success": False, "result": None, "error": str(e)}
159
-
160
 
161
- @mcp.tool(name="get_heart_rate", description="Calculate heart rate from R-peak indices")
162
- def get_heart_rate(rpeaks: List[int], sampling_rate: float = 1000.0) -> dict:
163
  """
164
- Calculate heart rate from R-peak indices.
 
 
 
 
165
 
166
- :param rpeaks: List of R-peak indices.
167
- :param sampling_rate: Sampling frequency in Hz (default: 1000.0).
168
- :return: Dictionary containing heart rate values and statistics.
169
  """
170
  try:
171
- rpeaks_array = np.array(rpeaks)
172
- hr_idx, hr = tools.get_heart_rate(beats=rpeaks_array, sampling_rate=sampling_rate, smooth=True, size=3)
 
173
 
174
  return {
175
  "success": True,
176
- "result": {
177
- "heart_rate": hr.tolist(),
178
- "heart_rate_indices": hr_idx.tolist(),
179
- "mean_hr": float(np.mean(hr)) if len(hr) > 0 else None,
180
- "std_hr": float(np.std(hr)) if len(hr) > 0 else None,
181
- "min_hr": float(np.min(hr)) if len(hr) > 0 else None,
182
- "max_hr": float(np.max(hr)) if len(hr) > 0 else None,
183
- },
184
- "error": None
185
  }
186
  except Exception as e:
187
- return {"success": False, "result": None, "error": str(e)}
188
-
189
 
190
- def create_app():
191
  """
192
  Create and return the FastMCP application instance.
193
 
194
- :return: The FastMCP application instance.
 
195
  """
196
- return mcp
197
- """Create and return FastMCP application instance"""
198
- return mcp
199
-
200
- if __name__ == "__main__":
201
- mcp.run(transport="http", host="0.0.0.0", port=8000)
 
 
 
 
 
 
 
 
 
1
  from fastmcp import FastMCP
 
 
 
 
2
 
3
+ # Create the FastMCP service application
4
  mcp = FastMCP("biosppy_service")
5
 
6
+ @mcp.tool(name="process_ecg", description="Process ECG signals and extract features")
7
+ def process_ecg(signal: list, sampling_rate: int) -> dict:
 
8
  """
9
+ Process ECG signals and extract features such as heart rate and R-peaks.
10
+
11
+ Parameters:
12
+ - signal: List of ECG signal values.
13
+ - sampling_rate: Sampling rate of the signal in Hz.
14
 
15
+ Returns:
16
+ - dict: Processed ECG features including heart rate and R-peaks.
 
17
  """
18
  try:
19
+ from biosppy.signals import ecg
20
+
21
+ output = ecg.ecg(signal=signal, sampling_rate=sampling_rate, show=False)
22
 
23
  return {
24
  "success": True,
25
+ "heart_rate": output[6],
26
+ "rpeaks": output[2].tolist()
 
 
 
 
 
27
  }
28
  except Exception as e:
29
+ return {"success": False, "error": str(e)}
30
 
31
+ @mcp.tool(name="process_resp", description="Process respiratory signals and extract features")
32
+ def process_resp(signal: list, sampling_rate: int) -> dict:
 
33
  """
34
+ Process respiratory signals and extract features.
35
+
36
+ Parameters:
37
+ - signal: List of respiratory signal values.
38
+ - sampling_rate: Sampling rate of the signal in Hz.
39
 
40
+ Returns:
41
+ - dict: Processed respiratory features.
 
42
  """
43
  try:
44
+ from biosppy.signals import resp
45
+
46
+ output = resp.resp(signal=signal, sampling_rate=sampling_rate, show=False)
47
 
48
  return {
49
  "success": True,
50
+ "resp_rate": output[5],
51
+ "filtered_signal": output[1].tolist()
 
 
 
 
 
52
  }
53
  except Exception as e:
54
+ return {"success": False, "error": str(e)}
55
 
56
+ @mcp.tool(name="process_eda", description="Process electrodermal activity (EDA) signals")
57
+ def process_eda(signal: list, sampling_rate: int) -> dict:
 
58
  """
59
+ Process electrodermal activity (EDA) signals and extract features.
60
+
61
+ Parameters:
62
+ - signal: List of EDA signal values.
63
+ - sampling_rate: Sampling rate of the signal in Hz.
64
 
65
+ Returns:
66
+ - dict: Processed EDA features.
 
67
  """
68
  try:
69
+ from biosppy.signals import eda
70
+
71
+ output = eda.eda(signal=signal, sampling_rate=sampling_rate, show=False)
72
 
73
  return {
74
  "success": True,
75
+ "filtered_signal": output[1].tolist(),
76
+ "onsets": output[2].tolist()
 
 
 
77
  }
78
  except Exception as e:
79
+ return {"success": False, "error": str(e)}
80
 
81
+ @mcp.tool(name="process_emg", description="Process electromyography (EMG) signals")
82
+ def process_emg(signal: list, sampling_rate: int) -> dict:
 
83
  """
84
+ Process electromyography (EMG) signals and extract features.
85
+
86
+ Parameters:
87
+ - signal: List of EMG signal values.
88
+ - sampling_rate: Sampling rate of the signal in Hz.
89
 
90
+ Returns:
91
+ - dict: Processed EMG features.
 
92
  """
93
  try:
94
+ from biosppy.signals import emg
95
+
96
+ output = emg.emg(signal=signal, sampling_rate=sampling_rate, show=False)
97
 
98
  return {
99
  "success": True,
100
+ "filtered_signal": output[1].tolist()
 
 
 
 
 
101
  }
102
  except Exception as e:
103
+ return {"success": False, "error": str(e)}
104
 
105
+ @mcp.tool(name="process_pcg", description="Process phonocardiogram (PCG) signals")
106
+ def process_pcg(signal: list, sampling_rate: int) -> dict:
 
 
 
107
  """
108
+ Process phonocardiogram (PCG) signals and extract features.
109
+
110
+ Parameters:
111
+ - signal: List of PCG signal values.
112
+ - sampling_rate: Sampling rate of the signal in Hz.
113
+
114
+ Returns:
115
+ - dict: Processed PCG features.
 
116
  """
117
  try:
118
+ from biosppy.signals import pcg
119
+
120
+ output = pcg.pcg(signal=signal, sampling_rate=sampling_rate, show=False)
 
 
 
 
 
 
121
 
122
  return {
123
  "success": True,
124
+ "filtered_signal": output[1].tolist()
 
 
 
 
125
  }
126
  except Exception as e:
127
+ return {"success": False, "error": str(e)}
 
128
 
129
+ @mcp.tool(name="process_ecg_synthesis", description="Synthesize ECG signals")
130
+ def process_ecg_synthesis(duration: float, sampling_rate: int) -> dict:
131
  """
132
+ Synthesize ECG signals for a given duration and sampling rate.
133
+
134
+ Parameters:
135
+ - duration: Duration of the synthesized signal in seconds.
136
+ - sampling_rate: Sampling rate of the signal in Hz.
137
 
138
+ Returns:
139
+ - dict: Synthesized ECG signal.
 
140
  """
141
  try:
142
+ from biosppy.synthesizers import ecg
143
+
144
+ signal = ecg.ecg(duration=duration, sampling_rate=sampling_rate)
145
 
146
  return {
147
  "success": True,
148
+ "synthesized_signal": signal.tolist()
 
 
 
 
 
 
 
 
149
  }
150
  except Exception as e:
151
+ return {"success": False, "error": str(e)}
 
152
 
153
+ def create_app() -> FastMCP:
154
  """
155
  Create and return the FastMCP application instance.
156
 
157
+ Returns:
158
+ - FastMCP: The FastMCP application instance.
159
  """
160
+ return mcp