Buckets:
MisterAI/LocalAI_Demo_backends / cpu-diffusers.upgrade-tmp /venv /lib /python3.10 /site-packages /pexpect /utils.py
| import os | |
| import sys | |
| import stat | |
| import select | |
| import time | |
| import errno | |
| try: | |
| InterruptedError | |
| except NameError: | |
| # Alias Python2 exception to Python3 | |
| InterruptedError = select.error | |
| if sys.version_info[0] >= 3: | |
| string_types = (str,) | |
| else: | |
| string_types = (unicode, str) | |
| def is_executable_file(path): | |
| """Checks that path is an executable regular file, or a symlink towards one. | |
| This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``. | |
| """ | |
| # follow symlinks, | |
| fpath = os.path.realpath(path) | |
| if not os.path.isfile(fpath): | |
| # non-files (directories, fifo, etc.) | |
| return False | |
| mode = os.stat(fpath).st_mode | |
| if (sys.platform.startswith('sunos') | |
| and os.getuid() == 0): | |
| # When root on Solaris, os.X_OK is True for *all* files, irregardless | |
| # of their executability -- instead, any permission bit of any user, | |
| # group, or other is fine enough. | |
| # | |
| # (This may be true for other "Unix98" OS's such as HP-UX and AIX) | |
| return bool(mode & (stat.S_IXUSR | | |
| stat.S_IXGRP | | |
| stat.S_IXOTH)) | |
| return os.access(fpath, os.X_OK) | |
| def which(filename, env=None): | |
| '''This takes a given filename; tries to find it in the environment path; | |
| then checks if it is executable. This returns the full path to the filename | |
| if found and executable. Otherwise this returns None.''' | |
| # Special case where filename contains an explicit path. | |
| if os.path.dirname(filename) != '' and is_executable_file(filename): | |
| return filename | |
| if env is None: | |
| env = os.environ | |
| p = env.get('PATH') | |
| if not p: | |
| p = os.defpath | |
| pathlist = p.split(os.pathsep) | |
| for path in pathlist: | |
| ff = os.path.join(path, filename) | |
| if is_executable_file(ff): | |
| return ff | |
| return None | |
| def split_command_line(command_line): | |
| '''This splits a command line into a list of arguments. It splits arguments | |
| on spaces, but handles embedded quotes, doublequotes, and escaped | |
| characters. It's impossible to do this with a regular expression, so I | |
| wrote a little state machine to parse the command line. ''' | |
| arg_list = [] | |
| arg = '' | |
| # Constants to name the states we can be in. | |
| state_basic = 0 | |
| state_esc = 1 | |
| state_singlequote = 2 | |
| state_doublequote = 3 | |
| # The state when consuming whitespace between commands. | |
| state_whitespace = 4 | |
| state = state_basic | |
| for c in command_line: | |
| if state == state_basic or state == state_whitespace: | |
| if c == '\\': | |
| # Escape the next character | |
| state = state_esc | |
| elif c == r"'": | |
| # Handle single quote | |
| state = state_singlequote | |
| elif c == r'"': | |
| # Handle double quote | |
| state = state_doublequote | |
| elif c.isspace(): | |
| # Add arg to arg_list if we aren't in the middle of whitespace. | |
| if state == state_whitespace: | |
| # Do nothing. | |
| None | |
| else: | |
| arg_list.append(arg) | |
| arg = '' | |
| state = state_whitespace | |
| else: | |
| arg = arg + c | |
| state = state_basic | |
| elif state == state_esc: | |
| arg = arg + c | |
| state = state_basic | |
| elif state == state_singlequote: | |
| if c == r"'": | |
| state = state_basic | |
| else: | |
| arg = arg + c | |
| elif state == state_doublequote: | |
| if c == r'"': | |
| state = state_basic | |
| else: | |
| arg = arg + c | |
| if arg != '': | |
| arg_list.append(arg) | |
| return arg_list | |
| def select_ignore_interrupts(iwtd, owtd, ewtd, timeout=None): | |
| '''This is a wrapper around select.select() that ignores signals. If | |
| select.select raises a select.error exception and errno is an EINTR | |
| error then it is ignored. Mainly this is used to ignore sigwinch | |
| (terminal resize). ''' | |
| # if select() is interrupted by a signal (errno==EINTR) then | |
| # we loop back and enter the select() again. | |
| if timeout is not None: | |
| end_time = time.time() + timeout | |
| while True: | |
| try: | |
| return select.select(iwtd, owtd, ewtd, timeout) | |
| except InterruptedError: | |
| err = sys.exc_info()[1] | |
| if err.args[0] == errno.EINTR: | |
| # if we loop back we have to subtract the | |
| # amount of time we already waited. | |
| if timeout is not None: | |
| timeout = end_time - time.time() | |
| if timeout < 0: | |
| return([], [], []) | |
| else: | |
| # something else caused the select.error, so | |
| # this actually is an exception. | |
| raise | |
| def poll_ignore_interrupts(fds, timeout=None): | |
| '''Simple wrapper around poll to register file descriptors and | |
| ignore signals.''' | |
| if timeout is not None: | |
| end_time = time.time() + timeout | |
| poller = select.poll() | |
| for fd in fds: | |
| poller.register(fd, select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) | |
| while True: | |
| try: | |
| timeout_ms = None if timeout is None else timeout * 1000 | |
| results = poller.poll(timeout_ms) | |
| return [afd for afd, _ in results] | |
| except InterruptedError: | |
| err = sys.exc_info()[1] | |
| if err.args[0] == errno.EINTR: | |
| # if we loop back we have to subtract the | |
| # amount of time we already waited. | |
| if timeout is not None: | |
| timeout = end_time - time.time() | |
| if timeout < 0: | |
| return [] | |
| else: | |
| # something else caused the select.error, so | |
| # this actually is an exception. | |
| raise | |
Xet Storage Details
- Size:
- 6.02 kB
- Xet hash:
- b6bcb2f704bef8ed315218a481e2f472c98171d9c8cfc822060cd453d1f197a0
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.