X-Git-Url: https://git.madduck.net/etc/neomutt.git/blobdiff_plain/dc77cadbe8d14395ae542bb2763a54436a0f31e3..62d5dded67993412d4516fdc33f58a2c4e470cc8:/.config/neomutt/buildmimetree.py diff --git a/.config/neomutt/buildmimetree.py b/.config/neomutt/buildmimetree.py index d0493b3..a0112c8 100755 --- a/.config/neomutt/buildmimetree.py +++ b/.config/neomutt/buildmimetree.py @@ -7,7 +7,7 @@ # Configuration: # neomuttrc (needs to be a single line): # macro compose B "\ -# source '$my_confdir/buildmimetree.py setup|'\ +# source '$my_confdir/buildmimetree.py setup --tempdir $tempdir|'\ # sourc e \$my_mdwn_postprocess_cmd_file\ # " "Convert message into a modern MIME tree with inline images" # @@ -36,7 +36,13 @@ import pathlib import markdown import tempfile import argparse -from collections import namedtuple +import re +import mimetypes +from collections import namedtuple, OrderedDict +from markdown.extensions import Extension +from markdown.inlinepatterns import ImageInlineProcessor, IMAGE_LINK_RE +from email.utils import make_msgid +from urllib import request def parse_cli_args(*args, **kwargs): @@ -61,6 +67,12 @@ def parse_cli_args(*args, **kwargs): help="Turn on debug logging of commands generated to stderr", ) + parser_setup.add_argument( + "--tempdir", + default=None, + help="Specify temporary directory to use for attachments", + ) + parser_setup.add_argument( "--extension", "-x", @@ -72,6 +84,12 @@ def parse_cli_args(*args, **kwargs): help="Markdown extension to add to the list of extensions use", ) + parser_setup.add_argument( + "--send-message", + action="store_true", + help="Generate command(s) to send the message after processing", + ) + parser_massage.add_argument( "--debug-commands", action="store_true", @@ -84,6 +102,12 @@ def parse_cli_args(*args, **kwargs): help="Turn on debugging to stderr of the MIME tree walk", ) + parser_massage.add_argument( + "--tempdir", + default=None, + help="Specify temporary directory to use for attachments", + ) + parser_massage.add_argument( "--extensions", metavar="EXTENSIONS", @@ -108,6 +132,74 @@ def parse_cli_args(*args, **kwargs): return parser.parse_args(*args, **kwargs) +# [ MARKDOWN WRAPPING ] ####################################################### + + +InlineImageInfo = namedtuple( + "InlineImageInfo", ["cid", "desc"], defaults=[None] +) + + +class InlineImageExtension(Extension): + class RelatedImageInlineProcessor(ImageInlineProcessor): + def __init__(self, re, md, ext): + super().__init__(re, md) + self._ext = ext + + def handleMatch(self, m, data): + el, start, end = super().handleMatch(m, data) + if "src" in el.attrib: + src = el.attrib["src"] + if "://" not in src or src.startswith("file://"): + # We only inline local content + cid = self._ext.get_cid_for_image(el.attrib) + el.attrib["src"] = f"cid:{cid}" + return el, start, end + + def __init__(self): + super().__init__() + self._images = OrderedDict() + + def extendMarkdown(self, md): + md.registerExtension(self) + inline_image_proc = self.RelatedImageInlineProcessor( + IMAGE_LINK_RE, md, self + ) + md.inlinePatterns.register(inline_image_proc, "image_link", 150) + + def get_cid_for_image(self, attrib): + msgid = make_msgid()[1:-1] + path = attrib["src"] + if path.startswith("/"): + path = f"file://{path}" + self._images[path] = InlineImageInfo( + msgid, attrib.get("title", attrib.get("alt")) + ) + return msgid + + def get_images(self): + return self._images + + +def markdown_with_inline_image_support(text, *, extensions=None): + inline_image_handler = InlineImageExtension() + extensions = extensions or [] + extensions.append(inline_image_handler) + mdwn = markdown.Markdown(extensions=extensions) + htmltext = mdwn.convert(text) + + images = inline_image_handler.get_images() + + def replace_image_with_cid(matchobj): + for m in (matchobj.group(1), f"file://{matchobj.group(1)}"): + if m in images: + return f"(cid:{images[m].cid}" + return matchobj.group(0) + + text = re.sub(r"\(([^)\s]+)", replace_image_with_cid, text) + return text, htmltext, images + + # [ PARTS GENERATION ] ######################################################## @@ -134,46 +226,68 @@ class Multipart( return f" children={len(self.children)}" -def convert_markdown_to_html(maildraft, *, extensions=None): - draftpath = pathlib.Path(maildraft) +def filewriter_fn(path, content, mode="w", **kwargs): + with open(path, mode, **kwargs) as out_f: + out_f.write(content) + + +def collect_inline_images( + images, *, tempdir=None, filewriter_fn=filewriter_fn +): + relparts = [] + for path, info in images.items(): + data = request.urlopen(path) + + mimetype = data.headers["Content-Type"] + ext = mimetypes.guess_extension(mimetype) + tempfilename = tempfile.mkstemp(prefix="img", suffix=ext, dir=tempdir) + path = pathlib.Path(tempfilename[1]) + + filewriter_fn(path, data.read(), "w+b") + + relparts.append( + Part(*mimetype.split("/"), path, cid=info.cid, desc=info.desc) + ) + + return relparts + + +def convert_markdown_to_html( + origtext, + draftpath, + *, + filewriter_fn=filewriter_fn, + tempdir=None, + extensions=None, +): + origtext, htmltext, images = markdown_with_inline_image_support( + origtext, extensions=extensions + ) + + filewriter_fn(draftpath, origtext, encoding="utf-8") textpart = Part( "text", "plain", draftpath, "Plain-text version", orig=True ) - with open(draftpath, "r", encoding="utf-8") as textmarkdown: - text = textmarkdown.read() - - mdwn = markdown.Markdown(extensions=extensions) - html = mdwn.convert(text) - htmlpath = draftpath.with_suffix(".html") + filewriter_fn( + htmlpath, htmltext, encoding="utf-8", errors="xmlcharrefreplace" + ) htmlpart = Part("text", "html", htmlpath, "HTML version") - with open( - htmlpath, "w", encoding="utf-8", errors="xmlcharrefreplace" - ) as texthtml: - texthtml.write(html) - - logopart = Part( - "image", - "png", - "/usr/share/doc/neomutt/logo/neomutt-256.png", - "Logo", - "neomutt-256.png", + altpart = Multipart( + "alternative", [textpart, htmlpart], "Group of alternative content" ) - return Multipart( - "relative", - [ - Multipart( - "alternative", - [textpart, htmlpart], - "Group of alternative content", - ), - logopart, - ], - "Group of related content", + imgparts = collect_inline_images( + images, tempdir=tempdir, filewriter_fn=filewriter_fn ) + if imgparts: + return Multipart( + "relative", [altpart] + imgparts, "Group of related content" + ) + else: + return altpart class MIMETreeDFWalker: @@ -282,17 +396,26 @@ class MuttCommands: def do_setup( - extensions=None, *, out_f=sys.stdout, temppath=None, debug_commands=False + extensions=None, + *, + out_f=sys.stdout, + temppath=None, + tempdir=None, + debug_commands=False, ): extensions = extensions or [] temppath = temppath or pathlib.Path( - tempfile.mkstemp(prefix="muttmdwn-")[1] + tempfile.mkstemp(prefix="muttmdwn-", dir=tempdir)[1] ) cmds = MuttCommands(out_f, debug=debug_commands) editor = f"{sys.argv[0]} massage --write-commands-to {temppath}" if extensions: editor = f'{editor} --extensions {",".join(extensions)}' + if tempdir: + editor = f"{editor} --tempdir {tempdir}" + if debug_commands: + editor = f"{editor} --debug-commands" cmds.cmd('set my_editor="$editor"') cmds.cmd('set my_edit_headers="$edit_headers"') @@ -304,11 +427,13 @@ def do_setup( def do_massage( - maildraft, - cmdpath, + draft_f, + draftpath, + cmd_f, *, extensions=None, converter=convert_markdown_to_html, + tempdir=None, debug_commands=False, debug_walk=False, ): @@ -316,95 +441,95 @@ def do_massage( # draft, and whatever commands we write to the file given as cmdpath will # be run by the second source command in the macro definition. - with open(cmdpath, "w") as cmd_f: - # Let's start by cleaning up what the setup did (see above), i.e. we - # restore the $editor and $edit_headers variables, and also unset the - # variable used to identify the command file we're currently writing - # to. - cmds = MuttCommands(cmd_f, debug=debug_commands) - cmds.cmd('set editor="$my_editor"') - cmds.cmd('set edit_headers="$my_edit_headers"') - cmds.cmd("unset my_editor") - cmds.cmd("unset my_edit_headers") - - # let's flush those commands, as there'll be a lot of pushes from now - # on, which need to be run in reverse order - cmds.flush() - - extensions = extensions.split(",") if extensions else [] - tree = converter(maildraft, extensions=extensions) - - mimetree = MIMETreeDFWalker(debug=args.debug_walk) - - def visitor_fn(item, stack, *, debugprint=None): - """ - Visitor function called for every node (part) of the MIME tree, - depth-first, and responsible for telling NeoMutt how to assemble - the tree. - """ - if isinstance(item, Part): - # We've hit a leaf-node, i.e. an alternative or a related part - # with actual content. - - # If the part is not an original part, i.e. doesn't already - # exist, we must first add it. - if not item.orig: - cmds.push(f"{item.path}") - cmds.push("") - if item.cid: - cmds.push( - f"\\Ca\\Ck{item.cid}" - ) - - # If the item (including the original) comes with a - # description, then we might just as well update the NeoMutt - # tree now: - if item.desc: - cmds.push(f"\\Ca\\Ck{item.desc}") - - # Finally, tag the entry that we just processed, so that when - # we're done at this level, as we walk up the stack, the items - # to be grouped will already be tagged and ready. - cmds.push("") - - elif isinstance(item, Multipart): - # This node has children, but we already visited them (see - # above), and so they have been tagged in NeoMutt's compose - # window. Now it's just a matter of telling NeoMutt to do the - # appropriate grouping: - if item.subtype == "alternative": - cmds.push("") - elif item.subtype == "relative": - cmds.push("") - elif item.subtype == "multilingual": - cmds.push("") - - # Again, if there is a description, we might just as well: - if item.desc: - cmds.push(f"\\Ca\\Ck{item.desc}") - - # Finally, if we're at non-root level, tag the new container, - # as it might itself be part of a container, to be processed - # one level up: - if stack: - cmds.push("") + # Let's start by cleaning up what the setup did (see above), i.e. we + # restore the $editor and $edit_headers variables, and also unset the + # variable used to identify the command file we're currently writing + # to. + cmds = MuttCommands(cmd_f, debug=debug_commands) + cmds.cmd('set editor="$my_editor"') + cmds.cmd('set edit_headers="$my_edit_headers"') + cmds.cmd("unset my_editor") + cmds.cmd("unset my_edit_headers") + + # let's flush those commands, as there'll be a lot of pushes from now + # on, which need to be run in reverse order + cmds.flush() - else: - # We should never get here - assert not "is valid part" + extensions = extensions.split(",") if extensions else [] + tree = converter(draft_f.read(), draftpath, tempdir=tempdir, extensions=extensions) - # ----------------- - # End of visitor_fn + mimetree = MIMETreeDFWalker(debug=debug_walk) - # Let's walk the tree and visit every node with our fancy visitor - # function - mimetree.walk(tree, visitor_fn=visitor_fn) + def visitor_fn(item, stack, *, debugprint=None): + """ + Visitor function called for every node (part) of the MIME tree, + depth-first, and responsible for telling NeoMutt how to assemble + the tree. + """ + if isinstance(item, Part): + # We've hit a leaf-node, i.e. an alternative or a related part + # with actual content. + + # Let's add the part + if item.orig: + # The original source already exists in the NeoMutt tree, but + # the underlying file may have been modified, so we need to + # update the encoding, but that's it: + cmds.push("") + else: + # … whereas all other parts need to be added, and they're all + # considered to be temporary and inline: + cmds.push(f"{item.path}") + cmds.push("") + + # If the item (including the original) comes with additional + # information, then we might just as well update the NeoMutt + # tree now: + if item.cid: + cmds.push(f"\\Ca\\Ck{item.cid}") + + elif isinstance(item, Multipart): + # This node has children, but we already visited them (see + # above), and so they have been tagged in NeoMutt's compose + # window. Now it's just a matter of telling NeoMutt to do the + # appropriate grouping: + if item.subtype == "alternative": + cmds.push("") + elif item.subtype == "relative": + cmds.push("") + elif item.subtype == "multilingual": + cmds.push("") - # Finally, cleanup. Since we're responsible for removing the temporary - # file, how's this for a little hack? - cmds.cmd(f"source 'rm -f {args.cmdpath}|'") - cmds.cmd("unset my_mdwn_postprocess_cmd_file") - cmds.flush() + else: + # We should never get here + assert not "is valid part" + + # If the item has a description, we might just as well add it + if item.desc: + cmds.push(f"\\Ca\\Ck{item.desc}") + + # Finally, if we're at non-root level, tag the new container, + # as it might itself be part of a container, to be processed + # one level up: + if stack: + cmds.push("") + + # ----------------- + # End of visitor_fn + + # Let's walk the tree and visit every node with our fancy visitor + # function + mimetree.walk(tree, visitor_fn=visitor_fn) + + # Finally, cleanup. Since we're responsible for removing the temporary + # file, how's this for a little hack? + try: + filename = cmd_f.name + except AttributeError: + filename = "pytest_internal_file" + cmds.cmd(f"source 'rm -f {filename}|'") + cmds.cmd("unset my_mdwn_postprocess_cmd_file") + cmds.flush() # [ CLI ENTRY ] ############################################################### @@ -413,22 +538,35 @@ if __name__ == "__main__": args = parse_cli_args() if args.mode == "setup": - do_setup(args.extensions, debug_commands=args.debug_commands) + if args.send_message: + raise NotImplementedError() - elif args.mode == "massage": - do_massage( - args.MAILDRAFT, - args.cmdpath, - extensions=args.extensions, + do_setup( + args.extensions, + tempdir=args.tempdir, debug_commands=args.debug_commands, - debug_walk=args.debug_walk, ) + elif args.mode == "massage": + with open(args.MAILDRAFT, "r") as draft_f, open( + args.cmdpath, "w" + ) as cmd_f: + do_massage( + draft_f, + pathlib.Path(args.MAILDRAFT), + cmd_f, + extensions=args.extensions, + tempdir=args.tempdir, + debug_commands=args.debug_commands, + debug_walk=args.debug_walk, + ) + # [ TESTS ] ################################################################### try: import pytest + from io import StringIO class Tests: @pytest.fixture @@ -488,12 +626,18 @@ try: @pytest.fixture def basic_mime_tree(self): return Multipart( - "related", + "relative", children=[ Multipart( "alternative", children=[ - Part("text", "plain", "part.txt", desc="Plain"), + Part( + "text", + "plain", + "part.txt", + desc="Plain", + orig=True, + ), Part("text", "html", "part.html", desc="HTML"), ], desc="Alternative", @@ -523,12 +667,11 @@ try: assert items[2][1] == 1 assert items[3][0].subtype == "png" assert items[3][1] == 1 - assert items[4][0].subtype == "related" + assert items[4][0].subtype == "relative" assert items[4][1] == 0 def test_MIMETreeDFWalker_list_to_mixed(self, basic_mime_tree): mimetree = MIMETreeDFWalker() - items = [] def visitor_fn(item, stack, debugprint): @@ -570,5 +713,228 @@ try: assert lines[2].endswith(f'{const2},{const1}"') assert lines[4].endswith(const1) + @pytest.fixture + def string_io(self, const1, text=None): + return StringIO(text or const1) + + def test_do_massage_basic(self, const1, string_io, capsys): + def converter(drafttext, draftpath, extensions, tempdir): + return Part("text", "plain", draftpath, orig=True) + + do_massage( + draft_f=string_io, + draftpath=const1, + cmd_f=sys.stdout, + converter=converter, + ) + + captured = capsys.readouterr() + lines = captured.out.splitlines() + assert '="$my_editor"' in lines.pop(0) + assert '="$my_edit_headers"' in lines.pop(0) + assert "unset my_editor" == lines.pop(0) + assert "unset my_edit_headers" == lines.pop(0) + assert "update-encoding" in lines.pop(0) + assert "source 'rm -f " in lines.pop(0) + assert "unset my_mdwn_postprocess_cmd_file" == lines.pop(0) + + def test_do_massage_fulltree( + self, string_io, const1, basic_mime_tree, capsys + ): + def converter(drafttext, draftpath, extensions, tempdir): + return basic_mime_tree + + do_massage( + draft_f=string_io, + draftpath=const1, + cmd_f=sys.stdout, + converter=converter, + ) + + captured = capsys.readouterr() + lines = captured.out.splitlines()[4:] + assert "Related" in lines.pop(0) + assert "group-related" in lines.pop(0) + assert "tag-entry" in lines.pop(0) + assert "Logo" in lines.pop(0) + assert "content-id" in lines.pop(0) + assert "toggle-unlink" in lines.pop(0) + assert "logo.png" in lines.pop(0) + assert "tag-entry" in lines.pop(0) + assert "Alternative" in lines.pop(0) + assert "group-alternatives" in lines.pop(0) + assert "tag-entry" in lines.pop(0) + assert "HTML" in lines.pop(0) + assert "toggle-unlink" in lines.pop(0) + assert "part.html" in lines.pop(0) + assert "tag-entry" in lines.pop(0) + assert "Plain" in lines.pop(0) + assert "update-encoding" in lines.pop(0) + assert len(lines) == 2 + + @pytest.fixture + def fake_filewriter(self): + class FileWriter: + def __init__(self): + self._writes = [] + + def __call__(self, path, content, mode="w", **kwargs): + self._writes.append((path, content)) + + def pop(self, index=-1): + return self._writes.pop(index) + + return FileWriter() + + @pytest.fixture + def markdown_non_converter(self, const1, const2): + return lambda s, text: f"{const1}{text}{const2}" + + def test_converter_tree_basic( + self, const1, const2, fake_filewriter, markdown_non_converter + ): + path = pathlib.Path(const2) + tree = convert_markdown_to_html( + const1, path, filewriter_fn=fake_filewriter + ) + + assert tree.subtype == "alternative" + assert len(tree.children) == 2 + assert tree.children[0].subtype == "plain" + assert tree.children[0].path == path + assert tree.children[0].orig + assert tree.children[1].subtype == "html" + assert tree.children[1].path == path.with_suffix(".html") + + def test_converter_writes( + self, + const1, + const2, + fake_filewriter, + monkeypatch, + markdown_non_converter, + ): + path = pathlib.Path(const2) + + with monkeypatch.context() as m: + m.setattr(markdown.Markdown, "convert", markdown_non_converter) + convert_markdown_to_html( + const1, path, filewriter_fn=fake_filewriter + ) + + assert (path, const1) == fake_filewriter.pop(0) + assert ( + path.with_suffix(".html"), + markdown_non_converter(None, const1), + ) == fake_filewriter.pop(0) + + def test_markdown_inline_image_processor(self): + imgpath1 = "file:/path/to/image.png" + imgpath2 = "file:///path/to/image.png?url=params" + imgpath3 = "/path/to/image.png" + text = f"""![inline local image]({imgpath1}) + ![image inlined + with newline]({imgpath2}) + ![image local path]({imgpath3})""" + text, html, images = markdown_with_inline_image_support(text) + + # local paths have been normalised to URLs: + imgpath3 = f"file://{imgpath3}" + + assert 'src="cid:' in html + assert "](cid:" in text + assert len(images) == 3 + assert imgpath1 in images + assert imgpath2 in images + assert imgpath3 in images + assert images[imgpath1].cid != images[imgpath2].cid + assert images[imgpath1].cid != images[imgpath3].cid + assert images[imgpath2].cid != images[imgpath3].cid + + def test_markdown_inline_image_processor_title_to_desc(self, const1): + imgpath = "file:///path/to/image.png" + text = f'![inline local image]({imgpath} "{const1}")' + text, html, images = markdown_with_inline_image_support(text) + assert images[imgpath].desc == const1 + + def test_markdown_inline_image_processor_alt_to_desc(self, const1): + imgpath = "file:///path/to/image.png" + text = f"![{const1}]({imgpath})" + text, html, images = markdown_with_inline_image_support(text) + assert images[imgpath].desc == const1 + + def test_markdown_inline_image_processor_title_over_alt_desc( + self, const1, const2 + ): + imgpath = "file:///path/to/image.png" + text = f'![{const1}]({imgpath} "{const2}")' + text, html, images = markdown_with_inline_image_support(text) + assert images[imgpath].desc == const2 + + def test_markdown_inline_image_not_external(self): + imgpath = "https://path/to/image.png" + text = f"![inline image]({imgpath})" + text, html, images = markdown_with_inline_image_support(text) + + assert 'src="cid:' not in html + assert "](cid:" not in text + assert len(images) == 0 + + def test_markdown_inline_image_local_file(self): + imgpath = "/path/to/image.png" + text = f"![inline image]({imgpath})" + text, html, images = markdown_with_inline_image_support(text) + + for k, v in images.items(): + assert k == f"file://{imgpath}" + break + + @pytest.fixture + def test_png(self): + return ( + "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAE" + "AAAABCAAAAAA6fptVAAAACklEQVQI12P4DwABAQEAG7buVgAA" + ) + + def test_markdown_inline_image_processor_base64(self, test_png): + text = f"![1px white inlined]({test_png})" + text, html, images = markdown_with_inline_image_support(text) + + assert 'src="cid:' in html + assert "](cid:" in text + assert len(images) == 1 + assert test_png in images + + def test_converter_tree_inline_image_base64( + self, test_png, const1, fake_filewriter + ): + text = f"![inline base64 image]({test_png})" + path = pathlib.Path(const1) + tree = convert_markdown_to_html( + text, path, filewriter_fn=fake_filewriter + ) + + assert tree.subtype == "relative" + assert tree.children[1].subtype == "png" + written = fake_filewriter.pop() + assert tree.children[1].path == written[0] + assert written[1] == request.urlopen(test_png).read() + + def test_inline_image_collection( + self, test_png, const1, const2, fake_filewriter + ): + test_images = {test_png: InlineImageInfo(cid=const1, desc=const2)} + relparts = collect_inline_images( + test_images, filewriter_fn=fake_filewriter + ) + + written = fake_filewriter.pop() + assert b"PNG" in written[1] + + assert relparts[0].subtype == "png" + assert relparts[0].path == written[0] + assert relparts[0].cid == const1 + assert relparts[0].desc == const2 + except ImportError: pass