| |
|
| | import threading
|
| | from threading import Event
|
| | from pystray import Icon, MenuItem, Menu
|
| | from PIL import Image, ImageDraw, ImageFont
|
| |
|
| | import os
|
| | import sys
|
| | import time
|
| | import psutil
|
| | import pythoncom
|
| | import wmi
|
| |
|
| | import pynvml
|
| | from pynvml import *
|
| |
|
| | from difflib import get_close_matches
|
| | import subprocess
|
| | import tempfile
|
| | import shutil
|
| |
|
| | icons = {}
|
| | current_colors = {}
|
| | last_colors = {}
|
| | stop_events = {}
|
| | color_lock = threading.Lock()
|
| | icon_lock = threading.Lock()
|
| |
|
| |
|
| | COLOR_MAP = {
|
| | "gray": (160, 160, 160, 255),
|
| | "green": (0, 128, 0, 255),
|
| | "red": (128, 0, 0, 255),
|
| | "yellow": (220, 220, 0, 255)
|
| | }
|
| |
|
| |
|
| | def managed_thread(target, *args, **kwargs):
|
| | """
|
| | Führt target(*args, **kwargs) in Schleife aus, bis shutdown_event gesetzt ist.
|
| | Ideal für Monitoring-Loops.
|
| | """
|
| | def wrapper():
|
| | while not shutdown_event.is_set():
|
| | target(*args, **kwargs)
|
| | time.sleep(2.1)
|
| | t = threading.Thread(target=wrapper, daemon=True)
|
| | thread_refs.append(t)
|
| | t.start()
|
| |
|
| | shutdown_event = Event()
|
| | thread_refs = []
|
| | shutdown_requested = threading.Event()
|
| |
|
| |
|
| | def resource_path(relative_path):
|
| | try:
|
| | base_path = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
|
| | full_path = os.path.join(base_path, relative_path)
|
| | if not os.path.exists(full_path) and hasattr(sys, 'frozen'):
|
| | raise FileNotFoundError
|
| | return full_path
|
| | except (AttributeError, FileNotFoundError):
|
| | try:
|
| | import pkgutil
|
| | data = pkgutil.get_data(__name__, relative_path)
|
| | if data:
|
| | temp_dir = tempfile.gettempdir()
|
| | temp_file = os.path.join(temp_dir, os.path.basename(relative_path))
|
| | with open(temp_file, 'wb') as f:
|
| | f.write(data)
|
| | return temp_file
|
| | except Exception as e:
|
| | print(f"[ERROR] Fallback-Resource-Pfad fehlgeschlagen: {e}")
|
| |
|
| |
|
| | pkg_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "your_package_name"))
|
| | fallback_path = os.path.join(pkg_root, relative_path)
|
| | if os.path.exists(fallback_path):
|
| | return fallback_path
|
| |
|
| | raise FileNotFoundError(f"Resource '{relative_path}' not found in any path.")
|
| |
|
| |
|
| | font_path = resource_path("DePixelSchmal.otf")
|
| |
|
| | def round_to_nearest_five(value):
|
| | return int(round(value / 5.0) * 5)
|
| |
|
| | def get_active_network_adapters():
|
| | c = wmi.WMI()
|
| | adapters = []
|
| | for nic in c.Win32_NetworkAdapterConfiguration(IPEnabled=True):
|
| | if hasattr(nic, 'Description'):
|
| | adapters.append(nic.Description)
|
| | return adapters
|
| |
|
| | def get_adapter_speeds():
|
| | c = wmi.WMI()
|
| | speeds = {}
|
| | for nic in c.Win32_NetworkAdapter():
|
| | if nic.NetEnabled and nic.Speed:
|
| | speeds[nic.Name] = int(nic.Speed)
|
| | return speeds
|
| |
|
| | def find_best_match(name, candidates):
|
| | matches = get_close_matches(name, candidates, n=1, cutoff=0.6)
|
| | return matches[0] if matches else None
|
| |
|
| | def create_text_icon(text, color=(255, 255, 255, 255), bg_color=(0, 0, 0, 0)):
|
| | size = 77
|
| | image = Image.new("RGBA", (size, size), bg_color)
|
| | draw = ImageDraw.Draw(image)
|
| |
|
| | try:
|
| | font = ImageFont.truetype(font_path, 29)
|
| | except Exception:
|
| | font = ImageFont.load_default()
|
| |
|
| | bbox = draw.textbbox((0, 0), text, font=font) if hasattr(draw, 'textbbox') else font.getsize(text)
|
| | text_w, text_h = bbox[2] - bbox[0], bbox[3] - bbox[1]
|
| | text_x = (size - text_w) // 2 - bbox[0]
|
| | text_y = (size - text_h) // 2 - bbox[1]
|
| |
|
| | draw.text((text_x, text_y), text, font=font, fill=color)
|
| | return image
|
| |
|
| | def format_speed_custom(value_kb):
|
| | units = ['kB/s', 'MB/s', 'GB/s']
|
| | speed = value_kb
|
| | unit_index = 0
|
| |
|
| | while speed >= 100 and unit_index < len(units) - 1:
|
| | speed /= 1024
|
| | unit_index += 1
|
| |
|
| | if speed < 10:
|
| | display = f"{speed:.1f}"
|
| | else:
|
| | display = f"{min(round(speed), 99)}"
|
| |
|
| | return f"{display}\n{units[unit_index]}"
|
| |
|
| |
|
| | def get_color(read_active, write_active, read_mb=0, write_mb=0):
|
| | if read_mb < 2 and write_mb < 2:
|
| | return "gray"
|
| | elif read_mb >= 2 and write_mb >= 2:
|
| | ratio = read_mb / write_mb if write_mb != 0 else float('inf')
|
| | if 1/5 <= ratio <= 5:
|
| | return "yellow"
|
| | elif write_mb > read_mb:
|
| | return "red"
|
| | else:
|
| | return "green"
|
| | elif write_mb >= 2:
|
| | return "red"
|
| | elif read_mb >= 2:
|
| | return "green"
|
| | return "gray"
|
| |
|
| |
|
| | def _set_icon_color(key, color):
|
| | with icon_lock:
|
| | icon_data = icons.get(key)
|
| | if icon_data:
|
| | icon = icon_data["icon"]
|
| | label = icon_data["label"]
|
| | new_icon = create_icon(COLOR_MAP.get(color, (128, 128, 128)), label)
|
| | try:
|
| | icon.icon = new_icon
|
| | except Exception as e:
|
| | print(f"[WARN] Could not update icon for {key}: {e}")
|
| | finally:
|
| | with color_lock:
|
| | last_colors[key] = color
|
| |
|
| |
|
| | def create_icon(color_rgb, label):
|
| | size = 77
|
| | image = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
| | draw = ImageDraw.Draw(image)
|
| |
|
| | draw.ellipse((0, 0, size, size), fill=color_rgb)
|
| |
|
| | try:
|
| | font = ImageFont.truetype(font_path, 55)
|
| | except Exception:
|
| | font = ImageFont.load_default()
|
| |
|
| | if hasattr(draw, 'textbbox'):
|
| | bbox = draw.textbbox((0, 0), label, font=font)
|
| | else:
|
| | width, height = font.getsize(label)
|
| | bbox = (0, 0, width, height)
|
| |
|
| | text_w = bbox[2] - bbox[0]
|
| | text_h = bbox[3] - bbox[1]
|
| | text_x = (size - text_w) / 2
|
| | text_y = (size - text_h) / 2 - bbox[1]
|
| |
|
| | brightness = sum(color_rgb) / 3
|
| | text_color = (0, 0, 0, 255) if brightness > 130 else (255, 255, 255, 255)
|
| |
|
| | draw.text((text_x, text_y), label, font=font, fill=text_color)
|
| | return image
|
| |
|
| | def update_tray_color(key, color):
|
| | with color_lock:
|
| | old_color = current_colors.get(key)
|
| | if old_color == color:
|
| | return
|
| | current_colors[key] = color
|
| |
|
| |
|
| |
|
| | def _icon_updater(key, stop_event):
|
| | while not stop_event.is_set():
|
| | with color_lock:
|
| | color = current_colors.get(key, "gray")
|
| | last_color = last_colors.get(key)
|
| |
|
| | if color != last_color:
|
| | try:
|
| | _set_icon_color(key, color)
|
| | except Exception as e:
|
| | print(f"[WARN] Icon update failed for {key}: {e}")
|
| |
|
| | stop_event.wait(0.2)
|
| |
|
| |
|
| |
|
| |
|
| | def get_gradient_color(percent):
|
| | """
|
| | Gibt eine Farbe für den gegebenen Prozentwert aus einem Regenbogenverlauf zurück:
|
| | 0% -> grün, 50% -> gelb, 100% -> rot
|
| | """
|
| | if percent <= 50:
|
| |
|
| | ratio = percent / 50.0
|
| | r = int(COLOR_MAP["green"][0] + ratio * (COLOR_MAP["yellow"][0] - COLOR_MAP["green"][0]))
|
| | g = int(COLOR_MAP["green"][1] + ratio * (COLOR_MAP["yellow"][1] - COLOR_MAP["green"][1]))
|
| | b = int(COLOR_MAP["green"][2] + ratio * (COLOR_MAP["yellow"][2] - COLOR_MAP["green"][2]))
|
| | else:
|
| |
|
| | ratio = (percent - 50) / 50.0
|
| | r = int(COLOR_MAP["yellow"][0] + ratio * (COLOR_MAP["red"][0] - COLOR_MAP["yellow"][0]))
|
| | g = int(COLOR_MAP["yellow"][1] + ratio * (COLOR_MAP["red"][1] - COLOR_MAP["yellow"][1]))
|
| | b = int(COLOR_MAP["yellow"][2] + ratio * (COLOR_MAP["red"][2] - COLOR_MAP["yellow"][2]))
|
| |
|
| | return (r, g, b, 255)
|
| |
|
| |
|
| | def create_bar_icon(percent, label, color=None):
|
| | """
|
| | Erstellt ein Balken-Icon mit Regenbogen-Farbverlauf.
|
| | 0% → unten grün, 50% → mitte gelb, 100% → oben rot
|
| | """
|
| | size = 77
|
| | margin = 4
|
| | image = Image.new("RGBA", (size, size), (0, 0, 0, 0))
|
| | draw = ImageDraw.Draw(image)
|
| |
|
| |
|
| | bar_width = size - 2 * margin
|
| | bar_height = int((percent / 100.0) * size)
|
| | bar_x0 = margin
|
| | bar_x1 = size - margin
|
| | bar_y_bottom = size - 1
|
| | bar_y_top = bar_y_bottom - bar_height + 1
|
| |
|
| |
|
| | for i in range(bar_height):
|
| | rel_percent = (i / bar_height) * percent
|
| | line_color = get_gradient_color(rel_percent)
|
| | y = bar_y_bottom - i
|
| | draw.line([(bar_x0, y), (bar_x1, y)], fill=line_color)
|
| |
|
| |
|
| | try:
|
| | font = ImageFont.truetype(font_path, 30)
|
| | except:
|
| | font = ImageFont.load_default()
|
| |
|
| | draw.text((2, 2), label, font=font, fill=(255, 255, 255, 255))
|
| |
|
| | return image
|
| |
|
| |
|
| |
|
| | def _on_quit(icon_inst=None, item=None):
|
| | print("[INFO] Beenden eingeleitet...")
|
| |
|
| |
|
| | shutdown_event.set()
|
| | for t in thread_refs:
|
| | t.join(timeout=2.1)
|
| | print("[INFO] Alle Threads beendet.")
|
| |
|
| |
|
| | stop_all_tray_icons()
|
| | print("[INFO] Alle Trays beendet.")
|
| |
|
| | try:
|
| | pynvml.nvmlShutdown()
|
| | except Exception:
|
| | pass
|
| | print("[INFO] Nvidia shut down.")
|
| | time.sleep(0.1)
|
| | shutdown_requested.set()
|
| |
|
| |
|
| | def _on_restart(icon_inst=None, item=None):
|
| | print("[INFO] Neustart eingeleitet...")
|
| |
|
| |
|
| | shutdown_event.set()
|
| | for t in thread_refs:
|
| | t.join(timeout=2.1)
|
| | print("[INFO] Alle Threads beendet.")
|
| |
|
| |
|
| | stop_all_tray_icons()
|
| |
|
| |
|
| | try:
|
| | pynvml.nvmlShutdown()
|
| | except Exception:
|
| | pass
|
| |
|
| | time.sleep(0.1)
|
| |
|
| | base_dir = os.path.dirname(os.path.abspath(__file__))
|
| | startdir_path = os.path.join(base_dir, "startdir.txt")
|
| | print("[INFO] Start Folder lesen")
|
| | try:
|
| | with open(startdir_path, "r", encoding="utf-8") as f:
|
| | executable_path = f.readline().strip()
|
| | if not os.path.isfile(executable_path):
|
| | raise FileNotFoundError(f"EXE nicht gefunden: {executable_path}")
|
| | except Exception as e:
|
| | print(f"[ERROR] Fehler beim Lesen der startdir.txt: {e}")
|
| | if icon_inst:
|
| | icon_inst.stop()
|
| | return
|
| |
|
| | exe_dir = os.path.dirname(executable_path)
|
| |
|
| | time.sleep(0.1)
|
| | try:
|
| | print(f"[INFO] Starte EXE erneut (via subprocess.Popen): {executable_path}")
|
| |
|
| | env = os.environ.copy()
|
| | env["RESTART_COUNT"] = str(int(env.get("RESTART_COUNT", "0")) + 1)
|
| | env["PYINSTALLER_RESET_ENVIRONMENT"] = "1"
|
| |
|
| | subprocess.Popen(
|
| | [executable_path],
|
| | cwd=exe_dir,
|
| | env=env,
|
| | close_fds=True,
|
| | shell=False
|
| | )
|
| |
|
| | print(f"[INFO] EXE gestartet!")
|
| | except Exception as e:
|
| | print(f"[ERROR] Fehler beim Start via Popen: {e}")
|
| | return
|
| | print("[INFO] Old Instance EXIT")
|
| | time.sleep(0.5)
|
| | shutdown_requested.set()
|
| |
|
| |
|
| |
|
| |
|
| | def stop_all_tray_icons():
|
| | for icon in icons.values():
|
| | try:
|
| | if icon["icon"].visible:
|
| | icon["icon"].visible = False
|
| | icon["icon"].stop()
|
| | except Exception as e:
|
| | print(f"[WARN] Icon-Stop fehlgeschlagen: {e}")
|
| | for event in stop_events.values():
|
| | event.set()
|
| |
|
| | def update_tray_tooltip(key, tooltip_text):
|
| | icon_data = icons.get(key)
|
| | if icon_data:
|
| | icon = icon_data["icon"]
|
| | try:
|
| | icon.title = tooltip_text[:127]
|
| | except Exception as e:
|
| | print(f"[WARN] Tooltip konnte nicht gesetzt werden für {key}: {e}")
|
| |
|
| | def update_net_icons(adapter_name, send_kb, recv_kb, selected_components):
|
| | menu = Menu(
|
| | MenuItem("Restart", lambda icon_inst, item: _on_restart(icon_inst, item)),
|
| | MenuItem("Exit", lambda icon_inst, item: _on_quit(icon_inst, item))
|
| | )
|
| |
|
| |
|
| | def update_icon(direction, value_kb):
|
| | if not selected_components['network']:
|
| | return
|
| |
|
| | if value_kb < 10:
|
| | value_kb = 0
|
| |
|
| | key = f"NET_{adapter_name}, Direction: {direction}"
|
| | text_with_linebreak = f"{'U ' if direction == 'SEND' else 'D '}{format_speed_custom(value_kb)}"
|
| | speed_parts = format_speed_custom(value_kb).split("\n")
|
| | text_no_linebreak = f"{speed_parts[0]} {speed_parts[1]}" if len(speed_parts) == 2 else format_speed_custom(value_kb)
|
| |
|
| | image = create_text_icon(text_with_linebreak)
|
| | if key not in icons:
|
| | icon = Icon(key, image, menu=menu)
|
| | icons[key] = {"icon": icon, "label": key}
|
| | print(f"Created Network icon {key}")
|
| | icon.run_detached()
|
| | else:
|
| | icons[key]["icon"].icon = image
|
| |
|
| | tooltip = f"{adapter_name} {'Upload' if direction == 'SEND' else 'Download'}: {text_no_linebreak}"
|
| | tooltip = tooltip[:127]
|
| | update_tray_tooltip(key, tooltip)
|
| |
|
| | threading.Thread(target=update_icon, args=("SEND", send_kb), daemon=True).start()
|
| | threading.Thread(target=update_icon, args=("RECV", recv_kb), daemon=True).start()
|
| |
|
| |
|
| | def sort_selected_drives(drive_selections, device_map):
|
| | items = [(dev, part) for dev, parts in device_map.items() for part in parts]
|
| |
|
| |
|
| | sorted_items = sorted(items, key=lambda x: x[1]['letter'].upper(), reverse=True)
|
| |
|
| |
|
| | return sorted(drive_selections, key=lambda x: x[1].upper(), reverse=True)
|
| |
|
| |
|
| | def start_drive_icons(hardware_info, stop_all_tray_icons, device_map, drive_selections):
|
| | print("Starting tray icons for selected drives...")
|
| | menu = Menu(
|
| | MenuItem("Restart", lambda icon_inst, item: _on_restart(icon_inst, item)),
|
| | MenuItem("Exit", lambda icon_inst, item: _on_quit(icon_inst, item))
|
| | )
|
| |
|
| |
|
| | sorted_drive_selections = sort_selected_drives(drive_selections, device_map)
|
| |
|
| | for index, (dev, part) in enumerate(sorted_drive_selections):
|
| | icon_label = part.strip(":")
|
| | icon_key = f"{dev}_{part}"
|
| | icon_title = f"{index}_SmartTaskTool_{icon_label}"
|
| |
|
| | image = create_icon(COLOR_MAP["gray"], icon_label)
|
| | icon = Icon(icon_title, image, menu=menu)
|
| | icons[icon_key] = {"icon": icon, "label": icon_label}
|
| | current_colors[icon_key] = "gray"
|
| | last_colors[icon_key] = None
|
| | stop_event = threading.Event()
|
| | stop_events[icon_key] = stop_event
|
| |
|
| | icon.run_detached()
|
| |
|
| | threading.Thread(target=_icon_updater, args=(icon_key, stop_event), daemon=True).start()
|
| | time.sleep(0.2)
|
| |
|
| | print("Started Icons:")
|
| | for key, value in icons.items():
|
| | print(f" {key}: {value}")
|
| |
|
| |
|
| | def start_tray_monitoring(hardware_info, selected_components):
|
| | if not isinstance(selected_components, dict):
|
| | raise ValueError("selected_components muss ein Dictionary sein")
|
| |
|
| | expected_keys = ['cpu', 'ram', 'gpu', 'network', 'drives']
|
| |
|
| |
|
| | for key in expected_keys:
|
| | if key not in selected_components:
|
| | raise KeyError(f"selected_components fehlt: '{key}'")
|
| |
|
| | value = selected_components[key]
|
| |
|
| | if key == 'drives':
|
| | if not isinstance(value, list):
|
| | raise TypeError(f"'{key}' muss eine Liste sein.")
|
| | if not all(isinstance(item, tuple) and len(item) == 2 for item in value):
|
| | raise ValueError(f"Jedes Element in '{key}' muss ein Tuple mit zwei Elementen sein.")
|
| | else:
|
| | if not isinstance(value, bool):
|
| | raise TypeError(f"'{key}' muss ein Boolean sein.")
|
| |
|
| | print("Starting tray monitoring...")
|
| |
|
| | menu = Menu(
|
| | MenuItem("Restart", lambda icon_inst, item: _on_restart(icon_inst, item)),
|
| | MenuItem("Exit", lambda icon_inst, item: _on_quit(icon_inst, item))
|
| | )
|
| |
|
| | def update_cpu(percent):
|
| | if not selected_components['cpu']:
|
| | return
|
| |
|
| |
|
| | try:
|
| | logical = psutil.cpu_count(logical=True)
|
| | physical = psutil.cpu_count(logical=False)
|
| | cpu_percentages = psutil.cpu_percent(interval=None, percpu=True)
|
| |
|
| | num_cores = logical if logical else physical
|
| | core_usages = cpu_percentages[:num_cores]
|
| |
|
| | if not core_usages:
|
| | raise ValueError("Keine CPU-Werte erhalten.")
|
| |
|
| | avg_cpu_percent = sum(core_usages) / len(core_usages)
|
| | percent = round_to_nearest_five(avg_cpu_percent)
|
| | except Exception as e:
|
| | print(f"[ERROR] CPU-Auswertung fehlgeschlagen: {e}")
|
| | percent = 0
|
| | core_usages = []
|
| | num_cores = 0
|
| |
|
| |
|
| | image = create_bar_icon(percent, "CPU")
|
| | key = "CPU_USAGE"
|
| | if key not in icons:
|
| | icon = Icon(key, image, menu=menu)
|
| | icons[key] = {"icon": icon, "label": "CPU"}
|
| | print(f"Created CPU icon")
|
| | threading.Thread(target=icon.run, daemon=True).start()
|
| | else:
|
| | icons[key]["icon"].icon = image
|
| |
|
| |
|
| | try:
|
| | cores_str = " | ".join([f"{int(p)}%" for p in core_usages])
|
| | tooltip = f"{num_cores} Cores: {cores_str}"
|
| | tooltip = tooltip[:127]
|
| | update_tray_tooltip(key, tooltip)
|
| | except Exception:
|
| | tooltip = f"CPU {percent}%"
|
| |
|
| |
|
| |
|
| |
|
| | def update_ram(percent):
|
| | if not selected_components['ram']:
|
| | return
|
| | percent = round_to_nearest_five(percent)
|
| | image = create_bar_icon(percent, "RAM")
|
| | key = "RAM_USAGE"
|
| | if key not in icons:
|
| | icon = Icon(key, image, menu=menu)
|
| | icons[key] = {"icon": icon, "label": "RAM"}
|
| | print(f"Created RAM icon")
|
| | threading.Thread(target=icon.run, daemon=True).start()
|
| | else:
|
| | icons[key]["icon"].icon = image
|
| |
|
| | try:
|
| | mem = psutil.virtual_memory()
|
| | used_gb = round(mem.used / (1024 ** 3))
|
| | total_gb = round(mem.total / (1024 ** 3))
|
| | tooltip = f"RAM {used_gb} / {total_gb} GB"
|
| | update_tray_tooltip(key, tooltip)
|
| | except Exception:
|
| | tooltip = f"RAM: {percent}%"
|
| |
|
| |
|
| |
|
| | def update_gpu_vram_temp(idx, util, mem_used, mem_total, temp, max_temp):
|
| | if not selected_components['gpu']:
|
| | return
|
| |
|
| |
|
| | key_temp = f"GPU{idx}_TEMP"
|
| | label_temp = f"T{idx}"
|
| | temp_rounded = round(temp)
|
| |
|
| | clamped = max(35, min(temp, max_temp))
|
| | pct = round((clamped - 35) / (max_temp - 35) * 100)
|
| | image_temp = create_bar_icon(pct, label_temp)
|
| |
|
| | if key_temp not in icons:
|
| | try:
|
| | icon = Icon(key_temp, image_temp, menu=menu)
|
| | icons[key_temp] = {"icon": icon, "label": label_temp}
|
| | print(f"Created GPU{idx} temperature icon.")
|
| | threading.Thread(target=icon.run, daemon=True).start()
|
| | except Exception as e:
|
| | print(f"Error creating GPU{idx} temperature icon: {e}")
|
| | else:
|
| | icons[key_temp]["icon"].icon = image_temp
|
| |
|
| |
|
| | tooltip = f"{label_temp}: {temp_rounded} °C / {max_temp} °C"
|
| | tooltip = tooltip[:127]
|
| | update_tray_tooltip(key_temp, tooltip)
|
| |
|
| | time.sleep(0.01)
|
| |
|
| |
|
| |
|
| | key_vram = f"VRAM{idx}_USAGE"
|
| | label_vram = f"VR{idx}"
|
| | vram_util = round_to_nearest_five(mem_used / mem_total * 100)
|
| | image_vram = create_bar_icon(vram_util, label_vram)
|
| |
|
| | if key_vram not in icons:
|
| | try:
|
| | icon = Icon(key_vram, image_vram, menu=menu)
|
| | icons[key_vram] = {"icon": icon, "label": label_vram}
|
| | print(f"Created VRAM{idx} usage icon.")
|
| | threading.Thread(target=icon.run, daemon=True).start()
|
| | except Exception as e:
|
| | print(f"Error creating VRAM{idx} usage icon: {e}")
|
| | else:
|
| | icons[key_vram]["icon"].icon = image_vram
|
| |
|
| | used_gb = round(mem_used / (1024**3))
|
| | total_gb = round(mem_total / (1024**3))
|
| |
|
| | tooltip = f"{label_vram}: {used_gb}/{total_gb} GB"
|
| | tooltip = tooltip[:127]
|
| | update_tray_tooltip(key_vram, tooltip)
|
| |
|
| |
|
| | time.sleep(0.01)
|
| |
|
| |
|
| |
|
| | key_gpu = f"GPU{idx}_USAGE"
|
| | label_gpu = f"GPU{idx}"
|
| | image_gpu = create_bar_icon(util, label_gpu)
|
| |
|
| | if key_gpu not in icons:
|
| | try:
|
| | icon = Icon(key_gpu, image_gpu, menu=menu)
|
| | icons[key_gpu] = {"icon": icon, "label": label_gpu}
|
| | print(f"Created GPU{idx} usage icon.")
|
| | threading.Thread(target=icon.run, daemon=True).start()
|
| | except Exception as e:
|
| | print(f"Error creating GPU{idx} usage icon: {e}")
|
| | else:
|
| | icons[key_gpu]["icon"].icon = image_gpu
|
| |
|
| |
|
| |
|
| | tooltip = f"{label_gpu}: {util}%"
|
| | tooltip = tooltip[:127]
|
| | update_tray_tooltip(key_gpu, tooltip)
|
| |
|
| |
|
| | def gpu_monitor():
|
| | try:
|
| | pynvml.nvmlInit()
|
| | gpu_data = []
|
| |
|
| | for idx in range(pynvml.nvmlDeviceGetCount()):
|
| | handle = pynvml.nvmlDeviceGetHandleByIndex(idx)
|
| | try:
|
| | max_temp = pynvml.nvmlDeviceGetTemperatureThreshold(
|
| | handle, pynvml.NVML_TEMPERATURE_THRESHOLD_GPU_MAX
|
| | )
|
| | except pynvml.NVMLError:
|
| | max_temp = 90
|
| |
|
| | gpu_data.append((idx, handle, max_temp))
|
| |
|
| | while not shutdown_event.is_set():
|
| |
|
| | for idx, handle, max_temp in gpu_data:
|
| | try:
|
| | util = pynvml.nvmlDeviceGetUtilizationRates(handle).gpu
|
| | mem = pynvml.nvmlDeviceGetMemoryInfo(handle)
|
| | temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
|
| |
|
| | update_gpu_vram_temp(idx, util, mem.used, mem.total, temp, max_temp)
|
| |
|
| | except pynvml.NVMLError as e:
|
| | print(f"[ERROR] GPU{idx} Fehler: {e}")
|
| |
|
| | time.sleep(0.5)
|
| |
|
| | except pynvml.NVMLError as e:
|
| | print(f"[ERROR] NVML Initialisierung fehlgeschlagen: {e}")
|
| |
|
| |
|
| |
|
| | def cpu_monitor():
|
| | while not shutdown_event.is_set():
|
| | try:
|
| | update_cpu(psutil.cpu_percent(interval=None))
|
| | except Exception as e:
|
| | print(f"[ERROR] CPU Monitoring Fehler: {e}")
|
| | time.sleep(0.5)
|
| |
|
| | def ram_monitor():
|
| | while not shutdown_event.is_set():
|
| | try:
|
| | mem = psutil.virtual_memory()
|
| | update_ram(mem.percent)
|
| | except Exception as e:
|
| | print(f"[ERROR] RAM Monitoring Fehler: {e}")
|
| | time.sleep(0.5)
|
| |
|
| | def wmi_monitor(poll_interval=2):
|
| |
|
| | pythoncom.CoInitialize()
|
| | c = wmi.WMI(namespace="root\\CIMV2")
|
| | prev_disk_counters = {}
|
| | prev_net_stats = {}
|
| | speeds = get_adapter_speeds()
|
| |
|
| | active_adapters = get_active_network_adapters()
|
| | adapter_map = {active: find_best_match(active, list(speeds.keys())) for active in active_adapters}
|
| |
|
| | try:
|
| | next_time = time.perf_counter()
|
| | while not shutdown_event.is_set():
|
| |
|
| | try:
|
| |
|
| |
|
| | perf_logical_disks = {
|
| | disk.Name.upper(): disk
|
| | for disk in c.Win32_PerfRawData_PerfDisk_LogicalDisk()
|
| | }
|
| |
|
| | for dev, part in selected_components.get('drives', []):
|
| | if not selected_components['drives']:
|
| | continue
|
| |
|
| | perf_disk = perf_logical_disks.get(part)
|
| | if not perf_disk:
|
| | continue
|
| |
|
| | read_bytes = int(perf_disk.DiskReadBytesPerSec)
|
| | write_bytes = int(perf_disk.DiskWriteBytesPerSec)
|
| | prev_read, prev_write = prev_disk_counters.get(part, (0, 0))
|
| | read_diff = max(0, read_bytes - prev_read)
|
| | write_diff = max(0, write_bytes - prev_write)
|
| | prev_disk_counters[part] = (read_bytes, write_bytes)
|
| |
|
| | mb_read = read_diff / 1024 / 1024 / 2
|
| | mb_write = write_diff / 1024 / 1024 / 2
|
| |
|
| | key = f"{dev}_{part}"
|
| | color = get_color(mb_read > 2.0, mb_write > 2.0, mb_read, mb_write)
|
| |
|
| | update_tray_color(key, color)
|
| | update_tray_tooltip(key, f"{part} R {int(mb_read)} MB/s | W {int(mb_write)} MB/s")
|
| |
|
| |
|
| |
|
| | perf_net_ifaces = c.Win32_PerfRawData_Tcpip_NetworkInterface()
|
| |
|
| | for iface in perf_net_ifaces:
|
| | iface_name = getattr(iface, 'Name', None)
|
| | if not iface_name:
|
| | continue
|
| |
|
| | adapter_name = find_best_match(iface_name, list(adapter_map.keys()))
|
| | if not adapter_name or not selected_components['network']:
|
| | continue
|
| |
|
| | send_raw = int(iface.BytesSentPersec)
|
| | recv_raw = int(iface.BytesReceivedPerSec)
|
| |
|
| |
|
| |
|
| | prev_send, prev_recv = prev_net_stats.get(adapter_name, (0, 0))
|
| | send_diff = max(0, send_raw - prev_send)
|
| | recv_diff = max(0, recv_raw - prev_recv)
|
| |
|
| |
|
| |
|
| | prev_net_stats[adapter_name] = (send_raw, recv_raw)
|
| |
|
| |
|
| | send_kb = send_diff / 1024 / 2
|
| | recv_kb = recv_diff / 1024 / 2
|
| |
|
| |
|
| |
|
| | update_net_icons(adapter_name, send_kb, recv_kb, selected_components)
|
| |
|
| | except Exception as e:
|
| | print(f"[ERROR] Unified WMI Monitor: {e}")
|
| |
|
| | next_time += poll_interval
|
| | time.sleep(max(0, next_time - time.perf_counter()))
|
| |
|
| | finally:
|
| | pythoncom.CoUninitialize()
|
| |
|
| |
|
| |
|
| | if selected_components['cpu']:
|
| | managed_thread(cpu_monitor)
|
| | time.sleep(0.3)
|
| |
|
| | if selected_components['ram']:
|
| | managed_thread(ram_monitor)
|
| | time.sleep(0.3)
|
| |
|
| | if selected_components['gpu']:
|
| | managed_thread(gpu_monitor)
|
| | time.sleep(0.3)
|
| |
|
| | if selected_components['network']:
|
| | managed_thread(wmi_monitor)
|
| | time.sleep(2)
|
| |
|
| |
|
| | if selected_components['drives']:
|
| | device_map = hardware_info.get('drive_map', {})
|
| | drive_selections = selected_components['drives']
|
| | start_drive_icons(hardware_info, stop_all_tray_icons, device_map, drive_selections)
|
| |
|
| |
|
| | print("All tray monitoring components started.")
|
| |
|
| |
|
| |
|