| import os |
| import shutil |
| import subprocess |
| import sys |
| import time |
|
|
| import pytest |
|
|
| import fsspec |
| from fsspec.implementations.cached import CachingFileSystem |
|
|
|
|
| @pytest.fixture() |
| def m(): |
| """ |
| Fixture providing a memory filesystem. |
| """ |
| m = fsspec.filesystem("memory") |
| m.store.clear() |
| m.pseudo_dirs.clear() |
| m.pseudo_dirs.append("") |
| try: |
| yield m |
| finally: |
| m.store.clear() |
| m.pseudo_dirs.clear() |
| m.pseudo_dirs.append("") |
|
|
|
|
| @pytest.fixture |
| def ftp_writable(tmpdir): |
| """ |
| Fixture providing a writable FTP filesystem. |
| """ |
| pytest.importorskip("pyftpdlib") |
| from fsspec.implementations.ftp import FTPFileSystem |
|
|
| FTPFileSystem.clear_instance_cache() |
| CachingFileSystem.clear_instance_cache() |
| d = str(tmpdir) |
| with open(os.path.join(d, "out"), "wb") as f: |
| f.write(b"hello" * 10000) |
| P = subprocess.Popen( |
| [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"] |
| ) |
| try: |
| time.sleep(1) |
| yield "localhost", 2121, "user", "pass" |
| finally: |
| P.terminate() |
| P.wait() |
| try: |
| shutil.rmtree(tmpdir) |
| except Exception: |
| pass |
|
|