X-Git-Url: https://git.madduck.net/etc/neomutt.git/blobdiff_plain/601d89eda27a3846c20c76805cb22dedce62a77b..34926144a6ff1605e0449d90eaccf80c4dfddfe7:/.config/neomutt/buildmimetree.py diff --git a/.config/neomutt/buildmimetree.py b/.config/neomutt/buildmimetree.py index a85ceb6..a27f64c 100755 --- a/.config/neomutt/buildmimetree.py +++ b/.config/neomutt/buildmimetree.py @@ -6,9 +6,12 @@ # # Configuration: # neomuttrc (needs to be a single line): +# set my_mdwn_extensions="extra,admonition,codehilite,sane_lists,smarty" # macro compose B "\ -# source '$my_confdir/buildmimetree.py setup|'\ -# sourc e \$my_mdwn_postprocess_cmd_file\ +# source '$my_confdir/buildmimetree.py \ +# --tempdir $tempdir --extensions $my_mdwn_extensions \ +# --css-file $my_confdir/htmlmail.css |'\ +# source \$my_mdwn_postprocess_cmd_file\ # " "Convert message into a modern MIME tree with inline images" # # (Yes, we need to call source twice, as mutt only starts to process output @@ -19,28 +22,44 @@ # Requirements: # - python3 # - python3-markdown +# - python3-beautifulsoup4 # Optional: # - pytest -# - Pynliner -# - Pygments, if installed, then syntax highlighting is enabled +# - Pynliner, provides --css-file and thus inline styling of HTML output +# - Pygments, then syntax highlighting for fenced code is enabled +# +# Running tests: +# pytest -x buildmimetree.py # # Latest version: # https://git.madduck.net/etc/neomutt.git/blob_plain/HEAD:/.config/neomutt/buildmimetree.py # -# Copyright © 2023 martin f. krafft -# Released under the GPL-2+ licence, just like Mutt itself. +# Copyright © 2023–24 martin f. krafft +# Released under the GPL-2+ licence, just like NeoMutt itself. # import sys +import os.path import pathlib import markdown import tempfile import argparse import re import mimetypes +import bs4 +import xml.etree.ElementTree as etree +import io +import enum +import warnings +from contextlib import contextmanager from collections import namedtuple, OrderedDict from markdown.extensions import Extension -from markdown.inlinepatterns import ImageInlineProcessor, IMAGE_LINK_RE +from markdown.blockprocessors import BlockProcessor +from markdown.inlinepatterns import ( + SimpleTextInlineProcessor, + ImageInlineProcessor, + IMAGE_LINK_RE, +) from email.utils import make_msgid from urllib import request @@ -53,74 +72,278 @@ def parse_cli_args(*args, **kwargs): ) ) parser.epilog = ( - "Copyright © 2022 martin f. krafft .\n" + "Copyright © 2023-24 martin f. krafft .\n" "Released under the MIT licence" ) - subp = parser.add_subparsers(help="Sub-command parsers", dest="mode") - parser_setup = subp.add_parser("setup", help="Setup phase") - parser_massage = subp.add_parser("massage", help="Massaging phase") + parser.add_argument( + "--extensions", + metavar="EXT[,EXT[,EXT]]", + type=str, + default="", + help="Markdown extension to use (comma-separated list)", + ) - parser_setup.add_argument( - "--debug-commands", + if _PYNLINER: + parser.add_argument( + "--css-file", + metavar="FILE", + type=pathlib.Path, + default=os.devnull, + help="CSS file to merge with the final HTML", + ) + else: + parser.set_defaults(css_file=None) + + parser.add_argument( + "--related-to-html-only", action="store_true", - help="Turn on debug logging of commands generated to stderr", + help="Make related content be sibling to HTML parts only", ) - parser_setup.add_argument( - "--extension", - "-x", - metavar="EXTENSION", - dest="extensions", - nargs="?", - default=[], - action="append", - help="Markdown extension to add to the list of extensions use", + def positive_integer(value): + try: + if int(value) > 0: + return int(value) + + except ValueError: + pass + + raise ValueError("Must be a positive integer") + + parser.add_argument( + "--max-number-other-attachments", + metavar="INTEGER", + type=positive_integer, + default=20, + help="Maximum number of other attachments to expect", ) - parser_setup.add_argument( - "--send-message", + parser.add_argument( + "--only-build", + "--just-build", action="store_true", - help="Generate command(s) to send the message after processing", + help="Only build, don't send the message", + ) + + parser.add_argument( + "--domain", + help="Domain to use in content IDs", + ) + + parser.add_argument( + "--tempdir", + metavar="DIR", + type=pathlib.Path, + help="Specify temporary directory to use for attachments", ) - parser_massage.add_argument( + parser.add_argument( "--debug-commands", action="store_true", help="Turn on debug logging of commands generated to stderr", ) - parser_massage.add_argument( + parser.add_argument( "--debug-walk", action="store_true", help="Turn on debugging to stderr of the MIME tree walk", ) - parser_massage.add_argument( - "--extensions", - metavar="EXTENSIONS", - type=str, - default="", - help="Markdown extension to use (comma-separated list)", + parser.add_argument( + "--dump-html", + metavar="FILE", + type=pathlib.Path, + help="Write the generated HTML to the file", + ) + + subp = parser.add_subparsers(help="Sub-command parsers", dest="mode") + massage_p = subp.add_parser( + "massage", help="Massaging phase (internal use)" ) - parser_massage.add_argument( + massage_p.add_argument( "--write-commands-to", - metavar="PATH", + "-o", + metavar="FILE", dest="cmdpath", + type=pathlib.Path, + required=True, help="Temporary file path to write commands to", ) - parser_massage.add_argument( + massage_p.add_argument( "MAILDRAFT", nargs="?", + type=pathlib.Path, help="If provided, the script is invoked as editor on the mail draft", ) return parser.parse_args(*args, **kwargs) -# [ MARKDOWN WRAPPING ] ####################################################### +# [ FILE I/O HANDLING ] ####################################################### + + +class File: + class Op(enum.Enum): + R = enum.auto() + W = enum.auto() + + def __init__(self, path=None, mode="r", content=None, **kwargs): + if path: + if content: + raise RuntimeError("Cannot specify path and content for File") + + self._path = ( + path if isinstance(path, pathlib.Path) else pathlib.Path(path) + ) + else: + self._path = None + + if content and not re.search(r"[r+]", mode): + raise RuntimeError("Cannot specify content without read mode") + + self._cache = {File.Op.R: [content] if content else [], File.Op.W: []} + self._lastop = None + self._mode = mode + self._kwargs = kwargs + self._file = None + + def open(self): + if self._path: + self._file = open(self._path, self._mode, **self._kwargs) + elif "b" in self._mode: + self._file = io.BytesIO() + else: + self._file = io.StringIO() + + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + self._file.close() + self._file = None + self._cache[File.Op.R] = self._cache[File.Op.W] + self._lastop = None + + def _get_cache(self, op): + return (b"" if "b" in self._mode else "").join(self._cache[op]) + + def _add_to_cache(self, op, s): + self._cache[op].append(s) + + def read(self, *, cache=True): + if cache and self._cache[File.Op.R]: + return self._get_cache(File.Op.R) + + if self._lastop == File.Op.W: + try: + self._file.seek(0) + except io.UnsupportedOperation: + pass + + self._lastop = File.Op.R + + if cache: + self._add_to_cache(File.Op.R, self._file.read()) + return self._get_cache(File.Op.R) + else: + return self._file.read() + + def write(self, s, *, cache=True): + if self._lastop == File.Op.R: + try: + self._file.seek(0) + except io.UnsupportedOperation: + pass + + if cache: + self._add_to_cache(File.Op.W, s) + + self._cache[File.Op.R] = self._cache[File.Op.W] + + written = self._file.write(s) + self._file.flush() + self._lastop = File.Op.W + return written + + path = property(lambda s: s._path) + + def __repr__(self): + return ( + f'" + ) + + +class FileFactory: + def __init__(self): + self._files = [] + + def __call__(self, path=None, mode="r", content=None, **kwargs): + f = File(path, mode, content, **kwargs) + self._files.append(f) + return f + + def __len__(self): + return self._files.__len__() + + def pop(self, idx=-1): + return self._files.pop(idx) + + def __getitem__(self, idx): + return self._files.__getitem__(idx) + + def __contains__(self, f): + return self._files.__contains__(f) + + +class FakeFileFactory(FileFactory): + def __init__(self): + super().__init__() + self._paths2files = OrderedDict() + + def __call__(self, path=None, mode="r", content=None, **kwargs): + if path in self._paths2files: + return self._paths2files[path] + + f = super().__call__(None, mode, content, **kwargs) + self._paths2files[path] = f + + mypath = path + + class FakeFile(File): + path = mypath + + # this is quality Python! We do this so that the fake file, which has + # no path, fake-pretends to have a path for testing purposes. + + f.__class__ = FakeFile + return f + + def __getitem__(self, path): + return self._paths2files.__getitem__(path) + + def get(self, path, default): + return self._paths2files.get(path, default) + + def pop(self, last=True): + return self._paths2files.popitem(last) + + def __repr__(self): + return ( + f"" + ) + + +# [ IMAGE HANDLING ] ########################################################## InlineImageInfo = namedtuple( @@ -128,11 +351,43 @@ InlineImageInfo = namedtuple( ) +class ImageRegistry: + def __init__(self): + self._images = OrderedDict() + + def register(self, path, description=None, *, domain=None): + # path = str(pathlib.Path(path).expanduser()) + path = os.path.expanduser(path) + if path.startswith("/"): + path = f"file://{path}" + cid = make_msgid(domain=domain)[1:-1] + self._images[path] = InlineImageInfo(cid, description) + return cid + + def __iter__(self): + return self._images.__iter__() + + def __getitem__(self, idx): + return self._images.__getitem__(idx) + + def __len__(self): + return self._images.__len__() + + def items(self): + return self._images.items() + + def __repr__(self): + return f"" + + def __str__(self): + return self._images.__str__() + + class InlineImageExtension(Extension): class RelatedImageInlineProcessor(ImageInlineProcessor): - def __init__(self, re, md, ext): + def __init__(self, re, md, registry): super().__init__(re, md) - self._ext = ext + self._registry = registry def handleMatch(self, m, data): el, start, end = super().handleMatch(m, data) @@ -140,52 +395,175 @@ class InlineImageExtension(Extension): src = el.attrib["src"] if "://" not in src or src.startswith("file://"): # We only inline local content - cid = self._ext.get_cid_for_image(el.attrib) + cid = self._registry.register( + el.attrib["src"], + el.attrib.get("title", el.attrib.get("alt")), + ) el.attrib["src"] = f"cid:{cid}" return el, start, end - def __init__(self): + def __init__(self, registry): super().__init__() - self._images = OrderedDict() + self._image_registry = registry + + INLINE_PATTERN_NAME = "image_link" def extendMarkdown(self, md): md.registerExtension(self) inline_image_proc = self.RelatedImageInlineProcessor( - IMAGE_LINK_RE, md, self + IMAGE_LINK_RE, md, self._image_registry ) - md.inlinePatterns.register(inline_image_proc, "image_link", 150) - - def get_cid_for_image(self, attrib): - msgid = make_msgid()[1:-1] - path = attrib["src"] - if path.startswith("/"): - path = f"file://{path}" - self._images[path] = InlineImageInfo( - msgid, attrib.get("title", attrib.get("alt")) + md.inlinePatterns.register( + inline_image_proc, InlineImageExtension.INLINE_PATTERN_NAME, 150 ) - return msgid - - def get_images(self): - return self._images -def markdown_with_inline_image_support(text, *, extensions=None): - inline_image_handler = InlineImageExtension() +def markdown_with_inline_image_support( + text, + *, + mdwn=None, + image_registry=None, + extensions=None, + extension_configs=None, +): + registry = ( + image_registry if image_registry is not None else ImageRegistry() + ) + inline_image_handler = InlineImageExtension(registry=registry) extensions = extensions or [] extensions.append(inline_image_handler) - mdwn = markdown.Markdown(extensions=extensions) - htmltext = mdwn.convert(text) + mdwn = markdown.Markdown( + extensions=extensions, extension_configs=extension_configs + ) - images = inline_image_handler.get_images() + htmltext = mdwn.convert(text) def replace_image_with_cid(matchobj): for m in (matchobj.group(1), f"file://{matchobj.group(1)}"): - if m in images: - return f"(cid:{images[m].cid}" + if m in registry: + return f"(cid:{registry[m].cid}" return matchobj.group(0) text = re.sub(r"\(([^)\s]+)", replace_image_with_cid, text) - return text, htmltext, images + return text, htmltext, registry, mdwn + + +# [ CSS STYLING ] ############################################################# + + +try: + with warnings.catch_warnings(): + # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=1081037 + warnings.filterwarnings("ignore", category=SyntaxWarning) + import pynliner + + _PYNLINER = True + +except ImportError: + _PYNLINER = False + +try: + from pygments.formatters import get_formatter_by_name + + _CODEHILITE_CLASS = "codehilite" + + _PYGMENTS_CSS = get_formatter_by_name( + "html", style="default" + ).get_style_defs(f".{_CODEHILITE_CLASS}") + +except ImportError: + _PYGMENTS_CSS = None + + +def apply_styling(html, css): + return ( + pynliner.Pynliner() + .from_string(html) + .with_cssString("\n".join(s for s in [_PYGMENTS_CSS, css] if s)) + .run() + ) + + +# [ FORMAT=FLOWED HANDLING ] ################################################## + + +class FormatFlowedNewlineExtension(Extension): + FFNL_RE = r"(?!\S)(\s)\n" + + def extendMarkdown(self, md): + ffnl = SimpleTextInlineProcessor(self.FFNL_RE) + md.inlinePatterns.register(ffnl, "ffnl", 125) + + +# [ QUOTE HANDLING ] ########################################################## + + +class QuoteToAdmonitionExtension(Extension): + class BlockProcessor(BlockProcessor): + RE = re.compile(r"(?:^|\n)>\s*(.*)") + + def __init__(self, parser): + super().__init__(parser) + self._title = None + self._disable = False + + def test(self, parent, blocks): + if self._disable: + return False + + if markdown.util.nearing_recursion_limit(): + return False + + lines = blocks.splitlines() + if len(lines) < 2: + if not self._title: + return False + + elif not self.RE.search(lines[0]): + return False + + return len(lines) > 0 + + elif not self.RE.search(lines[0]) and self.RE.search(lines[1]): + return True + + elif self._title and self.RE.search(lines[1]): + return True + + return False + + def run(self, parent, blocks): + quotelines = blocks.pop(0).splitlines() + + cont = bool(self._title) + if not self.RE.search(quotelines[0]): + self._title = quotelines.pop(0) + + admonition = etree.SubElement(parent, "div") + admonition.set( + "class", f"admonition quote{' continued' if cont else ''}" + ) + self.parser.parseChunk(admonition, self._title) + + admonition[0].set("class", "admonition-title") + with self.disable(): + self.parser.parseChunk(admonition, "\n".join(quotelines)) + + @contextmanager + def disable(self): + self._disable = True + yield True + self._disable = False + + @classmethod + def clean(klass, line): + m = klass.RE.match(line) + return m.group(1) if m else line + + def extendMarkdown(self, md): + md.registerExtension(self) + email_quote_proc = self.BlockProcessor(md.parser) + md.parser.blockprocessors.register(email_quote_proc, "emailquote", 25) # [ PARTS GENERATION ] ######################################################## @@ -213,17 +591,18 @@ class Multipart( def __str__(self): return f" children={len(self.children)}" - -def filewriter_fn(path, content, mode="w", **kwargs): - with open(path, mode, **kwargs) as out_f: - out_f.write(content) + def __hash__(self): + return hash(str(self.subtype) + "".join(str(self.children))) def collect_inline_images( - images, *, tempdir=None, filewriter_fn=filewriter_fn + image_registry, *, tempdir=None, filefactory=FileFactory() ): relparts = [] - for path, info in images.items(): + for path, info in image_registry.items(): + if path.startswith("cid:"): + continue + data = request.urlopen(path) mimetype = data.headers["Content-Type"] @@ -231,98 +610,282 @@ def collect_inline_images( tempfilename = tempfile.mkstemp(prefix="img", suffix=ext, dir=tempdir) path = pathlib.Path(tempfilename[1]) - filewriter_fn(path, data.read(), "w+b") + with filefactory(path, "w+b") as out_f: + out_f.write(data.read()) + + # filewriter_fn(path, data.read(), "w+b") + desc = ( + f'Inline image: "{info.desc}"' + if info.desc + else f"Inline image {str(len(relparts)+1)}" + ) relparts.append( - Part(*mimetype.split("/"), path, cid=info.cid, desc=info.desc) + Part(*mimetype.split("/"), path, cid=info.cid, desc=desc) ) return relparts +EMAIL_SIG_SEP = "\n-- \n" +HTML_SIG_MARKER = "=htmlsig " + + +def make_html_doc(body, sig=None): + ret = ( + "\n" + "\n" + "\n" + '\n' # noqa: E501 + '\n' # noqa: E501 + "\n" + "\n" + f"{body}\n" + ) + + if sig: + nl = "\n" + ret = ( + f'{ret}
{EMAIL_SIG_SEP.strip(nl)}\n' # noqa: E501 + f"{sig}\n" + "
" + ) + + return f"{ret}\n \n" + + +def make_text_mail(text, sig=None): + return EMAIL_SIG_SEP.join((text, sig)) if sig else text + + +def extract_signature(text, *, filefactory=FileFactory()): + parts = text.split(EMAIL_SIG_SEP, 1) + if len(parts) == 1: + return text, None, None + + lines = parts[1].splitlines() + if lines[0].startswith(HTML_SIG_MARKER): + path = pathlib.Path(re.split(r" +", lines.pop(0), maxsplit=1)[1]) + textsig = "\n".join(lines) + + with filefactory(path.expanduser()) as sig_f: + sig_input = sig_f.read() + + soup = bs4.BeautifulSoup(sig_input, "html.parser") + + style = str(soup.style.extract()) if soup.style else "" + for sig_selector in ( + "#signature", + "#signatur", + "#emailsig", + ".signature", + ".signatur", + ".emailsig", + "body", + "div", + ): + sig = soup.select_one(sig_selector) + if sig: + break + + if not sig: + return parts[0], textsig, style + sig_input + + if sig.attrs.get("id") == "signature": + sig = "".join(str(c) for c in sig.children) + + return parts[0], textsig, style + str(sig) + + return parts[0], parts[1], None + + def convert_markdown_to_html( - origtext, draftpath, *, filewriter_fn=filewriter_fn, extensions=None + draft_f, + *, + related_to_html_only=False, + css_f=None, + htmldump_f=None, + filefactory=FileFactory(), + tempdir=None, + extensions=None, + extension_configs=None, + domain=None, ): - origtext, htmltext, images = markdown_with_inline_image_support( - origtext, extensions=extensions + # TODO extension_configs need to be handled differently + extension_configs = extension_configs or {} + extension_configs.setdefault("pymdownx.highlight", {})[ + "css_class" + ] = _CODEHILITE_CLASS + + extensions = extensions or [] + extensions.append(FormatFlowedNewlineExtension()) + extensions.append(QuoteToAdmonitionExtension()) + + draft = draft_f.read() + origtext, textsig, htmlsig = extract_signature( + draft, filefactory=filefactory ) - filewriter_fn(draftpath, origtext, encoding="utf-8") - textpart = Part( - "text", "plain", draftpath, "Plain-text version", orig=True + ( + origtext, + htmltext, + image_registry, + mdwn, + ) = markdown_with_inline_image_support( + origtext, extensions=extensions, extension_configs=extension_configs ) - htmlpath = draftpath.with_suffix(".html") - filewriter_fn( - htmlpath, htmltext, encoding="utf-8", errors="xmlcharrefreplace" + if htmlsig: + if not textsig: + # TODO: decide what to do if there is no plain-text version + raise NotImplementedError("HTML signature but no text alternative") + + soup = bs4.BeautifulSoup(htmlsig, "html.parser") + for img in soup.find_all("img"): + uri = img.attrs["src"] + desc = img.attrs.get("title", img.attrs.get("alt")) + cid = image_registry.register(uri, desc, domain=domain) + img.attrs["src"] = f"cid:{cid}" + + htmlsig = str(soup) + + elif textsig: + ( + textsig, + htmlsig, + image_registry, + mdwn, + ) = markdown_with_inline_image_support( + textsig, + extensions=extensions, + extension_configs=extension_configs, + image_registry=image_registry, + mdwn=mdwn, + ) + + origtext = make_text_mail(origtext, textsig) + draft_f.write(origtext) + textpart = Part( + "text", "plain", draft_f.path, "Plain-text version", orig=True ) + + htmltext = make_html_doc(htmltext, htmlsig) + htmltext = apply_styling(htmltext, css_f.read() if css_f else None) + + if draft_f.path: + htmlpath = draft_f.path.with_suffix(".html") + else: + htmlpath = pathlib.Path( + tempfile.mkstemp(suffix=".html", dir=tempdir)[1] + ) + with filefactory( + htmlpath, "w", encoding="utf-8", errors="xmlcharrefreplace" + ) as out_f: + out_f.write(htmltext) htmlpart = Part("text", "html", htmlpath, "HTML version") - altpart = Multipart( - "alternative", [textpart, htmlpart], "Group of alternative content" + if htmldump_f: + htmldump_f.write(htmltext) + + imgparts = collect_inline_images( + image_registry, tempdir=tempdir, filefactory=filefactory ) - imgparts = collect_inline_images(images, filewriter_fn=filewriter_fn) - if imgparts: + if related_to_html_only: + # If there are inline image part, they will be contained within a + # multipart/related part along with the HTML part only + if imgparts: + # replace htmlpart with a multipart/related container of the HTML + # parts and the images + htmlpart = Multipart( + "relative", [htmlpart] + imgparts, "Group of related content" + ) + return Multipart( - "relative", [altpart] + imgparts, "Group of related content" + "alternative", [textpart, htmlpart], "Group of alternative content" ) + else: - return altpart + # If there are inline image part, they will be siblings to the + # multipart/alternative tree within a multipart/related part + altpart = Multipart( + "alternative", [textpart, htmlpart], "Group of alternative content" + ) + if imgparts: + return Multipart( + "relative", [altpart] + imgparts, "Group of related content" + ) + else: + return altpart class MIMETreeDFWalker: def __init__(self, *, visitor_fn=None, debug=False): - self._visitor_fn = visitor_fn + self._visitor_fn = visitor_fn or self._echovisit self._debug = debug + def _echovisit(self, node, ancestry, debugprint): + debugprint(f"node={node} ancestry={ancestry}") + def walk(self, root, *, visitor_fn=None): """ Recursive function to implement a depth-dirst walk of the MIME-tree rooted at `root`. """ - if isinstance(root, list): - root = Multipart("mixed", children=root) + if len(root) > 1: + root = Multipart("mixed", children=root) + else: + root = root[0] self._walk( root, - stack=[], + ancestry=[], + descendents=[], visitor_fn=visitor_fn or self._visitor_fn, ) - def _walk(self, node, *, stack, visitor_fn): + def _walk(self, node, *, ancestry, descendents, visitor_fn): # Let's start by enumerating the parts at the current level. At the - # root level, stack will be the empty list, and we expect a multipart/* - # container at this level. Later, e.g. within a mutlipart/alternative - # container, the subtree will just be the alternative parts, while the - # top of the stack will be the multipart/alternative container, which - # we will process after the following loop. - - lead = f"{'| '*len(stack)}|-" + # root level, ancestry will be the empty list, and we expect a + # multipart/* container at this level. Later, e.g. within a + # mutlipart/alternative container, the subtree will just be the + # alternative parts, while the top of the ancestry will be the + # multipart/alternative container, which we will process after the + # following loop. + + lead = f"{'│ '*len(ancestry)}" if isinstance(node, Multipart): self.debugprint( - f"{lead}{node} parents={[s.subtype for s in stack]}" + f"{lead}├{node} ancestry={[s.subtype for s in ancestry]}" ) - # Depth-first, so push the current container onto the stack, - # then descend … - stack.append(node) - self.debugprint("| " * (len(stack) + 1)) + # Depth-first, so push the current container onto the ancestry + # stack, then descend … + ancestry.append(node) + self.debugprint(lead + "│ " * 2) for child in node.children: self._walk( child, - stack=stack, + ancestry=ancestry, + descendents=descendents, visitor_fn=visitor_fn, ) - self.debugprint("| " * len(stack)) - assert stack.pop() == node + assert ancestry.pop() == node + sibling_descendents = descendents + descendents.extend(node.children) else: - self.debugprint(f"{lead}{node}") + self.debugprint(f"{lead}├{node}") + sibling_descendents = descendents + + if False and ancestry: + self.debugprint(lead[:-1] + " │") if visitor_fn: - visitor_fn(node, stack, debugprint=self.debugprint) + visitor_fn( + node, ancestry, sibling_descendents, debugprint=self.debugprint + ) def debugprint(self, s, **kwargs): if self._debug: @@ -360,7 +923,7 @@ class MuttCommands: self._cmd1.append(s) def push(self, s): - s = s.replace('"', '"') + s = s.replace('"', r"\"") s = f'push "{s}"' self.debugprint(s) self._push.insert(0, s) @@ -377,19 +940,18 @@ class MuttCommands: def do_setup( - extensions=None, *, out_f=sys.stdout, temppath=None, debug_commands=False + *, + out_f=sys.stdout, + temppath=None, + tempdir=None, + debug_commands=False, ): - extensions = extensions or [] temppath = temppath or pathlib.Path( - tempfile.mkstemp(prefix="muttmdwn-")[1] + tempfile.mkstemp(prefix="muttmdwn-", dir=tempdir)[1] ) cmds = MuttCommands(out_f, debug=debug_commands) - editor = f"{sys.argv[0]} massage --write-commands-to {temppath}" - if extensions: - editor = f'{editor} --extensions {",".join(extensions)}' - if debug_commands: - editor = f"{editor} --debug-commands" + editor = f"{' '.join(sys.argv)} massage --write-commands-to {temppath}" cmds.cmd('set my_editor="$editor"') cmds.cmd('set my_edit_headers="$edit_headers"') @@ -402,11 +964,17 @@ def do_setup( def do_massage( draft_f, - draftpath, cmd_f, *, extensions=None, + css_f=None, + htmldump_f=None, converter=convert_markdown_to_html, + related_to_html_only=True, + only_build=False, + max_other_attachments=20, + tempdir=None, + domain=None, debug_commands=False, debug_walk=False, ): @@ -419,26 +987,30 @@ def do_massage( # variable used to identify the command file we're currently writing # to. cmds = MuttCommands(cmd_f, debug=debug_commands) - cmds.cmd('set editor="$my_editor"') - cmds.cmd('set edit_headers="$my_edit_headers"') - cmds.cmd("unset my_editor") - cmds.cmd("unset my_edit_headers") - - # let's flush those commands, as there'll be a lot of pushes from now - # on, which need to be run in reverse order - cmds.flush() extensions = extensions.split(",") if extensions else [] - tree = converter(draft_f.read(), draftpath, extensions=extensions) + tree = converter( + draft_f, + css_f=css_f, + htmldump_f=htmldump_f, + related_to_html_only=related_to_html_only, + tempdir=tempdir, + extensions=extensions, + domain=domain, + ) mimetree = MIMETreeDFWalker(debug=debug_walk) - def visitor_fn(item, stack, *, debugprint=None): + state = dict(pos=1, tags={}, parts=1) + + def visitor_fn(item, ancestry, descendents, *, debugprint=None): """ Visitor function called for every node (part) of the MIME tree, depth-first, and responsible for telling NeoMutt how to assemble the tree. """ + KILL_LINE = r"\Ca\Ck" + if isinstance(item, Part): # We've hit a leaf-node, i.e. an alternative or a related part # with actual content. @@ -448,44 +1020,101 @@ def do_massage( # The original source already exists in the NeoMutt tree, but # the underlying file may have been modified, so we need to # update the encoding, but that's it: + cmds.push("") cmds.push("") + + # We really just need to be able to assume that at this point, + # NeoMutt is at position 1, and that we've processed only this + # part so far. Nevermind about actual attachments, we can + # safely ignore those as they stay at the end. + assert state["pos"] == 1 + assert state["parts"] == 1 else: # … whereas all other parts need to be added, and they're all # considered to be temporary and inline: cmds.push(f"{item.path}") cmds.push("") + # This added a part at the end of the list of parts, and that's + # just how many parts we've seen so far, so it's position in + # the NeoMutt compose list is the count of parts + state["parts"] += 1 + state["pos"] = state["parts"] + # If the item (including the original) comes with additional # information, then we might just as well update the NeoMutt # tree now: if item.cid: - cmds.push(f"\\Ca\\Ck{item.cid}") + cmds.push(f"{KILL_LINE}{item.cid}") + + # Now for the biggest hack in this script, which is to handle + # attachments, such as PDFs, that aren't related or alternatives. + # The problem is that when we add an inline image, it always gets + # appended to the list, i.e. inserted *after* other attachments. + # Since we don't know the number of attachments, we also cannot + # infer the postition of the new attachment. Therefore, we bubble + # it all the way to the top, only to then move it down again: + if state["pos"] > 1: # skip for the first part + for i in range(max_other_attachments): + # could use any number here, but has to be larger than the + # number of possible attachments. The performance + # difference of using a high number is negligible. + # Bubble up the new part + cmds.push("") + + # As we push the part to the right position in the list (i.e. + # the last of the subset of attachments this script added), we + # must handle the situation that subtrees are skipped by + # NeoMutt. Hence, the actual number of positions to move down + # is decremented by the number of descendents so far + # encountered. + for i in range(1, state["pos"] - len(descendents)): + cmds.push("") elif isinstance(item, Multipart): # This node has children, but we already visited them (see - # above), and so they have been tagged in NeoMutt's compose - # window. Now it's just a matter of telling NeoMutt to do the - # appropriate grouping: + # above). The tags dictionary of State should contain a list of + # their positions in the NeoMutt compose window, so iterate those + # and tag the parts there: + n_tags = len(state["tags"][item]) + for tag in state["tags"][item]: + cmds.push(f"{tag}") + if item.subtype == "alternative": cmds.push("") - elif item.subtype == "relative": + elif item.subtype in ("relative", "related"): cmds.push("") elif item.subtype == "multilingual": cmds.push("") + else: + raise NotImplementedError( + f"Handling of multipart/{item.subtype} is not implemented" + ) + + state["pos"] -= n_tags - 1 + state["parts"] += 1 else: # We should never get here - assert not "is valid part" + raise RuntimeError(f"Type {type(item)} is unexpected: {item}") # If the item has a description, we might just as well add it if item.desc: - cmds.push(f"\\Ca\\Ck{item.desc}") - - # Finally, if we're at non-root level, tag the new container, - # as it might itself be part of a container, to be processed - # one level up: - if stack: - cmds.push("") + cmds.push(f"{KILL_LINE}{item.desc}") + + if ancestry: + # If there's an ancestry, record the current (assumed) position in + # the NeoMutt compose window as needed-to-tag by our direct parent + # (i.e. the last item of the ancestry) + state["tags"].setdefault(ancestry[-1], []).append(state["pos"]) + + lead = "│ " * (len(ancestry) + 1) + "* " + debugprint( + f"{lead}ancestry={[a.subtype for a in ancestry]}\n" + f"{lead}descendents={[d.subtype for d in descendents]}\n" + f"{lead}children_positions={state['tags'][ancestry[-1]]}\n" + f"{lead}pos={state['pos']}, parts={state['parts']}" + ) # ----------------- # End of visitor_fn @@ -494,6 +1123,9 @@ def do_massage( # function mimetree.walk(tree, visitor_fn=visitor_fn) + if not only_build: + cmds.push("") + # Finally, cleanup. Since we're responsible for removing the temporary # file, how's this for a little hack? try: @@ -501,6 +1133,10 @@ def do_massage( except AttributeError: filename = "pytest_internal_file" cmds.cmd(f"source 'rm -f {filename}|'") + cmds.cmd('set editor="$my_editor"') + cmds.cmd('set edit_headers="$my_edit_headers"') + cmds.cmd("unset my_editor") + cmds.cmd("unset my_edit_headers") cmds.cmd("unset my_mdwn_postprocess_cmd_file") cmds.flush() @@ -510,21 +1146,30 @@ def do_massage( if __name__ == "__main__": args = parse_cli_args() - if args.mode == "setup": - if args.send_message: - raise NotImplementedError() - - do_setup(args.extensions, debug_commands=args.debug_commands) + if args.mode is None: + do_setup( + tempdir=args.tempdir, + debug_commands=args.debug_commands, + ) elif args.mode == "massage": - with open(args.MAILDRAFT, "r") as draft_f, open( - args.cmdpath, "w" - ) as cmd_f: + with ( + File(args.MAILDRAFT, "r+") as draft_f, + File(args.cmdpath, "w") as cmd_f, + File(args.css_file, "r") as css_f, + File(args.dump_html, "w") as htmldump_f, + ): do_massage( draft_f, - pathlib.Path(args.MAILDRAFT), cmd_f, extensions=args.extensions, + css_f=css_f, + htmldump_f=htmldump_f, + related_to_html_only=args.related_to_html_only, + max_other_attachments=args.max_number_other_attachments, + only_build=args.only_build, + tempdir=args.tempdir, + domain=args.domain, debug_commands=args.debug_commands, debug_walk=args.debug_walk, ) @@ -534,20 +1179,28 @@ if __name__ == "__main__": try: import pytest - from io import StringIO class Tests: @pytest.fixture def const1(self): - return "CONSTANT STRING 1" + return "Curvature Vest Usher Dividing+T#iceps Senior" @pytest.fixture def const2(self): - return "CONSTANT STRING 2" + return "Habitant Celestial 2litzy Resurf/ce Headpiece Harmonics" + + @pytest.fixture + def fakepath(self): + return pathlib.Path("/does/not/exist") + + @pytest.fixture + def fakepath2(self): + return pathlib.Path("/does/not/exist/either") # NOTE: tests using the capsys fixture must specify sys.stdout to the # functions they call, else old stdout is used and not captured + @pytest.mark.muttctrl def test_MuttCommands_cmd(self, const1, const2, capsys): "Assert order of commands" cmds = MuttCommands(out_f=sys.stdout) @@ -557,6 +1210,7 @@ try: captured = capsys.readouterr() assert captured.out == "\n".join((const1, const2, "")) + @pytest.mark.muttctrl def test_MuttCommands_push(self, const1, const2, capsys): "Assert reverse order of pushes" cmds = MuttCommands(out_f=sys.stdout) @@ -569,6 +1223,15 @@ try: == ('"\npush "'.join(("", const2, const1, "")))[2:-6] ) + @pytest.mark.muttctrl + def test_MuttCommands_push_escape(self, const1, const2, capsys): + cmds = MuttCommands(out_f=sys.stdout) + cmds.push(f'"{const1}"') + cmds.flush() + captured = capsys.readouterr() + assert f'"\\"{const1}\\""' in captured.out + + @pytest.mark.muttctrl def test_MuttCommands_cmd_push_mixed(self, const1, const2, capsys): "Assert reverse order of pushes" cmds = MuttCommands(out_f=sys.stdout) @@ -592,7 +1255,7 @@ try: assert lines[5] in lines_out[7] @pytest.fixture - def basic_mime_tree(self): + def mime_tree_related_to_alternative(self): return Multipart( "relative", children=[ @@ -617,185 +1280,322 @@ try: desc="Related", ) - def test_MIMETreeDFWalker_depth_first_walk(self, basic_mime_tree): + @pytest.fixture + def mime_tree_related_to_html(self): + return Multipart( + "alternative", + children=[ + Part( + "text", + "plain", + "part.txt", + desc="Plain", + orig=True, + ), + Multipart( + "relative", + children=[ + Part("text", "html", "part.html", desc="HTML"), + Part( + "text", + "png", + "logo.png", + cid="logo.png", + desc="Logo", + ), + ], + desc="Related", + ), + ], + desc="Alternative", + ) + + @pytest.fixture + def mime_tree_nested(self): + return Multipart( + "relative", + children=[ + Multipart( + "alternative", + children=[ + Part( + "text", + "plain", + "part.txt", + desc="Plain", + orig=True, + ), + Multipart( + "alternative", + children=[ + Part( + "text", + "plain", + "part.txt", + desc="Nested plain", + ), + Part( + "text", + "html", + "part.html", + desc="Nested HTML", + ), + ], + desc="Nested alternative", + ), + ], + desc="Alternative", + ), + Part( + "text", + "png", + "logo.png", + cid="logo.png", + desc="Logo", + ), + ], + desc="Related", + ) + + @pytest.mark.treewalk + def test_MIMETreeDFWalker_depth_first_walk( + self, mime_tree_related_to_alternative + ): mimetree = MIMETreeDFWalker() items = [] - def visitor_fn(item, stack, debugprint): - items.append((item, len(stack))) + def visitor_fn(item, ancestry, descendents, debugprint): + items.append((item, len(ancestry), len(descendents))) - mimetree.walk(basic_mime_tree, visitor_fn=visitor_fn) + mimetree.walk( + mime_tree_related_to_alternative, visitor_fn=visitor_fn + ) assert len(items) == 5 assert items[0][0].subtype == "plain" assert items[0][1] == 2 + assert items[0][2] == 0 assert items[1][0].subtype == "html" assert items[1][1] == 2 + assert items[1][2] == 0 assert items[2][0].subtype == "alternative" assert items[2][1] == 1 + assert items[2][2] == 2 assert items[3][0].subtype == "png" assert items[3][1] == 1 + assert items[3][2] == 2 assert items[4][0].subtype == "relative" assert items[4][1] == 0 + assert items[4][2] == 4 - def test_MIMETreeDFWalker_list_to_mixed(self, basic_mime_tree): + @pytest.mark.treewalk + def test_MIMETreeDFWalker_list_to_mixed(self, const1): mimetree = MIMETreeDFWalker() items = [] - def visitor_fn(item, stack, debugprint): + def visitor_fn(item, ancestry, descendents, debugprint): items.append(item) - mimetree.walk([basic_mime_tree], visitor_fn=visitor_fn) + p = Part("text", "plain", const1) + mimetree.walk([p], visitor_fn=visitor_fn) + assert items[-1].subtype == "plain" + mimetree.walk([p, p], visitor_fn=visitor_fn) assert items[-1].subtype == "mixed" + @pytest.mark.treewalk def test_MIMETreeDFWalker_visitor_in_constructor( - self, basic_mime_tree + self, mime_tree_related_to_alternative ): items = [] - def visitor_fn(item, stack, debugprint): + def visitor_fn(item, ancestry, descendents, debugprint): items.append(item) mimetree = MIMETreeDFWalker(visitor_fn=visitor_fn) - mimetree.walk(basic_mime_tree) + mimetree.walk(mime_tree_related_to_alternative) assert len(items) == 5 - def test_do_setup_no_extensions(self, const1, capsys): - "Assert basics about the setup command output" - do_setup(temppath=const1, out_f=sys.stdout) - captout = capsys.readouterr() - lines = captout.out.splitlines() - assert lines[2].endswith(f'{const1}"') - assert lines[4].endswith(const1) - assert "first-entry" in lines[-1] - assert "edit-file" in lines[-1] - - def test_do_setup_extensions(self, const1, const2, capsys): - "Assert that extensions are passed to editor" - do_setup( - temppath=const1, extensions=[const2, const1], out_f=sys.stdout - ) - captout = capsys.readouterr() - lines = captout.out.splitlines() - # assert comma-separated list of extensions passed - assert lines[2].endswith(f'{const2},{const1}"') - assert lines[4].endswith(const1) - @pytest.fixture def string_io(self, const1, text=None): return StringIO(text or const1) - def test_do_massage_basic(self, const1, string_io, capsys): - def converter(drafttext, draftpath, extensions): - return Part("text", "plain", draftpath, orig=True) + @pytest.mark.massage + def test_do_massage_basic(self): + def converter(draft_f, **kwargs): + return Part("text", "plain", draft_f.path, orig=True) - do_massage( - draft_f=string_io, - draftpath=const1, - cmd_f=sys.stdout, - converter=converter, - ) + with File() as draft_f, File() as cmd_f: + do_massage( + draft_f=draft_f, + cmd_f=cmd_f, + converter=converter, + ) + lines = cmd_f.read().splitlines() - captured = capsys.readouterr() - lines = captured.out.splitlines() + assert "send-message" in lines.pop(0) + assert "update-encoding" in lines.pop(0) + assert "first-entry" in lines.pop(0) + assert "source 'rm -f " in lines.pop(0) assert '="$my_editor"' in lines.pop(0) assert '="$my_edit_headers"' in lines.pop(0) assert "unset my_editor" == lines.pop(0) assert "unset my_edit_headers" == lines.pop(0) - assert "update-encoding" in lines.pop(0) - assert "source 'rm -f " in lines.pop(0) assert "unset my_mdwn_postprocess_cmd_file" == lines.pop(0) - def test_do_massage_fulltree( - self, string_io, const1, basic_mime_tree, capsys - ): - def converter(drafttext, draftpath, extensions): - return basic_mime_tree + @pytest.mark.massage + def test_do_massage_fulltree(self, mime_tree_related_to_alternative): + def converter(draft_f, **kwargs): + return mime_tree_related_to_alternative - do_massage( - draft_f=string_io, - draftpath=const1, - cmd_f=sys.stdout, - converter=converter, - ) - - captured = capsys.readouterr() - lines = captured.out.splitlines()[4:] - assert "Related" in lines.pop(0) - assert "group-related" in lines.pop(0) - assert "tag-entry" in lines.pop(0) - assert "Logo" in lines.pop(0) - assert "content-id" in lines.pop(0) - assert "toggle-unlink" in lines.pop(0) - assert "logo.png" in lines.pop(0) - assert "tag-entry" in lines.pop(0) - assert "Alternative" in lines.pop(0) - assert "group-alternatives" in lines.pop(0) - assert "tag-entry" in lines.pop(0) - assert "HTML" in lines.pop(0) - assert "toggle-unlink" in lines.pop(0) - assert "part.html" in lines.pop(0) - assert "tag-entry" in lines.pop(0) - assert "Plain" in lines.pop(0) - assert "update-encoding" in lines.pop(0) - assert len(lines) == 2 - - @pytest.fixture - def fake_filewriter(self): - class FileWriter: - def __init__(self): - self._writes = [] + max_attachments = 5 - def __call__(self, path, content, mode="w", **kwargs): - self._writes.append((path, content)) - - def pop(self, index=-1): - return self._writes.pop(index) - - return FileWriter() - - @pytest.fixture - def markdown_non_converter(self, const1, const2): - return lambda s, text: f"{const1}{text}{const2}" - - def test_converter_tree_basic( - self, const1, const2, fake_filewriter, markdown_non_converter + with File() as draft_f, File() as cmd_f: + do_massage( + draft_f=draft_f, + cmd_f=cmd_f, + max_other_attachments=max_attachments, + converter=converter, + ) + lines = cmd_f.read().splitlines()[:-6] + + assert "first-entry" in lines.pop() + assert "update-encoding" in lines.pop() + assert "Plain" in lines.pop() + assert "part.html" in lines.pop() + assert "toggle-unlink" in lines.pop() + for i in range(max_attachments): + assert "move-up" in lines.pop() + assert "move-down" in lines.pop() + assert "HTML" in lines.pop() + assert "jump>1" in lines.pop() + assert "jump>2" in lines.pop() + assert "group-alternatives" in lines.pop() + assert "Alternative" in lines.pop() + assert "logo.png" in lines.pop() + assert "toggle-unlink" in lines.pop() + assert "content-id" in lines.pop() + for i in range(max_attachments): + assert "move-up" in lines.pop() + assert "move-down" in lines.pop() + assert "Logo" in lines.pop() + assert "jump>1" in lines.pop() + assert "jump>4" in lines.pop() + assert "group-related" in lines.pop() + assert "Related" in lines.pop() + assert "send-message" in lines.pop() + assert len(lines) == 0 + + @pytest.mark.massage + def test_mime_tree_relative_within_alternative( + self, mime_tree_related_to_html ): - path = pathlib.Path(const2) - tree = convert_markdown_to_html( - const1, path, filewriter_fn=fake_filewriter - ) + def converter(draft_f, **kwargs): + return mime_tree_related_to_html + + with File() as draft_f, File() as cmd_f: + do_massage( + draft_f=draft_f, + cmd_f=cmd_f, + converter=converter, + ) + lines = cmd_f.read().splitlines()[:-6] + + assert "first-entry" in lines.pop() + assert "update-encoding" in lines.pop() + assert "Plain" in lines.pop() + assert "part.html" in lines.pop() + assert "toggle-unlink" in lines.pop() + assert "move-up" in lines.pop() + while True: + top = lines.pop() + if "move-up" not in top: + break + assert "move-down" in top + assert "HTML" in lines.pop() + assert "logo.png" in lines.pop() + assert "toggle-unlink" in lines.pop() + assert "content-id" in lines.pop() + assert "move-up" in lines.pop() + while True: + top = lines.pop() + if "move-up" not in top: + break + assert "move-down" in top + assert "move-down" in lines.pop() + assert "Logo" in lines.pop() + assert "jump>2" in lines.pop() + assert "jump>3" in lines.pop() + assert "group-related" in lines.pop() + assert "Related" in lines.pop() + assert "jump>1" in lines.pop() + assert "jump>2" in lines.pop() + assert "group-alternative" in lines.pop() + assert "Alternative" in lines.pop() + assert "send-message" in lines.pop() + assert len(lines) == 0 + + @pytest.mark.massage + def test_mime_tree_nested_trees_does_not_break_positioning( + self, mime_tree_nested + ): + def converter(draft_f, **kwargs): + return mime_tree_nested + + with File() as draft_f, File() as cmd_f: + do_massage( + draft_f=draft_f, + cmd_f=cmd_f, + converter=converter, + ) + lines = cmd_f.read().splitlines() + + while "logo.png" not in lines.pop(): + pass + lines.pop() + assert "content-id" in lines.pop() + assert "move-up" in lines.pop() + while True: + top = lines.pop() + if "move-up" not in top: + break + assert "move-down" in top + # Due to the nested trees, the number of descendents of the sibling + # actually needs to be considered, not just the nieces. So to move + # from position 1 to position 6, it only needs one + # because that jumps over the entire sibling tree. Thus what + # follows next must not be another + assert "Logo" in lines.pop() + + @pytest.mark.converter + def test_converter_tree_basic(self, fakepath, const1, fakefilefactory): + with fakefilefactory(fakepath, content=const1) as draft_f: + tree = convert_markdown_to_html( + draft_f, filefactory=fakefilefactory + ) assert tree.subtype == "alternative" assert len(tree.children) == 2 assert tree.children[0].subtype == "plain" - assert tree.children[0].path == path + assert tree.children[0].path == draft_f.path assert tree.children[0].orig assert tree.children[1].subtype == "html" - assert tree.children[1].path == path.with_suffix(".html") + assert tree.children[1].path == fakepath.with_suffix(".html") + @pytest.mark.converter def test_converter_writes( - self, - const1, - const2, - fake_filewriter, - monkeypatch, - markdown_non_converter, + self, fakepath, fakefilefactory, const1, monkeypatch ): - path = pathlib.Path(const2) + with fakefilefactory(fakepath, content=const1) as draft_f: + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) - with monkeypatch.context() as m: - m.setattr(markdown.Markdown, "convert", markdown_non_converter) - convert_markdown_to_html( - const1, path, filewriter_fn=fake_filewriter - ) - - assert (path, const1) == fake_filewriter.pop(0) - assert ( - path.with_suffix(".html"), - markdown_non_converter(None, const1), - ) == fake_filewriter.pop(0) + html = fakefilefactory.pop() + assert fakepath.with_suffix(".html") == html[0] + assert const1 in html[1].read() + text = fakefilefactory.pop() + assert fakepath == text[0] + assert const1 == text[1].read() + @pytest.mark.imgproc def test_markdown_inline_image_processor(self): imgpath1 = "file:/path/to/image.png" imgpath2 = "file:///path/to/image.png?url=params" @@ -804,7 +1604,7 @@ try: ![image inlined with newline]({imgpath2}) ![image local path]({imgpath3})""" - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) # local paths have been normalised to URLs: imgpath3 = f"file://{imgpath3}" @@ -819,75 +1619,669 @@ try: assert images[imgpath1].cid != images[imgpath3].cid assert images[imgpath2].cid != images[imgpath3].cid + @pytest.mark.imgproc def test_markdown_inline_image_processor_title_to_desc(self, const1): imgpath = "file:///path/to/image.png" text = f'![inline local image]({imgpath} "{const1}")' - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) assert images[imgpath].desc == const1 + @pytest.mark.imgproc def test_markdown_inline_image_processor_alt_to_desc(self, const1): imgpath = "file:///path/to/image.png" text = f"![{const1}]({imgpath})" - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) assert images[imgpath].desc == const1 + @pytest.mark.imgproc def test_markdown_inline_image_processor_title_over_alt_desc( self, const1, const2 ): imgpath = "file:///path/to/image.png" text = f'![{const1}]({imgpath} "{const2}")' - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) assert images[imgpath].desc == const2 + @pytest.mark.imgproc def test_markdown_inline_image_not_external(self): imgpath = "https://path/to/image.png" text = f"![inline image]({imgpath})" - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) assert 'src="cid:' not in html assert "](cid:" not in text assert len(images) == 0 + @pytest.mark.imgproc def test_markdown_inline_image_local_file(self): imgpath = "/path/to/image.png" text = f"![inline image]({imgpath})" - text, html, images = markdown_with_inline_image_support(text) + text, html, images, mdwn = markdown_with_inline_image_support(text) for k, v in images.items(): assert k == f"file://{imgpath}" break - def test_markdown_inline_image_processor_base64(self): - img = ( + @pytest.mark.imgproc + def test_markdown_inline_image_expanduser(self): + imgpath = pathlib.Path("~/image.png") + text = f"![inline image]({imgpath})" + text, html, images, mdwn = markdown_with_inline_image_support(text) + + for k, v in images.items(): + assert k == f"file://{imgpath.expanduser()}" + break + + @pytest.fixture + def test_png(self): + return ( "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAE" "AAAABCAAAAAA6fptVAAAACklEQVQI12P4DwABAQEAG7buVgAA" ) - text = f"![1px white inlined]({img})" - text, html, images = markdown_with_inline_image_support(text) + + @pytest.mark.imgproc + def test_markdown_inline_image_processor_base64(self, test_png): + text = f"![1px white inlined]({test_png})" + text, html, images, mdwn = markdown_with_inline_image_support(text) assert 'src="cid:' in html assert "](cid:" in text assert len(images) == 1 - assert img in images + assert test_png in images + @pytest.mark.converter def test_converter_tree_inline_image_base64( - self, const1, fake_filewriter + self, test_png, fakefilefactory ): - img = ( - "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAE" - "AAAABCAAAAAA6fptVAAAACklEQVQI12P4DwABAQEAG7buVgAA" + text = f"![inline base64 image]({test_png})" + with fakefilefactory(content=text) as draft_f: + tree = convert_markdown_to_html( + draft_f, + filefactory=fakefilefactory, + related_to_html_only=False, + ) + assert tree.subtype == "relative" + assert tree.children[0].subtype == "alternative" + assert tree.children[1].subtype == "png" + written = fakefilefactory.pop() + assert tree.children[1].path == written[0] + assert b"PNG" in written[1].read() + + @pytest.mark.converter + def test_converter_tree_inline_image_base64_related_to_html( + self, test_png, fakefilefactory + ): + text = f"![inline base64 image]({test_png})" + with fakefilefactory(content=text) as draft_f: + tree = convert_markdown_to_html( + draft_f, + filefactory=fakefilefactory, + related_to_html_only=True, + ) + assert tree.subtype == "alternative" + assert tree.children[1].subtype == "relative" + assert tree.children[1].children[1].subtype == "png" + written = fakefilefactory.pop() + assert tree.children[1].children[1].path == written[0] + assert b"PNG" in written[1].read() + + @pytest.mark.converter + def test_converter_tree_inline_image_cid( + self, const1, fakefilefactory + ): + text = f"![inline base64 image](cid:{const1})" + with fakefilefactory(content=text) as draft_f: + tree = convert_markdown_to_html( + draft_f, + filefactory=fakefilefactory, + related_to_html_only=False, + ) + assert len(tree.children) == 2 + assert tree.children[0].cid != const1 + assert tree.children[0].type != "image" + assert tree.children[1].cid != const1 + assert tree.children[1].type != "image" + + @pytest.fixture + def fakefilefactory(self): + return FakeFileFactory() + + @pytest.mark.imgcoll + def test_inline_image_collection( + self, test_png, const1, const2, fakefilefactory + ): + test_images = {test_png: InlineImageInfo(cid=const1, desc=const2)} + relparts = collect_inline_images( + test_images, filefactory=fakefilefactory + ) + + written = fakefilefactory.pop() + assert b"PNG" in written[1].read() + + assert relparts[0].subtype == "png" + assert relparts[0].path == written[0] + assert relparts[0].cid == const1 + assert const2 in relparts[0].desc + + if _PYNLINER: + + @pytest.mark.styling + def test_apply_stylesheet(self): + html = "

