From: martin f. krafft Date: Mon, 18 Nov 2019 03:57:53 +0000 (+1300) Subject: Add 'code/taskwarrior/tasklib/' from commit '9a7dac4599f198c52f268ae8bcc0267743ef934f' X-Git-Url: https://git.madduck.net/etc/taskwarrior.git/commitdiff_plain/8078ceed9b50dad2d319a74e2d3014b6ff649b47?hp=31d9529fe14bb21ff87c6e0030890fb32df15834 Add 'code/taskwarrior/tasklib/' from commit '9a7dac4599f198c52f268ae8bcc0267743ef934f' git-subtree-dir: code/taskwarrior/tasklib git-subtree-mainline: 31d9529fe14bb21ff87c6e0030890fb32df15834 git-subtree-split: 9a7dac4599f198c52f268ae8bcc0267743ef934f --- diff --git a/code/taskwarrior/tasklib/.coveragerc b/code/taskwarrior/tasklib/.coveragerc new file mode 100644 index 0000000..264003a --- /dev/null +++ b/code/taskwarrior/tasklib/.coveragerc @@ -0,0 +1,5 @@ +[report] +omit = */tests.py +exclude_lines = + raise NotImplementedError + @abc.abstractmethod diff --git a/code/taskwarrior/tasklib/.gitignore b/code/taskwarrior/tasklib/.gitignore new file mode 100644 index 0000000..0b10a68 --- /dev/null +++ b/code/taskwarrior/tasklib/.gitignore @@ -0,0 +1,9 @@ +.cache +*.pyc +*~ +tasklib.egg-info +/dist +/build +docs/_build +.project +.coverage diff --git a/code/taskwarrior/tasklib/.travis.yml b/code/taskwarrior/tasklib/.travis.yml new file mode 100644 index 0000000..cfa6d5f --- /dev/null +++ b/code/taskwarrior/tasklib/.travis.yml @@ -0,0 +1,42 @@ +language: python +env: + - TASK_VERSION=v2.1.1 + - TASK_VERSION=v2.1.2 + - TASK_VERSION=v2.2.0 + - TASK_VERSION=v2.3.0 + - TASK_VERSION=v2.4.0 + - TASK_VERSION=v2.4.1 + - TASK_VERSION=v2.4.2 + - TASK_VERSION=v2.4.3 + - TASK_VERSION=v2.4.4 + - TASK_VERSION=v2.5.0 + - TASK_VERSION=v2.5.1 +python: + - "2.7" + - "3.3" + - "3.4" + - "3.5" + - "3.6" +install: + - pip install -e . + - pip install coveralls + - sudo add-apt-repository ppa:ubuntu-toolchain-r/test -y + - sudo apt-get update -qq + - sudo apt-get install -qq build-essential cmake uuid-dev g++-4.8 + - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-4.8 50 + - git clone --recursive https://github.com/GothenburgBitFactory/taskwarrior + - cd taskwarrior + - git checkout $TASK_VERSION + - git clean -dfx + - git submodule init + - git submodule update + - cmake -DCMAKE_BUILD_TYPE=release . + - make -j2 + - sudo make install + - task --version +before_script: + - cd $TRAVIS_BUILD_DIR +script: + - coverage run --source=tasklib setup.py test +after_success: + - coveralls diff --git a/code/taskwarrior/tasklib/AUTHORS b/code/taskwarrior/tasklib/AUTHORS new file mode 100644 index 0000000..6de540e --- /dev/null +++ b/code/taskwarrior/tasklib/AUTHORS @@ -0,0 +1,2 @@ +Rob Golding +Tomas Babej diff --git a/code/taskwarrior/tasklib/LICENSE b/code/taskwarrior/tasklib/LICENSE new file mode 100644 index 0000000..72fd2d9 --- /dev/null +++ b/code/taskwarrior/tasklib/LICENSE @@ -0,0 +1,24 @@ +Copyright (c) 2013-2017, Rob Golding. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Rob Golding, nor the names of its contributors may + be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/code/taskwarrior/tasklib/MANIFEST.in b/code/taskwarrior/tasklib/MANIFEST.in new file mode 100644 index 0000000..9d5d250 --- /dev/null +++ b/code/taskwarrior/tasklib/MANIFEST.in @@ -0,0 +1,2 @@ +include LICENSE +include README.rst diff --git a/code/taskwarrior/tasklib/README.rst b/code/taskwarrior/tasklib/README.rst new file mode 100644 index 0000000..731241c --- /dev/null +++ b/code/taskwarrior/tasklib/README.rst @@ -0,0 +1,53 @@ +tasklib +======= + +.. image:: https://travis-ci.org/robgolding63/tasklib.png?branch=develop + :target: http://travis-ci.org/robgolding63/tasklib + +.. image:: https://coveralls.io/repos/robgolding63/tasklib/badge.png?branch=develop + :target: https://coveralls.io/r/robgolding63/tasklib?branch=develop + +tasklib is a Python library for interacting with taskwarrior_ databases, using +a queryset API similar to that of Django's ORM. + +Supports Python 2.6, 2.7, 3.2, 3.3 and 3.4 with taskwarrior 2.1.x and above. +Older versions of taskwarrior are untested and may not work. + +Requirements +------------ + +* taskwarrior_ v2.1.x or above. + +Installation +------------ + +Install via pip:: + + pip install tasklib + +Usage +----- + +tasklib has a similar API to that of Django's ORM:: + + >>> from tasklib import TaskWarrior + + >>> tw = TaskWarrior('/home/rob/.task') + >>> tasks = tw.tasks.pending() + >>> tasks + ['Tidy the house', 'Learn German'] + >>> tasks.filter(tags__contain='chores') + ['Tidy the house'] + >>> type(tasks[0]) + + >>> tasks[0].done() + >>> tasks = tw.tasks.pending() + >>> tasks + ['Learn German'] + >>> tasks[0]['tags'] = ['languages'] + >>> tasks[0].save() + +For more advanced usage, see the documentation_. + +.. _taskwarrior: http://taskwarrior.org +.. _documentation: http://tasklib.readthedocs.org/en/latest/ diff --git a/code/taskwarrior/tasklib/docs/Makefile b/code/taskwarrior/tasklib/docs/Makefile new file mode 100644 index 0000000..274756e --- /dev/null +++ b/code/taskwarrior/tasklib/docs/Makefile @@ -0,0 +1,177 @@ +# Makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +PAPER = +BUILDDIR = _build + +# User-friendly check for sphinx-build +ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) +$(error The '$(SPHINXBUILD)' command was not found. Make sure you have Sphinx installed, then set the SPHINXBUILD environment variable to point to the full path of the '$(SPHINXBUILD)' executable. Alternatively you can add the directory with the executable to your PATH. If you don't have Sphinx installed, grab it from http://sphinx-doc.org/) +endif + +# Internal variables. +PAPEROPT_a4 = -D latex_paper_size=a4 +PAPEROPT_letter = -D latex_paper_size=letter +ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . +# the i18n builder cannot share the environment and doctrees with the others +I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . + +.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest gettext + +help: + @echo "Please use \`make ' where is one of" + @echo " html to make standalone HTML files" + @echo " dirhtml to make HTML files named index.html in directories" + @echo " singlehtml to make a single large HTML file" + @echo " pickle to make pickle files" + @echo " json to make JSON files" + @echo " htmlhelp to make HTML files and a HTML help project" + @echo " qthelp to make HTML files and a qthelp project" + @echo " devhelp to make HTML files and a Devhelp project" + @echo " epub to make an epub" + @echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter" + @echo " latexpdf to make LaTeX files and run them through pdflatex" + @echo " latexpdfja to make LaTeX files and run them through platex/dvipdfmx" + @echo " text to make text files" + @echo " man to make manual pages" + @echo " texinfo to make Texinfo files" + @echo " info to make Texinfo files and run them through makeinfo" + @echo " gettext to make PO message catalogs" + @echo " changes to make an overview of all changed/added/deprecated items" + @echo " xml to make Docutils-native XML files" + @echo " pseudoxml to make pseudoxml-XML files for display purposes" + @echo " linkcheck to check all external links for integrity" + @echo " doctest to run all doctests embedded in the documentation (if enabled)" + +clean: + rm -rf $(BUILDDIR)/* + +html: + $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/html." + +dirhtml: + $(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml + @echo + @echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml." + +singlehtml: + $(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml + @echo + @echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml." + +pickle: + $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle + @echo + @echo "Build finished; now you can process the pickle files." + +json: + $(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json + @echo + @echo "Build finished; now you can process the JSON files." + +htmlhelp: + $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp + @echo + @echo "Build finished; now you can run HTML Help Workshop with the" \ + ".hhp project file in $(BUILDDIR)/htmlhelp." + +qthelp: + $(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp + @echo + @echo "Build finished; now you can run "qcollectiongenerator" with the" \ + ".qhcp project file in $(BUILDDIR)/qthelp, like this:" + @echo "# qcollectiongenerator $(BUILDDIR)/qthelp/tasklib.qhcp" + @echo "To view the help file:" + @echo "# assistant -collectionFile $(BUILDDIR)/qthelp/tasklib.qhc" + +devhelp: + $(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp + @echo + @echo "Build finished." + @echo "To view the help file:" + @echo "# mkdir -p $$HOME/.local/share/devhelp/tasklib" + @echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/tasklib" + @echo "# devhelp" + +epub: + $(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub + @echo + @echo "Build finished. The epub file is in $(BUILDDIR)/epub." + +latex: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo + @echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex." + @echo "Run \`make' in that directory to run these through (pdf)latex" \ + "(use \`make latexpdf' here to do that automatically)." + +latexpdf: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through pdflatex..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +latexpdfja: + $(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex + @echo "Running LaTeX files through platex and dvipdfmx..." + $(MAKE) -C $(BUILDDIR)/latex all-pdf-ja + @echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex." + +text: + $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text + @echo + @echo "Build finished. The text files are in $(BUILDDIR)/text." + +man: + $(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man + @echo + @echo "Build finished. The manual pages are in $(BUILDDIR)/man." + +texinfo: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo + @echo "Build finished. The Texinfo files are in $(BUILDDIR)/texinfo." + @echo "Run \`make' in that directory to run these through makeinfo" \ + "(use \`make info' here to do that automatically)." + +info: + $(SPHINXBUILD) -b texinfo $(ALLSPHINXOPTS) $(BUILDDIR)/texinfo + @echo "Running Texinfo files through makeinfo..." + make -C $(BUILDDIR)/texinfo info + @echo "makeinfo finished; the Info files are in $(BUILDDIR)/texinfo." + +gettext: + $(SPHINXBUILD) -b gettext $(I18NSPHINXOPTS) $(BUILDDIR)/locale + @echo + @echo "Build finished. The message catalogs are in $(BUILDDIR)/locale." + +changes: + $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes + @echo + @echo "The overview file is in $(BUILDDIR)/changes." + +linkcheck: + $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck + @echo + @echo "Link check complete; look for any errors in the above output " \ + "or in $(BUILDDIR)/linkcheck/output.txt." + +doctest: + $(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest + @echo "Testing of doctests in the sources finished, look at the " \ + "results in $(BUILDDIR)/doctest/output.txt." + +xml: + $(SPHINXBUILD) -b xml $(ALLSPHINXOPTS) $(BUILDDIR)/xml + @echo + @echo "Build finished. The XML files are in $(BUILDDIR)/xml." + +pseudoxml: + $(SPHINXBUILD) -b pseudoxml $(ALLSPHINXOPTS) $(BUILDDIR)/pseudoxml + @echo + @echo "Build finished. The pseudo-XML files are in $(BUILDDIR)/pseudoxml." diff --git a/code/taskwarrior/tasklib/docs/conf.py b/code/taskwarrior/tasklib/docs/conf.py new file mode 100644 index 0000000..3892461 --- /dev/null +++ b/code/taskwarrior/tasklib/docs/conf.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# +# tasklib documentation build configuration file, created by +# sphinx-quickstart on Sun Nov 10 15:19:07 2013. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +import sys +import os + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +#sys.path.insert(0, os.path.abspath('.')) + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +#needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix of source filenames. +source_suffix = '.rst' + +# The encoding of source files. +#source_encoding = 'utf-8-sig' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = u'tasklib' +copyright = u'2014, Rob Golding' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +version = '1.2.1' +# The full version, including alpha/beta/rc tags. +release = '1.2.1' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +#language = None + +# There are two options for replacing |today|: either, you set today to some +# non-false value, then it is used: +#today = '' +# Else, today_fmt is used as the format for a strftime call. +#today_fmt = '%B %d, %Y' + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +exclude_patterns = ['_build'] + +# The reST default role (used for this markup: `text`) to use for all +# documents. +#default_role = None + +# If true, '()' will be appended to :func: etc. cross-reference text. +#add_function_parentheses = True + +# If true, the current module name will be prepended to all description +# unit titles (such as .. function::). +#add_module_names = True + +# If true, sectionauthor and moduleauthor directives will be shown in the +# output. They are ignored by default. +#show_authors = False + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# A list of ignored prefixes for module index sorting. +#modindex_common_prefix = [] + +# If true, keep warnings as "system message" paragraphs in the built documents. +#keep_warnings = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +html_theme = 'default' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +#html_theme_options = {} + +# Add any paths that contain custom themes here, relative to this directory. +#html_theme_path = [] + +# The name for this set of Sphinx documents. If None, it defaults to +# " v documentation". +#html_title = None + +# A shorter title for the navigation bar. Default is the same as html_title. +#html_short_title = None + +# The name of an image file (relative to this directory) to place at the top +# of the sidebar. +#html_logo = None + +# The name of an image file (within the static path) to use as favicon of the +# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32 +# pixels large. +#html_favicon = None + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Add any extra paths that contain custom files (such as robots.txt or +# .htaccess) here, relative to this directory. These files are copied +# directly to the root of the documentation. +#html_extra_path = [] + +# If not '', a 'Last updated on:' timestamp is inserted at every page bottom, +# using the given strftime format. +#html_last_updated_fmt = '%b %d, %Y' + +# If true, SmartyPants will be used to convert quotes and dashes to +# typographically correct entities. +#html_use_smartypants = True + +# Custom sidebar templates, maps document names to template names. +#html_sidebars = {} + +# Additional templates that should be rendered to pages, maps page names to +# template names. +#html_additional_pages = {} + +# If false, no module index is generated. +#html_domain_indices = True + +# If false, no index is generated. +#html_use_index = True + +# If true, the index is split into individual pages for each letter. +#html_split_index = False + +# If true, links to the reST sources are added to the pages. +#html_show_sourcelink = True + +# If true, "Created using Sphinx" is shown in the HTML footer. Default is True. +#html_show_sphinx = True + +# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True. +#html_show_copyright = True + +# If true, an OpenSearch description file will be output, and all pages will +# contain a tag referring to it. The value of this option must be the +# base URL from which the finished HTML is served. +#html_use_opensearch = '' + +# This is the file name suffix for HTML files (e.g. ".xhtml"). +#html_file_suffix = None + +# Output file base name for HTML help builder. +htmlhelp_basename = 'tasklibdoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { +# The paper size ('letterpaper' or 'a4paper'). +#'papersize': 'letterpaper', + +# The font size ('10pt', '11pt' or '12pt'). +#'pointsize': '10pt', + +# Additional stuff for the LaTeX preamble. +#'preamble': '', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + ('index', 'tasklib.tex', u'tasklib Documentation', + u'Rob Golding', 'manual'), +] + +# The name of an image file (relative to this directory) to place at the top of +# the title page. +#latex_logo = None + +# For "manual" documents, if this is true, then toplevel headings are parts, +# not chapters. +#latex_use_parts = False + +# If true, show page references after internal links. +#latex_show_pagerefs = False + +# If true, show URL addresses after external links. +#latex_show_urls = False + +# Documents to append as an appendix to all manuals. +#latex_appendices = [] + +# If false, no module index is generated. +#latex_domain_indices = True + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + ('index', 'tasklib', u'tasklib Documentation', + [u'Rob Golding'], 1) +] + +# If true, show URL addresses after external links. +#man_show_urls = False + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + ('index', 'tasklib', u'tasklib Documentation', + u'Rob Golding', 'tasklib', 'One line description of project.', + 'Miscellaneous'), +] + +# Documents to append as an appendix to all manuals. +#texinfo_appendices = [] + +# If false, no module index is generated. +#texinfo_domain_indices = True + +# How to display URL addresses: 'footnote', 'no', or 'inline'. +#texinfo_show_urls = 'footnote' + +# If true, do not generate a @detailmenu in the "Top" node's menu. +#texinfo_no_detailmenu = False diff --git a/code/taskwarrior/tasklib/docs/index.rst b/code/taskwarrior/tasklib/docs/index.rst new file mode 100644 index 0000000..17ea42a --- /dev/null +++ b/code/taskwarrior/tasklib/docs/index.rst @@ -0,0 +1,597 @@ +Welcome to tasklib's documentation! +=================================== + +tasklib is a Python library for interacting with taskwarrior_ databases, using +a queryset API similar to that of Django's ORM. + +Supports Python 2.6, 2.7, 3.2, 3.3 and 3.4 with taskwarrior 2.1.x and above. +Older versions of taskwarrior are untested and may not work. + +Requirements +------------ + +* taskwarrior_ v2.1.x or above, although newest minor release is recommended. + +Installation +------------ + +Install via pip (recommended):: + + pip install tasklib + +Or clone from github:: + + git clone https://github.com/robgolding63/tasklib.git + cd tasklib + python setup.py install + +Initialization +-------------- + +Optionally initialize the ``TaskWarrior`` instance with ``data_location`` (the +database directory). If it doesn't already exist, this will be created +automatically unless ``create=False``. + +The default location is the same as taskwarrior's:: + + >>> tw = TaskWarrior(data_location='~/.task', create=True) + +The ``TaskWarrior`` instance will also use your .taskrc configuration (so that +it recognizes the same UDAs as your task binary, uses the same configuration, +etc.). To override the location of the .taskrc, use +``taskrc_location=~/some/different/path``. + +Creating Tasks +-------------- + +To create a task, simply create a new ``Task`` object:: + + >>> new_task = Task(tw, description="throw out the trash") + +This task is not yet saved to TaskWarrior (same as in Django), not until +you call ``.save()`` method:: + + >>> new_task.save() + +You can set any attribute as a keyword argument to the Task object:: + + >>> complex_task = Task(tw, description="finally fix the shower", due=datetime(2015,2,14,8,0,0), priority='H') + +or by setting the attributes one by one:: + + >>> complex_task = Task(tw) + >>> complex_task['description'] = "finally fix the shower" + >>> complex_task['due'] = datetime(2015,2,14,8,0,0) + >>> complex_task['priority'] = 'H' + +Modifying Task +-------------- + +To modify a created or retrieved ``Task`` object, use dictionary-like access:: + + >>> homework = tw.tasks.get(tags=['chores']) + >>> homework['project'] = 'Home' + +The change is not propagated to the TaskWarrior until you run the ``save()`` method:: + + >>> homework.save() + +Attributes, which map to native Python objects are converted. See Task Attributes section. + +Task Attributes +--------------- + +Attributes of task objects are accessible through indices, like so:: + + >>> task = tw.tasks.pending().get(tags__contain='work') # There is only one pending task with 'work' tag + >>> task['description'] + 'Upgrade Ubuntu Server' + >>> task['id'] + 15 + >>> task['due'] + datetime.datetime(2015, 2, 5, 0, 0, tzinfo=) + >>> task['tags'] + ['work', 'servers'] + +The following fields are deserialized into Python objects: + +* ``due``, ``wait``, ``scheduled``, ``until``, ``entry``: deserialized to a ``datetime`` object +* ``annotations``: deserialized to a list of ``TaskAnnotation`` objects +* ``tags``: deserialized to a list of strings +* ``depends``: deserialized to a set of ``Task`` objects + +Attributes should be set using the correct Python representation, which will be +serialized into the correct format when the task is saved. + +Task properties +--------------- + +Tasklib defines several properties upon ``Task`` object, for convenience:: + + >>> t.save() + >>> t.saved + True + >>> t.pending + True + >>> t.active + False + >>> t.start() + >>> t.active + True + >>> t.done() + >>> t.completed + True + >>> t.pending + False + >>> t.delete() + >>> t.deleted + True + +Operations on Tasks +------------------- + +After modifying one or more attributes, simple call ``save()`` to write those +changes to the database:: + + >>> task = tw.tasks.pending().get(tags__contain='work') + >>> task['due'] = datetime(year=2014, month=1, day=5) + >>> task.save() + +To mark a task as complete, use ``done()``:: + + >>> task = tw.tasks.pending().get(tags__contain='work') + >>> task.done() + >>> len(tw.tasks.pending().filter(tags__contain='work')) + 0 + +To delete a task, use ``delete()``:: + + >>> task = tw.tasks.get(description="task added by mistake") + >>> task.delete() + +To update a task object with values from TaskWarrior database, use ``refresh()``. Example:: + + >>> task = Task(tw, description="learn to cook") + >>> task.save() + >>> task['id'] + 5 + >>> task['tags'] + [] + +Now, suppose the we modify the task using the TaskWarrior interface in another terminal:: + + $ task 5 modify +someday + Task 5 modified. + +Switching back to the open python process:: + + >>> task['tags'] + [] + >>> task.refresh() + >>> task['tags'] + ['someday'] + +Tasks can also be started and stopped. Use ``start()`` and ``stop()`` +respectively:: + + >>> task.start() + >>> task['start'] + datetime.datetime(2015, 7, 16, 18, 48, 28, tzinfo=) + >>> task.stop() + >>> task['start'] + >>> task.done() + >>> task['end'] + datetime.datetime(2015, 7, 16, 18, 49, 2, tzinfo=) + + +Retrieving Tasks +---------------- + +``tw.tasks`` is a ``TaskQuerySet`` object which emulates the Django QuerySet +API. To get all tasks (including completed ones):: + + >>> tw.tasks.all() + ['First task', 'Completed task', 'Deleted task', ...] + +Filtering +--------- + +Filter tasks using the same familiar syntax:: + + >>> tw.tasks.filter(status='pending', tags__contains=['work']) + ['Upgrade Ubuntu Server'] + +Filter arguments are passed to the ``task`` command (``__`` is replaced by +a period) so the above example is equivalent to the following command:: + + $ task status:pending tags.contain=work + +Tasks can also be filtered using raw commands, like so:: + + >>> tw.tasks.filter('status:pending +work') + ['Upgrade Ubuntu Server'] + +Although this practice is discouraged, as by using raw commands you may lose +some of the portablility of your commands over different TaskWarrior versions. + +However, you can mix raw commands with keyword filters, as in the given example:: + + >>> tw.tasks.filter('+BLOCKING', project='Home') # Gets all blocking tasks in project Home + ['Fix the toilette'] + +This can be a neat way how to use syntax not yet supported by tasklib. The above +is excellent example, since virtual tags do not work the same way as the ordinary ones, that is:: + + >>> tw.tasks.filter(tags=['BLOCKING']) + >>> [] + +will not work. + +There are built-in functions for retrieving pending & completed tasks:: + + >>> tw.tasks.pending().filter(tags__contain='work') + ['Upgrade Ubuntu Server'] + >>> len(tw.tasks.completed()) + 227 + +Use ``get()`` to return the only task in a ``TaskQuerySet``, or raise an +exception:: + + >>> tw.tasks.get(tags__contain='work')['status'] + 'pending' + >>> tw.tasks.get(status='completed', tags__contains='work') # Status of only task with the work tag is pending, so this should fail + Traceback (most recent call last): + File "", line 1, in + File "tasklib/task.py", line 224, in get + 'Lookup parameters were {0}'.format(kwargs)) + tasklib.task.DoesNotExist: Task matching query does not exist. Lookup parameters were {'status': 'completed', 'tags__contains': ['work']} + >>> tw.tasks.get(status='pending') + Traceback (most recent call last): + File "", line 1, in + File "tasklib/task.py", line 227, in get + 'Lookup parameters were {1}'.format(num, kwargs)) + ValueError: get() returned more than one Task -- it returned 23! Lookup parameters were {'status': 'pending'} + +Additionally, since filters return ``TaskQuerySets`` you can stack filters on top of each other:: + + >>> home_tasks = tw.tasks.filter(project='Wife') + >>> home_tasks.filter(due__before=datetime(2015,2,14,14,14,14)) # What I have to do until Valentine's day + ['Prepare surprise birthday party'] + +Equality of Task objects +------------------------ + +Two Tasks are considered equal if they have the same UUIDs:: + + >>> task1 = Task(tw, description="Pet the dog") + >>> task1.save() + >>> task2 = tw.tasks.get(description="Pet the dog") + >>> task1 == task2 + True + +If you compare the two unsaved tasks, they are considered equal only if it's the +same Python object:: + + >>> task1 = Task(tw, description="Pet the cat") + >>> task2 = Task(tw, description="Pet the cat") + >>> task1 == task2 + False + >>> task3 = task1 + >>> task3 == task1 + True + +Accessing original values +------------------------- + +To access the saved state of the Task, use dict-like access using the +``original`` attribute: + + >>> t = Task(tw, description="tidy up") + >>> t.save() + >>> t['description'] = "tidy up the kitchen and bathroom" + >>> t['description'] + "tidy up the kitchen and bathroom" + >>> t.original['description'] + "tidy up" + +When you save the task, original values are refreshed to reflect the +saved state of the task: + + >>> t.save() + >>> t.original['description'] + "tidy up the kitchen and bathroom" + +Dealing with dates and time +--------------------------- + +Any timestamp-like attributes of the tasks are converted to timezone-aware +datetime objects. To achieve this, Tasklib leverages ``pytz`` Python module, +which brings the Olsen timezone databaze to Python. + +This shields you from annoying details of Daylight Saving Time shifts +or conversion between different timezones. For example, to list all the +tasks which are due midnight if you're currently in Berlin: + + >>> myzone = pytz.timezone('Europe/Berlin') + >>> midnight = myzone.localize(datetime(2015,2,2,0,0,0)) + >>> tw.tasks.filter(due__before=midnight) + +However, this is still a little bit tedious. That's why TaskWarrior object +is capable of automatic timezone detection, using the ``tzlocal`` Python +module. If your system timezone is set to 'Europe/Berlin', following example +will work the same way as the previous one: + + >>> tw.tasks.filter(due__before=datetime(2015,2,2,0,0,0)) + +You can also use simple dates when filtering: + + >>> tw.tasks.filter(due__before=date(2015,2,2)) + +In such case, a 00:00:00 is used as the time component. + +Of course, you can use datetime naive objects when initializing Task object +or assigning values to datetime atrributes: + + >>> t = Task(tw, description="Buy new shoes", due=date(2015,2,5)) + >>> t['due'] + datetime.datetime(2015, 2, 5, 0, 0, tzinfo=) + >>> t['due'] = date(2015,2,6,15,15,15) + >>> t['due'] + datetime.datetime(2015, 2, 6, 15, 15, 15, tzinfo=) + +However, since timezone-aware and timezone-naive datetimes are not comparable +in Python, this can cause some unexpected behaviour: + + >>> from datetime import datetime + >>> now = datetime.now() + >>> t = Task(tw, description="take out the trash now") + >>> t['due'] = now + >>> now + datetime.datetime(2015, 2, 1, 19, 44, 4, 770001) + >>> t['due'] + datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=) + >>> t['due'] == now + Traceback (most recent call last): + File "", line 1, in + TypeError: can't compare offset-naive and offset-aware datetimes + +If you want to compare datetime aware value with datetime naive value, you need +to localize the naive value first: + + >>> from datetime import datetime + >>> from tasklib.task import local_zone + >>> now = local_zone.localize(datetime.now()) + >>> t['due'] = now + >>> now + datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=) + >>> t['due'] == now + True + +Also, note that it does not matter whether the timezone aware datetime objects +are set in the same timezone: + + >>> import pytz + >>> t['due'] + datetime.datetime(2015, 2, 1, 19, 44, 4, 770001, tzinfo=) + >>> now.astimezone(pytz.utc) + datetime.datetime(2015, 2, 1, 18, 44, 4, 770001, tzinfo=) + >>> t['due'] == now.astimezone(pytz.utc) + True + +*Note*: Following behaviour is available only for TaskWarrior >= 2.4.0. + +There is a third approach to setting up date time values, which leverages +the 'task calc' command. You can simply set any datetime attribute to +any string that contains an acceptable TaskWarrior-formatted time expression:: + + $ task calc now + 1d + 2015-07-17T21:17:54 + +This syntax can be leveraged in the python interpreter as follows:: + + >>> t['due'] = "now + 1d" + >>> t['due'] + datetime.datetime(2015, 7, 17, 21, 19, 31, tzinfo=) + +It can be easily seen that the string with TaskWarrior-formatted time expression +is automatically converted to native datetime in the local time zone. + +For the list of acceptable formats and keywords, please consult: + +* http://taskwarrior.org/docs/dates.html +* http://taskwarrior.org/docs/named_dates.html + +However, as each such assigment involves call to 'task calc' for conversion, +it might cause some performance issues when assigning strings to datetime +attributes repeatedly, in a automated manner. + +Working with annotations +------------------------ + +Annotations of the tasks are represented in tasklib by ``TaskAnnotation`` objects. These +are much like ``Task`` objects, albeit very simplified. + + >>> annotated_task = tw.tasks.get(description='Annotated task') + >>> annotated_task['annotations'] + [Yeah, I am annotated!] + +Annotations have only defined ``entry`` and ``description`` values:: + + >>> annotation = annotated_task['annotations'][0] + >>> annotation['entry'] + datetime.datetime(2015, 1, 3, 21, 13, 55, tzinfo=) + >>> annotation['description'] + u'Yeah, I am annotated!' + +To add a annotation to a Task, use ``add_annotation()``:: + + >>> task = Task(tw, description="new task") + >>> task.add_annotation("we can annotate any task") + Traceback (most recent call last): + File "", line 1, in + File "build/bdist.linux-x86_64/egg/tasklib/task.py", line 355, in add_annotation + tasklib.task.NotSaved: Task needs to be saved to add annotation + +However, Task needs to be saved before you can add a annotation to it:: + + >>> task.save() + >>> task.add_annotation("we can annotate saved tasks") + >>> task['annotations'] + [we can annotate saved tasks] + +To remove the annotation, pass its description to ``remove_annotation()`` method:: + + >>> task.remove_annotation("we can annotate saved tasks") + +Alternatively, you can pass the ``TaskAnnotation`` object itself:: + + >>> task.remove_annotation(task['annotations'][0]) + + +Running custom commands +----------------------- + +To run a custom commands, use ``execute_command()`` method of ``TaskWarrior`` object:: + + >>> tw = TaskWarrior() + >>> tw.execute_command(['log', 'Finish high school.']) + [u'Logged task.'] + +You can use ``config_override`` keyword argument to specify a dictionary of configuration overrides:: + + >>> tw.execute_command(['3', 'done'], config_override={'gc': 'off'}) # Will mark 3 as completed and it will retain its ID + + +Additionally, you can use ``return_all=True`` flag, which returns +``(stdout, sterr, return_code)`` triplet, and ``allow_failure=False``, which will +prevent tasklib from raising an exception if the task binary returned non-zero +return code:: + + >>> tw.execute_command(['invalidcommand'], allow_failure=False, return_all=True) + ([u''], + [u'Using alternate .taskrc file /home/tbabej/.taskrc', + u"[task next rc:/home/tbabej/.taskrc rc.recurrence.confirmation=no rc.json.array=off rc.confirmation=no rc.bulk=0 rc.dependency.confirmation=no description ~ 'invalidcommand']", + u'Configuration override rc.recurrence.confirmation:no', + u'Configuration override rc.json.array:off', + u'Configuration override rc.confirmation:no', + u'Configuration override rc.bulk:0', + u'Configuration override rc.dependency.confirmation:no', + u'No matches.', + u'There are local changes. Sync required.'], + 1) + + +Setting custom configuration values +----------------------------------- + +By default, TaskWarrior uses configuration values stored in your .taskrc. +To see what configuration value overrides are passed to each executed +task command, have a peek into ``overrides`` attribute of ``TaskWarrior`` object:: + + >>> tw.overrides + {'confirmation': 'no', 'data.location': '/home/tbabej/.task'} + +To pass your own configuration overrides, you just need to update this dictionary:: + + >>> tw.overrides.update({'hooks': 'off'}) # tasklib will not trigger hooks + +Creating hook scripts +--------------------- + +From version 2.4.0, TaskWarrior has support for hook scripts. Tasklib provides +some very useful helpers to write those. With tasklib, writing these becomes +a breeze:: + + #!/usr/bin/python + + from tasklib.task import Task + task = Task.from_input() + # ... + print task.export_data() + +For example, plugin which would assign the priority "H" to any task containing +three exclamation marks in the description, would go like this:: + + #!/usr/bin/python + + from tasklib.task import Task + task = Task.from_input() + + if "!!!" in task['description']: + task['priority'] = "H" + + print task.export_data() + +Tasklib can automatically detect whether it's running in the ``on-modify`` event, +which provides more input than ``on-add`` event and reads the data accordingly. + +This means the example above works both for ``on-add`` and ``on-modify`` events! + +Consenquently, you can create just one hook file for both ``on-add`` and +``on-modify`` events, and you just need to create a symlink for the other one. +This removes the need for maintaining two copies of the same code base and/or +boilerplate code. + +In ``on-modify`` events, tasklib loads both the original version and the modified +version of the task to the returned ``Task`` object. To access the original data +(in read-only manner), use ``original`` dict-like attribute: + + >>> t = Task.from_input() + >>> t['description'] + "Modified description" + >>> t.original['description'] + "Original description" + +Working with UDAs +----------------- + +Since TaskWarrior does read your .taskrc, you need not to define any UDAs +in the TaskWarrior's config dictionary, as described above. Suppose we have +a estimate UDA in the .taskrc:: + + uda.estimate.type = numeric + +We can simply filter and create tasks using the estimate UDA out of the box:: + + >>> tw = TaskWarrior() + >>> task = Task(tw, description="Long task", estimate=1000) + >>> task.save() + >>> task['id'] + 1 + +This is saved as UDA in the TaskWarrior:: + + $ task 1 export + {"id":1,"description":"Long task","estimate":1000, ...} + +We can also speficy UDAs as arguments in the TaskFilter:: + + >>> tw.tasks.filter(estimate=1000) + Long task + +Syncing +------- + +If you have configurated the needed config variables in your .taskrc, syncing +is as easy as:: + + >>> tw = TaskWarrior() + >>> tw.execute_command(['sync']) + +If you want to use non-standard server/credentials, you'll need to provide configuration +overrides to the ``TaskWarrior`` instance. Update the ``config`` dictionary with the +values you desire to override, and then we can run the sync command using +the ``execute_command()`` method:: + + >>> tw = TaskWarrior() + >>> sync_config = { + ... 'taskd.certificate': '/home/tbabej/.task/tbabej.cert.pem', + ... 'taskd.credentials': 'Public/tbabej/34af54de-3cb2-4d3d-82be-33ddb8fd3e66', + ... 'taskd.server': 'task.server.com:53589', + ... 'taskd.ca': '/home/tbabej/.task/ca.cert.pem', + ... 'taskd.trust': 'ignore hostname'} + >>> tw.config.update(sync_config) + >>> tw.execute_command(['sync']) + + +.. _taskwarrior: http://taskwarrior.org diff --git a/code/taskwarrior/tasklib/setup.py b/code/taskwarrior/tasklib/setup.py new file mode 100644 index 0000000..42da23e --- /dev/null +++ b/code/taskwarrior/tasklib/setup.py @@ -0,0 +1,39 @@ +from setuptools import setup, find_packages + +install_requirements = ['six>=1.4', 'pytz', 'tzlocal'] + +version = '1.2.1' + +try: + import importlib +except ImportError: + install_requirements.append('importlib') + +setup( + name='tasklib', + version=version, + description='Python Task Warrior library', + long_description=open('README.rst').read(), + author='Rob Golding', + author_email='rob@robgolding.com', + license='BSD', + url='https://github.com/robgolding63/tasklib', + download_url='https://github.com/robgolding63/tasklib/downloads', + packages=find_packages(), + include_package_data=True, + test_suite='tasklib.tests', + install_requires=install_requirements, + classifiers=[ + 'Development Status :: 4 - Beta', + 'Programming Language :: Python', + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.2", + "Programming Language :: Python :: 3.3", + 'License :: OSI Approved :: BSD License', + 'Topic :: Software Development :: Libraries :: Python Modules', + 'Intended Audience :: Developers', + ], +) diff --git a/code/taskwarrior/tasklib/tasklib/__init__.py b/code/taskwarrior/tasklib/tasklib/__init__.py new file mode 100644 index 0000000..ed3e731 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/__init__.py @@ -0,0 +1,3 @@ +from .backends import TaskWarrior +from .task import Task +from .serializing import local_zone diff --git a/code/taskwarrior/tasklib/tasklib/backends.py b/code/taskwarrior/tasklib/tasklib/backends.py new file mode 100644 index 0000000..38f6c59 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/backends.py @@ -0,0 +1,440 @@ +import abc +import copy +import datetime +import json +import logging +import os +import re +import six +import subprocess + +from .task import Task, TaskQuerySet, ReadOnlyDictView +from .filters import TaskWarriorFilter +from .serializing import local_zone + +DATE_FORMAT_CALC = '%Y-%m-%dT%H:%M:%S' + +logger = logging.getLogger(__name__) + + +class Backend(object): + + @abc.abstractproperty + def filter_class(self): + """Returns the TaskFilter class used by this backend""" + pass + + @abc.abstractmethod + def filter_tasks(self, filter_obj): + """Returns a list of Task objects matching the given filter""" + pass + + @abc.abstractmethod + def save_task(self, task): + pass + + @abc.abstractmethod + def delete_task(self, task): + pass + + @abc.abstractmethod + def start_task(self, task): + pass + + @abc.abstractmethod + def stop_task(self, task): + pass + + @abc.abstractmethod + def complete_task(self, task): + pass + + @abc.abstractmethod + def refresh_task(self, task, after_save=False): + """ + Refreshes the given task. Returns new data dict with serialized + attributes. + """ + pass + + @abc.abstractmethod + def annotate_task(self, task, annotation): + pass + + @abc.abstractmethod + def denotate_task(self, task, annotation): + pass + + @abc.abstractmethod + def sync(self): + """Syncs the backend database with the taskd server""" + pass + + def convert_datetime_string(self, value): + """ + Converts TW syntax datetime string to a localized datetime + object. This method is not mandatory. + """ + raise NotImplementedError + + +class TaskWarriorException(Exception): + pass + + +class TaskWarrior(Backend): + + VERSION_2_1_0 = six.u('2.1.0') + VERSION_2_2_0 = six.u('2.2.0') + VERSION_2_3_0 = six.u('2.3.0') + VERSION_2_4_0 = six.u('2.4.0') + VERSION_2_4_1 = six.u('2.4.1') + VERSION_2_4_2 = six.u('2.4.2') + VERSION_2_4_3 = six.u('2.4.3') + VERSION_2_4_4 = six.u('2.4.4') + VERSION_2_4_5 = six.u('2.4.5') + + def __init__(self, data_location=None, create=True, + taskrc_location=None, task_command='task', + version_override=None): + self.taskrc_location = None + if taskrc_location: + self.taskrc_location = os.path.expanduser(taskrc_location) + + # If taskrc does not exist, pass / to use defaults and avoid creating + # dummy .taskrc file by TaskWarrior + if not os.path.exists(self.taskrc_location): + self.taskrc_location = '/' + + self.task_command = task_command + + self._config = None + self.version = version_override or self._get_version() + self.overrides = { + 'confirmation': 'no', + 'dependency.confirmation': 'no', # See TW-1483 or taskrc man page + 'recurrence.confirmation': 'no', # Necessary for modifying R tasks + + # Defaults to on since 2.4.5, we expect off during parsing + 'json.array': 'off', + + # 2.4.3 onwards supports 0 as infite bulk, otherwise set just + # arbitrary big number which is likely to be large enough + 'bulk': 0 if self.version >= self.VERSION_2_4_3 else 100000, + } + + # Set data.location override if passed via kwarg + if data_location is not None: + data_location = os.path.expanduser(data_location) + if create and not os.path.exists(data_location): + os.makedirs(data_location) + self.overrides['data.location'] = data_location + + self.tasks = TaskQuerySet(self) + + def _get_task_command(self): + return self.task_command.split() + + def _get_command_args(self, args, config_override=None): + command_args = self._get_task_command() + overrides = self.overrides.copy() + overrides.update(config_override or dict()) + for item in overrides.items(): + command_args.append('rc.{0}={1}'.format(*item)) + command_args.extend([ + x.decode('utf-8') if isinstance(x, six.binary_type) + else six.text_type(x) for x in args + ]) + return command_args + + def _get_version(self): + p = subprocess.Popen( + self._get_task_command() + ['--version'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = [x.decode('utf-8') for x in p.communicate()] + return stdout.strip('\n') + + def _get_modified_task_fields_as_args(self, task): + args = [] + + def add_field(field): + # Add the output of format_field method to args list (defaults to + # field:value) + serialized_value = task._serialize(field, task._data[field]) + + # Empty values should not be enclosed in quotation marks, see + # TW-1510 + if serialized_value is '': + escaped_serialized_value = '' + else: + escaped_serialized_value = six.u("'{0}'").format( + serialized_value) + + format_default = lambda task: six.u("{0}:{1}").format( + field, escaped_serialized_value) + + format_func = getattr(self, 'format_{0}'.format(field), + format_default) + + args.append(format_func(task)) + + # If we're modifying saved task, simply pass on all modified fields + if task.saved: + for field in task._modified_fields: + add_field(field) + + # For new tasks, pass all fields that make sense + else: + for field in task._data.keys(): + # We cannot set stuff that's read only (ID, UUID, ..) + if field in task.read_only_fields: + continue + # We do not want to do field deletion for new tasks + if task._data[field] is None: + continue + # Otherwise we're fine + add_field(field) + + return args + + def format_depends(self, task): + # We need to generate added and removed dependencies list, + # since Taskwarrior does not accept redefining dependencies. + + # This cannot be part of serialize_depends, since we need + # to keep a list of all depedencies in the _data dictionary, + # not just currently added/removed ones + + old_dependencies = task._original_data.get('depends', set()) + + added = task['depends'] - old_dependencies + removed = old_dependencies - task['depends'] + + # Removed dependencies need to be prefixed with '-' + return 'depends:' + ','.join( + [t['uuid'] for t in added] + + ['-' + t['uuid'] for t in removed] + ) + + def format_description(self, task): + # Task version older than 2.4.0 ignores first word of the + # task description if description: prefix is used + if self.version < self.VERSION_2_4_0: + return task._data['description'] + else: + return six.u("description:'{0}'").format( + task._data['description'] or '', + ) + + def convert_datetime_string(self, value): + + if self.version >= self.VERSION_2_4_0: + # For strings, use 'calc' to evaluate the string to datetime + # available since TW 2.4.0 + args = value.split() + result = self.execute_command(['calc'] + args) + naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC) + localized = local_zone.localize(naive) + else: + raise ValueError( + 'Provided value could not be converted to ' + 'datetime, its type is not supported: {}' + .format(type(value)), + ) + + return localized + + @property + def filter_class(self): + return TaskWarriorFilter + + # Public interface + + @property + def config(self): + # First, check if memoized information is available + if self._config: + return self._config + + # If not, fetch the config using the 'show' command + raw_output = self.execute_command( + ['show'], + config_override={'verbose': 'nothing'} + ) + + config = dict() + config_regex = re.compile(r'^(?P[^\s]+)\s+(?P[^\s].*$)') + + for line in raw_output: + match = config_regex.match(line) + if match: + config[match.group('key')] = match.group('value').strip() + + # Memoize the config dict + self._config = ReadOnlyDictView(config) + + return self._config + + def execute_command(self, args, config_override=None, allow_failure=True, + return_all=False): + command_args = self._get_command_args( + args, config_override=config_override) + logger.debug(u' '.join(command_args)) + + env = os.environ.copy() + if self.taskrc_location: + env['TASKRC'] = self.taskrc_location + p = subprocess.Popen(command_args, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=env) + stdout, stderr = [x.decode('utf-8') for x in p.communicate()] + if p.returncode and allow_failure: + if stderr.strip(): + error_msg = stderr.strip() + else: + error_msg = stdout.strip() + error_msg += u'\nCommand used: ' + u' '.join(command_args) + raise TaskWarriorException(error_msg) + + # Return all whole triplet only if explicitly asked for + if not return_all: + return stdout.rstrip().split('\n') + else: + return (stdout.rstrip().split('\n'), + stderr.rstrip().split('\n'), + p.returncode) + + def enforce_recurrence(self): + # Run arbitrary report command which will trigger generation + # of recurrent tasks. + + # Only necessary for TW up to 2.4.1, fixed in 2.4.2. + if self.version < self.VERSION_2_4_2: + self.execute_command(['next'], allow_failure=False) + + def merge_with(self, path, push=False): + path = path.rstrip('/') + '/' + self.execute_command(['merge', path], config_override={ + 'merge.autopush': 'yes' if push else 'no', + }) + + def undo(self): + self.execute_command(['undo']) + + # Backend interface implementation + + def filter_tasks(self, filter_obj): + self.enforce_recurrence() + args = ['export'] + filter_obj.get_filter_params() + tasks = [] + for line in self.execute_command(args): + if line: + data = line.strip(',') + try: + filtered_task = Task(self) + filtered_task._load_data(json.loads(data)) + tasks.append(filtered_task) + except ValueError: + raise TaskWarriorException('Invalid JSON: %s' % data) + return tasks + + def save_task(self, task): + """Save a task into TaskWarrior database using add/modify call""" + + args = [task['uuid'], 'modify'] if task.saved else ['add'] + args.extend(self._get_modified_task_fields_as_args(task)) + output = self.execute_command(args) + + # Parse out the new ID, if the task is being added for the first time + if not task.saved: + id_lines = [l for l in output if l.startswith('Created task ')] + + # Complain loudly if it seems that more tasks were created + # Should not happen. + # Expected output: Created task 1. + # Created task 1 (recurrence template). + if len(id_lines) != 1 or len(id_lines[0].split(' ')) not in (3, 5): + raise TaskWarriorException( + 'Unexpected output when creating ' + 'task: %s' % '\n'.join(id_lines), + ) + + # Circumvent the ID storage, since ID is considered read-only + identifier = id_lines[0].split(' ')[2].rstrip('.') + + # Identifier can be either ID or UUID for completed tasks + try: + task._data['id'] = int(identifier) + except ValueError: + task._data['uuid'] = identifier + + # Refreshing is very important here, as not only modification time + # is updated, but arbitrary attribute may have changed due hooks + # altering the data before saving + task.refresh(after_save=True) + + def delete_task(self, task): + self.execute_command([task['uuid'], 'delete']) + + def start_task(self, task): + self.execute_command([task['uuid'], 'start']) + + def stop_task(self, task): + self.execute_command([task['uuid'], 'stop']) + + def complete_task(self, task): + # Older versions of TW do not stop active task at completion + if self.version < self.VERSION_2_4_0 and task.active: + task.stop() + + self.execute_command([task['uuid'], 'done']) + + def annotate_task(self, task, annotation): + args = [task['uuid'], 'annotate', annotation] + self.execute_command(args) + + def denotate_task(self, task, annotation): + args = [task['uuid'], 'denotate', annotation] + self.execute_command(args) + + def refresh_task(self, task, after_save=False): + # We need to use ID as backup for uuid here for the refreshes + # of newly saved tasks. Any other place in the code is fine + # with using UUID only. + args = [task['uuid'] or task['id'], 'export'] + output = self.execute_command(args) + + def valid(output): + return len(output) == 1 and output[0].startswith('{') + + # For older TW versions attempt to uniquely locate the task + # using the data we have if it has been just saved. + # This can happen when adding a completed task on older TW versions. + if (not valid(output) and self.version < self.VERSION_2_4_5 + and after_save): + + # Make a copy, removing ID and UUID. It's most likely invalid + # (ID 0) if it failed to match a unique task. + data = copy.deepcopy(task._data) + data.pop('id', None) + data.pop('uuid', None) + + taskfilter = self.filter_class(self) + for key, value in data.items(): + taskfilter.add_filter_param(key, value) + + output = self.execute_command(['export'] + + taskfilter.get_filter_params()) + + # If more than 1 task has been matched still, raise an exception + if not valid(output): + raise TaskWarriorException( + 'Unique identifiers {0} with description: {1} matches ' + 'multiple tasks: {2}'.format( + task['uuid'] or task['id'], task['description'], output) + ) + + return json.loads(output[0]) + + def sync(self): + self.execute_command(['sync']) diff --git a/code/taskwarrior/tasklib/tasklib/filters.py b/code/taskwarrior/tasklib/tasklib/filters.py new file mode 100644 index 0000000..b7cab64 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/filters.py @@ -0,0 +1,79 @@ +import abc +import six +from .serializing import SerializingObject + + +class TaskFilter(object): + """ + Abstract base class that defines interface of a TaskFilter. + """ + + @abc.abstractmethod + def add_filter(self, arg): + """ + Processes an non-keyword filter. + """ + pass + + @abc.abstractmethod + def add_filter_param(self, key, value): + """ + Processes a keyword filter. + """ + pass + + @abc.abstractmethod + def clone(self): + """ + Returns a new deep copy of itself. + """ + pass + + +class TaskWarriorFilter(TaskFilter, SerializingObject): + """ + A set of parameters to filter the task list with. + """ + + def __init__(self, backend, filter_params=None): + self.filter_params = filter_params or [] + super(TaskFilter, self).__init__(backend) + + def add_filter(self, filter_str): + self.filter_params.append(filter_str) + + def add_filter_param(self, key, value): + key = key.replace('__', '.') + + # Replace the value with empty string, since that is the + # convention in TW for empty values + attribute_key = key.split('.')[0] + + # Since this is user input, we need to normalize before we serialize + value = self._normalize(attribute_key, value) + value = self._serialize(attribute_key, value) + + # If we are filtering by uuid:, do not use uuid keyword + # due to TW-1452 bug + if key == 'uuid': + self.filter_params.insert(0, value) + else: + # Surround value with aphostrophes unless it's a empty string + value = "'%s'" % value if value else '' + + # We enforce equality match by using 'is' (or 'none') modifier + # Without using this syntax, filter fails due to TW-1479 + # which is, however, fixed in 2.4.5 + if self.backend.version < self.backend.VERSION_2_4_5: + modifier = '.is' if value else '.none' + key = key + modifier if '.' not in key else key + + self.filter_params.append(six.u("{0}:{1}").format(key, value)) + + def get_filter_params(self): + return [f for f in self.filter_params if f] + + def clone(self): + c = self.__class__(self.backend) + c.filter_params = list(self.filter_params) + return c diff --git a/code/taskwarrior/tasklib/tasklib/lazy.py b/code/taskwarrior/tasklib/tasklib/lazy.py new file mode 100644 index 0000000..e5b3492 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/lazy.py @@ -0,0 +1,235 @@ +""" +Provides lazy implementations for Task and TaskQuerySet. +""" + + +class LazyUUIDTask(object): + """ + A lazy wrapper around Task object, referenced by UUID. + + - Supports comparison with LazyUUIDTask or Task objects (equality by UUIDs) + - If any attribute other than 'uuid' requested, a lookup in the + backend will be performed and this object will be replaced by a proper + Task object. + """ + + def __init__(self, tw, uuid): + self._tw = tw + self._uuid = uuid + + def __getitem__(self, key): + # LazyUUIDTask does not provide anything else other than 'uuid' + if key is 'uuid': + return self._uuid + else: + self.replace() + return self[key] + + def __getattr__(self, name): + # Getattr is called only if the attribute could not be found using + # normal means + self.replace() + return getattr(self, name) + + def __eq__(self, other): + if other and other['uuid']: + # For saved Tasks, just define equality by equality of uuids + return self['uuid'] == other['uuid'] + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return self['uuid'].__hash__() + + def __repr__(self): + return 'LazyUUIDTask: {0}'.format(self._uuid) + + def __copy__(self): + return LazyUUIDTask(self._tw, self._uuid) + + def __deepcopy__(self, memo): + return LazyUUIDTask(self._tw, self._uuid) + + @property + def saved(self): + """ + Implementation of the 'saved' property. Always returns True. + """ + return True + + @property + def _modified_fields(self): + return set() + + @property + def modified(self): + return False + + def replace(self): + """ + Performs conversion to the regular Task object, referenced by the + stored UUID. + """ + + replacement = self._tw.tasks.get(uuid=self._uuid) + self.__class__ = replacement.__class__ + self.__dict__ = replacement.__dict__ + + +class LazyUUIDTaskSet(object): + """ + A lazy wrapper around TaskQuerySet object, for tasks referenced by UUID. + + - Supports 'in' operator with LazyUUIDTask or Task objects + - If iteration over the objects in the LazyUUIDTaskSet is requested, the + LazyUUIDTaskSet will be converted to QuerySet and evaluated + """ + + def __init__(self, tw, uuids): + self._tw = tw + self._uuids = set(uuids) + + def __getattr__(self, name): + # Getattr is called only if the attribute could not be found using + # normal means + + if name.startswith('__'): + # If some internal method was being search, do not convert + # to TaskQuerySet just because of that + raise AttributeError + else: + self.replace() + return getattr(self, name) + + def __repr__(self): + return 'LazyUUIDTaskSet([{0}])'.format(', '.join(self._uuids)) + + def __eq__(self, other): + return (set(t['uuid'] for t in other) if other else set()) == self._uuids + + def __ne__(self, other): + return not (self == other) + + def __contains__(self, task): + return task['uuid'] in self._uuids + + def __len__(self): + return len(self._uuids) + + def __iter__(self): + for uuid in self._uuids: + yield LazyUUIDTask(self._tw, uuid) + + def __sub__(self, other): + return self.difference(other) + + def __isub__(self, other): + return self.difference_update(other) + + def __rsub__(self, other): + return LazyUUIDTaskSet( + self._tw, + set(t['uuid'] for t in other) - self._uuids, + ) + + def __or__(self, other): + return self.union(other) + + def __ior__(self, other): + return self.update(other) + + def __ror__(self, other): + return self.union(other) + + def __xor__(self, other): + return self.symmetric_difference(other) + + def __ixor__(self, other): + return self.symmetric_difference_update(other) + + def __rxor__(self, other): + return self.symmetric_difference(other) + + def __and__(self, other): + return self.intersection(other) + + def __iand__(self, other): + return self.intersection_update(other) + + def __rand__(self, other): + return self.intersection(other) + + def __le__(self, other): + return self.issubset(other) + + def __ge__(self, other): + return self.issuperset(other) + + def issubset(self, other): + return all([task in other for task in self]) + + def issuperset(self, other): + return all([task in self for task in other]) + + def union(self, other): + return LazyUUIDTaskSet( + self._tw, + self._uuids | set(t['uuid'] for t in other), + ) + + def intersection(self, other): + return LazyUUIDTaskSet( + self._tw, + self._uuids & set(t['uuid'] for t in other), + ) + + def difference(self, other): + return LazyUUIDTaskSet( + self._tw, + self._uuids - set(t['uuid'] for t in other), + ) + + def symmetric_difference(self, other): + return LazyUUIDTaskSet( + self._tw, + self._uuids ^ set(t['uuid'] for t in other), + ) + + def update(self, other): + self._uuids |= set(t['uuid'] for t in other) + return self + + def intersection_update(self, other): + self._uuids &= set(t['uuid'] for t in other) + return self + + def difference_update(self, other): + self._uuids -= set(t['uuid'] for t in other) + return self + + def symmetric_difference_update(self, other): + self._uuids ^= set(t['uuid'] for t in other) + return self + + def add(self, task): + self._uuids.add(task['uuid']) + + def remove(self, task): + self._uuids.remove(task['uuid']) + + def pop(self): + return self._uuids.pop() + + def clear(self): + self._uuids.clear() + + def replace(self): + """ + Performs conversion to the regular TaskQuerySet object, referenced by + the stored UUIDs. + """ + + replacement = self._tw.tasks.filter(' '.join(self._uuids)) + self.__class__ = replacement.__class__ + self.__dict__ = replacement.__dict__ diff --git a/code/taskwarrior/tasklib/tasklib/serializing.py b/code/taskwarrior/tasklib/tasklib/serializing.py new file mode 100644 index 0000000..8cdeaf2 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/serializing.py @@ -0,0 +1,254 @@ +import datetime +import importlib +import json +import pytz +import six +import tzlocal + + +from .lazy import LazyUUIDTaskSet, LazyUUIDTask + +DATE_FORMAT = '%Y%m%dT%H%M%SZ' +local_zone = tzlocal.get_localzone() + + +class SerializingObject(object): + """ + Common ancestor for TaskResource & TaskWarriorFilter, since they both + need to serialize arguments. + + Serializing method should hold the following contract: + - any empty value (meaning removal of the attribute) + is deserialized into a empty string + - None denotes a empty value for any attribute + + Deserializing method should hold the following contract: + - None denotes a empty value for any attribute (however, + this is here as a safeguard, TaskWarrior currently does + not export empty-valued attributes) if the attribute + is not iterable (e.g. list or set), in which case + a empty iterable should be used. + + Normalizing methods should hold the following contract: + - They are used to validate and normalize the user input. + Any attribute value that comes from the user (during Task + initialization, assignign values to Task attributes, or + filtering by user-provided values of attributes) is first + validated and normalized using the normalize_{key} method. + - If validation or normalization fails, normalizer is expected + to raise ValueError. + """ + + def __init__(self, backend): + self.backend = backend + + def _deserialize(self, key, value): + hydrate_func = getattr(self, 'deserialize_{0}'.format(key), + lambda x: x if x != '' else None) + return hydrate_func(value) + + def _serialize(self, key, value): + dehydrate_func = getattr(self, 'serialize_{0}'.format(key), + lambda x: x if x is not None else '') + return dehydrate_func(value) + + def _normalize(self, key, value): + """ + Use normalize_ methods to normalize user input. Any user + input will be normalized at the moment it is used as filter, + or entered as a value of Task attribute. + """ + + # None value should not be converted by normalizer + if value is None: + return None + + normalize_func = getattr(self, 'normalize_{0}'.format(key), + lambda x: x) + + return normalize_func(value) + + def timestamp_serializer(self, date): + if not date: + return '' + + # Any serialized timestamp should be localized, we need to + # convert to UTC before converting to string (DATE_FORMAT uses UTC) + date = date.astimezone(pytz.utc) + + return date.strftime(DATE_FORMAT) + + def timestamp_deserializer(self, date_str): + if not date_str: + return None + + # Return timestamp localized in the local zone + naive_timestamp = datetime.datetime.strptime(date_str, DATE_FORMAT) + localized_timestamp = pytz.utc.localize(naive_timestamp) + return localized_timestamp.astimezone(local_zone) + + def serialize_entry(self, value): + return self.timestamp_serializer(value) + + def deserialize_entry(self, value): + return self.timestamp_deserializer(value) + + def normalize_entry(self, value): + return self.datetime_normalizer(value) + + def serialize_modified(self, value): + return self.timestamp_serializer(value) + + def deserialize_modified(self, value): + return self.timestamp_deserializer(value) + + def normalize_modified(self, value): + return self.datetime_normalizer(value) + + def serialize_start(self, value): + return self.timestamp_serializer(value) + + def deserialize_start(self, value): + return self.timestamp_deserializer(value) + + def normalize_start(self, value): + return self.datetime_normalizer(value) + + def serialize_end(self, value): + return self.timestamp_serializer(value) + + def deserialize_end(self, value): + return self.timestamp_deserializer(value) + + def normalize_end(self, value): + return self.datetime_normalizer(value) + + def serialize_due(self, value): + return self.timestamp_serializer(value) + + def deserialize_due(self, value): + return self.timestamp_deserializer(value) + + def normalize_due(self, value): + return self.datetime_normalizer(value) + + def serialize_scheduled(self, value): + return self.timestamp_serializer(value) + + def deserialize_scheduled(self, value): + return self.timestamp_deserializer(value) + + def normalize_scheduled(self, value): + return self.datetime_normalizer(value) + + def serialize_until(self, value): + return self.timestamp_serializer(value) + + def deserialize_until(self, value): + return self.timestamp_deserializer(value) + + def normalize_until(self, value): + return self.datetime_normalizer(value) + + def serialize_wait(self, value): + return self.timestamp_serializer(value) + + def deserialize_wait(self, value): + return self.timestamp_deserializer(value) + + def normalize_wait(self, value): + return self.datetime_normalizer(value) + + def serialize_annotations(self, value): + value = value if value is not None else [] + + # This may seem weird, but it's correct, we want to export + # a list of dicts as serialized value + serialized_annotations = [json.loads(annotation.export_data()) + for annotation in value] + return serialized_annotations if serialized_annotations else '' + + def deserialize_annotations(self, data): + task_module = importlib.import_module('tasklib.task') + TaskAnnotation = getattr(task_module, 'TaskAnnotation') + return [TaskAnnotation(self, d) for d in data] if data else [] + + def serialize_tags(self, tags): + return ','.join(tags) if tags else '' + + def deserialize_tags(self, tags): + if isinstance(tags, six.string_types): + return set(tags.split(',')) if tags else set() + return set(tags or []) + + def serialize_parent(self, parent): + return parent['uuid'] if parent else '' + + def deserialize_parent(self, uuid): + return LazyUUIDTask(self.backend, uuid) if uuid else None + + def serialize_depends(self, value): + # Return the list of uuids + value = value if value is not None else set() + + if isinstance(value, LazyUUIDTaskSet): + return ','.join(value._uuids) + else: + return ','.join(task['uuid'] for task in value) + + def deserialize_depends(self, raw_uuids): + raw_uuids = raw_uuids or [] # Convert None to empty list + + if not raw_uuids: + return set() + + # TW 2.4.4 encodes list of dependencies as a single string + if type(raw_uuids) is not list: + uuids = raw_uuids.split(',') + # TW 2.4.5 and later exports them as a list, no conversion needed + else: + uuids = raw_uuids + + return LazyUUIDTaskSet(self.backend, uuids) + + def datetime_normalizer(self, value): + """ + Normalizes date/datetime value (considered to come from user input) + to localized datetime value. Following conversions happen: + + naive date -> localized datetime with the same date, and time=midnight + naive datetime -> localized datetime with the same value + localized datetime -> localized datetime (no conversion) + """ + + if ( + isinstance(value, datetime.date) + and not isinstance(value, datetime.datetime) + ): + # Convert to local midnight + value_full = datetime.datetime.combine(value, datetime.time.min) + localized = local_zone.localize(value_full) + elif isinstance(value, datetime.datetime): + if value.tzinfo is None: + # Convert to localized datetime object + localized = local_zone.localize(value) + else: + # If the value is already localized, there is no need to change + # time zone at this point. Also None is a valid value too. + localized = value + elif isinstance(value, six.string_types): + localized = self.backend.convert_datetime_string(value) + else: + raise ValueError("Provided value could not be converted to " + "datetime, its type is not supported: {}" + .format(type(value))) + + return localized + + def normalize_uuid(self, value): + # Enforce sane UUID + if not isinstance(value, six.string_types) or value == '': + raise ValueError("UUID must be a valid non-empty string, " + "not: {}".format(value)) + + return value diff --git a/code/taskwarrior/tasklib/tasklib/task.py b/code/taskwarrior/tasklib/tasklib/task.py new file mode 100644 index 0000000..20bff1f --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/task.py @@ -0,0 +1,568 @@ +from __future__ import print_function +import copy +import importlib +import json +import logging +import os +import six +import sys + +from .serializing import SerializingObject + +DATE_FORMAT = '%Y%m%dT%H%M%SZ' +REPR_OUTPUT_SIZE = 10 +PENDING = 'pending' +COMPLETED = 'completed' +DELETED = 'deleted' +WAITING = 'waiting' +RECURRING = 'recurring' + +logger = logging.getLogger(__name__) + + +class ReadOnlyDictView(object): + """ + Provides simplified read-only view upon dict object. + """ + + def __init__(self, viewed_dict): + self.viewed_dict = viewed_dict + + def __getitem__(self, key): + return copy.deepcopy(self.viewed_dict.__getitem__(key)) + + def __contains__(self, k): + return self.viewed_dict.__contains__(k) + + def __iter__(self): + for value in self.viewed_dict: + yield copy.deepcopy(value) + + def __len__(self): + return len(self.viewed_dict) + + def __unicode__(self): + return six.u('ReadOnlyDictView: {0}'.format(repr(self.viewed_dict))) + + __repr__ = __unicode__ + + def get(self, key, default=None): + return copy.deepcopy(self.viewed_dict.get(key, default)) + + def items(self): + return [copy.deepcopy(v) for v in self.viewed_dict.items()] + + def values(self): + return [copy.deepcopy(v) for v in self.viewed_dict.values()] + + +class TaskResource(SerializingObject): + read_only_fields = [] + + def _load_data(self, data): + self._data = dict((key, self._deserialize(key, value)) + for key, value in data.items()) + # We need to use a copy for original data, so that changes + # are not propagated. + self._original_data = copy.deepcopy(self._data) + + def _update_data(self, data, update_original=False, remove_missing=False): + """ + Low level update of the internal _data dict. Data which are coming as + updates should already be serialized. If update_original is True, the + original_data dict is updated as well. + """ + self._data.update(dict((key, self._deserialize(key, value)) + for key, value in data.items())) + + # In certain situations, we want to treat missing keys as removals + if remove_missing: + for key in set(self._data.keys()) - set(data.keys()): + self._data[key] = None + + if update_original: + self._original_data = copy.deepcopy(self._data) + + def __getitem__(self, key): + # This is a workaround to make TaskResource non-iterable + # over simple index-based iteration + try: + int(key) + raise StopIteration + except ValueError: + pass + + if key not in self._data: + self._data[key] = self._deserialize(key, None) + + return self._data.get(key) + + def __setitem__(self, key, value): + if key in self.read_only_fields: + raise RuntimeError('Field \'%s\' is read-only' % key) + + # Normalize the user input before saving it + value = self._normalize(key, value) + self._data[key] = value + + def __str__(self): + s = six.text_type(self.__unicode__()) + if not six.PY3: + s = s.encode('utf-8') + return s + + def __repr__(self): + return str(self) + + def export_data(self): + """ + Exports current data contained in the Task as JSON + """ + + # We need to remove spaces for TW-1504, use custom separators + data_tuples = ((key, self._serialize(key, value)) + for key, value in six.iteritems(self._data)) + + # Empty string denotes empty serialized value, we do not want + # to pass that to TaskWarrior. + data_tuples = filter(lambda t: t[1] is not '', data_tuples) + data = dict(data_tuples) + return json.dumps(data, separators=(',', ':')) + + @property + def _modified_fields(self): + writable_fields = set(self._data.keys()) - set(self.read_only_fields) + for key in writable_fields: + new_value = self._data.get(key) + old_value = self._original_data.get(key) + + # Make sure not to mark data removal as modified field if the + # field originally had some empty value + if key in self._data and not new_value and not old_value: + continue + + if new_value != old_value: + yield key + + @property + def modified(self): + return bool(list(self._modified_fields)) + + +class TaskAnnotation(TaskResource): + read_only_fields = ['entry', 'description'] + + def __init__(self, task, data=None): + self.task = task + self._load_data(data or dict()) + super(TaskAnnotation, self).__init__(task.backend) + + def remove(self): + self.task.remove_annotation(self) + + def __unicode__(self): + return self['description'] + + def __eq__(self, other): + # consider 2 annotations equal if they belong to the same task, and + # their data dics are the same + return self.task == other.task and self._data == other._data + + def __ne__(self, other): + return not self.__eq__(other) + + __repr__ = __unicode__ + + +class Task(TaskResource): + read_only_fields = ['id', 'entry', 'urgency', 'uuid', 'modified'] + + class DoesNotExist(Exception): + pass + + class CompletedTask(Exception): + """ + Raised when the operation cannot be performed on the completed task. + """ + pass + + class DeletedTask(Exception): + """ + Raised when the operation cannot be performed on the deleted task. + """ + pass + + class ActiveTask(Exception): + """ + Raised when the operation cannot be performed on the active task. + """ + pass + + class InactiveTask(Exception): + """ + Raised when the operation cannot be performed on an inactive task. + """ + pass + + class NotSaved(Exception): + """ + Raised when the operation cannot be performed on the task, because + it has not been saved to TaskWarrior yet. + """ + pass + + @classmethod + def from_input(cls, input_file=sys.stdin, modify=None, backend=None): + """ + Creates a Task object, directly from the stdin, by reading one line. + If modify=True, two lines are used, first line interpreted as the + original state of the Task object, and second line as its new, + modified value. This is consistent with the TaskWarrior's hook + system. + + Object created by this method should not be saved, deleted + or refreshed, as t could create a infinite loop. For this + reason, TaskWarrior instance is set to None. + + Input_file argument can be used to specify the input file, + but defaults to sys.stdin. + """ + + # Detect the hook type if not given directly + name = os.path.basename(sys.argv[0]) + modify = name.startswith('on-modify') if modify is None else modify + + # Create the TaskWarrior instance if none passed + if backend is None: + backends = importlib.import_module('tasklib.backends') + hook_parent_dir = os.path.dirname(os.path.dirname(sys.argv[0])) + backend = backends.TaskWarrior(data_location=hook_parent_dir) + + # TaskWarrior instance is set to None + task = cls(backend) + + # Load the data from the input + task._load_data(json.loads(input_file.readline().strip())) + + # If this is a on-modify event, we are provided with additional + # line of input, which provides updated data + if modify: + task._update_data(json.loads(input_file.readline().strip()), + remove_missing=True) + + return task + + def __init__(self, backend, **kwargs): + super(Task, self).__init__(backend) + + # Check that user is not able to set read-only value in __init__ + for key in kwargs.keys(): + if key in self.read_only_fields: + raise RuntimeError('Field \'%s\' is read-only' % key) + + # We serialize the data in kwargs so that users of the library + # do not have to pass different data formats via __setitem__ and + # __init__ methods, that would be confusing + + # Rather unfortunate syntax due to python2.6 comaptiblity + self._data = dict((key, self._normalize(key, value)) + for (key, value) in six.iteritems(kwargs)) + self._original_data = copy.deepcopy(self._data) + + # Provide read only access to the original data + self.original = ReadOnlyDictView(self._original_data) + + def __unicode__(self): + return self['description'] + + def __eq__(self, other): + if self['uuid'] and other['uuid']: + # For saved Tasks, just define equality by equality of uuids + return self['uuid'] == other['uuid'] + else: + # If the tasks are not saved, compare the actual instances + return id(self) == id(other) + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + if self['uuid']: + # For saved Tasks, just define equality by equality of uuids + return self['uuid'].__hash__() + else: + # If the tasks are not saved, return hash of instance id + return id(self).__hash__() + + @property + def completed(self): + return self['status'] == six.text_type('completed') + + @property + def deleted(self): + return self['status'] == six.text_type('deleted') + + @property + def waiting(self): + return self['status'] == six.text_type('waiting') + + @property + def pending(self): + return self['status'] == six.text_type('pending') + + @property + def recurring(self): + return self['status'] == six.text_type('recurring') + + @property + def active(self): + return self['start'] is not None + + @property + def saved(self): + return self['uuid'] is not None or self['id'] is not None + + def serialize_depends(self, cur_dependencies): + # Check that all the tasks are saved + for task in (cur_dependencies or set()): + if not task.saved: + raise Task.NotSaved( + 'Task \'%s\' needs to be saved before ' + 'it can be set as dependency.' % task, + ) + + return super(Task, self).serialize_depends(cur_dependencies) + + def delete(self): + if not self.saved: + raise Task.NotSaved( + 'Task needs to be saved before it can be deleted', + ) + + # Refresh the status, and raise exception if the task is deleted + self.refresh(only_fields=['status']) + + if self.deleted: + raise Task.DeletedTask('Task was already deleted') + + self.backend.delete_task(self) + + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status', 'start', 'end']) + + def start(self): + if not self.saved: + raise Task.NotSaved( + 'Task needs to be saved before it can be started', + ) + + # Refresh, and raise exception if task is already completed/deleted + self.refresh(only_fields=['status']) + + if self.completed: + raise Task.CompletedTask('Cannot start a completed task') + elif self.deleted: + raise Task.DeletedTask('Deleted task cannot be started') + elif self.active: + raise Task.ActiveTask('Task is already active') + + self.backend.start_task(self) + + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status', 'start']) + + def stop(self): + if not self.saved: + raise Task.NotSaved( + 'Task needs to be saved before it can be stopped', + ) + + # Refresh, and raise exception if task is already completed/deleted + self.refresh(only_fields=['status']) + + if not self.active: + raise Task.InactiveTask('Cannot stop an inactive task') + + self.backend.stop_task(self) + + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status', 'start']) + + def done(self): + if not self.saved: + raise Task.NotSaved( + 'Task needs to be saved before it can be completed', + ) + + # Refresh, and raise exception if task is already completed/deleted + self.refresh(only_fields=['status']) + + if self.completed: + raise Task.CompletedTask('Cannot complete a completed task') + elif self.deleted: + raise Task.DeletedTask('Deleted task cannot be completed') + + self.backend.complete_task(self) + + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status', 'start', 'end']) + + def save(self): + if self.saved and not self.modified: + return + + # All the actual work is done by the backend + self.backend.save_task(self) + + def add_annotation(self, annotation): + if not self.saved: + raise Task.NotSaved('Task needs to be saved to add annotation') + + self.backend.annotate_task(self, annotation) + self.refresh(only_fields=['annotations']) + + def remove_annotation(self, annotation): + if not self.saved: + raise Task.NotSaved('Task needs to be saved to remove annotation') + + if isinstance(annotation, TaskAnnotation): + annotation = annotation['description'] + + self.backend.denotate_task(self, annotation) + self.refresh(only_fields=['annotations']) + + def refresh(self, only_fields=None, after_save=False): + # Raise error when trying to refresh a task that has not been saved + if not self.saved: + raise Task.NotSaved('Task needs to be saved to be refreshed') + + new_data = self.backend.refresh_task(self, after_save=after_save) + + if only_fields: + to_update = dict( + [(k, new_data.get(k)) for k in only_fields], + ) + self._update_data(to_update, update_original=True) + else: + self._load_data(new_data) + + +class TaskQuerySet(object): + """ + Represents a lazy lookup for a task objects. + """ + + def __init__(self, backend, filter_obj=None): + self.backend = backend + self._result_cache = None + self.filter_obj = filter_obj or self.backend.filter_class(backend) + + def __deepcopy__(self, memo): + """ + Deep copy of a QuerySet doesn't populate the cache + """ + obj = self.__class__(backend=self.backend) + for k, v in self.__dict__.items(): + if k in ('_iter', '_result_cache'): + obj.__dict__[k] = None + else: + obj.__dict__[k] = copy.deepcopy(v, memo) + return obj + + def __repr__(self): + data = list(self[:REPR_OUTPUT_SIZE + 1]) + if len(data) > REPR_OUTPUT_SIZE: + data[-1] = '...(remaining elements truncated)...' + return repr(data) + + def __len__(self): + if self._result_cache is None: + self._result_cache = list(self) + return len(self._result_cache) + + def __iter__(self): + if self._result_cache is None: + self._result_cache = self._execute() + return iter(self._result_cache) + + def __getitem__(self, k): + if self._result_cache is None: + self._result_cache = list(self) + return self._result_cache.__getitem__(k) + + def __bool__(self): + if self._result_cache is not None: + return bool(self._result_cache) + try: + next(iter(self)) + except StopIteration: + return False + return True + + def __nonzero__(self): + return type(self).__bool__(self) + + def _clone(self, klass=None, **kwargs): + if klass is None: + klass = self.__class__ + filter_obj = self.filter_obj.clone() + c = klass(backend=self.backend, filter_obj=filter_obj) + c.__dict__.update(kwargs) + return c + + def _execute(self): + """ + Fetch the tasks which match the current filters. + """ + return self.backend.filter_tasks(self.filter_obj) + + def all(self): + """ + Returns a new TaskQuerySet that is a copy of the current one. + """ + return self._clone() + + def pending(self): + return self.filter(status=PENDING) + + def completed(self): + return self.filter(status=COMPLETED) + + def deleted(self): + return self.filter(status=DELETED) + + def waiting(self): + return self.filter(status=WAITING) + + def recurring(self): + return self.filter(status=RECURRING) + + def filter(self, *args, **kwargs): + """ + Returns a new TaskQuerySet with the given filters added. + """ + clone = self._clone() + for f in args: + clone.filter_obj.add_filter(f) + for key, value in kwargs.items(): + clone.filter_obj.add_filter_param(key, value) + return clone + + def get(self, **kwargs): + """ + Performs the query and returns a single object matching the given + keyword arguments. + """ + clone = self.filter(**kwargs) + num = len(clone) + if num == 1: + return clone._result_cache[0] + if not num: + raise Task.DoesNotExist( + 'Task matching query does not exist. ' + 'Lookup parameters were {0}'.format(kwargs), + ) + raise ValueError( + 'get() returned more than one Task -- it returned {0}! ' + 'Lookup parameters were {1}'.format(num, kwargs), + ) diff --git a/code/taskwarrior/tasklib/tasklib/tests.py b/code/taskwarrior/tasklib/tasklib/tests.py new file mode 100644 index 0000000..5b1ff62 --- /dev/null +++ b/code/taskwarrior/tasklib/tasklib/tests.py @@ -0,0 +1,1624 @@ +# coding=utf-8 + +import copy +import datetime +import itertools +import json +import os +import pytz +import six +import shutil +import sys +import tempfile +import unittest + +from .backends import TaskWarrior +from .task import Task, ReadOnlyDictView +from .lazy import LazyUUIDTask, LazyUUIDTaskSet +from .serializing import DATE_FORMAT, local_zone + +# http://taskwarrior.org/docs/design/task.html , Section: The Attributes +TASK_STANDARD_ATTRS = ( + 'status', + 'uuid', + 'entry', + 'description', + 'start', + 'end', + 'due', + 'until', + 'wait', + 'modified', + 'scheduled', + 'recur', + 'mask', + 'imask', + 'parent', + 'project', + 'priority', + 'depends', + 'tags', + 'annotations', +) + + +def total_seconds_2_6(x): + return x.microseconds / 1e6 + x.seconds + x.days * 24 * 3600 + + +class TasklibTest(unittest.TestCase): + + def get_taskwarrior(self, **kwargs): + tw_kwargs = dict( + data_location=self.tmp, + taskrc_location='/', + ) + tw_kwargs.update(kwargs) + return TaskWarrior(**tw_kwargs) + + def setUp(self): + self.tmp = tempfile.mkdtemp(dir='.') + self.tw = self.get_taskwarrior() + + def tearDown(self): + shutil.rmtree(self.tmp) + + +class TaskWarriorTest(TasklibTest): + + def test_custom_command(self): + # ensure that a custom command which contains multiple parts + # is properly split up + tw = self.get_taskwarrior( + task_command='wsl task', + # prevent `_get_version` from running as `wsl` may not exist + version_override=os.getenv('TASK_VERSION'), + ) + self.assertEqual(tw._get_task_command(), ['wsl', 'task']) + + +class TaskFilterTest(TasklibTest): + + def test_all_empty(self): + self.assertEqual(len(self.tw.tasks.all()), 0) + + def test_all_non_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.all()), 1) + self.assertEqual(self.tw.tasks.all()[0]['description'], 'test task') + self.assertEqual(self.tw.tasks.all()[0]['status'], 'pending') + + def test_pending_non_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.pending()), 1) + self.assertEqual( + self.tw.tasks.pending()[0]['description'], + 'test task', + ) + self.assertEqual(self.tw.tasks.pending()[0]['status'], 'pending') + + def test_completed_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.completed()), 0) + + def test_completed_non_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.completed()), 0) + self.tw.tasks.all()[0].done() + self.assertEqual(len(self.tw.tasks.completed()), 1) + + def test_deleted_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.deleted()), 0) + + def test_deleted_non_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.deleted()), 0) + self.tw.tasks.all()[0].delete() + self.assertEqual(len(self.tw.tasks.deleted()), 1) + + def test_waiting_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.waiting()), 0) + + def test_waiting_non_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.waiting()), 0) + + t = self.tw.tasks.all()[0] + t['wait'] = datetime.datetime.now() + datetime.timedelta(days=1) + t.save() + + self.assertEqual(len(self.tw.tasks.waiting()), 1) + + def test_recurring_empty(self): + Task(self.tw, description='test task').save() + self.assertEqual(len(self.tw.tasks.recurring()), 0) + + def test_recurring_non_empty(self): + Task( + self.tw, + description='test task', + recur='daily', + due=datetime.datetime.now(), + ).save() + self.assertEqual(len(self.tw.tasks.recurring()), 1) + + def test_filtering_by_attribute(self): + Task(self.tw, description='no priority task').save() + Task(self.tw, priority='H', description='high priority task').save() + self.assertEqual(len(self.tw.tasks.all()), 2) + + # Assert that the correct number of tasks is returned + self.assertEqual(len(self.tw.tasks.filter(priority='H')), 1) + + # Assert that the correct tasks are returned + high_priority_task = self.tw.tasks.get(priority='H') + self.assertEqual( + high_priority_task['description'], + 'high priority task', + ) + + def test_filtering_by_empty_attribute(self): + Task(self.tw, description='no priority task').save() + Task(self.tw, priority='H', description='high priority task').save() + self.assertEqual(len(self.tw.tasks.all()), 2) + + # Assert that the correct number of tasks is returned + self.assertEqual(len(self.tw.tasks.filter(priority=None)), 1) + + # Assert that the correct tasks are returned + no_priority_task = self.tw.tasks.get(priority=None) + self.assertEqual(no_priority_task['description'], 'no priority task') + + def test_filter_for_task_with_space_in_descripition(self): + task = Task(self.tw, description='test task') + task.save() + + filtered_task = self.tw.tasks.get(description='test task') + self.assertEqual(filtered_task['description'], 'test task') + + def test_filter_for_task_without_space_in_descripition(self): + task = Task(self.tw, description='test') + task.save() + + filtered_task = self.tw.tasks.get(description='test') + self.assertEqual(filtered_task['description'], 'test') + + def test_filter_for_task_with_space_in_project(self): + task = Task(self.tw, description='test', project='random project') + task.save() + + filtered_task = self.tw.tasks.get(project='random project') + self.assertEqual(filtered_task['project'], 'random project') + + def test_filter_for_task_without_space_in_project(self): + task = Task(self.tw, description='test', project='random') + task.save() + + filtered_task = self.tw.tasks.get(project='random') + self.assertEqual(filtered_task['project'], 'random') + + def test_filter_with_empty_uuid(self): + self.assertRaises(ValueError, lambda: self.tw.tasks.get(uuid='')) + + def test_filter_dummy_by_status(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(status=t['status']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_uuid(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(uuid=t['uuid']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_entry(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(entry=t['entry']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_description(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(description=t['description']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_start(self): + t = Task(self.tw, description='test') + t.save() + t.start() + + tasks = self.tw.tasks.filter(start=t['start']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_end(self): + t = Task(self.tw, description='test') + t.save() + t.done() + + tasks = self.tw.tasks.filter(end=t['end']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_due(self): + t = Task(self.tw, description='test', due=datetime.datetime.now()) + t.save() + + tasks = self.tw.tasks.filter(due=t['due']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_until(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(until=t['until']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_modified(self): + # Older TW version does not support bumping modified + # on save + if self.tw.version < six.text_type('2.2.0'): + # Python2.6 does not support SkipTest. As a workaround + # mark the test as passed by exiting. + if getattr(unittest, 'SkipTest', None) is not None: + raise unittest.SkipTest() + else: + return + + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(modified=t['modified']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_scheduled(self): + t = Task(self.tw, description='test') + t.save() + + tasks = self.tw.tasks.filter(scheduled=t['scheduled']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_tags(self): + t = Task(self.tw, description='test', tags=['home']) + t.save() + + tasks = self.tw.tasks.filter(tags=t['tags']) + self.assertEqual(list(tasks), [t]) + + def test_filter_dummy_by_projects(self): + t = Task(self.tw, description='test', project='random') + t.save() + + tasks = self.tw.tasks.filter(project=t['project']) + self.assertEqual(list(tasks), [t]) + + def test_filter_by_priority(self): + t = Task(self.tw, description='test', priority='H') + t.save() + + tasks = self.tw.tasks.filter(priority=t['priority']) + self.assertEqual(list(tasks), [t]) + + +class TaskTest(TasklibTest): + + def test_create_unsaved_task(self): + # Make sure a new task is not saved unless explicitly called for + Task(self.tw, description='test task') + self.assertEqual(len(self.tw.tasks.all()), 0) + + # TODO: once python 2.6 compatibility is over, use context managers here + # and in all subsequent tests for assertRaises + + def test_delete_unsaved_task(self): + t = Task(self.tw, description='test task') + self.assertRaises(Task.NotSaved, t.delete) + + def test_complete_unsaved_task(self): + t = Task(self.tw, description='test task') + self.assertRaises(Task.NotSaved, t.done) + + def test_refresh_unsaved_task(self): + t = Task(self.tw, description='test task') + self.assertRaises(Task.NotSaved, t.refresh) + + def test_start_unsaved_task(self): + t = Task(self.tw, description='test task') + self.assertRaises(Task.NotSaved, t.start) + + def test_delete_deleted_task(self): + t = Task(self.tw, description='test task') + t.save() + t.delete() + + self.assertRaises(Task.DeletedTask, t.delete) + + def test_complete_completed_task(self): + t = Task(self.tw, description='test task') + t.save() + t.done() + + self.assertRaises(Task.CompletedTask, t.done) + + def test_start_completed_task(self): + t = Task(self.tw, description='test task') + t.save() + t.done() + + self.assertRaises(Task.CompletedTask, t.start) + + def test_add_completed_task(self): + t = Task( + self.tw, + description='test', + status='completed', + end=datetime.datetime.now(), + ) + t.save() + + def test_add_multiple_completed_tasks(self): + t1 = Task( + self.tw, + description='test1', + status='completed', + end=datetime.datetime.now(), + ) + t2 = Task( + self.tw, + description='test2', + status='completed', + end=datetime.datetime.now(), + ) + t1.save() + t2.save() + + def test_complete_deleted_task(self): + t = Task(self.tw, description='test task') + t.save() + t.delete() + + self.assertRaises(Task.DeletedTask, t.done) + + def test_starting_task(self): + t = Task(self.tw, description='test task') + now = t.datetime_normalizer(datetime.datetime.now()) + t.save() + t.start() + + self.assertTrue(now.replace(microsecond=0) <= t['start']) + self.assertEqual(t['status'], 'pending') + + def test_completing_task(self): + t = Task(self.tw, description='test task') + now = t.datetime_normalizer(datetime.datetime.now()) + t.save() + t.done() + + self.assertTrue(now.replace(microsecond=0) <= t['end']) + self.assertEqual(t['status'], 'completed') + + def test_deleting_task(self): + t = Task(self.tw, description='test task') + now = t.datetime_normalizer(datetime.datetime.now()) + t.save() + t.delete() + + self.assertTrue(now.replace(microsecond=0) <= t['end']) + self.assertEqual(t['status'], 'deleted') + + def test_started_task_active(self): + t = Task(self.tw, description='test task') + t.save() + t.start() + self.assertTrue(t.active) + + def test_unstarted_task_inactive(self): + t = Task(self.tw, description='test task') + self.assertFalse(t.active) + t.save() + self.assertFalse(t.active) + + def test_start_active_task(self): + t = Task(self.tw, description='test task') + t.save() + t.start() + self.assertRaises(Task.ActiveTask, t.start) + + def test_stop_completed_task(self): + t = Task(self.tw, description='test task') + t.save() + t.start() + t.done() + + self.assertRaises(Task.InactiveTask, t.stop) + + t = Task(self.tw, description='test task') + t.save() + t.done() + + self.assertRaises(Task.InactiveTask, t.stop) + + def test_stop_deleted_task(self): + t = Task(self.tw, description='test task') + t.save() + t.start() + t.delete() + t.stop() + + def test_stop_inactive_task(self): + t = Task(self.tw, description='test task') + t.save() + + self.assertRaises(Task.InactiveTask, t.stop) + + t = Task(self.tw, description='test task') + t.save() + t.start() + t.stop() + + self.assertRaises(Task.InactiveTask, t.stop) + + def test_stopping_task(self): + t = Task(self.tw, description='test task') + t.datetime_normalizer(datetime.datetime.now()) + t.save() + t.start() + t.stop() + + self.assertEqual(t['end'], None) + self.assertEqual(t['status'], 'pending') + self.assertFalse(t.active) + + def test_modify_simple_attribute_without_space(self): + t = Task(self.tw, description='test') + t.save() + + self.assertEqual(t['description'], 'test') + + t['description'] = 'test-modified' + t.save() + + self.assertEqual(t['description'], 'test-modified') + + def test_modify_simple_attribute_with_space(self): + # Space can pose problems with parsing + t = Task(self.tw, description='test task') + t.save() + + self.assertEqual(t['description'], 'test task') + + t['description'] = 'test task modified' + t.save() + + self.assertEqual(t['description'], 'test task modified') + + def test_empty_dependency_set_of_unsaved_task(self): + t = Task(self.tw, description='test task') + self.assertEqual(t['depends'], set()) + + def test_empty_dependency_set_of_saved_task(self): + t = Task(self.tw, description='test task') + t.save() + self.assertEqual(t['depends'], set()) + + def test_set_unsaved_task_as_dependency(self): + # Adds only one dependency to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + + # We only save the parent task, dependency task is unsaved + t.save() + t['depends'] = set([dependency]) + + self.assertRaises(Task.NotSaved, t.save) + + def test_set_simple_dependency_set(self): + # Adds only one dependency to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + + t.save() + dependency.save() + + t['depends'] = set([dependency]) + + self.assertEqual(t['depends'], set([dependency])) + + def test_set_simple_dependency_lazyuuidtaskset(self): + # Adds only one dependency as a LazyUUIDTaskSet to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + + t.save() + dependency.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']]) + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']])) + + def test_set_complex_dependency_set(self): + # Adds two dependencies to task with no dependencies + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + t.save() + dependency1.save() + dependency2.save() + + t['depends'] = set([dependency1, dependency2]) + + self.assertEqual(t['depends'], set([dependency1, dependency2])) + + def test_set_complex_dependency_lazyuuidtaskset(self): + # Adds two dependencies as a LazyUUIDTaskSet to task with no dependencies + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + t.save() + dependency1.save() + dependency2.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']]) + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']])) + + def test_remove_from_dependency_set(self): + # Removes dependency from task with two dependencies + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + dependency1.save() + dependency2.save() + + t['depends'] = set([dependency1, dependency2]) + t.save() + + t['depends'].remove(dependency2) + t.save() + + self.assertEqual(t['depends'], set([dependency1])) + + def test_remove_from_dependency_lazyuuidtaskset(self): + # Removes dependency from task with two dependencies as LazyUUIDTaskSet + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + dependency1.save() + dependency2.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']]) + t.save() + + t['depends'].remove(dependency2) + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid']])) + + def test_add_to_dependency_set(self): + # Adds dependency to task with one dependencies + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + dependency1.save() + dependency2.save() + + t['depends'] = set([dependency1]) + t.save() + + t['depends'].add(dependency2) + t.save() + + self.assertEqual(t['depends'], set([dependency1, dependency2])) + + def test_add_to_dependency_lazyuuidtaskset(self): + # Adds dependency to task with one dependencies as LazyUUIDTaskSet + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + dependency1.save() + dependency2.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid']]) + t.save() + + t['depends'].add(dependency2) + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']])) + + def test_add_lazyuuidtaskset_to_dependency_lazyuuidtaskset(self): + # Adds dependency as LazyUUIDTaskSet to task with one dependencies as LazyUUIDTaskSet + t = Task(self.tw, description='test task') + dependency1 = Task(self.tw, description='needs to be done first') + dependency2 = Task(self.tw, description='needs to be done second') + + dependency1.save() + dependency2.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency1['uuid']]) + t.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency2['uuid']]).union(t['depends']) + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency1['uuid'], dependency2['uuid']])) + + def test_add_to_empty_dependency_set(self): + # Adds dependency to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + + dependency.save() + + t['depends'].add(dependency) + t.save() + + self.assertEqual(t['depends'], set([dependency])) + + def test_add_to_empty_dependency_lazyuuidtaskset(self): + # Adds dependency as LazyUUIDTaskSet to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + + dependency.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']]) + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']])) + + def test_simple_dependency_set_save_repeatedly(self): + # Adds only one dependency to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + dependency.save() + + t['depends'] = set([dependency]) + t.save() + + # We taint the task, but keep depends intact + t['description'] = 'test task modified' + t.save() + + self.assertEqual(t['depends'], set([dependency])) + + # We taint the task, but assign the same set to the depends + t['depends'] = set([dependency]) + t['description'] = 'test task modified again' + t.save() + + self.assertEqual(t['depends'], set([dependency])) + + def test_simple_dependency_lazyuuidtaskset_save_repeatedly(self): + # Adds only one dependency as LazyUUIDTaskSet to task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + dependency.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']]) + t.save() + + # We taint the task, but keep depends intact + t['description'] = 'test task modified' + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']])) + + # We taint the task, but assign the same set to the depends + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']]) + t['description'] = 'test task modified again' + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']])) + + def test_simple_dependency_lazyuuidtaskset_save_before_repeatedly(self): + # Adds only one dependency as LazyUUIDTaskSet to a saved task with no dependencies + t = Task(self.tw, description='test task') + dependency = Task(self.tw, description='needs to be done first') + dependency.save() + t.save() + + t['depends'] = LazyUUIDTaskSet(self.tw, [dependency['uuid']]) + t.save() + + self.assertEqual(t['depends'], LazyUUIDTaskSet(self.tw, [dependency['uuid']])) + + def test_compare_different_tasks(self): + # Negative: compare two different tasks + t1 = Task(self.tw, description='test task') + t2 = Task(self.tw, description='test task') + + t1.save() + t2.save() + + self.assertEqual(t1 == t2, False) + + def test_compare_same_task_object(self): + # Compare Task object wit itself + t = Task(self.tw, description='test task') + t.save() + + self.assertEqual(t == t, True) + + def test_compare_same_task(self): + # Compare the same task using two different objects + t1 = Task(self.tw, description='test task') + t1.save() + + t2 = self.tw.tasks.get(uuid=t1['uuid']) + self.assertEqual(t1 == t2, True) + + def test_compare_unsaved_tasks(self): + # t1 and t2 are unsaved tasks, considered to be unequal + # despite the content of data + t1 = Task(self.tw, description='test task') + t2 = Task(self.tw, description='test task') + + self.assertEqual(t1 == t2, False) + + def test_hash_unsaved_tasks(self): + # Considered equal, it's the same object + t1 = Task(self.tw, description='test task') + t2 = t1 + self.assertEqual(hash(t1) == hash(t2), True) + + def test_hash_same_task(self): + # Compare the hash of the task using two different objects + t1 = Task(self.tw, description='test task') + t1.save() + + t2 = self.tw.tasks.get(uuid=t1['uuid']) + self.assertEqual(t1.__hash__(), t2.__hash__()) + + def test_hash_unequal_unsaved_tasks(self): + # Compare the hash of the task using two different objects + t1 = Task(self.tw, description='test task 1') + t2 = Task(self.tw, description='test task 2') + + self.assertNotEqual(t1.__hash__(), t2.__hash__()) + + def test_hash_unequal_saved_tasks(self): + # Compare the hash of the task using two different objects + t1 = Task(self.tw, description='test task 1') + t2 = Task(self.tw, description='test task 2') + + t1.save() + t2.save() + + self.assertNotEqual(t1.__hash__(), t2.__hash__()) + + def test_adding_task_with_priority(self): + t = Task(self.tw, description='test task', priority='M') + t.save() + + def test_removing_priority_with_none(self): + t = Task(self.tw, description='test task', priority='L') + t.save() + + # Remove the priority mark + t['priority'] = None + t.save() + + # Assert that priority is not there after saving + self.assertEqual(t['priority'], None) + + def test_adding_task_with_due_time(self): + t = Task(self.tw, description='test task', due=datetime.datetime.now()) + t.save() + + def test_removing_due_time_with_none(self): + t = Task(self.tw, description='test task', due=datetime.datetime.now()) + t.save() + + # Remove the due timestamp + t['due'] = None + t.save() + + # Assert that due timestamp is no longer there + self.assertEqual(t['due'], None) + + def test_modified_fields_new_task(self): + t = Task(self.tw) + + # This should be empty with new task + self.assertEqual(set(t._modified_fields), set()) + + # Modify the task + t['description'] = 'test task' + self.assertEqual(set(t._modified_fields), set(['description'])) + + t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3 + self.assertEqual(set(t._modified_fields), set(['description', 'due'])) + + t['project'] = 'test project' + self.assertEqual( + set(t._modified_fields), + set(['description', 'due', 'project']), + ) + + # List of modified fields should clear out when saved + t.save() + self.assertEqual(set(t._modified_fields), set()) + + # Reassigning the fields with the same values now should not produce + # modified fields + t['description'] = 'test task' + t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3 + t['project'] = 'test project' + self.assertEqual(set(t._modified_fields), set()) + + def test_modified_fields_loaded_task(self): + t = Task(self.tw) + + # Modify the task + t['description'] = 'test task' + t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3 + t['project'] = 'test project' + + dependency = Task(self.tw, description='dependency') + dependency.save() + t['depends'] = set([dependency]) + + # List of modified fields should clear out when saved + t.save() + self.assertEqual(set(t._modified_fields), set()) + + # Get the task by using a filter by UUID + self.tw.tasks.get(uuid=t['uuid']) + + # Reassigning the fields with the same values now should not produce + # modified fields + t['description'] = 'test task' + t['due'] = datetime.datetime(2014, 2, 14, 14, 14, 14) # <3 + t['project'] = 'test project' + t['depends'] = set([dependency]) + self.assertEqual(set(t._modified_fields), set()) + + def test_modified_fields_not_affected_by_reading(self): + t = Task(self.tw) + + for field in TASK_STANDARD_ATTRS: + t[field] + + self.assertEqual(set(t._modified_fields), set()) + + def test_setting_read_only_attrs_through_init(self): + # Test that we are unable to set readonly attrs through __init__ + for readonly_key in Task.read_only_fields: + kwargs = {'description': 'test task', readonly_key: 'value'} + self.assertRaises( + RuntimeError, + lambda: Task(self.tw, **kwargs), + ) + + def test_setting_read_only_attrs_through_setitem(self): + # Test that we are unable to set readonly attrs through __init__ + for readonly_key in Task.read_only_fields: + t = Task(self.tw, description='test task') + self.assertRaises( + RuntimeError, + lambda: t.__setitem__(readonly_key, 'value'), + ) + + def test_saving_unmodified_task(self): + t = Task(self.tw, description='test task') + t.save() + t.save() + + def test_adding_tag_by_appending(self): + t = Task(self.tw, description='test task', tags=['test1']) + t.save() + t['tags'].add('test2') + t.save() + self.assertEqual(t['tags'], set(['test1', 'test2'])) + + def test_adding_tag_twice(self): + t = Task(self.tw, description='test task', tags=['test1']) + t.save() + t['tags'].add('test2') + t['tags'].add('test2') + t.save() + self.assertEqual(t['tags'], set(['test1', 'test2'])) + + def test_adding_tag_by_appending_empty(self): + t = Task(self.tw, description='test task') + t.save() + t['tags'].add('test') + t.save() + self.assertEqual(t['tags'], set(['test'])) + + def test_serializers_returning_empty_string_for_none(self): + # Test that any serializer returns '' when passed None + t = Task(self.tw) + serializers = [ + getattr(t, serializer_name) + for serializer_name in filter( + lambda x: x.startswith('serialize_'), + dir(t), + ) + ] + for serializer in serializers: + self.assertEqual(serializer(None), '') + + def test_deserializer_returning_empty_value_for_empty_string(self): + # Test that any deserializer returns empty value when passed '' + t = Task(self.tw) + deserializers = [ + getattr(t, deserializer_name) + for deserializer_name in filter( + lambda x: x.startswith('deserialize_'), + dir(t), + ) + ] + for deserializer in deserializers: + self.assertTrue(deserializer('') in (None, [], set())) + + def test_normalizers_handling_none(self): + # Test that any normalizer can handle None as a valid value + t = Task(self.tw) + + for key in TASK_STANDARD_ATTRS: + t._normalize(key, None) + + def test_recurrent_task_generation(self): + today = datetime.date.today() + t = Task( + self.tw, + description='brush teeth', + due=today, + recur='daily', + ) + t.save() + self.assertEqual(len(self.tw.tasks.pending()), 2) + + def test_spawned_task_parent(self): + today = datetime.date.today() + t = Task( + self.tw, + description='brush teeth', + due=today, + recur='daily', + ) + t.save() + + spawned = self.tw.tasks.pending().get(due=today) + assert spawned['parent'] == t + + def test_modify_number_of_tasks_at_once(self): + for i in range(1, 100): + Task(self.tw, description='test task %d' % i, tags=['test']).save() + + self.tw.execute_command(['+test', 'mod', 'unified', 'description']) + + def test_return_all_from_executed_command(self): + Task(self.tw, description='test task', tags=['test']).save() + out, err, rc = self.tw.execute_command(['count'], return_all=True) + self.assertEqual(rc, 0) + + def test_return_all_from_failed_executed_command(self): + Task(self.tw, description='test task', tags=['test']).save() + out, err, rc = self.tw.execute_command( + ['countinvalid'], + return_all=True, + allow_failure=False, + ) + self.assertNotEqual(rc, 0) + + +class TaskFromHookTest(TasklibTest): + + input_add_data = six.StringIO( + '{"description":"Buy some milk",' + '"entry":"20141118T050231Z",' + '"status":"pending",' + '"start":"20141119T152233Z",' + '"uuid":"a360fc44-315c-4366-b70c-ea7e7520b749"}', + ) + + input_add_data_recurring = six.StringIO( + '{"description":"Mow the lawn",' + '"entry":"20160210T224304Z",' + '"parent":"62da6227-519c-42c2-915d-dccada926ad7",' + '"recur":"weekly",' + '"status":"pending",' + '"uuid":"81305335-0237-49ff-8e87-b3cdc2369cec"}', + ) + + input_modify_data = six.StringIO( + '\n'.join([ + input_add_data.getvalue(), + ( + '{"description":"Buy some milk finally",' + '"entry":"20141118T050231Z",' + '"status":"completed",' + '"uuid":"a360fc44-315c-4366-b70c-ea7e7520b749"}' + ), + ]), + ) + + exported_raw_data = ( + '{"project":"Home",' + '"due":"20150101T232323Z",' + '"description":"test task"}' + ) + + def test_setting_up_from_add_hook_input(self): + t = Task.from_input(input_file=self.input_add_data, backend=self.tw) + self.assertEqual(t['description'], 'Buy some milk') + self.assertEqual(t.pending, True) + + def test_setting_up_from_add_hook_input_recurring(self): + t = Task.from_input( + input_file=self.input_add_data_recurring, + backend=self.tw, + ) + self.assertEqual(t['description'], 'Mow the lawn') + self.assertEqual(t.pending, True) + + def test_setting_up_from_modified_hook_input(self): + t = Task.from_input( + input_file=self.input_modify_data, + modify=True, + backend=self.tw, + ) + self.assertEqual(t['description'], 'Buy some milk finally') + self.assertEqual(t.pending, False) + self.assertEqual(t.completed, True) + + self.assertEqual(t._original_data['status'], 'pending') + self.assertEqual(t._original_data['description'], 'Buy some milk') + self.assertEqual( + set(t._modified_fields), + set(['status', 'description', 'start']), + ) + + def test_export_data(self): + t = Task( + self.tw, + description='test task', + project='Home', + due=pytz.utc.localize( + datetime.datetime(2015, 1, 1, 23, 23, 23)), + ) + + # Check that the output is a permutation of: + # {"project":"Home","description":"test task","due":"20150101232323Z"} + allowed_segments = self.exported_raw_data[1:-1].split(',') + allowed_output = [ + '{' + ','.join(segments) + '}' + for segments in itertools.permutations(allowed_segments) + ] + + self.assertTrue( + any(t.export_data() == expected + for expected in allowed_output), + ) + + +class TimezoneAwareDatetimeTest(TasklibTest): + + def setUp(self): + super(TimezoneAwareDatetimeTest, self).setUp() + self.zone = local_zone + self.localdate_naive = datetime.datetime(2015, 2, 2) + self.localtime_naive = datetime.datetime(2015, 2, 2, 0, 0, 0) + self.localtime_aware = self.zone.localize(self.localtime_naive) + self.utctime_aware = self.localtime_aware.astimezone(pytz.utc) + + def test_timezone_naive_datetime_setitem(self): + t = Task(self.tw, description='test task') + t['due'] = self.localtime_naive + self.assertEqual(t['due'], self.localtime_aware) + + def test_timezone_naive_datetime_using_init(self): + t = Task(self.tw, description='test task', due=self.localtime_naive) + self.assertEqual(t['due'], self.localtime_aware) + + def test_filter_by_naive_datetime(self): + t = Task(self.tw, description='task1', due=self.localtime_naive) + t.save() + matching_tasks = self.tw.tasks.filter(due=self.localtime_naive) + self.assertEqual(len(matching_tasks), 1) + + def test_serialize_naive_datetime(self): + t = Task(self.tw, description='task1', due=self.localtime_naive) + self.assertEqual( + json.loads(t.export_data())['due'], + self.utctime_aware.strftime(DATE_FORMAT), + ) + + def test_timezone_naive_date_setitem(self): + t = Task(self.tw, description='test task') + t['due'] = self.localdate_naive + self.assertEqual(t['due'], self.localtime_aware) + + def test_timezone_naive_date_using_init(self): + t = Task(self.tw, description='test task', due=self.localdate_naive) + self.assertEqual(t['due'], self.localtime_aware) + + def test_filter_by_naive_date(self): + t = Task(self.tw, description='task1', due=self.localdate_naive) + t.save() + matching_tasks = self.tw.tasks.filter(due=self.localdate_naive) + self.assertEqual(len(matching_tasks), 1) + + def test_serialize_naive_date(self): + t = Task(self.tw, description='task1', due=self.localdate_naive) + self.assertEqual( + json.loads(t.export_data())['due'], + self.utctime_aware.strftime(DATE_FORMAT), + ) + + def test_timezone_aware_datetime_setitem(self): + t = Task(self.tw, description='test task') + t['due'] = self.localtime_aware + self.assertEqual(t['due'], self.localtime_aware) + + def test_timezone_aware_datetime_using_init(self): + t = Task(self.tw, description='test task', due=self.localtime_aware) + self.assertEqual(t['due'], self.localtime_aware) + + def test_filter_by_aware_datetime(self): + t = Task(self.tw, description='task1', due=self.localtime_aware) + t.save() + matching_tasks = self.tw.tasks.filter(due=self.localtime_aware) + self.assertEqual(len(matching_tasks), 1) + + def test_serialize_aware_datetime(self): + t = Task(self.tw, description='task1', due=self.localtime_aware) + self.assertEqual( + json.loads(t.export_data())['due'], + self.utctime_aware.strftime(DATE_FORMAT), + ) + + +class DatetimeStringTest(TasklibTest): + + def test_simple_now_conversion(self): + if self.tw.version < six.text_type('2.4.0'): + # Python2.6 does not support SkipTest. As a workaround + # mark the test as passed by exiting. + if getattr(unittest, 'SkipTest', None) is not None: + raise unittest.SkipTest() + else: + return + + t = Task(self.tw, description='test task', due='now') + now = local_zone.localize(datetime.datetime.now()) + + # Assert that both times are not more than 5 seconds apart + if sys.version_info < (2, 7): + self.assertTrue(total_seconds_2_6(now - t['due']) < 5) + self.assertTrue(total_seconds_2_6(t['due'] - now) < 5) + else: + self.assertTrue((now - t['due']).total_seconds() < 5) + self.assertTrue((t['due'] - now).total_seconds() < 5) + + def test_simple_eoy_conversion(self): + if self.tw.version < six.text_type('2.4.0'): + # Python2.6 does not support SkipTest. As a workaround + # mark the test as passed by exiting. + if getattr(unittest, 'SkipTest', None) is not None: + raise unittest.SkipTest() + else: + return + + t = Task(self.tw, description='test task', due='eoy') + now = local_zone.localize(datetime.datetime.now()) + eoy = local_zone.localize(datetime.datetime( + year=now.year, + month=12, + day=31, + hour=23, + minute=59, + second=59, + )) + self.assertEqual(eoy, t['due']) + + def test_complex_eoy_conversion(self): + if self.tw.version < six.text_type('2.4.0'): + # Python2.6 does not support SkipTest. As a workaround + # mark the test as passed by exiting. + if getattr(unittest, 'SkipTest', None) is not None: + raise unittest.SkipTest() + else: + return + + t = Task(self.tw, description='test task', due='eoy - 4 months') + now = local_zone.localize(datetime.datetime.now()) + due_date = local_zone.localize( + datetime.datetime( + year=now.year, + month=12, + day=31, + hour=23, + minute=59, + second=59, + ) + ) - datetime.timedelta(0, 4 * 30 * 86400) + self.assertEqual(due_date, t['due']) + + def test_filtering_with_string_datetime(self): + if self.tw.version < six.text_type('2.4.0'): + # Python2.6 does not support SkipTest. As a workaround + # mark the test as passed by exiting. + if getattr(unittest, 'SkipTest', None) is not None: + raise unittest.SkipTest() + else: + return + + t = Task( + self.tw, + description='test task', + due=datetime.datetime.now() - datetime.timedelta(0, 2), + ) + t.save() + self.assertEqual(len(self.tw.tasks.filter(due__before='now')), 1) + + +class AnnotationTest(TasklibTest): + + def setUp(self): + super(AnnotationTest, self).setUp() + Task(self.tw, description='test task').save() + + def test_adding_annotation(self): + task = self.tw.tasks.get() + task.add_annotation('test annotation') + self.assertEqual(len(task['annotations']), 1) + ann = task['annotations'][0] + self.assertEqual(ann['description'], 'test annotation') + + def test_removing_annotation(self): + task = self.tw.tasks.get() + task.add_annotation('test annotation') + ann = task['annotations'][0] + ann.remove() + self.assertEqual(len(task['annotations']), 0) + + def test_removing_annotation_by_description(self): + task = self.tw.tasks.get() + task.add_annotation('test annotation') + task.remove_annotation('test annotation') + self.assertEqual(len(task['annotations']), 0) + + def test_removing_annotation_by_obj(self): + task = self.tw.tasks.get() + task.add_annotation('test annotation') + ann = task['annotations'][0] + task.remove_annotation(ann) + self.assertEqual(len(task['annotations']), 0) + + def test_annotation_after_modification(self): + task = self.tw.tasks.get() + task['project'] = 'test' + task.add_annotation('I should really do this task') + self.assertEqual(task['project'], 'test') + task.save() + self.assertEqual(task['project'], 'test') + + def test_serialize_annotations(self): + # Test that serializing annotations is possible + t = Task(self.tw, description='test') + t.save() + + t.add_annotation('annotation1') + t.add_annotation('annotation2') + + data = t._serialize('annotations', t._data['annotations']) + + self.assertEqual(len(data), 2) + self.assertEqual(type(data[0]), dict) + self.assertEqual(type(data[1]), dict) + + self.assertEqual(data[0]['description'], 'annotation1') + self.assertEqual(data[1]['description'], 'annotation2') + + +class UnicodeTest(TasklibTest): + + def test_unicode_task(self): + Task(self.tw, description=six.u('†åßk')).save() + self.tw.tasks.get() + + def test_filter_by_unicode_task(self): + Task(self.tw, description=six.u('†åßk')).save() + tasks = self.tw.tasks.filter(description=six.u('†åßk')) + self.assertEqual(len(tasks), 1) + + def test_non_unicode_task(self): + Task(self.tw, description='test task').save() + self.tw.tasks.get() + + +class ReadOnlyDictViewTest(unittest.TestCase): + + def setUp(self): + self.sample = dict(sample_list=[1, 2, 3], sample_dict={'key': 'value'}) + self.original_sample = copy.deepcopy(self.sample) + self.view = ReadOnlyDictView(self.sample) + + def test_readonlydictview_getitem(self): + sample_list = self.view['sample_list'] + self.assertEqual(sample_list, self.sample['sample_list']) + + # Assert that modification changed only copied value + sample_list.append(4) + self.assertNotEqual(sample_list, self.sample['sample_list']) + + # Assert that viewed dict is not changed + self.assertEqual(self.sample, self.original_sample) + + def test_readonlydictview_contains(self): + self.assertEqual('sample_list' in self.view, + 'sample_list' in self.sample) + self.assertEqual('sample_dict' in self.view, + 'sample_dict' in self.sample) + self.assertEqual('key' in self.view, 'key' in self.sample) + + # Assert that viewed dict is not changed + self.assertEqual(self.sample, self.original_sample) + + def test_readonlydictview_iter(self): + self.assertEqual( + list(key for key in self.view), + list(key for key in self.sample), + ) + + # Assert the view is correct after modification + self.sample['new'] = 'value' + self.assertEqual( + list(key for key in self.view), + list(key for key in self.sample), + ) + + def test_readonlydictview_len(self): + self.assertEqual(len(self.view), len(self.sample)) + + # Assert the view is correct after modification + self.sample['new'] = 'value' + self.assertEqual(len(self.view), len(self.sample)) + + def test_readonlydictview_get(self): + sample_list = self.view.get('sample_list') + self.assertEqual(sample_list, self.sample.get('sample_list')) + + # Assert that modification changed only copied value + sample_list.append(4) + self.assertNotEqual(sample_list, self.sample.get('sample_list')) + + # Assert that viewed dict is not changed + self.assertEqual(self.sample, self.original_sample) + + def test_readonlydict_items(self): + view_items = self.view.items() + sample_items = list(self.sample.items()) + self.assertEqual(view_items, sample_items) + + view_items.append('newkey') + self.assertNotEqual(view_items, sample_items) + self.assertEqual(self.sample, self.original_sample) + + def test_readonlydict_values(self): + view_values = self.view.values() + sample_values = list(self.sample.values()) + self.assertEqual(view_values, sample_values) + + view_list_item = list(filter(lambda x: type(x) is list, + view_values))[0] + view_list_item.append(4) + self.assertNotEqual(view_values, sample_values) + self.assertEqual(self.sample, self.original_sample) + + +class LazyUUIDTaskTest(TasklibTest): + + def setUp(self): + super(LazyUUIDTaskTest, self).setUp() + + self.stored = Task(self.tw, description='this is test task') + self.stored.save() + + self.lazy = LazyUUIDTask(self.tw, self.stored['uuid']) + + def test_uuid_non_conversion(self): + assert self.stored['uuid'] == self.lazy['uuid'] + assert type(self.lazy) is LazyUUIDTask + + def test_lazy_explicit_conversion(self): + assert type(self.lazy) is LazyUUIDTask + self.lazy.replace() + assert type(self.lazy) is Task + + def test_conversion_key(self): + assert self.stored['description'] == self.lazy['description'] + assert type(self.lazy) is Task + + def test_conversion_attribute(self): + assert type(self.lazy) is LazyUUIDTask + assert self.lazy.completed is False + assert type(self.lazy) is Task + + def test_normal_to_lazy_equality(self): + assert self.stored == self.lazy + assert not self.stored != self.lazy + assert type(self.lazy) is LazyUUIDTask + + def test_lazy_to_lazy_equality(self): + lazy1 = LazyUUIDTask(self.tw, self.stored['uuid']) + lazy2 = LazyUUIDTask(self.tw, self.stored['uuid']) + + assert lazy1 == lazy2 + assert not lazy1 != lazy2 + assert type(lazy1) is LazyUUIDTask + assert type(lazy2) is LazyUUIDTask + + def test_normal_to_lazy_inequality(self): + # Create a different UUID by changing the last letter + wrong_uuid = self.stored['uuid'] + wrong_uuid = wrong_uuid[:-1] + ('a' if wrong_uuid[-1] != 'a' else 'b') + + wrong_lazy = LazyUUIDTask(self.tw, wrong_uuid) + + assert not self.stored == wrong_lazy + assert self.stored != wrong_lazy + assert type(wrong_lazy) is LazyUUIDTask + + def test_lazy_to_lazy_inequality(self): + # Create a different UUID by changing the last letter + wrong_uuid = self.stored['uuid'] + wrong_uuid = wrong_uuid[:-1] + ('a' if wrong_uuid[-1] != 'a' else 'b') + + lazy1 = LazyUUIDTask(self.tw, self.stored['uuid']) + lazy2 = LazyUUIDTask(self.tw, wrong_uuid) + + assert not lazy1 == lazy2 + assert lazy1 != lazy2 + assert type(lazy1) is LazyUUIDTask + assert type(lazy2) is LazyUUIDTask + + def test_lazy_in_queryset(self): + tasks = self.tw.tasks.filter(uuid=self.stored['uuid']) + + assert self.lazy in tasks + assert type(self.lazy) is LazyUUIDTask + + def test_lazy_saved(self): + assert self.lazy.saved is True + + def test_lazy_modified(self): + assert self.lazy.modified is False + + def test_lazy_modified_fields(self): + assert self.lazy._modified_fields == set() + + +class LazyUUIDTaskSetTest(TasklibTest): + + def setUp(self): + super(LazyUUIDTaskSetTest, self).setUp() + + self.task1 = Task(self.tw, description='task 1') + self.task2 = Task(self.tw, description='task 2') + self.task3 = Task(self.tw, description='task 3') + + self.task1.save() + self.task2.save() + self.task3.save() + + self.uuids = ( + self.task1['uuid'], + self.task2['uuid'], + self.task3['uuid'], + ) + + self.lazy = LazyUUIDTaskSet(self.tw, self.uuids) + + def test_length(self): + assert len(self.lazy) == 3 + assert type(self.lazy) is LazyUUIDTaskSet + + def test_contains(self): + assert self.task1 in self.lazy + assert self.task2 in self.lazy + assert self.task3 in self.lazy + assert type(self.lazy) is LazyUUIDTaskSet + + def test_eq_lazy(self): + new_lazy = LazyUUIDTaskSet(self.tw, self.uuids) + assert self.lazy == new_lazy + assert not self.lazy != new_lazy + assert type(self.lazy) is LazyUUIDTaskSet + + def test_eq_real(self): + assert self.lazy == self.tw.tasks.all() + assert self.tw.tasks.all() == self.lazy + assert not self.lazy != self.tw.tasks.all() + + assert type(self.lazy) is LazyUUIDTaskSet + + def test_union(self): + taskset = set([self.task1]) + lazyset = LazyUUIDTaskSet( + self.tw, + (self.task2['uuid'], self.task3['uuid']), + ) + + assert taskset | lazyset == self.lazy + assert lazyset | taskset == self.lazy + assert taskset.union(lazyset) == self.lazy + assert lazyset.union(taskset) == self.lazy + + lazyset |= taskset + assert lazyset == self.lazy + + def test_difference(self): + taskset = set([self.task1, self.task2]) + lazyset = LazyUUIDTaskSet( + self.tw, + (self.task2['uuid'], self.task3['uuid']), + ) + + assert taskset - lazyset == set([self.task1]) + assert lazyset - taskset == set([self.task3]) + assert taskset.difference(lazyset) == set([self.task1]) + assert lazyset.difference(taskset) == set([self.task3]) + + lazyset -= taskset + assert lazyset == set([self.task3]) + + def test_symmetric_difference(self): + taskset = set([self.task1, self.task2]) + lazyset = LazyUUIDTaskSet( + self.tw, + (self.task2['uuid'], self.task3['uuid']), + ) + + assert taskset ^ lazyset == set([self.task1, self.task3]) + assert lazyset ^ taskset == set([self.task1, self.task3]) + self.assertEqual( + taskset.symmetric_difference(lazyset), + set([self.task1, self.task3]), + ) + self.assertEqual( + lazyset.symmetric_difference(taskset), + set([self.task1, self.task3]), + ) + + lazyset ^= taskset + assert lazyset == set([self.task1, self.task3]) + + def test_intersection(self): + taskset = set([self.task1, self.task2]) + lazyset = LazyUUIDTaskSet( + self.tw, + (self.task2['uuid'], self.task3['uuid']), + ) + + assert taskset & lazyset == set([self.task2]) + assert lazyset & taskset == set([self.task2]) + assert taskset.intersection(lazyset) == set([self.task2]) + assert lazyset.intersection(taskset) == set([self.task2]) + + lazyset &= taskset + assert lazyset == set([self.task2]) + + +class TaskWarriorBackendTest(TasklibTest): + + def test_config(self): + assert self.tw.config['nag'] == 'You have more urgent tasks.' + assert self.tw.config['default.command'] == 'next' + assert self.tw.config['dependency.indicator'] == 'D'