# Configuration:
# neomuttrc (needs to be a single line):
# macro compose B "\
-# <enter-command> source '$my_confdir/buildmimetree.py setup|'<enter>\
+# <enter-command> source '$my_confdir/buildmimetree.py setup --tempdir $tempdir|'<enter>\
# <enter-command> sourc e \$my_mdwn_postprocess_cmd_file<enter>\
# " "Convert message into a modern MIME tree with inline images"
import markdown
import tempfile
import argparse
-from collections import namedtuple
+import re
+import mimetypes
+from collections import namedtuple, OrderedDict
+from markdown.extensions import Extension
+from markdown.inlinepatterns import ImageInlineProcessor, IMAGE_LINK_RE
+from email.utils import make_msgid
+from urllib import request
def parse_cli_args(*args, **kwargs):
parser_setup = subp.add_parser("setup", help="Setup phase")
parser_massage = subp.add_parser("massage", help="Massaging phase")
- parser_setup.add_argument(
- "--debug-commands",
- action="store_true",
- help="Turn on debug logging of commands generated to stderr",
- )
help="Markdown extension to add to the list of extensions use",
- parser_massage.add_argument(
+ parser_setup.add_argument(
+ "--only-build",
+ action="store_true",
+ help="Only build, don't send the message",
+ )
+ parser_setup.add_argument(
+ "--tempdir",
+ default=None,
+ help="Specify temporary directory to use for attachments",
+ )
+ parser_setup.add_argument(
help="Turn on debug logging of commands generated to stderr",
- "--debug-walk",
- action="store_true",
- help="Turn on debugging to stderr of the MIME tree walk",
+ "--write-commands-to",
+ metavar="PATH",
+ dest="cmdpath",
+ help="Temporary file path to write commands to",
- "--write-commands-to",
- metavar="PATH",
- dest="cmdpath",
- help="Temporary file path to write commands to",
+ "--only-build",
+ action="store_true",
+ help="Only build, don't send the message",
+ )
+ parser_massage.add_argument(
+ "--tempdir",
+ default=None,
+ help="Specify temporary directory to use for attachments",
+ )
+ parser_massage.add_argument(
+ "--debug-commands",
+ action="store_true",
+ help="Turn on debug logging of commands generated to stderr",
+ )
+ parser_massage.add_argument(
+ "--debug-walk",
+ action="store_true",
+ help="Turn on debugging to stderr of the MIME tree walk",
return parser.parse_args(*args, **kwargs)
+# [ MARKDOWN WRAPPING ] #######################################################
+InlineImageInfo = namedtuple(
+ "InlineImageInfo", ["cid", "desc"], defaults=[None]
+class InlineImageExtension(Extension):
+ class RelatedImageInlineProcessor(ImageInlineProcessor):
+ def __init__(self, re, md, ext):
+ super().__init__(re, md)
+ self._ext = ext
+ def handleMatch(self, m, data):
+ el, start, end = super().handleMatch(m, data)
+ if "src" in el.attrib:
+ src = el.attrib["src"]
+ if "://" not in src or src.startswith("file://"):
+ # We only inline local content
+ cid = self._ext.get_cid_for_image(el.attrib)
+ el.attrib["src"] = f"cid:{cid}"
+ return el, start, end
+ def __init__(self):
+ super().__init__()
+ self._images = OrderedDict()
+ def extendMarkdown(self, md):
+ md.registerExtension(self)
+ inline_image_proc = self.RelatedImageInlineProcessor(
+ IMAGE_LINK_RE, md, self
+ )
+ md.inlinePatterns.register(inline_image_proc, "image_link", 150)
+ def get_cid_for_image(self, attrib):
+ msgid = make_msgid()[1:-1]
+ path = attrib["src"]
+ if path.startswith("/"):
+ path = f"file://{path}"
+ self._images[path] = InlineImageInfo(
+ msgid, attrib.get("title", attrib.get("alt"))
+ )
+ return msgid
+ def get_images(self):
+ return self._images
+def markdown_with_inline_image_support(text, *, extensions=None):
+ inline_image_handler = InlineImageExtension()
+ extensions = extensions or []
+ extensions.append(inline_image_handler)
+ mdwn = markdown.Markdown(extensions=extensions)
+ htmltext = mdwn.convert(text)
+ images = inline_image_handler.get_images()
+ def replace_image_with_cid(matchobj):
+ for m in (matchobj.group(1), f"file://{matchobj.group(1)}"):
+ if m in images:
+ return f"(cid:{images[m].cid}"
+ return matchobj.group(0)
+ text = re.sub(r"\(([^)\s]+)", replace_image_with_cid, text)
+ return text, htmltext, images
# [ PARTS GENERATION ] ########################################################
return f"<multipart/{self.subtype}> children={len(self.children)}"
-def convert_markdown_to_html(maildraft, *, extensions=None):
- draftpath = pathlib.Path(maildraft)
+def filewriter_fn(path, content, mode="w", **kwargs):
+ with open(path, mode, **kwargs) as out_f:
+ out_f.write(content)
+def collect_inline_images(
+ images, *, tempdir=None, filewriter_fn=filewriter_fn
+ relparts = []
+ for path, info in images.items():
+ data = request.urlopen(path)
+ mimetype = data.headers["Content-Type"]
+ ext = mimetypes.guess_extension(mimetype)
+ tempfilename = tempfile.mkstemp(prefix="img", suffix=ext, dir=tempdir)
+ path = pathlib.Path(tempfilename[1])
+ filewriter_fn(path, data.read(), "w+b")
+ relparts.append(
+ Part(*mimetype.split("/"), path, cid=info.cid, desc=f"Image: {info.desc}")
+ )
+ return relparts
+def convert_markdown_to_html(
+ origtext,
+ draftpath,
+ *,
+ filewriter_fn=filewriter_fn,
+ tempdir=None,
+ extensions=None,
+ origtext, htmltext, images = markdown_with_inline_image_support(
+ origtext, extensions=extensions
+ )
+ filewriter_fn(draftpath, origtext, encoding="utf-8")
textpart = Part(
"text", "plain", draftpath, "Plain-text version", orig=True
- with open(draftpath, "r", encoding="utf-8") as textmarkdown:
- text = textmarkdown.read()
- mdwn = markdown.Markdown(extensions=extensions)
- html = mdwn.convert(text)
htmlpath = draftpath.with_suffix(".html")
+ filewriter_fn(
+ htmlpath, htmltext, encoding="utf-8", errors="xmlcharrefreplace"
+ )
htmlpart = Part("text", "html", htmlpath, "HTML version")
- with open(
- htmlpath, "w", encoding="utf-8", errors="xmlcharrefreplace"
- ) as texthtml:
- texthtml.write(html)
- logopart = Part(
- "image",
- "png",
- "/usr/share/doc/neomutt/logo/neomutt-256.png",
- "Logo",
- "neomutt-256.png",
+ altpart = Multipart(
+ "alternative", [textpart, htmlpart], "Group of alternative content"
- return Multipart(
- "relative",
- [
- Multipart(
- "alternative",
- [textpart, htmlpart],
- "Group of alternative content",
- ),
- logopart,
- ],
- "Group of related content",
+ imgparts = collect_inline_images(
+ images, tempdir=tempdir, filewriter_fn=filewriter_fn
+ if imgparts:
+ return Multipart(
+ "relative", [altpart] + imgparts, "Group of related content"
+ )
+ else:
+ return altpart
class MIMETreeDFWalker:
def do_setup(
- extensions=None, *, out_f=sys.stdout, temppath=None, debug_commands=False
+ extensions=None,
+ *,
+ out_f=sys.stdout,
+ only_build=False,
+ temppath=None,
+ tempdir=None,
+ debug_commands=False,
extensions = extensions or []
temppath = temppath or pathlib.Path(
- tempfile.mkstemp(prefix="muttmdwn-")[1]
+ tempfile.mkstemp(prefix="muttmdwn-", dir=tempdir)[1]
cmds = MuttCommands(out_f, debug=debug_commands)
editor = f"{sys.argv[0]} massage --write-commands-to {temppath}"
if extensions:
editor = f'{editor} --extensions {",".join(extensions)}'
+ if only_build:
+ editor = f'{editor} --only-build'
+ if tempdir:
+ editor = f"{editor} --tempdir {tempdir}"
+ if debug_commands:
+ editor = f"{editor} --debug-commands"
cmds.cmd('set my_editor="$editor"')
cmds.cmd('set my_edit_headers="$edit_headers"')
def do_massage(
- maildraft,
- cmdpath,
+ draft_f,
+ draftpath,
+ cmd_f,
+ only_build=False,
+ tempdir=None,
# draft, and whatever commands we write to the file given as cmdpath will
# be run by the second source command in the macro definition.
- with open(cmdpath, "w") as cmd_f:
- # Let's start by cleaning up what the setup did (see above), i.e. we
- # restore the $editor and $edit_headers variables, and also unset the
- # variable used to identify the command file we're currently writing
- # to.
- cmds = MuttCommands(cmd_f, debug=debug_commands)
- cmds.cmd('set editor="$my_editor"')
- cmds.cmd('set edit_headers="$my_edit_headers"')
- cmds.cmd("unset my_editor")
- cmds.cmd("unset my_edit_headers")
- # let's flush those commands, as there'll be a lot of pushes from now
- # on, which need to be run in reverse order
- cmds.flush()
- extensions = extensions.split(",") if extensions else []
- tree = converter(maildraft, extensions=extensions)
- mimetree = MIMETreeDFWalker(debug=args.debug_walk)
- def visitor_fn(item, stack, *, debugprint=None):
- """
- Visitor function called for every node (part) of the MIME tree,
- depth-first, and responsible for telling NeoMutt how to assemble
- the tree.
- """
- if isinstance(item, Part):
- # We've hit a leaf-node, i.e. an alternative or a related part
- # with actual content.
- # If the part is not an original part, i.e. doesn't already
- # exist, we must first add it.
- if not item.orig:
- cmds.push(f"<attach-file>{item.path}<enter>")
- cmds.push("<toggle-unlink><toggle-disposition>")
- if item.cid:
- cmds.push(
- f"<edit-content-id>\\Ca\\Ck{item.cid}<enter>"
- )
- # If the item (including the original) comes with a
- # description, then we might just as well update the NeoMutt
- # tree now:
- if item.desc:
- cmds.push(f"<edit-description>\\Ca\\Ck{item.desc}<enter>")
- # Finally, tag the entry that we just processed, so that when
- # we're done at this level, as we walk up the stack, the items
- # to be grouped will already be tagged and ready.
- cmds.push("<tag-entry>")
- elif isinstance(item, Multipart):
- # This node has children, but we already visited them (see
- # above), and so they have been tagged in NeoMutt's compose
- # window. Now it's just a matter of telling NeoMutt to do the
- # appropriate grouping:
- if item.subtype == "alternative":
- cmds.push("<group-alternatives>")
- elif item.subtype == "relative":
- cmds.push("<group-related>")
- elif item.subtype == "multilingual":
- cmds.push("<group-multilingual>")
- # Again, if there is a description, we might just as well:
- if item.desc:
- cmds.push(f"<edit-description>\\Ca\\Ck{item.desc}<enter>")
- # Finally, if we're at non-root level, tag the new container,
- # as it might itself be part of a container, to be processed
- # one level up:
- if stack:
- cmds.push("<tag-entry>")
+ # Let's start by cleaning up what the setup did (see above), i.e. we
+ # restore the $editor and $edit_headers variables, and also unset the
+ # variable used to identify the command file we're currently writing
+ # to.
+ cmds = MuttCommands(cmd_f, debug=debug_commands)
+ cmds.cmd('set editor="$my_editor"')
+ cmds.cmd('set edit_headers="$my_edit_headers"')
+ cmds.cmd("unset my_editor")
+ cmds.cmd("unset my_edit_headers")
+ # let's flush those commands, as there'll be a lot of pushes from now
+ # on, which need to be run in reverse order
+ cmds.flush()
- else:
- # We should never get here
- assert not "is valid part"
+ extensions = extensions.split(",") if extensions else []
+ tree = converter(draft_f.read(), draftpath, tempdir=tempdir, extensions=extensions)
- # -----------------
- # End of visitor_fn
+ mimetree = MIMETreeDFWalker(debug=debug_walk)
- # Let's walk the tree and visit every node with our fancy visitor
- # function
- mimetree.walk(tree, visitor_fn=visitor_fn)
+ def visitor_fn(item, stack, *, debugprint=None):
+ """
+ Visitor function called for every node (part) of the MIME tree,
+ depth-first, and responsible for telling NeoMutt how to assemble
+ the tree.
+ """
+ if isinstance(item, Part):
+ # We've hit a leaf-node, i.e. an alternative or a related part
+ # with actual content.
+ # Let's add the part
+ if item.orig:
+ # The original source already exists in the NeoMutt tree, but
+ # the underlying file may have been modified, so we need to
+ # update the encoding, but that's it:
+ cmds.push("<update-encoding>")
+ else:
+ # … whereas all other parts need to be added, and they're all
+ # considered to be temporary and inline:
+ cmds.push(f"<attach-file>{item.path}<enter>")
+ cmds.push("<toggle-unlink><toggle-disposition>")
+ # If the item (including the original) comes with additional
+ # information, then we might just as well update the NeoMutt
+ # tree now:
+ if item.cid:
+ cmds.push(f"<edit-content-id>\\Ca\\Ck{item.cid}<enter>")
+ elif isinstance(item, Multipart):
+ # This node has children, but we already visited them (see
+ # above), and so they have been tagged in NeoMutt's compose
+ # window. Now it's just a matter of telling NeoMutt to do the
+ # appropriate grouping:
+ if item.subtype == "alternative":
+ cmds.push("<group-alternatives>")
+ elif item.subtype in ("relative", "related"):
+ cmds.push("<group-related>")
+ elif item.subtype == "multilingual":
+ cmds.push("<group-multilingual>")
- # Finally, cleanup. Since we're responsible for removing the temporary
- # file, how's this for a little hack?
- cmds.cmd(f"source 'rm -f {args.cmdpath}|'")
- cmds.cmd("unset my_mdwn_postprocess_cmd_file")
- cmds.flush()
+ else:
+ # We should never get here
+ assert not "is valid part"
+ # If the item has a description, we might just as well add it
+ if item.desc:
+ cmds.push(f"<edit-description>\\Ca\\Ck{item.desc}<enter>")
+ # Finally, if we're at non-root level, tag the new container,
+ # as it might itself be part of a container, to be processed
+ # one level up:
+ if stack:
+ cmds.push("<tag-entry>")
+ # -----------------
+ # End of visitor_fn
+ # Let's walk the tree and visit every node with our fancy visitor
+ # function
+ mimetree.walk(tree, visitor_fn=visitor_fn)
+ if not only_build:
+ cmds.push("<send-message>")
+ # Finally, cleanup. Since we're responsible for removing the temporary
+ # file, how's this for a little hack?
+ try:
+ filename = cmd_f.name
+ except AttributeError:
+ filename = "pytest_internal_file"
+ cmds.cmd(f"source 'rm -f {filename}|'")
+ cmds.cmd("unset my_mdwn_postprocess_cmd_file")
+ cmds.flush()
# [ CLI ENTRY ] ###############################################################
args = parse_cli_args()
if args.mode == "setup":
- do_setup(args.extensions, debug_commands=args.debug_commands)
- elif args.mode == "massage":
- do_massage(
- args.cmdpath,
- extensions=args.extensions,
+ do_setup(
+ args.extensions,
+ only_build=args.only_build,
+ tempdir=args.tempdir,
- debug_walk=args.debug_walk,
+ elif args.mode == "massage":
+ with open(args.MAILDRAFT, "r") as draft_f, open(
+ args.cmdpath, "w"
+ ) as cmd_f:
+ do_massage(
+ draft_f,
+ pathlib.Path(args.MAILDRAFT),
+ cmd_f,
+ extensions=args.extensions,
+ only_build=args.only_build,
+ tempdir=args.tempdir,
+ debug_commands=args.debug_commands,
+ debug_walk=args.debug_walk,
+ )
# [ TESTS ] ###################################################################
import pytest
+ from io import StringIO
class Tests:
def basic_mime_tree(self):
return Multipart(
- "related",
+ "relative",
- Part("text", "plain", "part.txt", desc="Plain"),
+ Part(
+ "text",
+ "plain",
+ "part.txt",
+ desc="Plain",
+ orig=True,
+ ),
Part("text", "html", "part.html", desc="HTML"),
assert items[2][1] == 1
assert items[3][0].subtype == "png"
assert items[3][1] == 1
- assert items[4][0].subtype == "related"
+ assert items[4][0].subtype == "relative"
assert items[4][1] == 0
def test_MIMETreeDFWalker_list_to_mixed(self, basic_mime_tree):
mimetree = MIMETreeDFWalker()
items = []
def visitor_fn(item, stack, debugprint):
assert lines[2].endswith(f'{const2},{const1}"')
assert lines[4].endswith(const1)
+ @pytest.fixture
+ def string_io(self, const1, text=None):
+ return StringIO(text or const1)
+ def test_do_massage_basic(self, const1, string_io, capsys):
+ def converter(drafttext, draftpath, extensions, tempdir):
+ return Part("text", "plain", draftpath, orig=True)
+ do_massage(
+ draft_f=string_io,
+ draftpath=const1,
+ cmd_f=sys.stdout,
+ converter=converter,
+ )
+ captured = capsys.readouterr()
+ lines = captured.out.splitlines()
+ assert '="$my_editor"' in lines.pop(0)
+ assert '="$my_edit_headers"' in lines.pop(0)
+ assert "unset my_editor" == lines.pop(0)
+ assert "unset my_edit_headers" == lines.pop(0)
+ assert "send-message" in lines.pop(0)
+ assert "update-encoding" in lines.pop(0)
+ assert "source 'rm -f " in lines.pop(0)
+ assert "unset my_mdwn_postprocess_cmd_file" == lines.pop(0)
+ def test_do_massage_fulltree(
+ self, string_io, const1, basic_mime_tree, capsys
+ ):
+ def converter(drafttext, draftpath, extensions, tempdir):
+ return basic_mime_tree
+ do_massage(
+ draft_f=string_io,
+ draftpath=const1,
+ cmd_f=sys.stdout,
+ converter=converter,
+ )
+ captured = capsys.readouterr()
+ lines = captured.out.splitlines()[4:]
+ assert "send-message" in lines.pop(0)
+ assert "Related" in lines.pop(0)
+ assert "group-related" in lines.pop(0)
+ assert "tag-entry" in lines.pop(0)
+ assert "Logo" in lines.pop(0)
+ assert "content-id" in lines.pop(0)
+ assert "toggle-unlink" in lines.pop(0)
+ assert "logo.png" in lines.pop(0)
+ assert "tag-entry" in lines.pop(0)
+ assert "Alternative" in lines.pop(0)
+ assert "group-alternatives" in lines.pop(0)
+ assert "tag-entry" in lines.pop(0)
+ assert "HTML" in lines.pop(0)
+ assert "toggle-unlink" in lines.pop(0)
+ assert "part.html" in lines.pop(0)
+ assert "tag-entry" in lines.pop(0)
+ assert "Plain" in lines.pop(0)
+ assert "update-encoding" in lines.pop(0)
+ assert len(lines) == 2
+ @pytest.fixture
+ def fake_filewriter(self):
+ class FileWriter:
+ def __init__(self):
+ self._writes = []
+ def __call__(self, path, content, mode="w", **kwargs):
+ self._writes.append((path, content))
+ def pop(self, index=-1):
+ return self._writes.pop(index)
+ return FileWriter()
+ @pytest.fixture
+ def markdown_non_converter(self, const1, const2):
+ return lambda s, text: f"{const1}{text}{const2}"
+ def test_converter_tree_basic(
+ self, const1, const2, fake_filewriter, markdown_non_converter
+ ):
+ path = pathlib.Path(const2)
+ tree = convert_markdown_to_html(
+ const1, path, filewriter_fn=fake_filewriter
+ )
+ assert tree.subtype == "alternative"
+ assert len(tree.children) == 2
+ assert tree.children[0].subtype == "plain"
+ assert tree.children[0].path == path
+ assert tree.children[0].orig
+ assert tree.children[1].subtype == "html"
+ assert tree.children[1].path == path.with_suffix(".html")
+ def test_converter_writes(
+ self,
+ const1,
+ const2,
+ fake_filewriter,
+ monkeypatch,
+ markdown_non_converter,
+ ):
+ path = pathlib.Path(const2)
+ with monkeypatch.context() as m:
+ m.setattr(markdown.Markdown, "convert", markdown_non_converter)
+ convert_markdown_to_html(
+ const1, path, filewriter_fn=fake_filewriter
+ )
+ assert (path, const1) == fake_filewriter.pop(0)
+ assert (
+ path.with_suffix(".html"),
+ markdown_non_converter(None, const1),
+ ) == fake_filewriter.pop(0)
+ def test_markdown_inline_image_processor(self):
+ imgpath1 = "file:/path/to/image.png"
+ imgpath2 = "file:///path/to/image.png?url=params"
+ imgpath3 = "/path/to/image.png"
+ text = f"""![inline local image]({imgpath1})
+ ![image inlined
+ with newline]({imgpath2})
+ ![image local path]({imgpath3})"""
+ text, html, images = markdown_with_inline_image_support(text)
+ # local paths have been normalised to URLs:
+ imgpath3 = f"file://{imgpath3}"
+ assert 'src="cid:' in html
+ assert "](cid:" in text
+ assert len(images) == 3
+ assert imgpath1 in images
+ assert imgpath2 in images
+ assert imgpath3 in images
+ assert images[imgpath1].cid != images[imgpath2].cid
+ assert images[imgpath1].cid != images[imgpath3].cid
+ assert images[imgpath2].cid != images[imgpath3].cid
+ def test_markdown_inline_image_processor_title_to_desc(self, const1):
+ imgpath = "file:///path/to/image.png"
+ text = f'![inline local image]({imgpath} "{const1}")'
+ text, html, images = markdown_with_inline_image_support(text)
+ assert images[imgpath].desc == const1
+ def test_markdown_inline_image_processor_alt_to_desc(self, const1):
+ imgpath = "file:///path/to/image.png"
+ text = f"![{const1}]({imgpath})"
+ text, html, images = markdown_with_inline_image_support(text)
+ assert images[imgpath].desc == const1
+ def test_markdown_inline_image_processor_title_over_alt_desc(
+ self, const1, const2
+ ):
+ imgpath = "file:///path/to/image.png"
+ text = f'![{const1}]({imgpath} "{const2}")'
+ text, html, images = markdown_with_inline_image_support(text)
+ assert images[imgpath].desc == const2
+ def test_markdown_inline_image_not_external(self):
+ imgpath = "https://path/to/image.png"
+ text = f"![inline image]({imgpath})"
+ text, html, images = markdown_with_inline_image_support(text)
+ assert 'src="cid:' not in html
+ assert "](cid:" not in text
+ assert len(images) == 0
+ def test_markdown_inline_image_local_file(self):
+ imgpath = "/path/to/image.png"
+ text = f"![inline image]({imgpath})"
+ text, html, images = markdown_with_inline_image_support(text)
+ for k, v in images.items():
+ assert k == f"file://{imgpath}"
+ break
+ @pytest.fixture
+ def test_png(self):
+ return (
+ ""
+ )
+ def test_markdown_inline_image_processor_base64(self, test_png):
+ text = f"![1px white inlined]({test_png})"
+ text, html, images = markdown_with_inline_image_support(text)
+ assert 'src="cid:' in html
+ assert "](cid:" in text
+ assert len(images) == 1
+ assert test_png in images
+ def test_converter_tree_inline_image_base64(
+ self, test_png, const1, fake_filewriter
+ ):
+ text = f"![inline base64 image]({test_png})"
+ path = pathlib.Path(const1)
+ tree = convert_markdown_to_html(
+ text, path, filewriter_fn=fake_filewriter
+ )
+ assert tree.subtype == "relative"
+ assert tree.children[1].subtype == "png"
+ written = fake_filewriter.pop()
+ assert tree.children[1].path == written[0]
+ assert written[1] == request.urlopen(test_png).read()
+ def test_inline_image_collection(
+ self, test_png, const1, const2, fake_filewriter
+ ):
+ test_images = {test_png: InlineImageInfo(cid=const1, desc=const2)}
+ relparts = collect_inline_images(
+ test_images, filewriter_fn=fake_filewriter
+ )
+ written = fake_filewriter.pop()
+ assert b"PNG" in written[1]
+ assert relparts[0].subtype == "png"
+ assert relparts[0].path == written[0]
+ assert relparts[0].cid == const1
+ assert relparts[0].desc.endswith(const2)
except ImportError: