--- /dev/null
+ [report]
+ omit = */tests.py
+ exclude_lines =
+ raise NotImplementedError
+ @abc.abstractmethod
--- /dev/null
+ .cache
+ *.pyc
+ *~
+ tasklib.egg-info
+ /dist
+ /build
+ docs/_build
+ .project
+ .coverage
--- /dev/null
+ language: python
+ env:
+ - TASK_VERSION=v2.1.1
+ - TASK_VERSION=v2.1.2
+ - TASK_VERSION=v2.2.0
+ - TASK_VERSION=v2.3.0
+ - TASK_VERSION=v2.4.0
+ - TASK_VERSION=v2.4.1
+ - TASK_VERSION=v2.4.2
+ - TASK_VERSION=v2.4.3
+ - TASK_VERSION=v2.4.4
+ - TASK_VERSION=v2.5.0
+ - TASK_VERSION=v2.5.1
+ python:
+ - "2.7"
+ - "3.3"
+ - "3.4"
+ - "3.5"
+ - "3.6"
+ install:
+ - pip install -e .
+ - pip install coveralls
+ - sudo add-apt-repository ppa:ubuntu-toolchain-r/test -y
+ - sudo apt-get update -qq
+ - sudo apt-get install -qq build-essential cmake uuid-dev g++-4.8
+ - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-4.8 50
+ - git clone --recursive https://github.com/GothenburgBitFactory/taskwarrior
+ - cd taskwarrior
+ - git checkout $TASK_VERSION
+ - git clean -dfx
+ - git submodule init
+ - git submodule update
+ - cmake -DCMAKE_BUILD_TYPE=release .
+ - make -j2
+ - sudo make install
+ - task --version
+ before_script:
+ - cd $TRAVIS_BUILD_DIR
+ script:
+ - coverage run --source=tasklib setup.py test
+ after_success:
+ - coveralls
--- /dev/null
+ Rob Golding
+ Tomas Babej
--- /dev/null
+ Copyright (c) 2013-2017, Rob Golding. All rights reserved.
+
+ Redistribution and use in source and binary forms, with or without
+ modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of Rob Golding, nor the names of its contributors may
+ be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+ DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+ FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+ CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--- /dev/null
+ include LICENSE
+ include README.rst
--- /dev/null
+ tasklib
+ =======
+
+ .. image:: https://travis-ci.org/robgolding63/tasklib.png?branch=develop
+ :target: http://travis-ci.org/robgolding63/tasklib
+
+ .. image:: https://coveralls.io/repos/robgolding63/tasklib/badge.png?branch=develop
+ :target: https://coveralls.io/r/robgolding63/tasklib?branch=develop
+
+ tasklib is a Python library for interacting with taskwarrior_ databases, using
+ a queryset API similar to that of Django's ORM.
+
+ Supports Python 2.6, 2.7, 3.2, 3.3 and 3.4 with taskwarrior 2.1.x and above.
+ Older versions of taskwarrior are untested and may not work.
+
+ Requirements
+ ------------
+
+ * taskwarrior_ v2.1.x or above.
+
+ Installation
+ ------------
+
+ Install via pip::
+
+ pip install tasklib
+
+ Usage
+ -----
+
+ tasklib has a similar API to that of Django's ORM::
+
+ >>> from tasklib import TaskWarrior
+
+ >>> tw = TaskWarrior('/home/rob/.task')
+ >>> tasks = tw.tasks.pending()
+ >>> tasks
+ ['Tidy the house', 'Learn German']
+ >>> tasks.filter(tags__contain='chores')
+ ['Tidy the house']
+ >>> type(tasks[0])
+ <class 'tasklib.task.Task'>
+ >>> tasks[0].done()
+ >>> tasks = tw.tasks.pending()
+ >>> tasks
+ ['Learn German']
+ >>> tasks[0]['tags'] = ['languages']
+ >>> tasks[0].save()
+
+ For more advanced usage, see the documentation_.
+
+ .. _taskwarrior: http://taskwarrior.org
+ .. _documentation: http://tasklib.readthedocs.org/en/latest/
--- /dev/null
+ # Makefile for Sphinx documentation
+ #
+
+ # You can set these variables from the command line.
+ SPHINXOPTS =
+ SPHINXBUILD = sphinx-build
+ PAPER =
+ BUILDDIR = _build
+
+ # User-friendly check for sphinx-build
+ ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
+ $(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/)
+ endif
+
+ # Internal variables.
+ PAPEROPT_a4 = -D latex_paper_size=a4
+ PAPEROPT_letter = -D latex_paper_size=letter
+ ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
+ # the i18n builder cannot share the environment and doctrees with the others
+ I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
+
+ .PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext
+
+ help:
+ @echo "Please use \`make <target>' where <target> is one of"
+ @echo " html to make standalone HTML files"
+ @echo " dirhtml to make HTML files named index.html in directories"
+ @echo " singlehtml to make a single large HTML file"
+ @echo " pickle to make pickle files"
+ @echo " json to make JSON files"
+ @echo " htmlhelp to make HTML files and a HTML help project"
+ @echo " qthelp to make HTML files and a qthelp project"
+ @echo " devhelp to make HTML files and a Devhelp project"
+ @echo " epub to make an epub"
+ @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
+ @echo " latexpdf to make LaTeX files and run them through pdflatex"
+ @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx"
+ @echo " text to make text files"
+ @echo " man to make manual pages"
+ @echo " texinfo to make Texinfo files"
+ @echo " info to make Texinfo files and run them through makeinfo"
+ @echo " gettext to make PO message catalogs"
+ @echo " changes to make an overview of all changed/added/deprecated items"
+ @echo " xml to make Docutils-native XML files"
+ @echo " pseudoxml to make pseudoxml-XML files for display purposes"
+ @echo " linkcheck to check all external links for integrity"
+ @echo " doctest to run all doctests embedded in the documentation (if enabled)"
+
+ clean:
+ rm -rf $(BUILDDIR)/*
+
+ html:
+ $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
+
+ dirhtml:
+ $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
+ @echo
+ @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
+
+ singlehtml:
+ $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
+ @echo
+ @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
+
+ pickle:
+ $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
+ @echo
+ @echo "Build finished; now you can process the pickle files."
+
+ json:
+ $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
+ @echo
+ @echo "Build finished; now you can process the JSON files."
+
+ htmlhelp:
+ $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
+ @echo
+ @echo "Build finished; now you can run HTML Help Workshop with the" \
+ ".hhp project file in $(BUILDDIR)/htmlhelp."
+
+ qthelp:
+ $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
+ @echo
+ @echo "Build finished; now you can run "qcollectiongenerator" with the" \
+ ".qhcp project file in $(BUILDDIR)/qthelp, like this:"
+ @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/tasklib.qhcp"
+ @echo "To view the help file:"
+ @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/tasklib.qhc"
+
+ devhelp:
+ $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
+ @echo
+ @echo "Build finished."
+ @echo "To view the help file:"
+ @echo "# mkdir -p $$HOME/.local/share/devhelp/tasklib"
+ @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/tasklib"
+ @echo "# devhelp"
+
+ epub:
+ $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
+ @echo
+ @echo "Build finished. The epub file is in $(BUILDDIR)/epub."
+
+ latex:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo
+ @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
+ @echo "Run \`make' in that directory to run these through (pdf)latex" \
+ "(use \`make latexpdf' here to do that automatically)."
+
+ latexpdf:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo "Running LaTeX files through pdflatex..."
+ $(MAKE) -C $(BUILDDIR)/latex all-pdf
+ @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
+
+ latexpdfja:
+ $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
+ @echo "Running LaTeX files through platex and dvipdfmx..."
+ $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja
+ @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
+
+ text:
+ $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
+ @echo
+ @echo "Build finished. The text files are in $(BUILDDIR)/text."
+
+ man:
+ $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
+ @echo
+ @echo "Build finished. The manual pages are in $(BUILDDIR)/man."
+
+ texinfo:
+ $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
+ @echo
+ @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo."
+ @echo "Run \`make' in that directory to run these through makeinfo" \
+ "(use \`make info' here to do that automatically)."
+
+ info:
+ $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo
+ @echo "Running Texinfo files through makeinfo..."
+ make -C $(BUILDDIR)/texinfo info
+ @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo."
+
+ gettext:
+ $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale
+ @echo
+ @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale."
+
+ changes:
+ $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
+ @echo
+ @echo "The overview file is in $(BUILDDIR)/changes."
+
+ linkcheck:
+ $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
+ @echo
+ @echo "Link check complete; look for any errors in the above output " \
+ "or in $(BUILDDIR)/linkcheck/output.txt."
+
+ doctest:
+ $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
+ @echo "Testing of doctests in the sources finished, look at the " \
+ "results in $(BUILDDIR)/doctest/output.txt."
+
+ xml:
+ $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml
+ @echo
+ @echo "Build finished. The XML files are in $(BUILDDIR)/xml."
+
+ pseudoxml:
+ $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml
+ @echo
+ @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml."
--- /dev/null
+ # -*- coding: utf-8 -*-
+ #
+ # tasklib documentation build configuration file, created by
+ # sphinx-quickstart on Sun Nov 10 15:19:07 2013.
+ #
+ # This file is execfile()d with the current directory set to its
+ # containing dir.
+ #
+ # Note that not all possible configuration values are present in this
+ # autogenerated file.
+ #
+ # All configuration values have a default; values that are commented out
+ # serve to show the default.
+
+ import sys
+ import os
+
+ # If extensions (or modules to document with autodoc) are in another directory,
+ # add these directories to sys.path here. If the directory is relative to the
+ # documentation root, use os.path.abspath to make it absolute, like shown here.
+ #sys.path.insert(0, os.path.abspath('.'))
+
+ # -- General configuration ------------------------------------------------
+
+ # If your documentation needs a minimal Sphinx version, state it here.
+ #needs_sphinx = '1.0'
+
+ # Add any Sphinx extension module names here, as strings. They can be
+ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
+ # ones.
+ extensions = []
+
+ # Add any paths that contain templates here, relative to this directory.
+ templates_path = ['_templates']
+
+ # The suffix of source filenames.
+ source_suffix = '.rst'
+
+ # The encoding of source files.
+ #source_encoding = 'utf-8-sig'
+
+ # The master toctree document.
+ master_doc = 'index'
+
+ # General information about the project.
+ project = u'tasklib'
+ copyright = u'2014, Rob Golding'
+
+ # The version info for the project you're documenting, acts as replacement for
+ # |version| and |release|, also used in various other places throughout the
+ # built documents.
+ #
+ # The short X.Y version.
+ version = '1.2.1'
+ # The full version, including alpha/beta/rc tags.
+ release = '1.2.1'
+
+ # The language for content autogenerated by Sphinx. Refer to documentation
+ # for a list of supported languages.
+ #language = None
+
+ # There are two options for replacing |today|: either, you set today to some
+ # non-false value, then it is used:
+ #today = ''
+ # Else, today_fmt is used as the format for a strftime call.
+ #today_fmt = '%B %d, %Y'
+
+ # List of patterns, relative to source directory, that match files and
+ # directories to ignore when looking for source files.
+ exclude_patterns = ['_build']
+
+ # The reST default role (used for this markup: `text`) to use for all
+ # documents.
+ #default_role = None
+
+ # If true, '()' will be appended to :func: etc. cross-reference text.
+ #add_function_parentheses = True
+
+ # If true, the current module name will be prepended to all description
+ # unit titles (such as .. function::).
+ #add_module_names = True
+
+ # If true, sectionauthor and moduleauthor directives will be shown in the
+ # output. They are ignored by default.
+ #show_authors = False
+
+ # The name of the Pygments (syntax highlighting) style to use.
+ pygments_style = 'sphinx'
+
+ # A list of ignored prefixes for module index sorting.
+ #modindex_common_prefix = []
+
+ # If true, keep warnings as "system message" paragraphs in the built documents.
+ #keep_warnings = False
+
+
+ # -- Options for HTML output ----------------------------------------------
+
+ # The theme to use for HTML and HTML Help pages. See the documentation for
+ # a list of builtin themes.
+ html_theme = 'default'
+
+ # Theme options are theme-specific and customize the look and feel of a theme
+ # further. For a list of options available for each theme, see the
+ # documentation.
+ #html_theme_options = {}
+
+ # Add any paths that contain custom themes here, relative to this directory.
+ #html_theme_path = []
+
+ # The name for this set of Sphinx documents. If None, it defaults to
+ # "<project> v<release> documentation".
+ #html_title = None
+
+ # A shorter title for the navigation bar. Default is the same as html_title.
+ #html_short_title = None
+
+ # The name of an image file (relative to this directory) to place at the top
+ # of the sidebar.
+ #html_logo = None
+
+ # The name of an image file (within the static path) to use as favicon of the
+ # docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
+ # pixels large.
+ #html_favicon = None
+
+ # Add any paths that contain custom static files (such as style sheets) here,
+ # relative to this directory. They are copied after the builtin static files,
+ # so a file named "default.css" will overwrite the builtin "default.css".
+ html_static_path = ['_static']
+
+ # Add any extra paths that contain custom files (such as robots.txt or
+ # .htaccess) here, relative to this directory. These files are copied
+ # directly to the root of the documentation.
+ #html_extra_path = []
+
+ # If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
+ # using the given strftime format.
+ #html_last_updated_fmt = '%b %d, %Y'
+
+ # If true, SmartyPants will be used to convert quotes and dashes to
+ # typographically correct entities.
+ #html_use_smartypants = True
+
+ # Custom sidebar templates, maps document names to template names.
+ #html_sidebars = {}
+
+ # Additional templates that should be rendered to pages, maps page names to
+ # template names.
+ #html_additional_pages = {}
+
+ # If false, no module index is generated.
+ #html_domain_indices = True
+
+ # If false, no index is generated.
+ #html_use_index = True
+
+ # If true, the index is split into individual pages for each letter.
+ #html_split_index = False
+
+ # If true, links to the reST sources are added to the pages.
+ #html_show_sourcelink = True
+
+ # If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
+ #html_show_sphinx = True
+
+ # If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
+ #html_show_copyright = True
+
+ # If true, an OpenSearch description file will be output, and all pages will
+ # contain a <link> tag referring to it. The value of this option must be the
+ # base URL from which the finished HTML is served.
+ #html_use_opensearch = ''
+
+ # This is the file name suffix for HTML files (e.g. ".xhtml").
+ #html_file_suffix = None
+
+ # Output file base name for HTML help builder.
+ htmlhelp_basename = 'tasklibdoc'
+
+
+ # -- Options for LaTeX output ---------------------------------------------
+
+ latex_elements = {
+ # The paper size ('letterpaper' or 'a4paper').
+ #'papersize': 'letterpaper',
+
+ # The font size ('10pt', '11pt' or '12pt').
+ #'pointsize': '10pt',
+
+ # Additional stuff for the LaTeX preamble.
+ #'preamble': '',
+ }
+
+ # Grouping the document tree into LaTeX files. List of tuples
+ # (source start file, target name, title,
+ # author, documentclass [howto, manual, or own class]).
+ latex_documents = [
+ ('index', 'tasklib.tex', u'tasklib Documentation',
+ u'Rob Golding', 'manual'),
+ ]
+
+ # The name of an image file (relative to this directory) to place at the top of
+ # the title page.
+ #latex_logo = None
+
+ # For "manual" documents, if this is true, then toplevel headings are parts,
+ # not chapters.
+ #latex_use_parts = False
+
+ # If true, show page references after internal links.
+ #latex_show_pagerefs = False
+
+ # If true, show URL addresses after external links.
+ #latex_show_urls = False
+
+ # Documents to append as an appendix to all manuals.
+ #latex_appendices = []
+
+ # If false, no module index is generated.
+ #latex_domain_indices = True
+
+
+ # -- Options for manual page output ---------------------------------------
+
+ # One entry per manual page. List of tuples
+ # (source start file, name, description, authors, manual section).
+ man_pages = [
+ ('index', 'tasklib', u'tasklib Documentation',
+ [u'Rob Golding'], 1)
+ ]
+
+ # If true, show URL addresses after external links.
+ #man_show_urls = False
+
+
+ # -- Options for Texinfo output -------------------------------------------
+
+ # Grouping the document tree into Texinfo files. List of tuples
+ # (source start file, target name, title, author,
+ # dir menu entry, description, category)
+ texinfo_documents = [
+ ('index', 'tasklib', u'tasklib Documentation',
+ u'Rob Golding', 'tasklib', 'One line description of project.',
+ 'Miscellaneous'),
+ ]
+
+ # Documents to append as an appendix to all manuals.
+ #texinfo_appendices = []
+
+ # If false, no module index is generated.
+ #texinfo_domain_indices = True
+
+ # How to display URL addresses: 'footnote', 'no', or 'inline'.
+ #texinfo_show_urls = 'footnote'
+
+ # If true, do not generate a @detailmenu in the "Top" node's menu.
+ #texinfo_no_detailmenu = False
--- /dev/null
+ Welcome to tasklib's documentation!
+ ===================================
+
+ tasklib is a Python library for interacting with taskwarrior_ databases, using
+ a queryset API similar to that of Django's ORM.
+
+ Supports Python 2.6, 2.7, 3.2, 3.3 and 3.4 with taskwarrior 2.1.x and above.
+ Older versions of taskwarrior are untested and may not work.
+
+ Requirements
+ ------------
+
+ * taskwarrior_ v2.1.x or above, although newest minor release is recommended.
+
+ Installation
+ ------------
+
+ Install via pip (recommended)::
+
+ pip install tasklib
+
+ Or clone from github::
+
+ git clone https://github.com/robgolding63/tasklib.git
+ cd tasklib
+ python setup.py install
+
+ Initialization
+ --------------
+
+ Optionally initialize the ``TaskWarrior`` instance with ``data_location`` (the
+ database directory). If it doesn't already exist, this will be created
+ automatically unless ``create=False``.
+
+ The default location is the same as taskwarrior's::
+
+ >>> tw = TaskWarrior(data_location='~/.task', create=True)
+
+ The ``TaskWarrior`` instance will also use your .taskrc configuration (so that
+ it recognizes the same UDAs as your task binary, uses the same configuration,
+ etc.). To override the location of the .taskrc, use
+ ``taskrc_location=~/some/different/path``.
+
+ Creating Tasks
+ --------------
+
+ To create a task, simply create a new ``Task`` object::
+
+ >>> new_task = Task(tw, description="throw out the trash")
+
+ This task is not yet saved to TaskWarrior (same as in Django), not until
+ you call ``.save()`` method::
+
+ >>> new_task.save()
+
+ You can set any attribute as a keyword argument to the Task object::
+
+ >>> complex_task = Task(tw, description="finally fix the shower", due=datetime(2015,2,14,8,0,0), priority='H')
+
+ or by setting the attributes one by one::
+
+ >>> complex_task = Task(tw)
+ >>> complex_task['description'] = "finally fix the shower"
+ >>> complex_task['due'] = datetime(2015,2,14,8,0,0)
+ >>> complex_task['priority'] = 'H'
+
+ Modifying Task
+ --------------
+
+ To modify a created or retrieved ``Task`` object, use dictionary-like access::
+
+ >>> homework = tw.tasks.get(tags=['chores'])
+ >>> homework['project'] = 'Home'
+
+ The change is not propagated to the TaskWarrior until you run the ``save()`` method::
+
+ >>> homework.save()
+
+ Attributes, which map to native Python objects are converted. See Task Attributes section.
+
+ Task Attributes
+ ---------------
+
+ Attributes of task objects are accessible through indices, like so::
+
+ >>> task = tw.tasks.pending().get(tags__contain='work') # There is only one pending task with 'work' tag
+ >>> task['description']
+ 'Upgrade Ubuntu Server'
+ >>> task['id']
+ 15
+ >>> task['due']
+ datetime.datetime(2015, 2, 5, 0, 0, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> task['tags']
+ ['work', 'servers']
+
+ The following fields are deserialized into Python objects:
+
+ * ``due``, ``wait``, ``scheduled``, ``until``, ``entry``: deserialized to a ``datetime`` object
+ * ``annotations``: deserialized to a list of ``TaskAnnotation`` objects
+ * ``tags``: deserialized to a list of strings
+ * ``depends``: deserialized to a set of ``Task`` objects
+
+ Attributes should be set using the correct Python representation, which will be
+ serialized into the correct format when the task is saved.
+
+ Task properties
+ ---------------
+
+ Tasklib defines several properties upon ``Task`` object, for convenience::
+
+ >>> t.save()
+ >>> t.saved
+ True
+ >>> t.pending
+ True
+ >>> t.active
+ False
+ >>> t.start()
+ >>> t.active
+ True
+ >>> t.done()
+ >>> t.completed
+ True
+ >>> t.pending
+ False
+ >>> t.delete()
+ >>> t.deleted
+ True
+
+ Operations on Tasks
+ -------------------
+
+ After modifying one or more attributes, simple call ``save()`` to write those
+ changes to the database::
+
+ >>> task = tw.tasks.pending().get(tags__contain='work')
+ >>> task['due'] = datetime(year=2014, month=1, day=5)
+ >>> task.save()
+
+ To mark a task as complete, use ``done()``::
+
+ >>> task = tw.tasks.pending().get(tags__contain='work')
+ >>> task.done()
+ >>> len(tw.tasks.pending().filter(tags__contain='work'))
+ 0
+
+ To delete a task, use ``delete()``::
+
+ >>> task = tw.tasks.get(description="task added by mistake")
+ >>> task.delete()
+
+ To update a task object with values from TaskWarrior database, use ``refresh()``. Example::
+
+ >>> task = Task(tw, description="learn to cook")
+ >>> task.save()
+ >>> task['id']
+ 5
+ >>> task['tags']
+ []
+
+ Now, suppose the we modify the task using the TaskWarrior interface in another terminal::
+
+ $ task 5 modify +someday
+ Task 5 modified.
+
+ Switching back to the open python process::
+
+ >>> task['tags']
+ []
+ >>> task.refresh()
+ >>> task['tags']
+ ['someday']
+
+ Tasks can also be started and stopped. Use ``start()`` and ``stop()``
+ respectively::
+
+ >>> task.start()
+ >>> task['start']
+ datetime.datetime(2015, 7, 16, 18, 48, 28, tzinfo=<DstTzInfo 'Europe/Prague' CEST+2:00:00 DST>)
+ >>> task.stop()
+ >>> task['start']
+ >>> task.done()
+ >>> task['end']
+ datetime.datetime(2015, 7, 16, 18, 49, 2, tzinfo=<DstTzInfo 'Europe/Prague' CEST+2:00:00 DST>)
+
+
+ Retrieving Tasks
+ ----------------
+
+ ``tw.tasks`` is a ``TaskQuerySet`` object which emulates the Django QuerySet
+ API. To get all tasks (including completed ones)::
+
+ >>> tw.tasks.all()
+ ['First task', 'Completed task', 'Deleted task', ...]
+
+ Filtering
+ ---------
+
+ Filter tasks using the same familiar syntax::
+
+ >>> tw.tasks.filter(status='pending', tags__contains=['work'])
+ ['Upgrade Ubuntu Server']
+
+ Filter arguments are passed to the ``task`` command (``__`` is replaced by
+ a period) so the above example is equivalent to the following command::
+
+ $ task status:pending tags.contain=work
+
+ Tasks can also be filtered using raw commands, like so::
+
+ >>> tw.tasks.filter('status:pending +work')
+ ['Upgrade Ubuntu Server']
+
+ Although this practice is discouraged, as by using raw commands you may lose
+ some of the portablility of your commands over different TaskWarrior versions.
+
+ However, you can mix raw commands with keyword filters, as in the given example::
+
+ >>> tw.tasks.filter('+BLOCKING', project='Home') # Gets all blocking tasks in project Home
+ ['Fix the toilette']
+
+ This can be a neat way how to use syntax not yet supported by tasklib. The above
+ is excellent example, since virtual tags do not work the same way as the ordinary ones, that is::
+
+ >>> tw.tasks.filter(tags=['BLOCKING'])
+ >>> []
+
+ will not work.
+
+ There are built-in functions for retrieving pending & completed tasks::
+
+ >>> tw.tasks.pending().filter(tags__contain='work')
+ ['Upgrade Ubuntu Server']
+ >>> len(tw.tasks.completed())
+ 227
+
+ Use ``get()`` to return the only task in a ``TaskQuerySet``, or raise an
+ exception::
+
+ >>> tw.tasks.get(tags__contain='work')['status']
+ 'pending'
+ >>> tw.tasks.get(status='completed', tags__contains='work') # Status of only task with the work tag is pending, so this should fail
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ File "tasklib/task.py", line 224, in get
+ 'Lookup parameters were {0}'.format(kwargs))
+ tasklib.task.DoesNotExist: Task matching query does not exist. Lookup parameters were {'status': 'completed', 'tags__contains': ['work']}
+ >>> tw.tasks.get(status='pending')
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ File "tasklib/task.py", line 227, in get
+ 'Lookup parameters were {1}'.format(num, kwargs))
+ ValueError: get() returned more than one Task -- it returned 23! Lookup parameters were {'status': 'pending'}
+
+ Additionally, since filters return ``TaskQuerySets`` you can stack filters on top of each other::
+
+ >>> home_tasks = tw.tasks.filter(project='Wife')
+ >>> home_tasks.filter(due__before=datetime(2015,2,14,14,14,14)) # What I have to do until Valentine's day
+ ['Prepare surprise birthday party']
+
+ Equality of Task objects
+ ------------------------
+
+ Two Tasks are considered equal if they have the same UUIDs::
+
+ >>> task1 = Task(tw, description="Pet the dog")
+ >>> task1.save()
+ >>> task2 = tw.tasks.get(description="Pet the dog")
+ >>> task1 == task2
+ True
+
+ If you compare the two unsaved tasks, they are considered equal only if it's the
+ same Python object::
+
+ >>> task1 = Task(tw, description="Pet the cat")
+ >>> task2 = Task(tw, description="Pet the cat")
+ >>> task1 == task2
+ False
+ >>> task3 = task1
+ >>> task3 == task1
+ True
+
+ Accessing original values
+ -------------------------
+
+ To access the saved state of the Task, use dict-like access using the
+ ``original`` attribute:
+
+ >>> t = Task(tw, description="tidy up")
+ >>> t.save()
+ >>> t['description'] = "tidy up the kitchen and bathroom"
+ >>> t['description']
+ "tidy up the kitchen and bathroom"
+ >>> t.original['description']
+ "tidy up"
+
+ When you save the task, original values are refreshed to reflect the
+ saved state of the task:
+
+ >>> t.save()
+ >>> t.original['description']
+ "tidy up the kitchen and bathroom"
+
+ Dealing with dates and time
+ ---------------------------
+
+ Any timestamp-like attributes of the tasks are converted to timezone-aware
+ datetime objects. To achieve this, Tasklib leverages ``pytz`` Python module,
+ which brings the Olsen timezone databaze to Python.
+
+ This shields you from annoying details of Daylight Saving Time shifts
+ or conversion between different timezones. For example, to list all the
+ tasks which are due midnight if you're currently in Berlin:
+
+ >>> myzone = pytz.timezone('Europe/Berlin')
+ >>> midnight = myzone.localize(datetime(2015,2,2,0,0,0))
+ >>> tw.tasks.filter(due__before=midnight)
+
+ However, this is still a little bit tedious. That's why TaskWarrior object
+ is capable of automatic timezone detection, using the ``tzlocal`` Python
+ module. If your system timezone is set to 'Europe/Berlin', following example
+ will work the same way as the previous one:
+
+ >>> tw.tasks.filter(due__before=datetime(2015,2,2,0,0,0))
+
+ You can also use simple dates when filtering:
+
+ >>> tw.tasks.filter(due__before=date(2015,2,2))
+
+ In such case, a 00:00:00 is used as the time component.
+
+ Of course, you can use datetime naive objects when initializing Task object
+ or assigning values to datetime atrributes:
+
+ >>> t = Task(tw, description="Buy new shoes", due=date(2015,2,5))
+ >>> t['due']
+ datetime.datetime(2015, 2, 5, 0, 0, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> t['due'] = date(2015,2,6,15,15,15)
+ >>> t['due']
+ datetime.datetime(2015, 2, 6, 15, 15, 15, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+
+ However, since timezone-aware and timezone-naive datetimes are not comparable
+ in Python, this can cause some unexpected behaviour:
+
+ >>> from datetime import datetime
+ >>> now = datetime.now()
+ >>> t = Task(tw, description="take out the trash now")
+ >>> t['due'] = now
+ >>> now
+ datetime.datetime(2015, 2, 1, 19, 44, 4, 770001)
+ >>> t['due']
+ datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> t['due'] == now
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ TypeError: can't compare offset-naive and offset-aware datetimes
+
+ If you want to compare datetime aware value with datetime naive value, you need
+ to localize the naive value first:
+
+ >>> from datetime import datetime
+ >>> from tasklib.task import local_zone
+ >>> now = local_zone.localize(datetime.now())
+ >>> t['due'] = now
+ >>> now
+ datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> t['due'] == now
+ True
+
+ Also, note that it does not matter whether the timezone aware datetime objects
+ are set in the same timezone:
+
+ >>> import pytz
+ >>> t['due']
+ datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> now.astimezone(pytz.utc)
+ datetime.datetime(2015, 2, 1, 18, 44, 4, 770001, tzinfo=<UTC>)
+ >>> t['due'] == now.astimezone(pytz.utc)
+ True
+
+ *Note*: Following behaviour is available only for TaskWarrior >= 2.4.0.
+
+ There is a third approach to setting up date time values, which leverages
+ the 'task calc' command. You can simply set any datetime attribute to
+ any string that contains an acceptable TaskWarrior-formatted time expression::
+
+ $ task calc now + 1d
+ 2015-07-17T21:17:54
+
+ This syntax can be leveraged in the python interpreter as follows::
+
+ >>> t['due'] = "now + 1d"
+ >>> t['due']
+ datetime.datetime(2015, 7, 17, 21, 19, 31, tzinfo=<DstTzInfo 'Europe/Berlin' CEST+2:00:00 DST>)
+
+ It can be easily seen that the string with TaskWarrior-formatted time expression
+ is automatically converted to native datetime in the local time zone.
+
+ For the list of acceptable formats and keywords, please consult:
+
+ * http://taskwarrior.org/docs/dates.html
+ * http://taskwarrior.org/docs/named_dates.html
+
+ However, as each such assigment involves call to 'task calc' for conversion,
+ it might cause some performance issues when assigning strings to datetime
+ attributes repeatedly, in a automated manner.
+
+ Working with annotations
+ ------------------------
+
+ Annotations of the tasks are represented in tasklib by ``TaskAnnotation`` objects. These
+ are much like ``Task`` objects, albeit very simplified.
+
+ >>> annotated_task = tw.tasks.get(description='Annotated task')
+ >>> annotated_task['annotations']
+ [Yeah, I am annotated!]
+
+ Annotations have only defined ``entry`` and ``description`` values::
+
+ >>> annotation = annotated_task['annotations'][0]
+ >>> annotation['entry']
+ datetime.datetime(2015, 1, 3, 21, 13, 55, tzinfo=<DstTzInfo 'Europe/Berlin' CET+1:00:00 STD>)
+ >>> annotation['description']
+ u'Yeah, I am annotated!'
+
+ To add a annotation to a Task, use ``add_annotation()``::
+
+ >>> task = Task(tw, description="new task")
+ >>> task.add_annotation("we can annotate any task")
+ Traceback (most recent call last):
+ File "<stdin>", line 1, in <module>
+ File "build/bdist.linux-x86_64/egg/tasklib/task.py", line 355, in add_annotation
+ tasklib.task.NotSaved: Task needs to be saved to add annotation
+
+ However, Task needs to be saved before you can add a annotation to it::
+
+ >>> task.save()
+ >>> task.add_annotation("we can annotate saved tasks")
+ >>> task['annotations']
+ [we can annotate saved tasks]
+
+ To remove the annotation, pass its description to ``remove_annotation()`` method::
+
+ >>> task.remove_annotation("we can annotate saved tasks")
+
+ Alternatively, you can pass the ``TaskAnnotation`` object itself::
+
+ >>> task.remove_annotation(task['annotations'][0])
+
+
+ Running custom commands
+ -----------------------
+
+ To run a custom commands, use ``execute_command()`` method of ``TaskWarrior`` object::
+
+ >>> tw = TaskWarrior()
+ >>> tw.execute_command(['log', 'Finish high school.'])
+ [u'Logged task.']
+
+ You can use ``config_override`` keyword argument to specify a dictionary of configuration overrides::
+
+ >>> tw.execute_command(['3', 'done'], config_override={'gc': 'off'}) # Will mark 3 as completed and it will retain its ID
+
+
+ Additionally, you can use ``return_all=True`` flag, which returns
+ ``(stdout, sterr, return_code)`` triplet, and ``allow_failure=False``, which will
+ prevent tasklib from raising an exception if the task binary returned non-zero
+ return code::
+
+ >>> tw.execute_command(['invalidcommand'], allow_failure=False, return_all=True)
+ ([u''],
+ [u'Using alternate .taskrc file /home/tbabej/.taskrc',
+ u"[task next rc:/home/tbabej/.taskrc rc.recurrence.confirmation=no rc.json.array=off rc.confirmation=no rc.bulk=0 rc.dependency.confirmation=no description ~ 'invalidcommand']",
+ u'Configuration override rc.recurrence.confirmation:no',
+ u'Configuration override rc.json.array:off',
+ u'Configuration override rc.confirmation:no',
+ u'Configuration override rc.bulk:0',
+ u'Configuration override rc.dependency.confirmation:no',
+ u'No matches.',
+ u'There are local changes. Sync required.'],
+ 1)
+
+
+ Setting custom configuration values
+ -----------------------------------
+
+ By default, TaskWarrior uses configuration values stored in your .taskrc.
+ To see what configuration value overrides are passed to each executed
+ task command, have a peek into ``overrides`` attribute of ``TaskWarrior`` object::
+
+ >>> tw.overrides
+ {'confirmation': 'no', 'data.location': '/home/tbabej/.task'}
+
+ To pass your own configuration overrides, you just need to update this dictionary::
+
+ >>> tw.overrides.update({'hooks': 'off'}) # tasklib will not trigger hooks
+
+ Creating hook scripts
+ ---------------------
+
+ From version 2.4.0, TaskWarrior has support for hook scripts. Tasklib provides
+ some very useful helpers to write those. With tasklib, writing these becomes
+ a breeze::
+
+ #!/usr/bin/python
+
+ from tasklib.task import Task
+ task = Task.from_input()
+ # ... <custom logic>
+ print task.export_data()
+
+ For example, plugin which would assign the priority "H" to any task containing
+ three exclamation marks in the description, would go like this::
+
+ #!/usr/bin/python
+
+ from tasklib.task import Task
+ task = Task.from_input()
+
+ if "!!!" in task['description']:
+ task['priority'] = "H"
+
+ print task.export_data()
+
+ Tasklib can automatically detect whether it's running in the ``on-modify`` event,
+ which provides more input than ``on-add`` event and reads the data accordingly.
+
+ This means the example above works both for ``on-add`` and ``on-modify`` events!
+
+ Consenquently, you can create just one hook file for both ``on-add`` and
+ ``on-modify`` events, and you just need to create a symlink for the other one.
+ This removes the need for maintaining two copies of the same code base and/or
+ boilerplate code.
+
+ In ``on-modify`` events, tasklib loads both the original version and the modified
+ version of the task to the returned ``Task`` object. To access the original data
+ (in read-only manner), use ``original`` dict-like attribute:
+
+ >>> t = Task.from_input()
+ >>> t['description']
+ "Modified description"
+ >>> t.original['description']
+ "Original description"
+
+ Working with UDAs
+ -----------------
+
+ Since TaskWarrior does read your .taskrc, you need not to define any UDAs
+ in the TaskWarrior's config dictionary, as described above. Suppose we have
+ a estimate UDA in the .taskrc::
+
+ uda.estimate.type = numeric
+
+ We can simply filter and create tasks using the estimate UDA out of the box::
+
+ >>> tw = TaskWarrior()
+ >>> task = Task(tw, description="Long task", estimate=1000)
+ >>> task.save()
+ >>> task['id']
+ 1
+
+ This is saved as UDA in the TaskWarrior::
+
+ $ task 1 export
+ {"id":1,"description":"Long task","estimate":1000, ...}
+
+ We can also speficy UDAs as arguments in the TaskFilter::
+
+ >>> tw.tasks.filter(estimate=1000)
+ Long task
+
+ Syncing
+ -------
+
+ If you have configurated the needed config variables in your .taskrc, syncing
+ is as easy as::
+
+ >>> tw = TaskWarrior()
+ >>> tw.execute_command(['sync'])
+
+ If you want to use non-standard server/credentials, you'll need to provide configuration
+ overrides to the ``TaskWarrior`` instance. Update the ``config`` dictionary with the
+ values you desire to override, and then we can run the sync command using
+ the ``execute_command()`` method::
+
+ >>> tw = TaskWarrior()
+ >>> sync_config = {
+ ... 'taskd.certificate': '/home/tbabej/.task/tbabej.cert.pem',
+ ... 'taskd.credentials': 'Public/tbabej/34af54de-3cb2-4d3d-82be-33ddb8fd3e66',
+ ... 'taskd.server': 'task.server.com:53589',
+ ... 'taskd.ca': '/home/tbabej/.task/ca.cert.pem',
+ ... 'taskd.trust': 'ignore hostname'}
+ >>> tw.config.update(sync_config)
+ >>> tw.execute_command(['sync'])
+
+
+ .. _taskwarrior: http://taskwarrior.org
--- /dev/null
+ from setuptools import setup, find_packages
+
+ install_requirements = ['six>=1.4', 'pytz', 'tzlocal']
+
+ version = '1.2.1'
+
+ try:
+ import importlib
+ except ImportError:
+ install_requirements.append('importlib')
+
+ setup(
+ name='tasklib',
+ version=version,
+ description='Python Task Warrior library',
+ long_description=open('README.rst').read(),
+ author='Rob Golding',
+ author_email='rob@robgolding.com',
+ license='BSD',
+ url='https://github.com/robgolding63/tasklib',
+ download_url='https://github.com/robgolding63/tasklib/downloads',
+ packages=find_packages(),
+ include_package_data=True,
+ test_suite='tasklib.tests',
+ install_requires=install_requirements,
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Programming Language :: Python',
+ "Programming Language :: Python :: 2",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.2",
+ "Programming Language :: Python :: 3.3",
+ 'License :: OSI Approved :: BSD License',
+ 'Topic :: Software Development :: Libraries :: Python Modules',
+ 'Intended Audience :: Developers',
+ ],
+ )
--- /dev/null
+ from .backends import TaskWarrior
+ from .task import Task
+ from .serializing import local_zone
--- /dev/null
+ import abc
+ import copy
+ import datetime
+ import json
+ import logging
+ import os
+ import re
+ import six
+ import subprocess
+
+ from .task import Task, TaskQuerySet, ReadOnlyDictView
+ from .filters import TaskWarriorFilter
+ from .serializing import local_zone
+
+ DATE_FORMAT_CALC = '%Y-%m-%dT%H:%M:%S'
+
+ logger = logging.getLogger(__name__)
+
+
+ class Backend(object):
+
+ @abc.abstractproperty
+ def filter_class(self):
+ """Returns the TaskFilter class used by this backend"""
+ pass
+
+ @abc.abstractmethod
+ def filter_tasks(self, filter_obj):
+ """Returns a list of Task objects matching the given filter"""
+ pass
+
+ @abc.abstractmethod
+ def save_task(self, task):
+ pass
+
+ @abc.abstractmethod
+ def delete_task(self, task):
+ pass
+
+ @abc.abstractmethod
+ def start_task(self, task):
+ pass
+
+ @abc.abstractmethod
+ def stop_task(self, task):
+ pass
+
+ @abc.abstractmethod
+ def complete_task(self, task):
+ pass
+
+ @abc.abstractmethod
+ def refresh_task(self, task, after_save=False):
+ """
+ Refreshes the given task. Returns new data dict with serialized
+ attributes.
+ """
+ pass
+
+ @abc.abstractmethod
+ def annotate_task(self, task, annotation):
+ pass
+
+ @abc.abstractmethod
+ def denotate_task(self, task, annotation):
+ pass
+
+ @abc.abstractmethod
+ def sync(self):
+ """Syncs the backend database with the taskd server"""
+ pass
+
+ def convert_datetime_string(self, value):
+ """
+ Converts TW syntax datetime string to a localized datetime
+ object. This method is not mandatory.
+ """
+ raise NotImplementedError
+
+
+ class TaskWarriorException(Exception):
+ pass
+
+
+ class TaskWarrior(Backend):
+
+ VERSION_2_1_0 = six.u('2.1.0')
+ VERSION_2_2_0 = six.u('2.2.0')
+ VERSION_2_3_0 = six.u('2.3.0')
+ VERSION_2_4_0 = six.u('2.4.0')
+ VERSION_2_4_1 = six.u('2.4.1')
+ VERSION_2_4_2 = six.u('2.4.2')
+ VERSION_2_4_3 = six.u('2.4.3')
+ VERSION_2_4_4 = six.u('2.4.4')
+ VERSION_2_4_5 = six.u('2.4.5')
+
+ def __init__(self, data_location=None, create=True,
+ taskrc_location=None, task_command='task',
+ version_override=None):
+ self.taskrc_location = None
+ if taskrc_location:
+ self.taskrc_location = os.path.expanduser(taskrc_location)
+
+ # If taskrc does not exist, pass / to use defaults and avoid creating
+ # dummy .taskrc file by TaskWarrior
+ if not os.path.exists(self.taskrc_location):
+ self.taskrc_location = '/'
+
+ self.task_command = task_command
+
+ self._config = None
+ self.version = version_override or self._get_version()
+ self.overrides = {
+ 'confirmation': 'no',
+ 'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
+ 'recurrence.confirmation': 'no', # Necessary for modifying R tasks
+
+ # Defaults to on since 2.4.5, we expect off during parsing
+ 'json.array': 'off',
+
+ # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
+ # arbitrary big number which is likely to be large enough
+ 'bulk': 0 if self.version >= self.VERSION_2_4_3 else 100000,
+ }
+
+ # Set data.location override if passed via kwarg
+ if data_location is not None:
+ data_location = os.path.expanduser(data_location)
+ if create and not os.path.exists(data_location):
+ os.makedirs(data_location)
+ self.overrides['data.location'] = data_location
+
+ self.tasks = TaskQuerySet(self)
+
+ def _get_task_command(self):
+ return self.task_command.split()
+
+ def _get_command_args(self, args, config_override=None):
+ command_args = self._get_task_command()
+ overrides = self.overrides.copy()
+ overrides.update(config_override or dict())
+ for item in overrides.items():
+ command_args.append('rc.{0}={1}'.format(*item))
+ command_args.extend([
+ x.decode('utf-8') if isinstance(x, six.binary_type)
+ else six.text_type(x) for x in args
+ ])
+ return command_args
+
+ def _get_version(self):
+ p = subprocess.Popen(
+ self._get_task_command() + ['--version'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
+ return stdout.strip('\n')
+
+ def _get_modified_task_fields_as_args(self, task):
+ args = []
+
+ def add_field(field):
+ # Add the output of format_field method to args list (defaults to
+ # field:value)
+ serialized_value = task._serialize(field, task._data[field])
+
+ # Empty values should not be enclosed in quotation marks, see
+ # TW-1510
+ if serialized_value is '':
+ escaped_serialized_value = ''
+ else:
+ escaped_serialized_value = six.u("'{0}'").format(
+ serialized_value)
+
+ format_default = lambda task: six.u("{0}:{1}").format(
+ field, escaped_serialized_value)
+
+ format_func = getattr(self, 'format_{0}'.format(field),
+ format_default)
+
+ args.append(format_func(task))
+
+ # If we're modifying saved task, simply pass on all modified fields
+ if task.saved:
+ for field in task._modified_fields:
+ add_field(field)
+
+ # For new tasks, pass all fields that make sense
+ else:
+ for field in task._data.keys():
+ # We cannot set stuff that's read only (ID, UUID, ..)
+ if field in task.read_only_fields:
+ continue
+ # We do not want to do field deletion for new tasks
+ if task._data[field] is None:
+ continue
+ # Otherwise we're fine
+ add_field(field)
+
+ return args
+
+ def format_depends(self, task):
+ # We need to generate added and removed dependencies list,
+ # since Taskwarrior does not accept redefining dependencies.
+
+ # This cannot be part of serialize_depends, since we need
+ # to keep a list of all depedencies in the _data dictionary,
+ # not just currently added/removed ones
+
+ old_dependencies = task._original_data.get('depends', set())
+
+ added = task['depends'] - old_dependencies
+ removed = old_dependencies - task['depends']
+
+ # Removed dependencies need to be prefixed with '-'
+ return 'depends:' + ','.join(
+ [t['uuid'] for t in added] +
+ ['-' + t['uuid'] for t in removed]
+ )
+
+ def format_description(self, task):
+ # Task version older than 2.4.0 ignores first word of the
+ # task description if description: prefix is used
+ if self.version < self.VERSION_2_4_0:
+ return task._data['description']
+ else:
+ return six.u("description:'{0}'").format(
+ task._data['description'] or '',
+ )
+
+ def convert_datetime_string(self, value):
+
+ if self.version >= self.VERSION_2_4_0:
+ # For strings, use 'calc' to evaluate the string to datetime
+ # available since TW 2.4.0
+ args = value.split()
+ result = self.execute_command(['calc'] + args)
+ naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC)
+ localized = local_zone.localize(naive)
+ else:
+ raise ValueError(
+ 'Provided value could not be converted to '
+ 'datetime, its type is not supported: {}'
+ .format(type(value)),
+ )
+
+ return localized
+
+ @property
+ def filter_class(self):
+ return TaskWarriorFilter
+
+ # Public interface
+
+ @property
+ def config(self):
+ # First, check if memoized information is available
+ if self._config:
+ return self._config
+
+ # If not, fetch the config using the 'show' command
+ raw_output = self.execute_command(
+ ['show'],
+ config_override={'verbose': 'nothing'}
+ )
+
+ config = dict()
+ config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].*$)')
+
+ for line in raw_output:
+ match = config_regex.match(line)
+ if match:
+ config[match.group('key')] = match.group('value').strip()
+
+ # Memoize the config dict
+ self._config = ReadOnlyDictView(config)
+
+ return self._config
+
+ def execute_command(self, args, config_override=None, allow_failure=True,
+ return_all=False):
+ command_args = self._get_command_args(
+ args, config_override=config_override)
+ logger.debug(u' '.join(command_args))
+
+ env = os.environ.copy()
+ if self.taskrc_location:
+ env['TASKRC'] = self.taskrc_location
+ p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE, env=env)
+ stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
+ if p.returncode and allow_failure:
+ if stderr.strip():
+ error_msg = stderr.strip()
+ else:
+ error_msg = stdout.strip()
+ error_msg += u'\nCommand used: ' + u' '.join(command_args)
+ raise TaskWarriorException(error_msg)
+
+ # Return all whole triplet only if explicitly asked for
+ if not return_all:
+ return stdout.rstrip().split('\n')
+ else:
+ return (stdout.rstrip().split('\n'),
+ stderr.rstrip().split('\n'),
+ p.returncode)
+
+ def enforce_recurrence(self):
+ # Run arbitrary report command which will trigger generation
+ # of recurrent tasks.
+
+ # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
+ if self.version < self.VERSION_2_4_2:
+ self.execute_command(['next'], allow_failure=False)
+
+ def merge_with(self, path, push=False):
+ path = path.rstrip('/') + '/'
+ self.execute_command(['merge', path], config_override={
+ 'merge.autopush': 'yes' if push else 'no',
+ })
+
+ def undo(self):
+ self.execute_command(['undo'])
+
+ # Backend interface implementation
+
+ def filter_tasks(self, filter_obj):
+ self.enforce_recurrence()
+ args = ['export'] + filter_obj.get_filter_params()
+ tasks = []
+ for line in self.execute_command(args):
+ if line:
+ data = line.strip(',')
+ try:
+ filtered_task = Task(self)
+ filtered_task._load_data(json.loads(data))
+ tasks.append(filtered_task)
+ except ValueError:
+ raise TaskWarriorException('Invalid JSON: %s' % data)
+ return tasks
+
+ def save_task(self, task):
+ """Save a task into TaskWarrior database using add/modify call"""
+
+ args = [task['uuid'], 'modify'] if task.saved else ['add']
+ args.extend(self._get_modified_task_fields_as_args(task))
+ output = self.execute_command(args)
+
+ # Parse out the new ID, if the task is being added for the first time
+ if not task.saved:
+ id_lines = [l for l in output if l.startswith('Created task ')]
+
+ # Complain loudly if it seems that more tasks were created
+ # Should not happen.
+ # Expected output: Created task 1.
+ # Created task 1 (recurrence template).
+ if len(id_lines) != 1 or len(id_lines[0].split(' ')) not in (3, 5):
+ raise TaskWarriorException(
+ 'Unexpected output when creating '
+ 'task: %s' % '\n'.join(id_lines),
+ )
+
+ # Circumvent the ID storage, since ID is considered read-only
+ identifier = id_lines[0].split(' ')[2].rstrip('.')
+
+ # Identifier can be either ID or UUID for completed tasks
+ try:
+ task._data['id'] = int(identifier)
+ except ValueError:
+ task._data['uuid'] = identifier
+
+ # Refreshing is very important here, as not only modification time
+ # is updated, but arbitrary attribute may have changed due hooks
+ # altering the data before saving
+ task.refresh(after_save=True)
+
+ def delete_task(self, task):
+ self.execute_command([task['uuid'], 'delete'])
+
+ def start_task(self, task):
+ self.execute_command([task['uuid'], 'start'])
+
+ def stop_task(self, task):
+ self.execute_command([task['uuid'], 'stop'])
+
+ def complete_task(self, task):
+ # Older versions of TW do not stop active task at completion
+ if self.version < self.VERSION_2_4_0 and task.active:
+ task.stop()
+
+ self.execute_command([task['uuid'], 'done'])
+
+ def annotate_task(self, task, annotation):
+ args = [task['uuid'], 'annotate', annotation]
+ self.execute_command(args)
+
+ def denotate_task(self, task, annotation):
+ args = [task['uuid'], 'denotate', annotation]
+ self.execute_command(args)
+
+ def refresh_task(self, task, after_save=False):
+ # We need to use ID as backup for uuid here for the refreshes
+ # of newly saved tasks. Any other place in the code is fine
+ # with using UUID only.
+ args = [task['uuid'] or task['id'], 'export']
+ output = self.execute_command(args)
+
+ def valid(output):
+ return len(output) == 1 and output[0].startswith('{')
+
+ # For older TW versions attempt to uniquely locate the task
+ # using the data we have if it has been just saved.
+ # This can happen when adding a completed task on older TW versions.
+ if (not valid(output) and self.version < self.VERSION_2_4_5
+ and after_save):
+
+ # Make a copy, removing ID and UUID. It's most likely invalid
+ # (ID 0) if it failed to match a unique task.
+ data = copy.deepcopy(task._data)
+ data.pop('id', None)
+ data.pop('uuid', None)
+
+ taskfilter = self.filter_class(self)
+ for key, value in data.items():
+ taskfilter.add_filter_param(key, value)
+
+ output = self.execute_command(['export'] +
+ taskfilter.get_filter_params())
+
+ # If more than 1 task has been matched still, raise an exception
+ if not valid(output):
+ raise TaskWarriorException(
+ 'Unique identifiers {0} with description: {1} matches '
+ 'multiple tasks: {2}'.format(
+ task['uuid'] or task['id'], task['description'], output)
+ )
+
+ return json.loads(output[0])
+
+ def sync(self):
+ self.execute_command(['sync'])
--- /dev/null
+ import abc
+ import six
+ from .serializing import SerializingObject
+
+
+ class TaskFilter(object):
+ """
+ Abstract base class that defines interface of a TaskFilter.
+ """
+
+ @abc.abstractmethod
+ def add_filter(self, arg):
+ """
+ Processes an non-keyword filter.
+ """
+ pass
+
+ @abc.abstractmethod
+ def add_filter_param(self, key, value):
+ """
+ Processes a keyword filter.
+ """
+ pass
+
+ @abc.abstractmethod
+ def clone(self):
+ """
+ Returns a new deep copy of itself.
+ """
+ pass
+
+
+ class TaskWarriorFilter(TaskFilter, SerializingObject):
+ """
+ A set of parameters to filter the task list with.
+ """
+
+ def __init__(self, backend, filter_params=None):
+ self.filter_params = filter_params or []
+ super(TaskFilter, self).__init__(backend)
+
+ def add_filter(self, filter_str):
+ self.filter_params.append(filter_str)
+
+ def add_filter_param(self, key, value):
+ key = key.replace('__', '.')
+
+ # Replace the value with empty string, since that is the
+ # convention in TW for empty values
+ attribute_key = key.split('.')[0]
+
+ # Since this is user input, we need to normalize before we serialize
+ value = self._normalize(attribute_key, value)
+ value = self._serialize(attribute_key, value)
+
+ # If we are filtering by uuid:, do not use uuid keyword
+ # due to TW-1452 bug
+ if key == 'uuid':
+ self.filter_params.insert(0, value)
+ else:
+ # Surround value with aphostrophes unless it's a empty string
+ value = "'%s'" % value if value else ''
+
+ # We enforce equality match by using 'is' (or 'none') modifier
+ # Without using this syntax, filter fails due to TW-1479
+ # which is, however, fixed in 2.4.5
+ if self.backend.version < self.backend.VERSION_2_4_5:
+ modifier = '.is' if value else '.none'
+ key = key + modifier if '.' not in key else key
+
+ self.filter_params.append(six.u("{0}:{1}").format(key, value))
+
+ def get_filter_params(self):
+ return [f for f in self.filter_params if f]
+
+ def clone(self):
+ c = self.__class__(self.backend)
+ c.filter_params = list(self.filter_params)
+ return c
--- /dev/null
+ """
+ Provides lazy implementations for Task and TaskQuerySet.
+ """
+
+
+ class LazyUUIDTask(object):
+ """
+ A lazy wrapper around Task object, referenced by UUID.
+
+ - Supports comparison with LazyUUIDTask or Task objects (equality by UUIDs)
+ - If any attribute other than 'uuid' requested, a lookup in the
+ backend will be performed and this object will be replaced by a proper
+ Task object.
+ """
+
+ def __init__(self, tw, uuid):
+ self._tw = tw
+ self._uuid = uuid
+
+ def __getitem__(self, key):
+ # LazyUUIDTask does not provide anything else other than 'uuid'
+ if key is 'uuid':
+ return self._uuid
+ else:
+ self.replace()
+ return self[key]
+
+ def __getattr__(self, name):
+ # Getattr is called only if the attribute could not be found using
+ # normal means
+ self.replace()
+ return getattr(self, name)
+
+ def __eq__(self, other):
+ if other and other['uuid']:
+ # For saved Tasks, just define equality by equality of uuids
+ return self['uuid'] == other['uuid']
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ return self['uuid'].__hash__()
+
+ def __repr__(self):
+ return 'LazyUUIDTask: {0}'.format(self._uuid)
+
+ def __copy__(self):
+ return LazyUUIDTask(self._tw, self._uuid)
+
+ def __deepcopy__(self, memo):
+ return LazyUUIDTask(self._tw, self._uuid)
+
+ @property
+ def saved(self):
+ """
+ Implementation of the 'saved' property. Always returns True.
+ """
+ return True
+
+ @property
+ def _modified_fields(self):
+ return set()
+
+ @property
+ def modified(self):
+ return False
+
+ def replace(self):
+ """
+ Performs conversion to the regular Task object, referenced by the
+ stored UUID.
+ """
+
+ replacement = self._tw.tasks.get(uuid=self._uuid)
+ self.__class__ = replacement.__class__
+ self.__dict__ = replacement.__dict__
+
+
+ class LazyUUIDTaskSet(object):
+ """
+ A lazy wrapper around TaskQuerySet object, for tasks referenced by UUID.
+
+ - Supports 'in' operator with LazyUUIDTask or Task objects
+ - If iteration over the objects in the LazyUUIDTaskSet is requested, the
+ LazyUUIDTaskSet will be converted to QuerySet and evaluated
+ """
+
+ def __init__(self, tw, uuids):
+ self._tw = tw
+ self._uuids = set(uuids)
+
+ def __getattr__(self, name):
+ # Getattr is called only if the attribute could not be found using
+ # normal means
+
+ if name.startswith('__'):
+ # If some internal method was being search, do not convert
+ # to TaskQuerySet just because of that
+ raise AttributeError
+ else:
+ self.replace()
+ return getattr(self, name)
+
+ def __repr__(self):
+ return 'LazyUUIDTaskSet([{0}])'.format(', '.join(self._uuids))
+
+ def __eq__(self, other):
+ return (set(t['uuid'] for t in other) if other else set()) == self._uuids
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ def __contains__(self, task):
+ return task['uuid'] in self._uuids
+
+ def __len__(self):
+ return len(self._uuids)
+
+ def __iter__(self):
+ for uuid in self._uuids:
+ yield LazyUUIDTask(self._tw, uuid)
+
+ def __sub__(self, other):
+ return self.difference(other)
+
+ def __isub__(self, other):
+ return self.difference_update(other)
+
+ def __rsub__(self, other):
+ return LazyUUIDTaskSet(
+ self._tw,
+ set(t['uuid'] for t in other) - self._uuids,
+ )
+
+ def __or__(self, other):
+ return self.union(other)
+
+ def __ior__(self, other):
+ return self.update(other)
+
+ def __ror__(self, other):
+ return self.union(other)
+
+ def __xor__(self, other):
+ return self.symmetric_difference(other)
+
+ def __ixor__(self, other):
+ return self.symmetric_difference_update(other)
+
+ def __rxor__(self, other):
+ return self.symmetric_difference(other)
+
+ def __and__(self, other):
+ return self.intersection(other)
+
+ def __iand__(self, other):
+ return self.intersection_update(other)
+
+ def __rand__(self, other):
+ return self.intersection(other)
+
+ def __le__(self, other):
+ return self.issubset(other)
+
+ def __ge__(self, other):
+ return self.issuperset(other)
+
+ def issubset(self, other):
+ return all([task in other for task in self])
+
+ def issuperset(self, other):
+ return all([task in self for task in other])
+
+ def union(self, other):
+ return LazyUUIDTaskSet(
+ self._tw,
+ self._uuids | set(t['uuid'] for t in other),
+ )
+
+ def intersection(self, other):
+ return LazyUUIDTaskSet(
+ self._tw,
+ self._uuids & set(t['uuid'] for t in other),
+ )
+
+ def difference(self, other):
+ return LazyUUIDTaskSet(
+ self._tw,
+ self._uuids - set(t['uuid'] for t in other),
+ )
+
+ def symmetric_difference(self, other):
+ return LazyUUIDTaskSet(
+ self._tw,
+ self._uuids ^ set(t['uuid'] for t in other),
+ )
+
+ def update(self, other):
+ self._uuids |= set(t['uuid'] for t in other)
+ return self
+
+ def intersection_update(self, other):
+ self._uuids &= set(t['uuid'] for t in other)
+ return self
+
+ def difference_update(self, other):
+ self._uuids -= set(t['uuid'] for t in other)
+ return self
+
+ def symmetric_difference_update(self, other):
+ self._uuids ^= set(t['uuid'] for t in other)
+ return self
+
+ def add(self, task):
+ self._uuids.add(task['uuid'])
+
+ def remove(self, task):
+ self._uuids.remove(task['uuid'])
+
+ def pop(self):
+ return self._uuids.pop()
+
+ def clear(self):
+ self._uuids.clear()
+
+ def replace(self):
+ """
+ Performs conversion to the regular TaskQuerySet object, referenced by
+ the stored UUIDs.
+ """
+
+ replacement = self._tw.tasks.filter(' '.join(self._uuids))
+ self.__class__ = replacement.__class__
+ self.__dict__ = replacement.__dict__
--- /dev/null
+ import datetime
+ import importlib
+ import json
+ import pytz
+ import six
+ import tzlocal
+
+
+ from .lazy import LazyUUIDTaskSet, LazyUUIDTask
+
+ DATE_FORMAT = '%Y%m%dT%H%M%SZ'
+ local_zone = tzlocal.get_localzone()
+
+
+ class SerializingObject(object):
+ """
+ Common ancestor for TaskResource & TaskWarriorFilter, since they both
+ need to serialize arguments.
+
+ Serializing method should hold the following contract:
+ - any empty value (meaning removal of the attribute)
+ is deserialized into a empty string
+ - None denotes a empty value for any attribute
+
+ Deserializing method should hold the following contract:
+ - None denotes a empty value for any attribute (however,
+ this is here as a safeguard, TaskWarrior currently does
+ not export empty-valued attributes) if the attribute
+ is not iterable (e.g. list or set), in which case
+ a empty iterable should be used.
+
+ Normalizing methods should hold the following contract:
+ - They are used to validate and normalize the user input.
+ Any attribute value that comes from the user (during Task
+ initialization, assignign values to Task attributes, or
+ filtering by user-provided values of attributes) is first
+ validated and normalized using the normalize_{key} method.
+ - If validation or normalization fails, normalizer is expected
+ to raise ValueError.
+ """
+
+ def __init__(self, backend):
+ self.backend = backend
+
+ def _deserialize(self, key, value):
+ hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
+ lambda x: x if x != '' else None)
+ return hydrate_func(value)
+
+ def _serialize(self, key, value):
+ dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
+ lambda x: x if x is not None else '')
+ return dehydrate_func(value)
+
+ def _normalize(self, key, value):
+ """
+ Use normalize_<key> methods to normalize user input. Any user
+ input will be normalized at the moment it is used as filter,
+ or entered as a value of Task attribute.
+ """
+
+ # None value should not be converted by normalizer
+ if value is None:
+ return None
+
+ normalize_func = getattr(self, 'normalize_{0}'.format(key),
+ lambda x: x)
+
+ return normalize_func(value)
+
+ def timestamp_serializer(self, date):
+ if not date:
+ return ''
+
+ # Any serialized timestamp should be localized, we need to
+ # convert to UTC before converting to string (DATE_FORMAT uses UTC)
+ date = date.astimezone(pytz.utc)
+
+ return date.strftime(DATE_FORMAT)
+
+ def timestamp_deserializer(self, date_str):
+ if not date_str:
+ return None
+
+ # Return timestamp localized in the local zone
+ naive_timestamp = datetime.datetime.strptime(date_str, DATE_FORMAT)
+ localized_timestamp = pytz.utc.localize(naive_timestamp)
+ return localized_timestamp.astimezone(local_zone)
+
+ def serialize_entry(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_entry(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_entry(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_modified(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_modified(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_modified(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_start(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_start(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_start(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_end(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_end(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_end(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_due(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_due(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_due(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_scheduled(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_scheduled(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_scheduled(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_until(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_until(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_until(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_wait(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_wait(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_wait(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_annotations(self, value):
+ value = value if value is not None else []
+
+ # This may seem weird, but it's correct, we want to export
+ # a list of dicts as serialized value
+ serialized_annotations = [json.loads(annotation.export_data())
+ for annotation in value]
+ return serialized_annotations if serialized_annotations else ''
+
+ def deserialize_annotations(self, data):
+ task_module = importlib.import_module('tasklib.task')
+ TaskAnnotation = getattr(task_module, 'TaskAnnotation')
+ return [TaskAnnotation(self, d) for d in data] if data else []
+
+ def serialize_tags(self, tags):
+ return ','.join(tags) if tags else ''
+
+ def deserialize_tags(self, tags):
+ if isinstance(tags, six.string_types):
+ return set(tags.split(',')) if tags else set()
+ return set(tags or [])
+
+ def serialize_parent(self, parent):
+ return parent['uuid'] if parent else ''
+
+ def deserialize_parent(self, uuid):
+ return LazyUUIDTask(self.backend, uuid) if uuid else None
+
+ def serialize_depends(self, value):
+ # Return the list of uuids
+ value = value if value is not None else set()
+
+ if isinstance(value, LazyUUIDTaskSet):
+ return ','.join(value._uuids)
+ else:
+ return ','.join(task['uuid'] for task in value)
+
+ def deserialize_depends(self, raw_uuids):
+ raw_uuids = raw_uuids or [] # Convert None to empty list
+
+ if not raw_uuids:
+ return set()
+
+ # TW 2.4.4 encodes list of dependencies as a single string
+ if type(raw_uuids) is not list:
+ uuids = raw_uuids.split(',')
+ # TW 2.4.5 and later exports them as a list, no conversion needed
+ else:
+ uuids = raw_uuids
+
+ return LazyUUIDTaskSet(self.backend, uuids)
+
+ def datetime_normalizer(self, value):
+ """
+ Normalizes date/datetime value (considered to come from user input)
+ to localized datetime value. Following conversions happen:
+
+ naive date -> localized datetime with the same date, and time=midnight
+ naive datetime -> localized datetime with the same value
+ localized datetime -> localized datetime (no conversion)
+ """
+
+ if (
+ isinstance(value, datetime.date)
+ and not isinstance(value, datetime.datetime)
+ ):
+ # Convert to local midnight
+ value_full = datetime.datetime.combine(value, datetime.time.min)
+ localized = local_zone.localize(value_full)
+ elif isinstance(value, datetime.datetime):
+ if value.tzinfo is None:
+ # Convert to localized datetime object
+ localized = local_zone.localize(value)
+ else:
+ # If the value is already localized, there is no need to change
+ # time zone at this point. Also None is a valid value too.
+ localized = value
+ elif isinstance(value, six.string_types):
+ localized = self.backend.convert_datetime_string(value)
+ else:
+ raise ValueError("Provided value could not be converted to "
+ "datetime, its type is not supported: {}"
+ .format(type(value)))
+
+ return localized
+
+ def normalize_uuid(self, value):
+ # Enforce sane UUID
+ if not isinstance(value, six.string_types) or value == '':
+ raise ValueError("UUID must be a valid non-empty string, "
+ "not: {}".format(value))
+
+ return value
--- /dev/null
+ from __future__ import print_function
+ import copy
+ import importlib
+ import json
+ import logging
+ import os
+ import six
+ import sys
+
+ from .serializing import SerializingObject
+
+ DATE_FORMAT = '%Y%m%dT%H%M%SZ'
+ REPR_OUTPUT_SIZE = 10
+ PENDING = 'pending'
+ COMPLETED = 'completed'
+ DELETED = 'deleted'
+ WAITING = 'waiting'
+ RECURRING = 'recurring'
+
+ logger = logging.getLogger(__name__)
+
+
+ class ReadOnlyDictView(object):
+ """
+ Provides simplified read-only view upon dict object.
+ """
+
+ def __init__(self, viewed_dict):
+ self.viewed_dict = viewed_dict
+
+ def __getitem__(self, key):
+ return copy.deepcopy(self.viewed_dict.__getitem__(key))
+
+ def __contains__(self, k):
+ return self.viewed_dict.__contains__(k)
+
+ def __iter__(self):
+ for value in self.viewed_dict:
+ yield copy.deepcopy(value)
+
+ def __len__(self):
+ return len(self.viewed_dict)
+
+ def __unicode__(self):
+ return six.u('ReadOnlyDictView: {0}'.format(repr(self.viewed_dict)))
+
+ __repr__ = __unicode__
+
+ def get(self, key, default=None):
+ return copy.deepcopy(self.viewed_dict.get(key, default))
+
+ def items(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.items()]
+
+ def values(self):
+ return [copy.deepcopy(v) for v in self.viewed_dict.values()]
+
+
+ class TaskResource(SerializingObject):
+ read_only_fields = []
+
+ def _load_data(self, data):
+ self._data = dict((key, self._deserialize(key, value))
+ for key, value in data.items())
+ # We need to use a copy for original data, so that changes
+ # are not propagated.
+ self._original_data = copy.deepcopy(self._data)
+
+ def _update_data(self, data, update_original=False, remove_missing=False):
+ """
+ Low level update of the internal _data dict. Data which are coming as
+ updates should already be serialized. If update_original is True, the
+ original_data dict is updated as well.
+ """
+ self._data.update(dict((key, self._deserialize(key, value))
+ for key, value in data.items()))
+
+ # In certain situations, we want to treat missing keys as removals
+ if remove_missing:
+ for key in set(self._data.keys()) - set(data.keys()):
+ self._data[key] = None
+
+ if update_original:
+ self._original_data = copy.deepcopy(self._data)
+
+ def __getitem__(self, key):
+ # This is a workaround to make TaskResource non-iterable
+ # over simple index-based iteration
+ try:
+ int(key)
+ raise StopIteration
+ except ValueError:
+ pass
+
+ if key not in self._data:
+ self._data[key] = self._deserialize(key, None)
+
+ return self._data.get(key)
+
+ def __setitem__(self, key, value):
+ if key in self.read_only_fields:
+ raise RuntimeError('Field \'%s\' is read-only' % key)
+
+ # Normalize the user input before saving it
+ value = self._normalize(key, value)
+ self._data[key] = value
+
+ def __str__(self):
+ s = six.text_type(self.__unicode__())
+ if not six.PY3:
+ s = s.encode('utf-8')
+ return s
+
+ def __repr__(self):
+ return str(self)
+
+ def export_data(self):
+ """
+ Exports current data contained in the Task as JSON
+ """
+
+ # We need to remove spaces for TW-1504, use custom separators
+ data_tuples = ((key, self._serialize(key, value))
+ for key, value in six.iteritems(self._data))
+
+ # Empty string denotes empty serialized value, we do not want
+ # to pass that to TaskWarrior.
+ data_tuples = filter(lambda t: t[1] is not '', data_tuples)
+ data = dict(data_tuples)
+ return json.dumps(data, separators=(',', ':'))
+
+ @property
+ def _modified_fields(self):
+ writable_fields = set(self._data.keys()) - set(self.read_only_fields)
+ for key in writable_fields:
+ new_value = self._data.get(key)
+ old_value = self._original_data.get(key)
+
+ # Make sure not to mark data removal as modified field if the
+ # field originally had some empty value
+ if key in self._data and not new_value and not old_value:
+ continue
+
+ if new_value != old_value:
+ yield key
+
+ @property
+ def modified(self):
+ return bool(list(self._modified_fields))
+
+
+ class TaskAnnotation(TaskResource):
+ read_only_fields = ['entry', 'description']
+
+ def __init__(self, task, data=None):
+ self.task = task
+ self._load_data(data or dict())
+ super(TaskAnnotation, self).__init__(task.backend)
+
+ def remove(self):
+ self.task.remove_annotation(self)
+
+ def __unicode__(self):
+ return self['description']
+
+ def __eq__(self, other):
+ # consider 2 annotations equal if they belong to the same task, and
+ # their data dics are the same
+ return self.task == other.task and self._data == other._data
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ __repr__ = __unicode__
+
+
+ class Task(TaskResource):
+ read_only_fields = ['id', 'entry', 'urgency', 'uuid', 'modified']
+
+ class DoesNotExist(Exception):
+ pass
+
+ class CompletedTask(Exception):
+ """
+ Raised when the operation cannot be performed on the completed task.
+ """
+ pass
+
+ class DeletedTask(Exception):
+ """
+ Raised when the operation cannot be performed on the deleted task.
+ """
+ pass
+
+ class ActiveTask(Exception):
+ """
+ Raised when the operation cannot be performed on the active task.
+ """
+ pass
+
+ class InactiveTask(Exception):
+ """
+ Raised when the operation cannot be performed on an inactive task.
+ """
+ pass
+
+ class NotSaved(Exception):
+ """
+ Raised when the operation cannot be performed on the task, because
+ it has not been saved to TaskWarrior yet.
+ """
+ pass
+
+ @classmethod
+ def from_input(cls, input_file=sys.stdin, modify=None, backend=None):
+ """
+ Creates a Task object, directly from the stdin, by reading one line.
+ If modify=True, two lines are used, first line interpreted as the
+ original state of the Task object, and second line as its new,
+ modified value. This is consistent with the TaskWarrior's hook
+ system.
+
+ Object created by this method should not be saved, deleted
+ or refreshed, as t could create a infinite loop. For this
+ reason, TaskWarrior instance is set to None.
+
+ Input_file argument can be used to specify the input file,
+ but defaults to sys.stdin.
+ """
+
+ # Detect the hook type if not given directly
+ name = os.path.basename(sys.argv[0])
+ modify = name.startswith('on-modify') if modify is None else modify
+
+ # Create the TaskWarrior instance if none passed
+ if backend is None:
+ backends = importlib.import_module('tasklib.backends')
+ hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0]))
+ backend = backends.TaskWarrior(data_location=hook_parent_dir)
+
+ # TaskWarrior instance is set to None
+ task = cls(backend)
+
+ # Load the data from the input
+ task._load_data(json.loads(input_file.readline().strip()))
+
+ # If this is a on-modify event, we are provided with additional
+ # line of input, which provides updated data
+ if modify:
+ task._update_data(json.loads(input_file.readline().strip()),
+ remove_missing=True)
+
+ return task
+
+ def __init__(self, backend, **kwargs):
+ super(Task, self).__init__(backend)
+
+ # Check that user is not able to set read-only value in __init__
+ for key in kwargs.keys():
+ if key in self.read_only_fields:
+ raise RuntimeError('Field \'%s\' is read-only' % key)
+
+ # We serialize the data in kwargs so that users of the library
+ # do not have to pass different data formats via __setitem__ and
+ # __init__ methods, that would be confusing
+
+ # Rather unfortunate syntax due to python2.6 comaptiblity
+ self._data = dict((key, self._normalize(key, value))
+ for (key, value) in six.iteritems(kwargs))
+ self._original_data = copy.deepcopy(self._data)
+
+ # Provide read only access to the original data
+ self.original = ReadOnlyDictView(self._original_data)
+
+ def __unicode__(self):
+ return self['description']
+
+ def __eq__(self, other):
+ if self['uuid'] and other['uuid']:
+ # For saved Tasks, just define equality by equality of uuids
+ return self['uuid'] == other['uuid']
+ else:
+ # If the tasks are not saved, compare the actual instances
+ return id(self) == id(other)
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ if self['uuid']:
+ # For saved Tasks, just define equality by equality of uuids
+ return self['uuid'].__hash__()
+ else:
+ # If the tasks are not saved, return hash of instance id
+ return id(self).__hash__()
+
+ @property
+ def completed(self):
+ return self['status'] == six.text_type('completed')
+
+ @property
+ def deleted(self):
+ return self['status'] == six.text_type('deleted')
+
+ @property
+ def waiting(self):
+ return self['status'] == six.text_type('waiting')
+
+ @property
+ def pending(self):
+ return self['status'] == six.text_type('pending')
+
+ @property
+ def recurring(self):
+ return self['status'] == six.text_type('recurring')
+
+ @property
+ def active(self):
+ return self['start'] is not None
+
+ @property
+ def saved(self):
+ return self['uuid'] is not None or self['id'] is not None
+
+ def serialize_depends(self, cur_dependencies):
+ # Check that all the tasks are saved
+ for task in (cur_dependencies or set()):
+ if not task.saved:
+ raise Task.NotSaved(
+ 'Task \'%s\' needs to be saved before '
+ 'it can be set as dependency.' % task,
+ )
+
+ return super(Task, self).serialize_depends(cur_dependencies)
+
+ def delete(self):
+ if not self.saved:
+ raise Task.NotSaved(
+ 'Task needs to be saved before it can be deleted',
+ )
+
+ # Refresh the status, and raise exception if the task is deleted
+ self.refresh(only_fields=['status'])
+
+ if self.deleted:
+ raise Task.DeletedTask('Task was already deleted')
+
+ self.backend.delete_task(self)
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start', 'end'])
+
+ def start(self):
+ if not self.saved:
+ raise Task.NotSaved(
+ 'Task needs to be saved before it can be started',
+ )
+
+ # Refresh, and raise exception if task is already completed/deleted
+ self.refresh(only_fields=['status'])
+
+ if self.completed:
+ raise Task.CompletedTask('Cannot start a completed task')
+ elif self.deleted:
+ raise Task.DeletedTask('Deleted task cannot be started')
+ elif self.active:
+ raise Task.ActiveTask('Task is already active')
+
+ self.backend.start_task(self)
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start'])
+
+ def stop(self):
+ if not self.saved:
+ raise Task.NotSaved(
+ 'Task needs to be saved before it can be stopped',
+ )
+
+ # Refresh, and raise exception if task is already completed/deleted
+ self.refresh(only_fields=['status'])
+
+ if not self.active:
+ raise Task.InactiveTask('Cannot stop an inactive task')
+
+ self.backend.stop_task(self)
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start'])
+
+ def done(self):
+ if not self.saved:
+ raise Task.NotSaved(
+ 'Task needs to be saved before it can be completed',
+ )
+
+ # Refresh, and raise exception if task is already completed/deleted
+ self.refresh(only_fields=['status'])
+
+ if self.completed:
+ raise Task.CompletedTask('Cannot complete a completed task')
+ elif self.deleted:
+ raise Task.DeletedTask('Deleted task cannot be completed')
+
+ self.backend.complete_task(self)
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start', 'end'])
+
+ def save(self):
+ if self.saved and not self.modified:
+ return
+
+ # All the actual work is done by the backend
+ self.backend.save_task(self)
+
+ def add_annotation(self, annotation):
+ if not self.saved:
+ raise Task.NotSaved('Task needs to be saved to add annotation')
+
+ self.backend.annotate_task(self, annotation)
+ self.refresh(only_fields=['annotations'])
+
+ def remove_annotation(self, annotation):
+ if not self.saved:
+ raise Task.NotSaved('Task needs to be saved to remove annotation')
+
+ if isinstance(annotation, TaskAnnotation):
+ annotation = annotation['description']
+
+ self.backend.denotate_task(self, annotation)
+ self.refresh(only_fields=['annotations'])
+
+ def refresh(self, only_fields=None, after_save=False):
+ # Raise error when trying to refresh a task that has not been saved
+ if not self.saved:
+ raise Task.NotSaved('Task needs to be saved to be refreshed')
+
+ new_data = self.backend.refresh_task(self, after_save=after_save)
+
+ if only_fields:
+ to_update = dict(
+ [(k, new_data.get(k)) for k in only_fields],
+ )
+ self._update_data(to_update, update_original=True)
+ else:
+ self._load_data(new_data)
+
+
+ class TaskQuerySet(object):
+ """
+ Represents a lazy lookup for a task objects.
+ """
+
+ def __init__(self, backend, filter_obj=None):
+ self.backend = backend
+ self._result_cache = None
+ self.filter_obj = filter_obj or self.backend.filter_class(backend)
+
+ def __deepcopy__(self, memo):
+ """
+ Deep copy of a QuerySet doesn't populate the cache
+ """
+ obj = self.__class__(backend=self.backend)
+ for k, v in self.__dict__.items():
+ if k in ('_iter', '_result_cache'):
+ obj.__dict__[k] = None
+ else:
+ obj.__dict__[k] = copy.deepcopy(v, memo)
+ return obj
+
+ def __repr__(self):
+ data = list(self[:REPR_OUTPUT_SIZE + 1])
+ if len(data) > REPR_OUTPUT_SIZE:
+ data[-1] = '...(remaining elements truncated)...'
+ return repr(data)
+
+ def __len__(self):
+ if self._result_cache is None:
+ self._result_cache = list(self)
+ return len(self._result_cache)
+
+ def __iter__(self):
+ if self._result_cache is None:
+ self._result_cache = self._execute()
+ return iter(self._result_cache)
+
+ def __getitem__(self, k):
+ if self._result_cache is None:
+ self._result_cache = list(self)
+ return self._result_cache.__getitem__(k)
+
+ def __bool__(self):
+ if self._result_cache is not None:
+ return bool(self._result_cache)
+ try:
+ next(iter(self))
+ except StopIteration:
+ return False
+ return True
+
+ def __nonzero__(self):
+ return type(self).__bool__(self)
+
+ def _clone(self, klass=None, **kwargs):
+ if klass is None:
+ klass = self.__class__
+ filter_obj = self.filter_obj.clone()
+ c = klass(backend=self.backend, filter_obj=filter_obj)
+ c.__dict__.update(kwargs)
+ return c
+
+ def _execute(self):
+ """
+ Fetch the tasks which match the current filters.
+ """
+ return self.backend.filter_tasks(self.filter_obj)
+
+ def all(self):
+ """
+ Returns a new TaskQuerySet that is a copy of the current one.
+ """
+ return self._clone()
+
+ def pending(self):
+ return self.filter(status=PENDING)
+
+ def completed(self):
+ return self.filter(status=COMPLETED)
+
+ def deleted(self):
+ return self.filter(status=DELETED)
+
+ def waiting(self):
+ return self.filter(status=WAITING)
+
+ def recurring(self):
+ return self.filter(status=RECURRING)
+
+ def filter(self, *args, **kwargs):
+ """
+ Returns a new TaskQuerySet with the given filters added.
+ """
+ clone = self._clone()
+ for f in args:
+ clone.filter_obj.add_filter(f)
+ for key, value in kwargs.items():
+ clone.filter_obj.add_filter_param(key, value)
+ return clone
+
+ def get(self, **kwargs):
+ """
+ Performs the query and returns a single object matching the given
+ keyword arguments.
+ """
+ clone = self.filter(**kwargs)
+ num = len(clone)
+ if num == 1:
+ return clone._result_cache[0]
+ if not num:
+ raise Task.DoesNotExist(
+ 'Task matching query does not exist. '
+ 'Lookup parameters were {0}'.format(kwargs),
+ )
+ raise ValueError(
+ 'get() returned more than one Task -- it returned {0}! '
+ 'Lookup parameters were {1}'.format(num, kwargs),
+ )
--- /dev/null
+ # coding=utf-8
+
+ import copy
+ import datetime
+ import itertools
+ import json
+ import os
+ import pytz
+ import six
+ import shutil
+ import sys
+ import tempfile
+ import unittest
+
+ from .backends import TaskWarrior
+ from .task import Task, ReadOnlyDictView
+ from .lazy import LazyUUIDTask, LazyUUIDTaskSet
+ from .serializing import DATE_FORMAT, local_zone
+
+ # http://taskwarrior.org/docs/design/task.html , Section: The Attributes
+ TASK_STANDARD_ATTRS = (
+ 'status',
+ 'uuid',
+ 'entry',
+ 'description',
+ 'start',
+ 'end',
+ 'due',
+ 'until',
+ 'wait',
+ 'modified',
+ 'scheduled',
+ 'recur',
+ 'mask',
+ 'imask',
+ 'parent',
+ 'project',
+ 'priority',
+ 'depends',
+ 'tags',
+ 'annotations',
+ )
+
+
+ def total_seconds_2_6(x):
+ return x.microseconds / 1e6 + x.seconds + x.days * 24 * 3600
+
+
+ class TasklibTest(unittest.TestCase):
+
+ def get_taskwarrior(self, **kwargs):
+ tw_kwargs = dict(
+ data_location=self.tmp,
+ taskrc_location='/',
+ )
+ tw_kwargs.update(kwargs)
+ return TaskWarrior(**tw_kwargs)
+
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp(dir='.')
+ self.tw = self.get_taskwarrior()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmp)
+
+
+ class TaskWarriorTest(TasklibTest):
+
+ def test_custom_command(self):
+ # ensure that a custom command which contains multiple parts
+ # is properly split up
+ tw = self.get_taskwarrior(
+ task_command='wsl task',
+ # prevent `_get_version` from running as `wsl` may not exist
+ version_override=os.getenv('TASK_VERSION'),
+ )
+ self.assertEqual(tw._get_task_command(), ['wsl', 'task'])
+
+
+ class TaskFilterTest(TasklibTest):
+
+ def test_all_empty(self):
+ self.assertEqual(len(self.tw.tasks.all()), 0)
+
+ def test_all_non_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.all()), 1)
+ self.assertEqual(self.tw.tasks.all()[0]['description'], 'test task')
+ self.assertEqual(self.tw.tasks.all()[0]['status'], 'pending')
+
+ def test_pending_non_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.pending()), 1)
+ self.assertEqual(
+ self.tw.tasks.pending()[0]['description'],
+ 'test task',
+ )
+ self.assertEqual(self.tw.tasks.pending()[0]['status'], 'pending')
+
+ def test_completed_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.completed()), 0)
+
+ def test_completed_non_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.completed()), 0)
+ self.tw.tasks.all()[0].done()
+ self.assertEqual(len(self.tw.tasks.completed()), 1)
+
+ def test_deleted_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.deleted()), 0)
+
+ def test_deleted_non_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.deleted()), 0)
+ self.tw.tasks.all()[0].delete()
+ self.assertEqual(len(self.tw.tasks.deleted()), 1)
+
+ def test_waiting_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.waiting()), 0)
+
+ def test_waiting_non_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.waiting()), 0)
+
+ t = self.tw.tasks.all()[0]
+ t['wait'] = datetime.datetime.now() + datetime.timedelta(days=1)
+ t.save()
+
+ self.assertEqual(len(self.tw.tasks.waiting()), 1)
+
+ def test_recurring_empty(self):
+ Task(self.tw, description='test task').save()
+ self.assertEqual(len(self.tw.tasks.recurring()), 0)
+
+ def test_recurring_non_empty(self):
+ Task(
+ self.tw,
+ description='test task',
+ recur='daily',
+ due=datetime.datetime.now(),
+ ).save()
+ self.assertEqual(len(self.tw.tasks.recurring()), 1)
+
+ def test_filtering_by_attribute(self):
+ Task(self.tw, description='no priority task').save()
+ Task(self.tw, priority='H', description='high priority task').save()
+ self.assertEqual(len(self.tw.tasks.all()), 2)
+
+ # Assert that the correct number of tasks is returned
+ self.assertEqual(len(self.tw.tasks.filter(priority='H')), 1)
+
+ # Assert that the correct tasks are returned
+ high_priority_task = self.tw.tasks.get(priority='H')
+ self.assertEqual(
+ high_priority_task['description'],
+ 'high priority task',
+ )
+
+ def test_filtering_by_empty_attribute(self):
+ Task(self.tw, description='no priority task').save()
+ Task(self.tw, priority='H', description='high priority task').save()
+ self.assertEqual(len(self.tw.tasks.all()), 2)
+
+ # Assert that the correct number of tasks is returned
+ self.assertEqual(len(self.tw.tasks.filter(priority=None)), 1)
+
+ # Assert that the correct tasks are returned
+ no_priority_task = self.tw.tasks.get(priority=None)
+ self.assertEqual(no_priority_task['description'], 'no priority task')
+
+ def test_filter_for_task_with_space_in_descripition(self):
+ task = Task(self.tw, description='test task')
+ task.save()
+
+ filtered_task = self.tw.tasks.get(description='test task')
+ self.assertEqual(filtered_task['description'], 'test task')
+
+ def test_filter_for_task_without_space_in_descripition(self):
+ task = Task(self.tw, description='test')
+ task.save()
+
+ filtered_task = self.tw.tasks.get(description='test')
+ self.assertEqual(filtered_task['description'], 'test')
+
+ def test_filter_for_task_with_space_in_project(self):
+ task = Task(self.tw, description='test', project='random project')
+ task.save()
+
+ filtered_task = self.tw.tasks.get(project='random project')
+ self.assertEqual(filtered_task['project'], 'random project')
+
+ def test_filter_for_task_without_space_in_project(self):
+ task = Task(self.tw, description='test', project='random')
+ task.save()
+
+ filtered_task = self.tw.tasks.get(project='random')
+ self.assertEqual(filtered_task['project'], 'random')
+
+ def test_filter_with_empty_uuid(self):
+ self.assertRaises(ValueError, lambda: self.tw.tasks.get(uuid=''))
+
+ def test_filter_dummy_by_status(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(status=t['status'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_uuid(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(uuid=t['uuid'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_entry(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(entry=t['entry'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_description(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(description=t['description'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_start(self):
+ t = Task(self.tw, description='test')
+ t.save()
+ t.start()
+
+ tasks = self.tw.tasks.filter(start=t['start'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_end(self):
+ t = Task(self.tw, description='test')
+ t.save()
+ t.done()
+
+ tasks = self.tw.tasks.filter(end=t['end'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_due(self):
+ t = Task(self.tw, description='test', due=datetime.datetime.now())
+ t.save()
+
+ tasks = self.tw.tasks.filter(due=t['due'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_until(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(until=t['until'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_modified(self):
+ # Older TW version does not support bumping modified
+ # on save
+ if self.tw.version < six.text_type('2.2.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(modified=t['modified'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_scheduled(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ tasks = self.tw.tasks.filter(scheduled=t['scheduled'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_tags(self):
+ t = Task(self.tw, description='test', tags=['home'])
+ t.save()
+
+ tasks = self.tw.tasks.filter(tags=t['tags'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_dummy_by_projects(self):
+ t = Task(self.tw, description='test', project='random')
+ t.save()
+
+ tasks = self.tw.tasks.filter(project=t['project'])
+ self.assertEqual(list(tasks), [t])
+
+ def test_filter_by_priority(self):
+ t = Task(self.tw, description='test', priority='H')
+ t.save()
+
+ tasks = self.tw.tasks.filter(priority=t['priority'])
+ self.assertEqual(list(tasks), [t])
+
+
+ class TaskTest(TasklibTest):
+
+ def test_create_unsaved_task(self):
+ # Make sure a new task is not saved unless explicitly called for
+ Task(self.tw, description='test task')
+ self.assertEqual(len(self.tw.tasks.all()), 0)
+
+ # TODO: once python 2.6 compatibility is over, use context managers here
+ # and in all subsequent tests for assertRaises
+
+ def test_delete_unsaved_task(self):
+ t = Task(self.tw, description='test task')
+ self.assertRaises(Task.NotSaved, t.delete)
+
+ def test_complete_unsaved_task(self):
+ t = Task(self.tw, description='test task')
+ self.assertRaises(Task.NotSaved, t.done)
+
+ def test_refresh_unsaved_task(self):
+ t = Task(self.tw, description='test task')
+ self.assertRaises(Task.NotSaved, t.refresh)
+
+ def test_start_unsaved_task(self):
+ t = Task(self.tw, description='test task')
+ self.assertRaises(Task.NotSaved, t.start)
+
+ def test_delete_deleted_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.delete()
+
+ self.assertRaises(Task.DeletedTask, t.delete)
+
+ def test_complete_completed_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.done()
+
+ self.assertRaises(Task.CompletedTask, t.done)
+
+ def test_start_completed_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.done()
+
+ self.assertRaises(Task.CompletedTask, t.start)
+
+ def test_add_completed_task(self):
+ t = Task(
+ self.tw,
+ description='test',
+ status='completed',
+ end=datetime.datetime.now(),
+ )
+ t.save()
+
+ def test_add_multiple_completed_tasks(self):
+ t1 = Task(
+ self.tw,
+ description='test1',
+ status='completed',
+ end=datetime.datetime.now(),
+ )
+ t2 = Task(
+ self.tw,
+ description='test2',
+ status='completed',
+ end=datetime.datetime.now(),
+ )
+ t1.save()
+ t2.save()
+
+ def test_complete_deleted_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.delete()
+
+ self.assertRaises(Task.DeletedTask, t.done)
+
+ def test_starting_task(self):
+ t = Task(self.tw, description='test task')
+ now = t.datetime_normalizer(datetime.datetime.now())
+ t.save()
+ t.start()
+
+ self.assertTrue(now.replace(microsecond=0) <= t['start'])
+ self.assertEqual(t['status'], 'pending')
+
+ def test_completing_task(self):
+ t = Task(self.tw, description='test task')
+ now = t.datetime_normalizer(datetime.datetime.now())
+ t.save()
+ t.done()
+
+ self.assertTrue(now.replace(microsecond=0) <= t['end'])
+ self.assertEqual(t['status'], 'completed')
+
+ def test_deleting_task(self):
+ t = Task(self.tw, description='test task')
+ now = t.datetime_normalizer(datetime.datetime.now())
+ t.save()
+ t.delete()
+
+ self.assertTrue(now.replace(microsecond=0) <= t['end'])
+ self.assertEqual(t['status'], 'deleted')
+
+ def test_started_task_active(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.start()
+ self.assertTrue(t.active)
+
+ def test_unstarted_task_inactive(self):
+ t = Task(self.tw, description='test task')
+ self.assertFalse(t.active)
+ t.save()
+ self.assertFalse(t.active)
+
+ def test_start_active_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.start()
+ self.assertRaises(Task.ActiveTask, t.start)
+
+ def test_stop_completed_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.start()
+ t.done()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.done()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ def test_stop_deleted_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.start()
+ t.delete()
+ t.stop()
+
+ def test_stop_inactive_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.start()
+ t.stop()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ def test_stopping_task(self):
+ t = Task(self.tw, description='test task')
+ t.datetime_normalizer(datetime.datetime.now())
+ t.save()
+ t.start()
+ t.stop()
+
+ self.assertEqual(t['end'], None)
+ self.assertEqual(t['status'], 'pending')
+ self.assertFalse(t.active)
+
+ def test_modify_simple_attribute_without_space(self):
+ t = Task(self.tw, description='test')
+ t.save()
+
+ self.assertEqual(t['description'], 'test')
+
+ t['description'] = 'test-modified'
+ t.save()
+
+ self.assertEqual(t['description'], 'test-modified')
+
+ def test_modify_simple_attribute_with_space(self):
+ # Space can pose problems with parsing
+ t = Task(self.tw, description='test task')
+ t.save()
+
+ self.assertEqual(t['description'], 'test task')
+
+ t['description'] = 'test task modified'
+ t.save()
+
+ self.assertEqual(t['description'], 'test task modified')
+
+ def test_empty_dependency_set_of_unsaved_task(self):
+ t = Task(self.tw, description='test task')
+ self.assertEqual(t['depends'], set())
+
+ def test_empty_dependency_set_of_saved_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ self.assertEqual(t['depends'], set())
+
+ def test_set_unsaved_task_as_dependency(self):
+ # Adds only one dependency to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+
+ # We only save the parent task, dependency task is unsaved
+ t.save()
+ t['depends'] = set([dependency])
+
+ self.assertRaises(Task.NotSaved, t.save)
+
+ def test_set_simple_dependency_set(self):
+ # Adds only one dependency to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+
+ t.save()
+ dependency.save()
+
+ t['depends'] = set([dependency])
+
+ self.assertEqual(t['depends'], set([dependency]))
+
+ def test_set_simple_dependency_lazyuuidtaskset(self):
+ # Adds only one dependency as a LazyUUIDTaskSet to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+
+ t.save()
+ dependency.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']])
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']]))
+
+ def test_set_complex_dependency_set(self):
+ # Adds two dependencies to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ t.save()
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = set([dependency1, dependency2])
+
+ self.assertEqual(t['depends'], set([dependency1, dependency2]))
+
+ def test_set_complex_dependency_lazyuuidtaskset(self):
+ # Adds two dependencies as a LazyUUIDTaskSet to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ t.save()
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']])
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']]))
+
+ def test_remove_from_dependency_set(self):
+ # Removes dependency from task with two dependencies
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = set([dependency1, dependency2])
+ t.save()
+
+ t['depends'].remove(dependency2)
+ t.save()
+
+ self.assertEqual(t['depends'], set([dependency1]))
+
+ def test_remove_from_dependency_lazyuuidtaskset(self):
+ # Removes dependency from task with two dependencies as LazyUUIDTaskSet
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']])
+ t.save()
+
+ t['depends'].remove(dependency2)
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid']]))
+
+ def test_add_to_dependency_set(self):
+ # Adds dependency to task with one dependencies
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = set([dependency1])
+ t.save()
+
+ t['depends'].add(dependency2)
+ t.save()
+
+ self.assertEqual(t['depends'], set([dependency1, dependency2]))
+
+ def test_add_to_dependency_lazyuuidtaskset(self):
+ # Adds dependency to task with one dependencies as LazyUUIDTaskSet
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid']])
+ t.save()
+
+ t['depends'].add(dependency2)
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']]))
+
+ def test_add_lazyuuidtaskset_to_dependency_lazyuuidtaskset(self):
+ # Adds dependency as LazyUUIDTaskSet to task with one dependencies as LazyUUIDTaskSet
+ t = Task(self.tw, description='test task')
+ dependency1 = Task(self.tw, description='needs to be done first')
+ dependency2 = Task(self.tw, description='needs to be done second')
+
+ dependency1.save()
+ dependency2.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid']])
+ t.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency2['uuid']]).union(t['depends'])
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']]))
+
+ def test_add_to_empty_dependency_set(self):
+ # Adds dependency to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+
+ dependency.save()
+
+ t['depends'].add(dependency)
+ t.save()
+
+ self.assertEqual(t['depends'], set([dependency]))
+
+ def test_add_to_empty_dependency_lazyuuidtaskset(self):
+ # Adds dependency as LazyUUIDTaskSet to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+
+ dependency.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']])
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']]))
+
+ def test_simple_dependency_set_save_repeatedly(self):
+ # Adds only one dependency to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+ dependency.save()
+
+ t['depends'] = set([dependency])
+ t.save()
+
+ # We taint the task, but keep depends intact
+ t['description'] = 'test task modified'
+ t.save()
+
+ self.assertEqual(t['depends'], set([dependency]))
+
+ # We taint the task, but assign the same set to the depends
+ t['depends'] = set([dependency])
+ t['description'] = 'test task modified again'
+ t.save()
+
+ self.assertEqual(t['depends'], set([dependency]))
+
+ def test_simple_dependency_lazyuuidtaskset_save_repeatedly(self):
+ # Adds only one dependency as LazyUUIDTaskSet to task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+ dependency.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']])
+ t.save()
+
+ # We taint the task, but keep depends intact
+ t['description'] = 'test task modified'
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']]))
+
+ # We taint the task, but assign the same set to the depends
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']])
+ t['description'] = 'test task modified again'
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']]))
+
+ def test_simple_dependency_lazyuuidtaskset_save_before_repeatedly(self):
+ # Adds only one dependency as LazyUUIDTaskSet to a saved task with no dependencies
+ t = Task(self.tw, description='test task')
+ dependency = Task(self.tw, description='needs to be done first')
+ dependency.save()
+ t.save()
+
+ t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']])
+ t.save()
+
+ self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']]))
+
+ def test_compare_different_tasks(self):
+ # Negative: compare two different tasks
+ t1 = Task(self.tw, description='test task')
+ t2 = Task(self.tw, description='test task')
+
+ t1.save()
+ t2.save()
+
+ self.assertEqual(t1 == t2, False)
+
+ def test_compare_same_task_object(self):
+ # Compare Task object wit itself
+ t = Task(self.tw, description='test task')
+ t.save()
+
+ self.assertEqual(t == t, True)
+
+ def test_compare_same_task(self):
+ # Compare the same task using two different objects
+ t1 = Task(self.tw, description='test task')
+ t1.save()
+
+ t2 = self.tw.tasks.get(uuid=t1['uuid'])
+ self.assertEqual(t1 == t2, True)
+
+ def test_compare_unsaved_tasks(self):
+ # t1 and t2 are unsaved tasks, considered to be unequal
+ # despite the content of data
+ t1 = Task(self.tw, description='test task')
+ t2 = Task(self.tw, description='test task')
+
+ self.assertEqual(t1 == t2, False)
+
+ def test_hash_unsaved_tasks(self):
+ # Considered equal, it's the same object
+ t1 = Task(self.tw, description='test task')
+ t2 = t1
+ self.assertEqual(hash(t1) == hash(t2), True)
+
+ def test_hash_same_task(self):
+ # Compare the hash of the task using two different objects
+ t1 = Task(self.tw, description='test task')
+ t1.save()
+
+ t2 = self.tw.tasks.get(uuid=t1['uuid'])
+ self.assertEqual(t1.__hash__(), t2.__hash__())
+
+ def test_hash_unequal_unsaved_tasks(self):
+ # Compare the hash of the task using two different objects
+ t1 = Task(self.tw, description='test task 1')
+ t2 = Task(self.tw, description='test task 2')
+
+ self.assertNotEqual(t1.__hash__(), t2.__hash__())
+
+ def test_hash_unequal_saved_tasks(self):
+ # Compare the hash of the task using two different objects
+ t1 = Task(self.tw, description='test task 1')
+ t2 = Task(self.tw, description='test task 2')
+
+ t1.save()
+ t2.save()
+
+ self.assertNotEqual(t1.__hash__(), t2.__hash__())
+
+ def test_adding_task_with_priority(self):
+ t = Task(self.tw, description='test task', priority='M')
+ t.save()
+
+ def test_removing_priority_with_none(self):
+ t = Task(self.tw, description='test task', priority='L')
+ t.save()
+
+ # Remove the priority mark
+ t['priority'] = None
+ t.save()
+
+ # Assert that priority is not there after saving
+ self.assertEqual(t['priority'], None)
+
+ def test_adding_task_with_due_time(self):
+ t = Task(self.tw, description='test task', due=datetime.datetime.now())
+ t.save()
+
+ def test_removing_due_time_with_none(self):
+ t = Task(self.tw, description='test task', due=datetime.datetime.now())
+ t.save()
+
+ # Remove the due timestamp
+ t['due'] = None
+ t.save()
+
+ # Assert that due timestamp is no longer there
+ self.assertEqual(t['due'], None)
+
+ def test_modified_fields_new_task(self):
+ t = Task(self.tw)
+
+ # This should be empty with new task
+ self.assertEqual(set(t._modified_fields), set())
+
+ # Modify the task
+ t['description'] = 'test task'
+ self.assertEqual(set(t._modified_fields), set(['description']))
+
+ t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3
+ self.assertEqual(set(t._modified_fields), set(['description', 'due']))
+
+ t['project'] = 'test project'
+ self.assertEqual(
+ set(t._modified_fields),
+ set(['description', 'due', 'project']),
+ )
+
+ # List of modified fields should clear out when saved
+ t.save()
+ self.assertEqual(set(t._modified_fields), set())
+
+ # Reassigning the fields with the same values now should not produce
+ # modified fields
+ t['description'] = 'test task'
+ t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3
+ t['project'] = 'test project'
+ self.assertEqual(set(t._modified_fields), set())
+
+ def test_modified_fields_loaded_task(self):
+ t = Task(self.tw)
+
+ # Modify the task
+ t['description'] = 'test task'
+ t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3
+ t['project'] = 'test project'
+
+ dependency = Task(self.tw, description='dependency')
+ dependency.save()
+ t['depends'] = set([dependency])
+
+ # List of modified fields should clear out when saved
+ t.save()
+ self.assertEqual(set(t._modified_fields), set())
+
+ # Get the task by using a filter by UUID
+ self.tw.tasks.get(uuid=t['uuid'])
+
+ # Reassigning the fields with the same values now should not produce
+ # modified fields
+ t['description'] = 'test task'
+ t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3
+ t['project'] = 'test project'
+ t['depends'] = set([dependency])
+ self.assertEqual(set(t._modified_fields), set())
+
+ def test_modified_fields_not_affected_by_reading(self):
+ t = Task(self.tw)
+
+ for field in TASK_STANDARD_ATTRS:
+ t[field]
+
+ self.assertEqual(set(t._modified_fields), set())
+
+ def test_setting_read_only_attrs_through_init(self):
+ # Test that we are unable to set readonly attrs through __init__
+ for readonly_key in Task.read_only_fields:
+ kwargs = {'description': 'test task', readonly_key: 'value'}
+ self.assertRaises(
+ RuntimeError,
+ lambda: Task(self.tw, **kwargs),
+ )
+
+ def test_setting_read_only_attrs_through_setitem(self):
+ # Test that we are unable to set readonly attrs through __init__
+ for readonly_key in Task.read_only_fields:
+ t = Task(self.tw, description='test task')
+ self.assertRaises(
+ RuntimeError,
+ lambda: t.__setitem__(readonly_key, 'value'),
+ )
+
+ def test_saving_unmodified_task(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t.save()
+
+ def test_adding_tag_by_appending(self):
+ t = Task(self.tw, description='test task', tags=['test1'])
+ t.save()
+ t['tags'].add('test2')
+ t.save()
+ self.assertEqual(t['tags'], set(['test1', 'test2']))
+
+ def test_adding_tag_twice(self):
+ t = Task(self.tw, description='test task', tags=['test1'])
+ t.save()
+ t['tags'].add('test2')
+ t['tags'].add('test2')
+ t.save()
+ self.assertEqual(t['tags'], set(['test1', 'test2']))
+
+ def test_adding_tag_by_appending_empty(self):
+ t = Task(self.tw, description='test task')
+ t.save()
+ t['tags'].add('test')
+ t.save()
+ self.assertEqual(t['tags'], set(['test']))
+
+ def test_serializers_returning_empty_string_for_none(self):
+ # Test that any serializer returns '' when passed None
+ t = Task(self.tw)
+ serializers = [
+ getattr(t, serializer_name)
+ for serializer_name in filter(
+ lambda x: x.startswith('serialize_'),
+ dir(t),
+ )
+ ]
+ for serializer in serializers:
+ self.assertEqual(serializer(None), '')
+
+ def test_deserializer_returning_empty_value_for_empty_string(self):
+ # Test that any deserializer returns empty value when passed ''
+ t = Task(self.tw)
+ deserializers = [
+ getattr(t, deserializer_name)
+ for deserializer_name in filter(
+ lambda x: x.startswith('deserialize_'),
+ dir(t),
+ )
+ ]
+ for deserializer in deserializers:
+ self.assertTrue(deserializer('') in (None, [], set()))
+
+ def test_normalizers_handling_none(self):
+ # Test that any normalizer can handle None as a valid value
+ t = Task(self.tw)
+
+ for key in TASK_STANDARD_ATTRS:
+ t._normalize(key, None)
+
+ def test_recurrent_task_generation(self):
+ today = datetime.date.today()
+ t = Task(
+ self.tw,
+ description='brush teeth',
+ due=today,
+ recur='daily',
+ )
+ t.save()
+ self.assertEqual(len(self.tw.tasks.pending()), 2)
+
+ def test_spawned_task_parent(self):
+ today = datetime.date.today()
+ t = Task(
+ self.tw,
+ description='brush teeth',
+ due=today,
+ recur='daily',
+ )
+ t.save()
+
+ spawned = self.tw.tasks.pending().get(due=today)
+ assert spawned['parent'] == t
+
+ def test_modify_number_of_tasks_at_once(self):
+ for i in range(1, 100):
+ Task(self.tw, description='test task %d' % i, tags=['test']).save()
+
+ self.tw.execute_command(['+test', 'mod', 'unified', 'description'])
+
+ def test_return_all_from_executed_command(self):
+ Task(self.tw, description='test task', tags=['test']).save()
+ out, err, rc = self.tw.execute_command(['count'], return_all=True)
+ self.assertEqual(rc, 0)
+
+ def test_return_all_from_failed_executed_command(self):
+ Task(self.tw, description='test task', tags=['test']).save()
+ out, err, rc = self.tw.execute_command(
+ ['countinvalid'],
+ return_all=True,
+ allow_failure=False,
+ )
+ self.assertNotEqual(rc, 0)
+
+
+ class TaskFromHookTest(TasklibTest):
+
+ input_add_data = six.StringIO(
+ '{"description":"Buy some milk",'
+ '"entry":"20141118T050231Z",'
+ '"status":"pending",'
+ '"start":"20141119T152233Z",'
+ '"uuid":"a360fc44-315c-4366-b70c-ea7e7520b749"}',
+ )
+
+ input_add_data_recurring = six.StringIO(
+ '{"description":"Mow the lawn",'
+ '"entry":"20160210T224304Z",'
+ '"parent":"62da6227-519c-42c2-915d-dccada926ad7",'
+ '"recur":"weekly",'
+ '"status":"pending",'
+ '"uuid":"81305335-0237-49ff-8e87-b3cdc2369cec"}',
+ )
+
+ input_modify_data = six.StringIO(
+ '\n'.join([
+ input_add_data.getvalue(),
+ (
+ '{"description":"Buy some milk finally",'
+ '"entry":"20141118T050231Z",'
+ '"status":"completed",'
+ '"uuid":"a360fc44-315c-4366-b70c-ea7e7520b749"}'
+ ),
+ ]),
+ )
+
+ exported_raw_data = (
+ '{"project":"Home",'
+ '"due":"20150101T232323Z",'
+ '"description":"test task"}'
+ )
+
+ def test_setting_up_from_add_hook_input(self):
+ t = Task.from_input(input_file=self.input_add_data, backend=self.tw)
+ self.assertEqual(t['description'], 'Buy some milk')
+ self.assertEqual(t.pending, True)
+
+ def test_setting_up_from_add_hook_input_recurring(self):
+ t = Task.from_input(
+ input_file=self.input_add_data_recurring,
+ backend=self.tw,
+ )
+ self.assertEqual(t['description'], 'Mow the lawn')
+ self.assertEqual(t.pending, True)
+
+ def test_setting_up_from_modified_hook_input(self):
+ t = Task.from_input(
+ input_file=self.input_modify_data,
+ modify=True,
+ backend=self.tw,
+ )
+ self.assertEqual(t['description'], 'Buy some milk finally')
+ self.assertEqual(t.pending, False)
+ self.assertEqual(t.completed, True)
+
+ self.assertEqual(t._original_data['status'], 'pending')
+ self.assertEqual(t._original_data['description'], 'Buy some milk')
+ self.assertEqual(
+ set(t._modified_fields),
+ set(['status', 'description', 'start']),
+ )
+
+ def test_export_data(self):
+ t = Task(
+ self.tw,
+ description='test task',
+ project='Home',
+ due=pytz.utc.localize(
+ datetime.datetime(2015, 1, 1, 23, 23, 23)),
+ )
+
+ # Check that the output is a permutation of:
+ # {"project":"Home","description":"test task","due":"20150101232323Z"}
+ allowed_segments = self.exported_raw_data[1:-1].split(',')
+ allowed_output = [
+ '{' + ','.join(segments) + '}'
+ for segments in itertools.permutations(allowed_segments)
+ ]
+
+ self.assertTrue(
+ any(t.export_data() == expected
+ for expected in allowed_output),
+ )
+
+
+ class TimezoneAwareDatetimeTest(TasklibTest):
+
+ def setUp(self):
+ super(TimezoneAwareDatetimeTest, self).setUp()
+ self.zone = local_zone
+ self.localdate_naive = datetime.datetime(2015, 2, 2)
+ self.localtime_naive = datetime.datetime(2015, 2, 2, 0, 0, 0)
+ self.localtime_aware = self.zone.localize(self.localtime_naive)
+ self.utctime_aware = self.localtime_aware.astimezone(pytz.utc)
+
+ def test_timezone_naive_datetime_setitem(self):
+ t = Task(self.tw, description='test task')
+ t['due'] = self.localtime_naive
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_timezone_naive_datetime_using_init(self):
+ t = Task(self.tw, description='test task', due=self.localtime_naive)
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_filter_by_naive_datetime(self):
+ t = Task(self.tw, description='task1', due=self.localtime_naive)
+ t.save()
+ matching_tasks = self.tw.tasks.filter(due=self.localtime_naive)
+ self.assertEqual(len(matching_tasks), 1)
+
+ def test_serialize_naive_datetime(self):
+ t = Task(self.tw, description='task1', due=self.localtime_naive)
+ self.assertEqual(
+ json.loads(t.export_data())['due'],
+ self.utctime_aware.strftime(DATE_FORMAT),
+ )
+
+ def test_timezone_naive_date_setitem(self):
+ t = Task(self.tw, description='test task')
+ t['due'] = self.localdate_naive
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_timezone_naive_date_using_init(self):
+ t = Task(self.tw, description='test task', due=self.localdate_naive)
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_filter_by_naive_date(self):
+ t = Task(self.tw, description='task1', due=self.localdate_naive)
+ t.save()
+ matching_tasks = self.tw.tasks.filter(due=self.localdate_naive)
+ self.assertEqual(len(matching_tasks), 1)
+
+ def test_serialize_naive_date(self):
+ t = Task(self.tw, description='task1', due=self.localdate_naive)
+ self.assertEqual(
+ json.loads(t.export_data())['due'],
+ self.utctime_aware.strftime(DATE_FORMAT),
+ )
+
+ def test_timezone_aware_datetime_setitem(self):
+ t = Task(self.tw, description='test task')
+ t['due'] = self.localtime_aware
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_timezone_aware_datetime_using_init(self):
+ t = Task(self.tw, description='test task', due=self.localtime_aware)
+ self.assertEqual(t['due'], self.localtime_aware)
+
+ def test_filter_by_aware_datetime(self):
+ t = Task(self.tw, description='task1', due=self.localtime_aware)
+ t.save()
+ matching_tasks = self.tw.tasks.filter(due=self.localtime_aware)
+ self.assertEqual(len(matching_tasks), 1)
+
+ def test_serialize_aware_datetime(self):
+ t = Task(self.tw, description='task1', due=self.localtime_aware)
+ self.assertEqual(
+ json.loads(t.export_data())['due'],
+ self.utctime_aware.strftime(DATE_FORMAT),
+ )
+
+
+ class DatetimeStringTest(TasklibTest):
+
+ def test_simple_now_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description='test task', due='now')
+ now = local_zone.localize(datetime.datetime.now())
+
+ # Assert that both times are not more than 5 seconds apart
+ if sys.version_info < (2, 7):
+ self.assertTrue(total_seconds_2_6(now - t['due']) < 5)
+ self.assertTrue(total_seconds_2_6(t['due'] - now) < 5)
+ else:
+ self.assertTrue((now - t['due']).total_seconds() < 5)
+ self.assertTrue((t['due'] - now).total_seconds() < 5)
+
+ def test_simple_eoy_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description='test task', due='eoy')
+ now = local_zone.localize(datetime.datetime.now())
+ eoy = local_zone.localize(datetime.datetime(
+ year=now.year,
+ month=12,
+ day=31,
+ hour=23,
+ minute=59,
+ second=59,
+ ))
+ self.assertEqual(eoy, t['due'])
+
+ def test_complex_eoy_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description='test task', due='eoy - 4 months')
+ now = local_zone.localize(datetime.datetime.now())
+ due_date = local_zone.localize(
+ datetime.datetime(
+ year=now.year,
+ month=12,
+ day=31,
+ hour=23,
+ minute=59,
+ second=59,
+ )
+ ) - datetime.timedelta(0, 4 * 30 * 86400)
+ self.assertEqual(due_date, t['due'])
+
+ def test_filtering_with_string_datetime(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(
+ self.tw,
+ description='test task',
+ due=datetime.datetime.now() - datetime.timedelta(0, 2),
+ )
+ t.save()
+ self.assertEqual(len(self.tw.tasks.filter(due__before='now')), 1)
+
+
+ class AnnotationTest(TasklibTest):
+
+ def setUp(self):
+ super(AnnotationTest, self).setUp()
+ Task(self.tw, description='test task').save()
+
+ def test_adding_annotation(self):
+ task = self.tw.tasks.get()
+ task.add_annotation('test annotation')
+ self.assertEqual(len(task['annotations']), 1)
+ ann = task['annotations'][0]
+ self.assertEqual(ann['description'], 'test annotation')
+
+ def test_removing_annotation(self):
+ task = self.tw.tasks.get()
+ task.add_annotation('test annotation')
+ ann = task['annotations'][0]
+ ann.remove()
+ self.assertEqual(len(task['annotations']), 0)
+
+ def test_removing_annotation_by_description(self):
+ task = self.tw.tasks.get()
+ task.add_annotation('test annotation')
+ task.remove_annotation('test annotation')
+ self.assertEqual(len(task['annotations']), 0)
+
+ def test_removing_annotation_by_obj(self):
+ task = self.tw.tasks.get()
+ task.add_annotation('test annotation')
+ ann = task['annotations'][0]
+ task.remove_annotation(ann)
+ self.assertEqual(len(task['annotations']), 0)
+
+ def test_annotation_after_modification(self):
+ task = self.tw.tasks.get()
+ task['project'] = 'test'
+ task.add_annotation('I should really do this task')
+ self.assertEqual(task['project'], 'test')
+ task.save()
+ self.assertEqual(task['project'], 'test')
+
+ def test_serialize_annotations(self):
+ # Test that serializing annotations is possible
+ t = Task(self.tw, description='test')
+ t.save()
+
+ t.add_annotation('annotation1')
+ t.add_annotation('annotation2')
+
+ data = t._serialize('annotations', t._data['annotations'])
+
+ self.assertEqual(len(data), 2)
+ self.assertEqual(type(data[0]), dict)
+ self.assertEqual(type(data[1]), dict)
+
+ self.assertEqual(data[0]['description'], 'annotation1')
+ self.assertEqual(data[1]['description'], 'annotation2')
+
+
+ class UnicodeTest(TasklibTest):
+
+ def test_unicode_task(self):
+ Task(self.tw, description=six.u('†åßk')).save()
+ self.tw.tasks.get()
+
+ def test_filter_by_unicode_task(self):
+ Task(self.tw, description=six.u('†åßk')).save()
+ tasks = self.tw.tasks.filter(description=six.u('†åßk'))
+ self.assertEqual(len(tasks), 1)
+
+ def test_non_unicode_task(self):
+ Task(self.tw, description='test task').save()
+ self.tw.tasks.get()
+
+
+ class ReadOnlyDictViewTest(unittest.TestCase):
+
+ def setUp(self):
+ self.sample = dict(sample_list=[1, 2, 3], sample_dict={'key': 'value'})
+ self.original_sample = copy.deepcopy(self.sample)
+ self.view = ReadOnlyDictView(self.sample)
+
+ def test_readonlydictview_getitem(self):
+ sample_list = self.view['sample_list']
+ self.assertEqual(sample_list, self.sample['sample_list'])
+
+ # Assert that modification changed only copied value
+ sample_list.append(4)
+ self.assertNotEqual(sample_list, self.sample['sample_list'])
+
+ # Assert that viewed dict is not changed
+ self.assertEqual(self.sample, self.original_sample)
+
+ def test_readonlydictview_contains(self):
+ self.assertEqual('sample_list' in self.view,
+ 'sample_list' in self.sample)
+ self.assertEqual('sample_dict' in self.view,
+ 'sample_dict' in self.sample)
+ self.assertEqual('key' in self.view, 'key' in self.sample)
+
+ # Assert that viewed dict is not changed
+ self.assertEqual(self.sample, self.original_sample)
+
+ def test_readonlydictview_iter(self):
+ self.assertEqual(
+ list(key for key in self.view),
+ list(key for key in self.sample),
+ )
+
+ # Assert the view is correct after modification
+ self.sample['new'] = 'value'
+ self.assertEqual(
+ list(key for key in self.view),
+ list(key for key in self.sample),
+ )
+
+ def test_readonlydictview_len(self):
+ self.assertEqual(len(self.view), len(self.sample))
+
+ # Assert the view is correct after modification
+ self.sample['new'] = 'value'
+ self.assertEqual(len(self.view), len(self.sample))
+
+ def test_readonlydictview_get(self):
+ sample_list = self.view.get('sample_list')
+ self.assertEqual(sample_list, self.sample.get('sample_list'))
+
+ # Assert that modification changed only copied value
+ sample_list.append(4)
+ self.assertNotEqual(sample_list, self.sample.get('sample_list'))
+
+ # Assert that viewed dict is not changed
+ self.assertEqual(self.sample, self.original_sample)
+
+ def test_readonlydict_items(self):
+ view_items = self.view.items()
+ sample_items = list(self.sample.items())
+ self.assertEqual(view_items, sample_items)
+
+ view_items.append('newkey')
+ self.assertNotEqual(view_items, sample_items)
+ self.assertEqual(self.sample, self.original_sample)
+
+ def test_readonlydict_values(self):
+ view_values = self.view.values()
+ sample_values = list(self.sample.values())
+ self.assertEqual(view_values, sample_values)
+
+ view_list_item = list(filter(lambda x: type(x) is list,
+ view_values))[0]
+ view_list_item.append(4)
+ self.assertNotEqual(view_values, sample_values)
+ self.assertEqual(self.sample, self.original_sample)
+
+
+ class LazyUUIDTaskTest(TasklibTest):
+
+ def setUp(self):
+ super(LazyUUIDTaskTest, self).setUp()
+
+ self.stored = Task(self.tw, description='this is test task')
+ self.stored.save()
+
+ self.lazy = LazyUUIDTask(self.tw, self.stored['uuid'])
+
+ def test_uuid_non_conversion(self):
+ assert self.stored['uuid'] == self.lazy['uuid']
+ assert type(self.lazy) is LazyUUIDTask
+
+ def test_lazy_explicit_conversion(self):
+ assert type(self.lazy) is LazyUUIDTask
+ self.lazy.replace()
+ assert type(self.lazy) is Task
+
+ def test_conversion_key(self):
+ assert self.stored['description'] == self.lazy['description']
+ assert type(self.lazy) is Task
+
+ def test_conversion_attribute(self):
+ assert type(self.lazy) is LazyUUIDTask
+ assert self.lazy.completed is False
+ assert type(self.lazy) is Task
+
+ def test_normal_to_lazy_equality(self):
+ assert self.stored == self.lazy
+ assert not self.stored != self.lazy
+ assert type(self.lazy) is LazyUUIDTask
+
+ def test_lazy_to_lazy_equality(self):
+ lazy1 = LazyUUIDTask(self.tw, self.stored['uuid'])
+ lazy2 = LazyUUIDTask(self.tw, self.stored['uuid'])
+
+ assert lazy1 == lazy2
+ assert not lazy1 != lazy2
+ assert type(lazy1) is LazyUUIDTask
+ assert type(lazy2) is LazyUUIDTask
+
+ def test_normal_to_lazy_inequality(self):
+ # Create a different UUID by changing the last letter
+ wrong_uuid = self.stored['uuid']
+ wrong_uuid = wrong_uuid[:-1] + ('a' if wrong_uuid[-1] != 'a' else 'b')
+
+ wrong_lazy = LazyUUIDTask(self.tw, wrong_uuid)
+
+ assert not self.stored == wrong_lazy
+ assert self.stored != wrong_lazy
+ assert type(wrong_lazy) is LazyUUIDTask
+
+ def test_lazy_to_lazy_inequality(self):
+ # Create a different UUID by changing the last letter
+ wrong_uuid = self.stored['uuid']
+ wrong_uuid = wrong_uuid[:-1] + ('a' if wrong_uuid[-1] != 'a' else 'b')
+
+ lazy1 = LazyUUIDTask(self.tw, self.stored['uuid'])
+ lazy2 = LazyUUIDTask(self.tw, wrong_uuid)
+
+ assert not lazy1 == lazy2
+ assert lazy1 != lazy2
+ assert type(lazy1) is LazyUUIDTask
+ assert type(lazy2) is LazyUUIDTask
+
+ def test_lazy_in_queryset(self):
+ tasks = self.tw.tasks.filter(uuid=self.stored['uuid'])
+
+ assert self.lazy in tasks
+ assert type(self.lazy) is LazyUUIDTask
+
+ def test_lazy_saved(self):
+ assert self.lazy.saved is True
+
+ def test_lazy_modified(self):
+ assert self.lazy.modified is False
+
+ def test_lazy_modified_fields(self):
+ assert self.lazy._modified_fields == set()
+
+
+ class LazyUUIDTaskSetTest(TasklibTest):
+
+ def setUp(self):
+ super(LazyUUIDTaskSetTest, self).setUp()
+
+ self.task1 = Task(self.tw, description='task 1')
+ self.task2 = Task(self.tw, description='task 2')
+ self.task3 = Task(self.tw, description='task 3')
+
+ self.task1.save()
+ self.task2.save()
+ self.task3.save()
+
+ self.uuids = (
+ self.task1['uuid'],
+ self.task2['uuid'],
+ self.task3['uuid'],
+ )
+
+ self.lazy = LazyUUIDTaskSet(self.tw, self.uuids)
+
+ def test_length(self):
+ assert len(self.lazy) == 3
+ assert type(self.lazy) is LazyUUIDTaskSet
+
+ def test_contains(self):
+ assert self.task1 in self.lazy
+ assert self.task2 in self.lazy
+ assert self.task3 in self.lazy
+ assert type(self.lazy) is LazyUUIDTaskSet
+
+ def test_eq_lazy(self):
+ new_lazy = LazyUUIDTaskSet(self.tw, self.uuids)
+ assert self.lazy == new_lazy
+ assert not self.lazy != new_lazy
+ assert type(self.lazy) is LazyUUIDTaskSet
+
+ def test_eq_real(self):
+ assert self.lazy == self.tw.tasks.all()
+ assert self.tw.tasks.all() == self.lazy
+ assert not self.lazy != self.tw.tasks.all()
+
+ assert type(self.lazy) is LazyUUIDTaskSet
+
+ def test_union(self):
+ taskset = set([self.task1])
+ lazyset = LazyUUIDTaskSet(
+ self.tw,
+ (self.task2['uuid'], self.task3['uuid']),
+ )
+
+ assert taskset | lazyset == self.lazy
+ assert lazyset | taskset == self.lazy
+ assert taskset.union(lazyset) == self.lazy
+ assert lazyset.union(taskset) == self.lazy
+
+ lazyset |= taskset
+ assert lazyset == self.lazy
+
+ def test_difference(self):
+ taskset = set([self.task1, self.task2])
+ lazyset = LazyUUIDTaskSet(
+ self.tw,
+ (self.task2['uuid'], self.task3['uuid']),
+ )
+
+ assert taskset - lazyset == set([self.task1])
+ assert lazyset - taskset == set([self.task3])
+ assert taskset.difference(lazyset) == set([self.task1])
+ assert lazyset.difference(taskset) == set([self.task3])
+
+ lazyset -= taskset
+ assert lazyset == set([self.task3])
+
+ def test_symmetric_difference(self):
+ taskset = set([self.task1, self.task2])
+ lazyset = LazyUUIDTaskSet(
+ self.tw,
+ (self.task2['uuid'], self.task3['uuid']),
+ )
+
+ assert taskset ^ lazyset == set([self.task1, self.task3])
+ assert lazyset ^ taskset == set([self.task1, self.task3])
+ self.assertEqual(
+ taskset.symmetric_difference(lazyset),
+ set([self.task1, self.task3]),
+ )
+ self.assertEqual(
+ lazyset.symmetric_difference(taskset),
+ set([self.task1, self.task3]),
+ )
+
+ lazyset ^= taskset
+ assert lazyset == set([self.task1, self.task3])
+
+ def test_intersection(self):
+ taskset = set([self.task1, self.task2])
+ lazyset = LazyUUIDTaskSet(
+ self.tw,
+ (self.task2['uuid'], self.task3['uuid']),
+ )
+
+ assert taskset & lazyset == set([self.task2])
+ assert lazyset & taskset == set([self.task2])
+ assert taskset.intersection(lazyset) == set([self.task2])
+ assert lazyset.intersection(taskset) == set([self.task2])
+
+ lazyset &= taskset
+ assert lazyset == set([self.task2])
+
+
+ class TaskWarriorBackendTest(TasklibTest):
+
+ def test_config(self):
+ assert self.tw.config['nag'] == 'You have more urgent tasks.'
+ assert self.tw.config['default.command'] == 'next'
+ assert self.tw.config['dependency.indicator'] == 'D'