Hello, world!

" + css = "p { color:red }" + out = apply_styling(html, css) + assert 'p style="color' in out + + @pytest.mark.styling + def test_apply_no_stylesheet(self, const1): + out = apply_styling(const1, None) + + @pytest.mark.massage + @pytest.mark.styling + def test_massage_styling_to_converter(self): + css = "p { color:red }" + css_applied = [] + + def converter(draft_f, css_f, **kwargs): + css = css_f.read() + css_applied.append(css) + return Part("text", "plain", draft_f.path, orig=True) + + with ( + File() as draft_f, + File(mode="w") as cmd_f, + File(content=css) as css_f, + ): + do_massage( + draft_f=draft_f, + cmd_f=cmd_f, + css_f=css_f, + converter=converter, + ) + assert css_applied[0] == css + + @pytest.mark.converter + @pytest.mark.styling + def test_converter_apply_styles( + self, const1, monkeypatch, fakepath, fakefilefactory + ): + css = "p { color:red }" + with ( + monkeypatch.context() as m, + fakefilefactory(fakepath, content=const1) as draft_f, + fakefilefactory(content=css) as css_f, + ): + m.setattr( + markdown.Markdown, + "convert", + lambda s, t: f"

{t}

", + ) + convert_markdown_to_html( + draft_f, css_f=css_f, filefactory=fakefilefactory + ) + assert re.search( + r"color:.*red", + fakefilefactory[fakepath.with_suffix(".html")].read(), + ) + + if _PYGMENTS_CSS: + + @pytest.mark.styling + def test_apply_stylesheet_pygments(self): + html = ( + f'
' + "
def foo():\n    return
" + ) + out = apply_styling(html, _PYGMENTS_CSS) + assert f'{_CODEHILITE_CLASS}" style="' in out + + @pytest.mark.sig + def test_signature_extraction_no_signature(self, const1): + assert (const1, None, None) == extract_signature(const1) + + @pytest.mark.sig + def test_signature_extraction_just_text(self, const1, const2): + origtext, textsig, htmlsig = extract_signature( + f"{const1}{EMAIL_SIG_SEP}{const2}" + ) + assert origtext == const1 + assert textsig == const2 + assert htmlsig is None + + @pytest.mark.sig + def test_signature_extraction_html( + self, fakepath, fakefilefactory, const1, const2 + ): + sigconst = "HTML signature from {path} but as a string" + sig = f'
{sigconst.format(path=fakepath)}
' + + sig_f = fakefilefactory(fakepath, content=sig) + + origtext, textsig, htmlsig = extract_signature( + f"{const1}{EMAIL_SIG_SEP}{HTML_SIG_MARKER} {fakepath}\n{const2}", + filefactory=fakefilefactory, + ) + assert origtext == const1 + assert textsig == const2 + assert htmlsig == sigconst.format(path=fakepath) + + @pytest.mark.sig + def test_signature_extraction_file_not_found(self, fakepath, const1): + with pytest.raises(FileNotFoundError): + origtext, textsig, htmlsig = extract_signature( + f"{const1}{EMAIL_SIG_SEP}{HTML_SIG_MARKER}{fakepath}\n{const1}" + ) + + @pytest.mark.imgproc + def test_image_registry(self, const1): + reg = ImageRegistry() + cid = reg.register(const1) + assert "@" in cid + assert not cid.startswith("<") + assert not cid.endswith(">") + assert const1 in reg + + @pytest.mark.imgproc + def test_image_registry_domain(self, const1, const2): + reg = ImageRegistry() + cid = reg.register(const1, domain=const2) + assert f"@{const2}" in cid + assert not cid.startswith("<") + assert not cid.endswith(">") + assert const1 in reg + + @pytest.mark.imgproc + def test_image_registry_file_uri(self, const1): + reg = ImageRegistry() + reg.register("/some/path") + for path in reg: + assert path.startswith("file://") + break + + @pytest.mark.converter + @pytest.mark.sig + def test_converter_signature_handling( + self, fakepath, fakefilefactory, monkeypatch + ): + mailparts = ( + "This is the mail body\n", + f"{EMAIL_SIG_SEP}", + "This is a plain-text signature only", + ) + + with ( + fakefilefactory( + fakepath, content="".join(mailparts) + ) as draft_f, + monkeypatch.context() as m, + ): + m.setattr(markdown.Markdown, "convert", lambda s, t: t) + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + body = soup.body.contents + + assert mailparts[0] in body.pop(0) + + sig = soup.select_one("#signature") + assert sig == body.pop(0) + + sep = sig.select_one("span.sig_separator") + assert sep == sig.contents[0] + assert f"\n{sep.text}\n" == EMAIL_SIG_SEP + + assert mailparts[2] in sig.contents[1] + + @pytest.mark.converter + @pytest.mark.sig + def test_converter_signature_handling_htmlsig( + self, fakepath, fakepath2, fakefilefactory, monkeypatch + ): + mailparts = ( + "This is the mail body", + f"{EMAIL_SIG_SEP}", + f"{HTML_SIG_MARKER}{fakepath2}\n", + "This is the plain-text version", + ) + htmlsig = "HTML Signature from {path} but as a string" + html = f'

