+# [ FILE I/O HANDLING ] #######################################################
+
+
+class File:
+
+ class Op(enum.Enum):
+ R = enum.auto()
+ W = enum.auto()
+
+ def __init__(self, path=None, mode="r", content=None, **kwargs):
+ if path:
+ if content:
+ raise RuntimeError("Cannot specify path and content for File")
+
+ self._path = (
+ path if isinstance(path, pathlib.Path) else pathlib.Path(path)
+ )
+ else:
+ self._path = None
+
+ if content and not re.search(r"[r+]", mode):
+ raise RuntimeError("Cannot specify content without read mode")
+
+ self._cache = {
+ File.Op.R: [content] if content else [],
+ File.Op.W: []
+ }
+ self._lastop = None
+ self._mode = mode
+ self._kwargs = kwargs
+ self._file = None
+
+ def open(self):
+ if self._path:
+ self._file = open(self._path, self._mode, **self._kwargs)
+ elif "b" in self._mode:
+ self._file = io.BytesIO()
+ else:
+ self._file = io.StringIO()
+
+ def __enter__(self):
+ self.open()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def close(self):
+ self._file.close()
+ self._file = None
+ self._cache[File.Op.R] = self._cache[File.Op.W]
+ self._lastop = None
+
+ def _get_cache(self, op):
+ return (b"" if "b" in self._mode else "").join(self._cache[op])
+
+ def _add_to_cache(self, op, s):
+ self._cache[op].append(s)
+
+ def read(self, *, cache=True):
+ if cache and self._cache[File.Op.R]:
+ return self._get_cache(File.Op.R)
+
+ if not self._file:
+ with self as f:
+ return f.read(cache=cache)
+
+ if self._lastop == File.Op.W:
+ try:
+ self._file.seek(0)
+ except io.UnsupportedOperation:
+ pass
+
+ self._lastop = File.Op.R
+
+ if cache:
+ self._add_to_cache(File.Op.R, self._file.read())
+ return self._get_cache(File.Op.R)
+ else:
+ return self._file.read()
+
+ def write(self, s, *, cache=True):
+
+ if not self._file:
+ with self as f:
+ return f.write(s, cache=cache)
+
+ if self._lastop == File.Op.R:
+ try:
+ self._file.seek(0)
+ except io.UnsupportedOperation:
+ pass
+
+ if cache:
+ self._add_to_cache(File.Op.W, s)
+
+ self._cache[File.Op.R] = self._cache[File.Op.W]
+
+ written = self._file.write(s)
+ self._file.flush()
+ self._lastop = File.Op.W
+ return written
+
+ path = property(lambda s: s._path)
+
+ def __repr__(self):
+ return (
+ f'<File path={self._path or "(buffered)"} open={bool(self._file)} '
+ f"rcache={sum(len(c) for c in self._rcache) if self._rcache is not None else False} "
+ f"wcache={sum(len(c) for c in self._wcache) if self._wcache is not None else False}>"
+ )
+
+
+class FileFactory:
+ def __init__(self):
+ self._files = []
+
+ def __call__(self, path=None, mode="r", content=None, **kwargs):
+ f = File(path, mode, content, **kwargs)
+ self._files.append(f)
+ return f
+
+ def __len__(self):
+ return self._files.__len__()
+
+ def pop(self, idx=-1):
+ return self._files.pop(idx)
+
+ def __getitem__(self, idx):
+ return self._files.__getitem__(idx)
+
+ def __contains__(self, f):
+ return self._files.__contains__(f)
+
+
+class FakeFileFactory(FileFactory):
+ def __init__(self):
+ super().__init__()
+ self._paths2files = OrderedDict()
+
+ def __call__(self, path=None, mode="r", content=None, **kwargs):
+ if path in self._paths2files:
+ return self._paths2files[path]
+
+ f = super().__call__(None, mode, content, **kwargs)
+ self._paths2files[path] = f
+
+ mypath = path
+
+ class FakeFile(File):
+ path = mypath
+
+ # this is quality Python! We do this so that the fake file, which has
+ # no path, fake-pretends to have a path for testing purposes.
+
+ f.__class__ = FakeFile
+ return f
+
+ def __getitem__(self, path):
+ return self._paths2files.__getitem__(path)
+
+ def get(self, path, default):
+ return self._paths2files.get(path, default)
+
+ def pop(self, last=True):
+ return self._paths2files.popitem(last)
+
+ def __repr__(self):
+ return (
+ f"<FakeFileFactory nfiles={len(self._files)} "
+ f"paths={len(self._paths2files)}>"
+ )
+
+