import bs4
import xml.etree.ElementTree as etree
import io
+import enum
from collections import namedtuple, OrderedDict
from markdown.extensions import Extension
from markdown.blockprocessors import BlockProcessor
class File:
+
+ class Op(enum.Enum):
+ R = enum.auto()
+ W = enum.auto()
+
def __init__(self, path=None, mode="r", content=None, **kwargs):
if path:
if content:
if content and not re.search(r"[r+]", mode):
raise RuntimeError("Cannot specify content without read mode")
- self._rcache = [content] if content else []
- self._wcache = []
+ self._cache = {
+ File.Op.R: [content] if content else [],
+ File.Op.W: []
+ }
+ self._lastop = None
self._mode = mode
self._kwargs = kwargs
self._file = None
def close(self):
self._file.close()
self._file = None
- self._rcache = self._wcache
-
- def _get_rcache(self):
- return (b"" if "b" in self._mode else "").join(self._rcache)
+ self._cache[File.Op.R] = self._cache[File.Op.W]
+ self._lastop = None
- def _get_wcache(self):
- return (b"" if "b" in self._mode else "").join(self._wcache)
+ def _get_cache(self, op):
+ return (b"" if "b" in self._mode else "").join(self._cache[op])
- def _add_to_rcache(self, s):
- self._rcache.append(s)
-
- def _add_to_wcache(self, s):
- self._wcache.append(s)
+ def _add_to_cache(self, op, s):
+ self._cache[op].append(s)
def read(self, *, cache=True):
- if cache and self._rcache:
- return self._get_rcache()
+ if cache and self._cache[File.Op.R]:
+ return self._get_cache(File.Op.R)
if not self._file:
with self as f:
return f.read(cache=cache)
- self._file.seek(0)
+ if self._lastop == File.Op.W:
+ try:
+ self._file.seek(0)
+ except io.UnsupportedOperation:
+ pass
+
+ self._lastop = File.Op.R
+
if cache:
- self._add_to_rcache(self._file.read())
- return self._get_rcache()
+ self._add_to_cache(File.Op.R, self._file.read())
+ return self._get_cache(File.Op.R)
else:
return self._file.read()
def write(self, s, *, cache=True):
+
if not self._file:
with self as f:
return f.write(s, cache=cache)
- self._file.seek(0)
- self._rcache = self._wcache
+ if self._lastop == File.Op.R:
+ try:
+ self._file.seek(0)
+ except io.UnsupportedOperation:
+ pass
if cache:
- self._add_to_wcache(s)
+ self._add_to_cache(File.Op.W, s)
+
+ self._cache[File.Op.R] = self._cache[File.Op.W]
written = self._file.write(s)
self._file.flush()
+ self._lastop = File.Op.W
return written
path = property(lambda s: s._path)