{htmlsig.format(path=fakepath2)}

' + + sig_f = fakefilefactory(fakepath2, content=html) + + def mdwn_fn(t): + return t.upper() + + with ( + fakefilefactory( + fakepath, content="".join(mailparts) + ) as draft_f, + monkeypatch.context() as m, + ): + m.setattr( + markdown.Markdown, "convert", lambda s, t: mdwn_fn(t) + ) + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + sig = soup.select_one("#signature") + sig.span.extract() + + assert HTML_SIG_MARKER not in sig.text + assert htmlsig.format(path=fakepath2) == sig.text.strip() + + plaintext = fakefilefactory[fakepath].read() + assert plaintext.endswith(EMAIL_SIG_SEP + mailparts[-1]) + + @pytest.mark.converter + @pytest.mark.sig + def test_converter_signature_handling_htmlsig_with_image( + self, fakepath, fakepath2, fakefilefactory, monkeypatch, test_png + ): + mailparts = ( + "This is the mail body", + f"{EMAIL_SIG_SEP}", + f"{HTML_SIG_MARKER}{fakepath2}\n", + "This is the plain-text version", ) - text = f"![inline base64 image]({img})" - path = pathlib.Path(const1) - tree = convert_markdown_to_html( - text, path, filewriter_fn=fake_filewriter + htmlsig = ( + "HTML Signature from {path} with image\n" + f'\n' + ) + html = ( + f'
{htmlsig.format(path=fakepath2)}
' + ) + + sig_f = fakefilefactory(fakepath2, content=html) + + def mdwn_fn(t): + return t.upper() + + with ( + fakefilefactory( + fakepath, content="".join(mailparts) + ) as draft_f, + monkeypatch.context() as m, + ): + m.setattr( + markdown.Markdown, "convert", lambda s, t: mdwn_fn(t) + ) + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + assert fakefilefactory.pop()[0].suffix == ".png" + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", ) + assert soup.img.attrs["src"].startswith("cid:") + + @pytest.mark.converter + @pytest.mark.sig + def test_converter_signature_handling_textsig_with_image( + self, fakepath, fakefilefactory, test_png + ): + mailparts = ( + "This is the mail body", + f"{EMAIL_SIG_SEP}", + "This is the plain-text version with image\n", + f"![Inline]({test_png})", + ) + with ( + fakefilefactory( + fakepath, content="".join(mailparts) + ) as draft_f, + ): + tree = convert_markdown_to_html( + draft_f, filefactory=fakefilefactory + ) assert tree.subtype == "relative" + assert tree.children[0].subtype == "alternative" assert tree.children[1].subtype == "png" - written = fake_filewriter.pop() + written = fakefilefactory.pop() assert tree.children[1].path == written[0] - assert written[1] == request.urlopen(img).read() + assert written[1].read() == request.urlopen(test_png).read() + + @pytest.mark.converter + def test_converter_attribution_to_admonition( + self, fakepath, fakefilefactory + ): + mailparts = ( + "Regarding whatever", + "> blockquote line1", + "> blockquote line2", + "> ", + "> new para with **bold** text", + ) + with fakefilefactory( + fakepath, content="\n".join(mailparts) + ) as draft_f: + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + quote = soup.select_one("div.admonition.quote") + assert quote + assert ( + soup.select_one("p.admonition-title").extract().text.strip() + == mailparts[0] + ) + + p = quote.p.extract() + assert p.text.strip() == "\n".join(p[2:] for p in mailparts[1:3]) + + p = quote.p.extract() + assert p.contents[1].name == "strong" + + @pytest.mark.converter + def test_converter_attribution_to_admonition_with_blockquote( + self, fakepath, fakefilefactory + ): + mailparts = ( + "Regarding whatever", + "> blockquote line1", + "> blockquote line2", + "> ", + "> new para with **bold** text", + ) + with fakefilefactory( + fakepath, content="\n".join(mailparts) + ) as draft_f: + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + quote = soup.select_one("div.admonition.quote") + assert quote.blockquote + + @pytest.mark.converter + def test_converter_attribution_to_admonition_multiple( + self, fakepath, fakefilefactory + ): + mailparts = ( + "Regarding whatever", + "> blockquote line1", + "> blockquote line2", + "", + "Normal text", + "", + "> continued emailquote", + "", + "Another email-quote", + "> something", + ) + with fakefilefactory( + fakepath, content="\n".join(mailparts) + ) as draft_f: + convert_markdown_to_html(draft_f, filefactory=fakefilefactory) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + quote = soup.select_one("div.admonition.quote.continued").extract() + assert quote + assert ( + quote.select_one("p.admonition-title").extract().text.strip() + == mailparts[0] + ) + + p = quote.p.extract() + assert p + + quote = soup.select_one("div.admonition.quote.continued").extract() + assert quote + assert ( + quote.select_one("p.admonition-title").extract().text.strip() + == mailparts[-2] + ) + + @pytest.mark.converter + def test_converter_format_flowed_with_nl2br( + self, fakepath, fakefilefactory + ): + mailparts = ( + "This is format=flowed text ", + "with spaces at the end ", + "and there ought be no newlines.", + "", + "[link](https://example.org) ", + "and text.", + "", + "[link text ", + "broken up](https://example.org).", + "", + "This is on a new line with a hard break ", + "due to the double space", + ) + with fakefilefactory( + fakepath, content="\n".join(mailparts) + ) as draft_f: + convert_markdown_to_html( + draft_f, extensions=["nl2br"], filefactory=fakefilefactory + ) + + soup = bs4.BeautifulSoup( + fakefilefactory[fakepath.with_suffix(".html")].read(), + "html.parser", + ) + import ipdb + + p = soup.p.extract().text + assert "".join(mailparts[0:3]) == p + p = ''.join(map(str, soup.p.extract().contents)) + assert p == 'link and text.' + p = ''.join(map(str, soup.p.extract().contents)) + assert ( + p == 'link text broken up.' + ) + + @pytest.mark.fileio + def test_file_class_contextmanager(self, const1, monkeypatch): + state = dict(o=False, c=False) + + def fn(t): + state[t] = True + + with monkeypatch.context() as m: + m.setattr(File, "open", lambda s: fn("o")) + m.setattr(File, "close", lambda s: fn("c")) + with File() as f: + assert state["o"] + assert not state["c"] + assert state["c"] + + @pytest.mark.fileio + def test_file_class_no_path(self, const1): + with File(mode="w+") as f: + f.write(const1, cache=False) + assert f.read(cache=False) == const1 + + @pytest.mark.fileio + def test_file_class_path(self, const1, tmp_path): + with File(tmp_path / "file", mode="w+") as f: + f.write(const1, cache=False) + assert f.read(cache=False) == const1 + + @pytest.mark.fileio + def test_file_class_path_no_exists(self, fakepath): + with pytest.raises(FileNotFoundError): + File(fakepath, mode="r").open() + + @pytest.mark.fileio + def test_file_class_cache(self, tmp_path, const1, const2): + path = tmp_path / "file" + file = File(path, mode="w+") + with file as f: + f.write(const1, cache=True) + with open(path, mode="w") as f: + f.write(const2) + with file as f: + assert f.read(cache=True) == const1 + + @pytest.mark.fileio + def test_file_class_cache_init(self, const1): + file = File(path=None, mode="r", content=const1) + with file as f: + assert f.read() == const1 + + @pytest.mark.fileio + def test_file_class_content_or_path(self, fakepath, const1): + with pytest.raises(RuntimeError): + file = File(path=fakepath, content=const1) + + @pytest.mark.fileio + def test_file_class_content_needs_read(self, const1): + with pytest.raises(RuntimeError): + file = File(mode="w", content=const1) + + @pytest.mark.fileio + def test_file_class_write_persists_close(self, const1): + f = File(mode="w+") + with f: + f.write(const1) + with f: + assert f.read() == const1 + + @pytest.mark.fileio + def test_file_class_write_resets_read_cache(self, const1, const2): + with File(mode="w+", content=const1) as f: + assert f.read() == const1 + f.write(const2) + assert f.read() == const2 + + @pytest.mark.fileio + def test_file_factory(self): + fact = FileFactory() + f = fact() + assert isinstance(f, File) + assert len(fact) == 1 + assert f in fact + assert f == fact[0] + + @pytest.mark.fileio + def test_fake_file_factory(self, fakepath, fakefilefactory): + fact = FakeFileFactory() + f = fakefilefactory(fakepath) + assert f.path == fakepath + assert f == fakefilefactory[fakepath] + + @pytest.mark.fileio + def test_fake_file_factory_path_persistence( + self, fakepath, fakefilefactory + ): + f1 = fakefilefactory(fakepath) + assert f1 == fakefilefactory(fakepath) except ImportError: pass