| import os |
| import folder_paths |
| import json |
| from server import PromptServer |
| import glob |
| from aiohttp import web |
|
|
|
|
| def get_allowed_dirs(): |
| dir = os.path.abspath(os.path.join(__file__, "../../user")) |
| file = os.path.join(dir, "text_file_dirs.json") |
| with open(file, "r") as f: |
| return json.loads(f.read()) |
|
|
|
|
| def get_valid_dirs(): |
| return get_allowed_dirs().keys() |
|
|
|
|
| def get_dir_from_name(name): |
| dirs = get_allowed_dirs() |
| if name not in dirs: |
| raise KeyError(name + " dir not found") |
|
|
| path = dirs[name] |
| path = path.replace("$input", folder_paths.get_input_directory()) |
| path = path.replace("$output", folder_paths.get_output_directory()) |
| path = path.replace("$temp", folder_paths.get_temp_directory()) |
| return path |
|
|
|
|
| def is_child_dir(parent_path, child_path): |
| parent_path = os.path.abspath(parent_path) |
| child_path = os.path.abspath(child_path) |
| return os.path.commonpath([parent_path]) == os.path.commonpath([parent_path, child_path]) |
|
|
|
|
| def get_real_path(dir): |
| dir = dir.replace("/**/", "/") |
| dir = os.path.abspath(dir) |
| dir = os.path.split(dir)[0] |
| return dir |
|
|
|
|
| @PromptServer.instance.routes.get("/pysssss/text-file/{name}") |
| async def get_files(request): |
| name = request.match_info["name"] |
| dir = get_dir_from_name(name) |
| recursive = "/**/" in dir |
| |
| pre = get_real_path(dir) |
|
|
| files = list(map(lambda t: os.path.relpath(t, pre), |
| glob.glob(dir, recursive=recursive))) |
|
|
| if len(files) == 0: |
| files = ["[none]"] |
| return web.json_response(files) |
|
|
|
|
| def get_file(root_dir, file): |
| if file == "[none]" or not file or not file.strip(): |
| raise ValueError("No file") |
|
|
| root_dir = get_dir_from_name(root_dir) |
| root_dir = get_real_path(root_dir) |
| full_path = os.path.join(root_dir, file) |
|
|
| if not is_child_dir(root_dir, full_path): |
| raise ReferenceError() |
|
|
| return full_path |
|
|
|
|
| class TextFileNode: |
| RETURN_TYPES = ("STRING",) |
| CATEGORY = "utils" |
|
|
| @classmethod |
| def VALIDATE_INPUTS(self, root_dir, file, **kwargs): |
| self.file = get_file(root_dir, file) |
| return True |
|
|
| def load_text(self, **kwargs): |
| with open(self.file, "r") as f: |
| return (f.read(), ) |
|
|
|
|
| class LoadText(TextFileNode): |
| @classmethod |
| def IS_CHANGED(self, **kwargs): |
| return os.path.getmtime(self.file) |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "root_dir": (list(get_valid_dirs()), {}), |
| "file": (["[none]"], { |
| "pysssss.binding": [{ |
| "source": "root_dir", |
| "callback": [{ |
| "type": "set", |
| "target": "$this.disabled", |
| "value": True |
| }, { |
| "type": "fetch", |
| "url": "/pysssss/text-file/{$source.value}", |
| "then": [{ |
| "type": "set", |
| "target": "$this.options.values", |
| "value": "$result" |
| }, { |
| "type": "validate-combo" |
| }, { |
| "type": "set", |
| "target": "$this.disabled", |
| "value": False |
| }] |
| }], |
| }] |
| }) |
| }, |
| } |
|
|
| FUNCTION = "load_text" |
|
|
|
|
| class SaveText(TextFileNode): |
| @classmethod |
| def IS_CHANGED(self, **kwargs): |
| return float("nan") |
|
|
| @classmethod |
| def INPUT_TYPES(s): |
| return { |
| "required": { |
| "root_dir": (list(get_valid_dirs()), {}), |
| "file": ("STRING", {"default": "file.txt"}), |
| "append": (["append", "overwrite", "new only"], {}), |
| "insert": ("BOOLEAN", { |
| "default": True, "label_on": "new line", "label_off": "none", |
| "pysssss.binding": [{ |
| "source": "append", |
| "callback": [{ |
| "type": "if", |
| "condition": [{ |
| "left": "$source.value", |
| "op": "eq", |
| "right": '"append"' |
| }], |
| "true": [{ |
| "type": "set", |
| "target": "$this.disabled", |
| "value": False |
| }], |
| "false": [{ |
| "type": "set", |
| "target": "$this.disabled", |
| "value": True |
| }], |
| }] |
| }] |
| }), |
| "text": ("STRING", {"forceInput": True, "multiline": True}) |
| }, |
| } |
|
|
| FUNCTION = "write_text" |
|
|
| def write_text(self, root_dir, file, append, insert, text): |
| if append == "new only" and os.path.exists(self.file): |
| raise FileExistsError( |
| self.file + " already exists and 'new only' is selected.") |
| with open(self.file, "a+" if append == "append" else "w") as f: |
| is_append = f.tell() != 0 |
| if is_append and insert: |
| f.write("\n") |
| f.write(text) |
|
|
| return super().load_text() |
|
|
|
|
| NODE_CLASS_MAPPINGS = { |
| "LoadText|pysssss": LoadText, |
| "SaveText|pysssss": SaveText, |
| } |
|
|
| NODE_DISPLAY_NAME_MAPPINGS = { |
| "LoadText|pysssss": "Load Text 🐍", |
| "SaveText|pysssss": "Save Text 🐍", |
| } |
|
|