All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
3 # NeoMutt helper script to create multipart/* emails with Markdown → HTML
4 # alternative conversion, and handling of inline images, using NeoMutt's
5 # ability to manually craft MIME trees, but automating this process.
8 # neomuttrc (needs to be a single line):
10 # <enter-command> source '$my_confdir/buildmimetree.py setup|'<enter>\
11 # <enter-command> sourc e \$my_mdwn_postprocess_cmd_file<enter>\
12 # " "Convert message into a modern MIME tree with inline images"
14 # (Yes, we need to call source twice, as mutt only starts to process output
15 # from a source command when the command exits, and since we need to react
16 # to the output, we need to be invoked again, using a $my_ variable to pass
25 # - Pygments, if installed, then syntax highlighting is enabled
28 # https://git.madduck.net/etc/neomutt.git/blob_plain/HEAD:/.config/neomutt/buildmimetree.py
30 # Copyright © 2023 martin f. krafft <madduck@madduck.net>
31 # Released under the GPL-2+ licence, just like Mutt itself.
41 from collections import namedtuple, OrderedDict
42 from markdown.extensions import Extension
43 from markdown.inlinepatterns import ImageInlineProcessor, IMAGE_LINK_RE
44 from email.utils import make_msgid
45 from urllib import request
48 def parse_cli_args(*args, **kwargs):
49 parser = argparse.ArgumentParser(
51 "NeoMutt helper to turn text/markdown email parts "
52 "into full-fledged MIME trees"
56 "Copyright © 2022 martin f. krafft <madduck@madduck.net>.\n"
57 "Released under the MIT licence"
60 subp = parser.add_subparsers(help="Sub-command parsers", dest="mode")
61 parser_setup = subp.add_parser("setup", help="Setup phase")
62 parser_massage = subp.add_parser("massage", help="Massaging phase")
64 parser_setup.add_argument(
67 help="Turn on debug logging of commands generated to stderr",
70 parser_setup.add_argument(
78 help="Markdown extension to add to the list of extensions use",
81 parser_setup.add_argument(
84 help="Generate command(s) to send the message after processing",
87 parser_massage.add_argument(
90 help="Turn on debug logging of commands generated to stderr",
93 parser_massage.add_argument(
96 help="Turn on debugging to stderr of the MIME tree walk",
99 parser_massage.add_argument(
101 metavar="EXTENSIONS",
104 help="Markdown extension to use (comma-separated list)",
107 parser_massage.add_argument(
108 "--write-commands-to",
111 help="Temporary file path to write commands to",
114 parser_massage.add_argument(
117 help="If provided, the script is invoked as editor on the mail draft",
120 return parser.parse_args(*args, **kwargs)
123 # [ MARKDOWN WRAPPING ] #######################################################
126 InlineImageInfo = namedtuple(
127 "InlineImageInfo", ["cid", "desc"], defaults=[None]
131 class InlineImageExtension(Extension):
132 class RelatedImageInlineProcessor(ImageInlineProcessor):
133 def __init__(self, re, md, ext):
134 super().__init__(re, md)
137 def handleMatch(self, m, data):
138 el, start, end = super().handleMatch(m, data)
139 if "src" in el.attrib:
140 src = el.attrib["src"]
141 if "://" not in src or src.startswith("file://"):
142 # We only inline local content
143 cid = self._ext.get_cid_for_image(el.attrib)
144 el.attrib["src"] = f"cid:{cid}"
145 return el, start, end
149 self._images = OrderedDict()
151 def extendMarkdown(self, md):
152 md.registerExtension(self)
153 inline_image_proc = self.RelatedImageInlineProcessor(
154 IMAGE_LINK_RE, md, self
156 md.inlinePatterns.register(inline_image_proc, "image_link", 150)
158 def get_cid_for_image(self, attrib):
159 msgid = make_msgid()[1:-1]
161 if path.startswith("/"):
162 path = f"file://{path}"
163 self._images[path] = InlineImageInfo(
164 msgid, attrib.get("title", attrib.get("alt"))
168 def get_images(self):
172 def markdown_with_inline_image_support(text, *, extensions=None):
173 inline_image_handler = InlineImageExtension()
174 extensions = extensions or []
175 extensions.append(inline_image_handler)
176 mdwn = markdown.Markdown(extensions=extensions)
177 htmltext = mdwn.convert(text)
179 images = inline_image_handler.get_images()
181 def replace_image_with_cid(matchobj):
182 for m in (matchobj.group(1), f"file://{matchobj.group(1)}"):
184 return f"(cid:{images[m].cid}"
185 return matchobj.group(0)
187 text = re.sub(r"\(([^)\s]+)", replace_image_with_cid, text)
188 return text, htmltext, images
191 # [ PARTS GENERATION ] ########################################################
197 ["type", "subtype", "path", "desc", "cid", "orig"],
198 defaults=[None, None, False],
202 ret = f"<{self.type}/{self.subtype}>"
204 ret = f"{ret} cid:{self.cid}"
206 ret = f"{ret} ORIGINAL"
211 namedtuple("Multipart", ["subtype", "children", "desc"], defaults=[None])
214 return f"<multipart/{self.subtype}> children={len(self.children)}"
217 def filewriter_fn(path, content, mode="w", **kwargs):
218 with open(path, mode, **kwargs) as out_f:
222 def collect_inline_images(
223 images, *, tempdir=None, filewriter_fn=filewriter_fn
226 for path, info in images.items():
227 data = request.urlopen(path)
229 mimetype = data.headers["Content-Type"]
230 ext = mimetypes.guess_extension(mimetype)
231 tempfilename = tempfile.mkstemp(prefix="img", suffix=ext, dir=tempdir)
232 path = pathlib.Path(tempfilename[1])
234 filewriter_fn(path, data.read(), "w+b")
237 Part(*mimetype.split("/"), path, cid=info.cid, desc=info.desc)
243 def convert_markdown_to_html(
244 origtext, draftpath, *, filewriter_fn=filewriter_fn, extensions=None
246 origtext, htmltext, images = markdown_with_inline_image_support(
247 origtext, extensions=extensions
250 filewriter_fn(draftpath, origtext, encoding="utf-8")
252 "text", "plain", draftpath, "Plain-text version", orig=True
255 htmlpath = draftpath.with_suffix(".html")
257 htmlpath, htmltext, encoding="utf-8", errors="xmlcharrefreplace"
259 htmlpart = Part("text", "html", htmlpath, "HTML version")
262 "alternative", [textpart, htmlpart], "Group of alternative content"
265 imgparts = collect_inline_images(images, filewriter_fn=filewriter_fn)
268 "relative", [altpart] + imgparts, "Group of related content"
274 class MIMETreeDFWalker:
275 def __init__(self, *, visitor_fn=None, debug=False):
276 self._visitor_fn = visitor_fn
279 def walk(self, root, *, visitor_fn=None):
281 Recursive function to implement a depth-dirst walk of the MIME-tree
285 if isinstance(root, list):
286 root = Multipart("mixed", children=root)
291 visitor_fn=visitor_fn or self._visitor_fn,
294 def _walk(self, node, *, stack, visitor_fn):
295 # Let's start by enumerating the parts at the current level. At the
296 # root level, stack will be the empty list, and we expect a multipart/*
297 # container at this level. Later, e.g. within a mutlipart/alternative
298 # container, the subtree will just be the alternative parts, while the
299 # top of the stack will be the multipart/alternative container, which
300 # we will process after the following loop.
302 lead = f"{'| '*len(stack)}|-"
303 if isinstance(node, Multipart):
305 f"{lead}{node} parents={[s.subtype for s in stack]}"
308 # Depth-first, so push the current container onto the stack,
311 self.debugprint("| " * (len(stack) + 1))
312 for child in node.children:
316 visitor_fn=visitor_fn,
318 self.debugprint("| " * len(stack))
319 assert stack.pop() == node
322 self.debugprint(f"{lead}{node}")
325 visitor_fn(node, stack, debugprint=self.debugprint)
327 def debugprint(self, s, **kwargs):
329 print(s, file=sys.stderr, **kwargs)
332 # [ RUN MODES ] ###############################################################
337 Stupid class to interface writing out Mutt commands. This is quite a hack
338 to deal with the fact that Mutt runs "push" commands in reverse order, so
339 all of a sudden, things become very complicated when mixing with "real"
342 Hence we keep two sets of commands, and one set of pushes. Commands are
343 added to the first until a push is added, after which commands are added to
344 the second set of commands.
346 On flush(), the first set is printed, followed by the pushes in reverse,
347 and then the second set is printed. All 3 sets are then cleared.
350 def __init__(self, out_f=sys.stdout, *, debug=False):
351 self._cmd1, self._push, self._cmd2 = [], [], []
363 s = s.replace('"', '"')
366 self._push.insert(0, s)
370 "\n".join(self._cmd1 + self._push + self._cmd2), file=self._out_f
372 self._cmd1, self._push, self._cmd2 = [], [], []
374 def debugprint(self, s, **kwargs):
376 print(s, file=sys.stderr, **kwargs)
380 extensions=None, *, out_f=sys.stdout, temppath=None, debug_commands=False
382 extensions = extensions or []
383 temppath = temppath or pathlib.Path(
384 tempfile.mkstemp(prefix="muttmdwn-")[1]
386 cmds = MuttCommands(out_f, debug=debug_commands)
388 editor = f"{sys.argv[0]} massage --write-commands-to {temppath}"
390 editor = f'{editor} --extensions {",".join(extensions)}'
392 editor = f"{editor} --debug-commands"
394 cmds.cmd('set my_editor="$editor"')
395 cmds.cmd('set my_edit_headers="$edit_headers"')
396 cmds.cmd(f'set editor="{editor}"')
397 cmds.cmd("unset edit_headers")
398 cmds.cmd(f"set my_mdwn_postprocess_cmd_file={temppath}")
399 cmds.push("<first-entry><edit-file>")
409 converter=convert_markdown_to_html,
410 debug_commands=False,
413 # Here's the big picture: we're being invoked as the editor on the email
414 # draft, and whatever commands we write to the file given as cmdpath will
415 # be run by the second source command in the macro definition.
417 # Let's start by cleaning up what the setup did (see above), i.e. we
418 # restore the $editor and $edit_headers variables, and also unset the
419 # variable used to identify the command file we're currently writing
421 cmds = MuttCommands(cmd_f, debug=debug_commands)
422 cmds.cmd('set editor="$my_editor"')
423 cmds.cmd('set edit_headers="$my_edit_headers"')
424 cmds.cmd("unset my_editor")
425 cmds.cmd("unset my_edit_headers")
427 # let's flush those commands, as there'll be a lot of pushes from now
428 # on, which need to be run in reverse order
431 extensions = extensions.split(",") if extensions else []
432 tree = converter(draft_f.read(), draftpath, extensions=extensions)
434 mimetree = MIMETreeDFWalker(debug=debug_walk)
436 def visitor_fn(item, stack, *, debugprint=None):
438 Visitor function called for every node (part) of the MIME tree,
439 depth-first, and responsible for telling NeoMutt how to assemble
442 if isinstance(item, Part):
443 # We've hit a leaf-node, i.e. an alternative or a related part
444 # with actual content.
448 # The original source already exists in the NeoMutt tree, but
449 # the underlying file may have been modified, so we need to
450 # update the encoding, but that's it:
451 cmds.push("<update-encoding>")
453 # … whereas all other parts need to be added, and they're all
454 # considered to be temporary and inline:
455 cmds.push(f"<attach-file>{item.path}<enter>")
456 cmds.push("<toggle-unlink><toggle-disposition>")
458 # If the item (including the original) comes with additional
459 # information, then we might just as well update the NeoMutt
462 cmds.push(f"<edit-content-id>\\Ca\\Ck{item.cid}<enter>")
464 elif isinstance(item, Multipart):
465 # This node has children, but we already visited them (see
466 # above), and so they have been tagged in NeoMutt's compose
467 # window. Now it's just a matter of telling NeoMutt to do the
468 # appropriate grouping:
469 if item.subtype == "alternative":
470 cmds.push("<group-alternatives>")
471 elif item.subtype == "relative":
472 cmds.push("<group-related>")
473 elif item.subtype == "multilingual":
474 cmds.push("<group-multilingual>")
477 # We should never get here
478 assert not "is valid part"
480 # If the item has a description, we might just as well add it
482 cmds.push(f"<edit-description>\\Ca\\Ck{item.desc}<enter>")
484 # Finally, if we're at non-root level, tag the new container,
485 # as it might itself be part of a container, to be processed
488 cmds.push("<tag-entry>")
493 # Let's walk the tree and visit every node with our fancy visitor
495 mimetree.walk(tree, visitor_fn=visitor_fn)
497 # Finally, cleanup. Since we're responsible for removing the temporary
498 # file, how's this for a little hack?
500 filename = cmd_f.name
501 except AttributeError:
502 filename = "pytest_internal_file"
503 cmds.cmd(f"source 'rm -f {filename}|'")
504 cmds.cmd("unset my_mdwn_postprocess_cmd_file")
508 # [ CLI ENTRY ] ###############################################################
510 if __name__ == "__main__":
511 args = parse_cli_args()
513 if args.mode == "setup":
514 if args.send_message:
515 raise NotImplementedError()
517 do_setup(args.extensions, debug_commands=args.debug_commands)
519 elif args.mode == "massage":
520 with open(args.MAILDRAFT, "r") as draft_f, open(
525 pathlib.Path(args.MAILDRAFT),
527 extensions=args.extensions,
528 debug_commands=args.debug_commands,
529 debug_walk=args.debug_walk,
533 # [ TESTS ] ###################################################################
537 from io import StringIO
542 return "CONSTANT STRING 1"
546 return "CONSTANT STRING 2"
548 # NOTE: tests using the capsys fixture must specify sys.stdout to the
549 # functions they call, else old stdout is used and not captured
551 def test_MuttCommands_cmd(self, const1, const2, capsys):
552 "Assert order of commands"
553 cmds = MuttCommands(out_f=sys.stdout)
557 captured = capsys.readouterr()
558 assert captured.out == "\n".join((const1, const2, ""))
560 def test_MuttCommands_push(self, const1, const2, capsys):
561 "Assert reverse order of pushes"
562 cmds = MuttCommands(out_f=sys.stdout)
566 captured = capsys.readouterr()
569 == ('"\npush "'.join(("", const2, const1, "")))[2:-6]
572 def test_MuttCommands_cmd_push_mixed(self, const1, const2, capsys):
573 "Assert reverse order of pushes"
574 cmds = MuttCommands(out_f=sys.stdout)
575 lines = ["000", "001", "010", "011", "100", "101", "110", "111"]
577 cmds.cmd(lines[4 * i + 0])
578 cmds.cmd(lines[4 * i + 1])
579 cmds.push(lines[4 * i + 2])
580 cmds.push(lines[4 * i + 3])
583 captured = capsys.readouterr()
584 lines_out = captured.out.splitlines()
585 assert lines[0] in lines_out[0]
586 assert lines[1] in lines_out[1]
587 assert lines[7] in lines_out[2]
588 assert lines[6] in lines_out[3]
589 assert lines[3] in lines_out[4]
590 assert lines[2] in lines_out[5]
591 assert lines[4] in lines_out[6]
592 assert lines[5] in lines_out[7]
595 def basic_mime_tree(self):
609 Part("text", "html", "part.html", desc="HTML"),
614 "text", "png", "logo.png", cid="logo.png", desc="Logo"
620 def test_MIMETreeDFWalker_depth_first_walk(self, basic_mime_tree):
621 mimetree = MIMETreeDFWalker()
625 def visitor_fn(item, stack, debugprint):
626 items.append((item, len(stack)))
628 mimetree.walk(basic_mime_tree, visitor_fn=visitor_fn)
629 assert len(items) == 5
630 assert items[0][0].subtype == "plain"
631 assert items[0][1] == 2
632 assert items[1][0].subtype == "html"
633 assert items[1][1] == 2
634 assert items[2][0].subtype == "alternative"
635 assert items[2][1] == 1
636 assert items[3][0].subtype == "png"
637 assert items[3][1] == 1
638 assert items[4][0].subtype == "relative"
639 assert items[4][1] == 0
641 def test_MIMETreeDFWalker_list_to_mixed(self, basic_mime_tree):
642 mimetree = MIMETreeDFWalker()
645 def visitor_fn(item, stack, debugprint):
648 mimetree.walk([basic_mime_tree], visitor_fn=visitor_fn)
649 assert items[-1].subtype == "mixed"
651 def test_MIMETreeDFWalker_visitor_in_constructor(
652 self, basic_mime_tree
656 def visitor_fn(item, stack, debugprint):
659 mimetree = MIMETreeDFWalker(visitor_fn=visitor_fn)
660 mimetree.walk(basic_mime_tree)
661 assert len(items) == 5
663 def test_do_setup_no_extensions(self, const1, capsys):
664 "Assert basics about the setup command output"
665 do_setup(temppath=const1, out_f=sys.stdout)
666 captout = capsys.readouterr()
667 lines = captout.out.splitlines()
668 assert lines[2].endswith(f'{const1}"')
669 assert lines[4].endswith(const1)
670 assert "first-entry" in lines[-1]
671 assert "edit-file" in lines[-1]
673 def test_do_setup_extensions(self, const1, const2, capsys):
674 "Assert that extensions are passed to editor"
676 temppath=const1, extensions=[const2, const1], out_f=sys.stdout
678 captout = capsys.readouterr()
679 lines = captout.out.splitlines()
680 # assert comma-separated list of extensions passed
681 assert lines[2].endswith(f'{const2},{const1}"')
682 assert lines[4].endswith(const1)
685 def string_io(self, const1, text=None):
686 return StringIO(text or const1)
688 def test_do_massage_basic(self, const1, string_io, capsys):
689 def converter(drafttext, draftpath, extensions):
690 return Part("text", "plain", draftpath, orig=True)
699 captured = capsys.readouterr()
700 lines = captured.out.splitlines()
701 assert '="$my_editor"' in lines.pop(0)
702 assert '="$my_edit_headers"' in lines.pop(0)
703 assert "unset my_editor" == lines.pop(0)
704 assert "unset my_edit_headers" == lines.pop(0)
705 assert "update-encoding" in lines.pop(0)
706 assert "source 'rm -f " in lines.pop(0)
707 assert "unset my_mdwn_postprocess_cmd_file" == lines.pop(0)
709 def test_do_massage_fulltree(
710 self, string_io, const1, basic_mime_tree, capsys
712 def converter(drafttext, draftpath, extensions):
713 return basic_mime_tree
722 captured = capsys.readouterr()
723 lines = captured.out.splitlines()[4:]
724 assert "Related" in lines.pop(0)
725 assert "group-related" in lines.pop(0)
726 assert "tag-entry" in lines.pop(0)
727 assert "Logo" in lines.pop(0)
728 assert "content-id" in lines.pop(0)
729 assert "toggle-unlink" in lines.pop(0)
730 assert "logo.png" in lines.pop(0)
731 assert "tag-entry" in lines.pop(0)
732 assert "Alternative" in lines.pop(0)
733 assert "group-alternatives" in lines.pop(0)
734 assert "tag-entry" in lines.pop(0)
735 assert "HTML" in lines.pop(0)
736 assert "toggle-unlink" in lines.pop(0)
737 assert "part.html" in lines.pop(0)
738 assert "tag-entry" in lines.pop(0)
739 assert "Plain" in lines.pop(0)
740 assert "update-encoding" in lines.pop(0)
741 assert len(lines) == 2
744 def fake_filewriter(self):
749 def __call__(self, path, content, mode="w", **kwargs):
750 self._writes.append((path, content))
752 def pop(self, index=-1):
753 return self._writes.pop(index)
758 def markdown_non_converter(self, const1, const2):
759 return lambda s, text: f"{const1}{text}{const2}"
761 def test_converter_tree_basic(
762 self, const1, const2, fake_filewriter, markdown_non_converter
764 path = pathlib.Path(const2)
765 tree = convert_markdown_to_html(
766 const1, path, filewriter_fn=fake_filewriter
769 assert tree.subtype == "alternative"
770 assert len(tree.children) == 2
771 assert tree.children[0].subtype == "plain"
772 assert tree.children[0].path == path
773 assert tree.children[0].orig
774 assert tree.children[1].subtype == "html"
775 assert tree.children[1].path == path.with_suffix(".html")
777 def test_converter_writes(
783 markdown_non_converter,
785 path = pathlib.Path(const2)
787 with monkeypatch.context() as m:
788 m.setattr(markdown.Markdown, "convert", markdown_non_converter)
789 convert_markdown_to_html(
790 const1, path, filewriter_fn=fake_filewriter
793 assert (path, const1) == fake_filewriter.pop(0)
795 path.with_suffix(".html"),
796 markdown_non_converter(None, const1),
797 ) == fake_filewriter.pop(0)
799 def test_markdown_inline_image_processor(self):
800 imgpath1 = "file:/path/to/image.png"
801 imgpath2 = "file:///path/to/image.png?url=params"
802 imgpath3 = "/path/to/image.png"
803 text = f"""![inline local image]({imgpath1})
805 with newline]({imgpath2})
806 ![image local path]({imgpath3})"""
807 text, html, images = markdown_with_inline_image_support(text)
809 # local paths have been normalised to URLs:
810 imgpath3 = f"file://{imgpath3}"
812 assert 'src="cid:' in html
813 assert "](cid:" in text
814 assert len(images) == 3
815 assert imgpath1 in images
816 assert imgpath2 in images
817 assert imgpath3 in images
818 assert images[imgpath1].cid != images[imgpath2].cid
819 assert images[imgpath1].cid != images[imgpath3].cid
820 assert images[imgpath2].cid != images[imgpath3].cid
822 def test_markdown_inline_image_processor_title_to_desc(self, const1):
823 imgpath = "file:///path/to/image.png"
824 text = f'![inline local image]({imgpath} "{const1}")'
825 text, html, images = markdown_with_inline_image_support(text)
826 assert images[imgpath].desc == const1
828 def test_markdown_inline_image_processor_alt_to_desc(self, const1):
829 imgpath = "file:///path/to/image.png"
830 text = f"![{const1}]({imgpath})"
831 text, html, images = markdown_with_inline_image_support(text)
832 assert images[imgpath].desc == const1
834 def test_markdown_inline_image_processor_title_over_alt_desc(
837 imgpath = "file:///path/to/image.png"
838 text = f'![{const1}]({imgpath} "{const2}")'
839 text, html, images = markdown_with_inline_image_support(text)
840 assert images[imgpath].desc == const2
842 def test_markdown_inline_image_not_external(self):
843 imgpath = "https://path/to/image.png"
844 text = f"![inline image]({imgpath})"
845 text, html, images = markdown_with_inline_image_support(text)
847 assert 'src="cid:' not in html
848 assert "](cid:" not in text
849 assert len(images) == 0
851 def test_markdown_inline_image_local_file(self):
852 imgpath = "/path/to/image.png"
853 text = f"![inline image]({imgpath})"
854 text, html, images = markdown_with_inline_image_support(text)
856 for k, v in images.items():
857 assert k == f"file://{imgpath}"
863 "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAE"
864 "AAAABCAAAAAA6fptVAAAACklEQVQI12P4DwABAQEAG7buVgAA"
868 def test_markdown_inline_image_processor_base64(self, test_png):
869 text = f"![1px white inlined]({test_png})"
870 text, html, images = markdown_with_inline_image_support(text)
872 assert 'src="cid:' in html
873 assert "](cid:" in text
874 assert len(images) == 1
875 assert test_png in images
877 def test_converter_tree_inline_image_base64(
878 self, test_png, const1, fake_filewriter
880 text = f"![inline base64 image]({test_png})"
881 path = pathlib.Path(const1)
882 tree = convert_markdown_to_html(
883 text, path, filewriter_fn=fake_filewriter
886 assert tree.subtype == "relative"
887 assert tree.children[1].subtype == "png"
888 written = fake_filewriter.pop()
889 assert tree.children[1].path == written[0]
890 assert written[1] == request.urlopen(test_png).read()
892 def test_inline_image_collection(self, test_png, const1, const2, fake_filewriter):
894 test_png: InlineImageInfo(
895 cid=const1, desc=const2
898 relparts = collect_inline_images(
899 test_images, filewriter_fn=fake_filewriter
902 written = fake_filewriter.pop()
903 assert b'PNG' in written[1]
905 assert relparts[0].subtype == "png"
906 assert relparts[0].path == written[0]
907 assert relparts[0].cid == const1
908 assert relparts[0].desc == const2