- TASK_VERSION=v2.3.0
- TASK_VERSION=v2.4.0
- TASK_VERSION=v2.4.1
- - TASK_VERSION=2.4.2
+ - TASK_VERSION=v2.4.2
+ - TASK_VERSION=v2.4.3
+ - TASK_VERSION=v2.4.4
+ - TASK_VERSION=2.4.5
python:
- "2.6"
- "2.7"
# built documents.
#
# The short X.Y version.
-version = '0.9.0'
+version = '0.10.0'
# The full version, including alpha/beta/rc tags.
-release = '0.9.0'
+release = '0.10.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Requirements
------------
-* taskwarrior_ v2.1.x or above.
+* taskwarrior_ v2.1.x or above, although newest minor release is recommended.
Installation
------------
>>> tw = TaskWarrior(data_location='~/.task', create=True)
+The ``TaskWarrior`` instance will also use your .taskrc configuration (so that
+it recognizes the same UDAs as your task binary, uses the same configuration,
+etc.). To override the location of the .taskrc, use
+``taskrc_location=~/some/different/path``.
+
Creating Tasks
--------------
Attributes should be set using the correct Python representation, which will be
serialized into the correct format when the task is saved.
+Task properties
+---------------
+
+Tasklib defines several properties upon ``Task`` object, for convenience::
+
+ >>> t.save()
+ >>> t.saved
+ True
+ >>> t.pending
+ True
+ >>> t.active
+ False
+ >>> t.start()
+ >>> t.active
+ True
+ >>> t.done()
+ >>> t.completed
+ True
+ >>> t.pending
+ False
+ >>> t.delete()
+ >>> t.deleted
+ True
+
Operations on Tasks
-------------------
>>> task['tags']
['someday']
+Tasks can also be started and stopped. Use ``start()`` and ``stop()``
+respectively::
+
+ >>> task.start()
+ >>> task['start']
+ datetime.datetime(2015, 7, 16, 18, 48, 28, tzinfo=<DstTzInfo 'Europe/Prague' CEST+2:00:00 DST>)
+ >>> task.stop()
+ >>> task['start']
+ >>> task.done()
+ >>> task['end']
+ datetime.datetime(2015, 7, 16, 18, 49, 2, tzinfo=<DstTzInfo 'Europe/Prague' CEST+2:00:00 DST>)
+
Retrieving Tasks
----------------
>>> t['due'] == now.astimezone(pytz.utc)
True
+*Note*: Following behaviour is available only for TaskWarrior >= 2.4.0.
+
+There is a third approach to setting up date time values, which leverages
+the 'task calc' command. You can simply set any datetime attribute to
+any string that contains an acceptable TaskWarrior-formatted time expression::
+
+ $ task calc now + 1d
+ 2015-07-17T21:17:54
+
+This syntax can be leveraged in the python interpreter as follows::
+
+ >>> t['due'] = "now + 1d"
+ >>> t['due']
+ datetime.datetime(2015, 7, 17, 21, 19, 31, tzinfo=<DstTzInfo 'Europe/Berlin' CEST+2:00:00 DST>)
+
+It can be easily seen that the string with TaskWarrior-formatted time expression
+is automatically converted to native datetime in the local time zone.
+
+For the list of acceptable formats and keywords, please consult:
+
+* http://taskwarrior.org/docs/dates.html
+* http://taskwarrior.org/docs/named_dates.html
+
+However, as each such assigment involves call to 'task calc' for conversion,
+it might cause some performance issues when assigning strings to datetime
+attributes repeatedly, in a automated manner.
Working with annotations
------------------------
>>> tw.execute_command(['3', 'done'], config_override={'gc': 'off'}) # Will mark 3 as completed and it will retain its ID
+
+Additionally, you can use ``return_all=True`` flag, which returns
+``(stdout, sterr, return_code)`` triplet, and ``allow_failure=False``, which will
+prevent tasklib from raising an exception if the task binary returned non-zero
+return code::
+
+ >>> tw.execute_command(['invalidcommand'], allow_failure=False, return_all=True)
+ ([u''],
+ [u'Using alternate .taskrc file /home/tbabej/.taskrc',
+ u"[task next rc:/home/tbabej/.taskrc rc.recurrence.confirmation=no rc.json.array=off rc.confirmation=no rc.bulk=0 rc.dependency.confirmation=no description ~ 'invalidcommand']",
+ u'Configuration override rc.recurrence.confirmation:no',
+ u'Configuration override rc.json.array:off',
+ u'Configuration override rc.confirmation:no',
+ u'Configuration override rc.bulk:0',
+ u'Configuration override rc.dependency.confirmation:no',
+ u'No matches.',
+ u'There are local changes. Sync required.'],
+ 1)
+
+
Setting custom configuration values
-----------------------------------
-By default, TaskWarrior does not use any of configuration values stored in
-your .taskrc. To see what configuration values are passed to each executed
+By default, TaskWarrior uses configuration values stored in your .taskrc.
+To see what configuration value overrides are passed to each executed
task command, have a peek into ``config`` attribute of ``TaskWarrior`` object::
>>> tw.config
{'confirmation': 'no', 'data.location': '/home/tbabej/.task'}
-To pass your own configuration, you just need to update this dictionary::
+To pass your own configuration overrides, you just need to update this dictionary::
>>> tw.config.update({'hooks': 'off'}) # tasklib will not trigger hooks
Working with UDAs
-----------------
-Since TaskWarrior does not read your .taskrc, you need to define any UDAs
-in the TaskWarrior's config dictionary, as described above.
+Since TaskWarrior does read your .taskrc, you need not to define any UDAs
+in the TaskWarrior's config dictionary, as described above. Suppose we have
+a estimate UDA in the .taskrc::
-Let us demonstrate this on the same example as in the TaskWarrior's docs::
+ uda.estimate.type = numeric
- >>> tw = TaskWarrior()
- >>> tw.config.update({'uda.estimate.type': 'numeric'})
-
-Now we can filter and create tasks using the estimate UDA::
+We can simply filter and create tasks using the estimate UDA out of the box::
+ >>> tw = TaskWarrior()
>>> task = Task(tw, description="Long task", estimate=1000)
>>> task.save()
>>> task['id']
$ task 1 export
{"id":1,"description":"Long task","estimate":1000, ...}
-As long as ``TaskWarrior``'s config is updated, we can approach UDAs as built in attributes::
+We can also speficy UDAs as arguments in the TaskFilter::
>>> tw.tasks.filter(estimate=1000)
Long task
Syncing
-------
-Syncing is not directly supported by tasklib, but it can be made to work in a similiar way
-as the UDAs. First we need to update the ``config`` dictionary by the values required for
-sync to work, and then we can run the sync command using the ``execute_command()`` method::
+If you have configurated the needed config variables in your .taskrc, syncing
+is as easy as::
+
+ >>> tw = TaskWarrior()
+ >>> tw.execute_command(['sync'])
+
+If you want to use non-standard server/credentials, you'll need to provide configuration
+overrides to the ``TaskWarrior`` instance. Update the ``config`` dictionary with the
+values you desire to override, and then we can run the sync command using
+the ``execute_command()`` method::
>>> tw = TaskWarrior()
>>> sync_config = {
from setuptools import setup, find_packages
-version = '0.9.0'
+version = '0.10.0'
setup(
name='tasklib',
import logging
import os
import pytz
+import re
import six
import sys
import subprocess
import tzlocal
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
+DATE_FORMAT_CALC = '%Y-%m-%dT%H:%M:%S'
REPR_OUTPUT_SIZE = 10
PENDING = 'pending'
COMPLETED = 'completed'
VERSION_2_2_0 = six.u('2.2.0')
VERSION_2_3_0 = six.u('2.3.0')
VERSION_2_4_0 = six.u('2.4.0')
+VERSION_2_4_1 = six.u('2.4.1')
+VERSION_2_4_2 = six.u('2.4.2')
+VERSION_2_4_3 = six.u('2.4.3')
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
to raise ValueError.
"""
+ def __init__(self, warrior):
+ self.warrior = warrior
+
def _deserialize(self, key, value):
hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
lambda x: x if x != '' else None)
# Convert to local midnight
value_full = datetime.datetime.combine(value, datetime.time.min)
localized = local_zone.localize(value_full)
- elif isinstance(value, datetime.datetime) and value.tzinfo is None:
- # Convert to localized datetime object
- localized = local_zone.localize(value)
+ elif isinstance(value, datetime.datetime):
+ if value.tzinfo is None:
+ # Convert to localized datetime object
+ localized = local_zone.localize(value)
+ else:
+ # If the value is already localized, there is no need to change
+ # time zone at this point. Also None is a valid value too.
+ localized = value
+ elif (isinstance(value, six.string_types)
+ and self.warrior.version >= VERSION_2_4_0):
+ # For strings, use 'task calc' to evaluate the string to datetime
+ # available since TW 2.4.0
+ args = value.split()
+ result = self.warrior.execute_command(['calc'] + args)
+ naive = datetime.datetime.strptime(result[0], DATE_FORMAT_CALC)
+ localized = local_zone.localize(naive)
else:
- # If the value is already localized, there is no need to change
- # time zone at this point. Also None is a valid value too.
- localized = value
+ raise ValueError("Provided value could not be converted to "
+ "datetime, its type is not supported: {}"
+ .format(type(value)))
return localized
# are not propagated.
self._original_data = copy.deepcopy(self._data)
- def _update_data(self, data, update_original=False):
+ def _update_data(self, data, update_original=False, remove_missing=False):
"""
Low level update of the internal _data dict. Data which are coming as
updates should already be serialized. If update_original is True, the
self._data.update(dict((key, self._deserialize(key, value))
for key, value in data.items()))
+ # In certain situations, we want to treat missing keys as removals
+ if remove_missing:
+ for key in set(self._data.keys()) - set(data.keys()):
+ self._data[key] = None
+
if update_original:
self._original_data = copy.deepcopy(self._data)
def __init__(self, task, data={}):
self.task = task
self._load_data(data)
+ super(TaskAnnotation, self).__init__(task.warrior)
def remove(self):
self.task.remove_annotation(self)
"""
pass
+ class ActiveTask(Exception):
+ """
+ Raised when the operation cannot be performed on the active task.
+ """
+ pass
+
+ class InactiveTask(Exception):
+ """
+ Raised when the operation cannot be performed on an inactive task.
+ """
+ pass
+
class NotSaved(Exception):
"""
Raised when the operation cannot be performed on the task, because
# If this is a on-modify event, we are provided with additional
# line of input, which provides updated data
if modify:
- task._update_data(json.loads(input_file.readline().strip()))
+ task._update_data(json.loads(input_file.readline().strip()),
+ remove_missing=True)
return task
def __init__(self, warrior, **kwargs):
- self.warrior = warrior
+ super(Task, self).__init__(warrior)
# Check that user is not able to set read-only value in __init__
for key in kwargs.keys():
def pending(self):
return self['status'] == six.text_type('pending')
+ @property
+ def active(self):
+ return self['start'] is not None
+
@property
def saved(self):
return self['uuid'] is not None or self['id'] is not None
if self.warrior.version < VERSION_2_4_0:
return self._data['description']
else:
- return "description:'{0}'".format(self._data['description'] or '')
+ return six.u("description:'{0}'").format(self._data['description'] or '')
def delete(self):
if not self.saved:
raise Task.CompletedTask("Cannot start a completed task")
elif self.deleted:
raise Task.DeletedTask("Deleted task cannot be started")
+ elif self.active:
+ raise Task.ActiveTask("Task is already active")
self.warrior.execute_command([self['uuid'], 'start'])
# Refresh the status again, so that we have updated info stored
self.refresh(only_fields=['status', 'start'])
+ def stop(self):
+ if not self.saved:
+ raise Task.NotSaved("Task needs to be saved before it can be stopped")
+
+ # Refresh, and raise exception if task is already completed/deleted
+ self.refresh(only_fields=['status'])
+
+ if not self.active:
+ raise Task.InactiveTask("Cannot stop an inactive task")
+
+ self.warrior.execute_command([self['uuid'], 'stop'])
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start'])
+
def done(self):
if not self.saved:
raise Task.NotSaved("Task needs to be saved before it can be completed")
elif self.deleted:
raise Task.DeletedTask("Deleted task cannot be completed")
+ # Older versions of TW do not stop active task at completion
+ if self.warrior.version < VERSION_2_4_0 and self.active:
+ self.stop()
+
self.warrior.execute_command([self['uuid'], 'done'])
# Refresh the status again, so that we have updated info stored
if serialized_value is '':
escaped_serialized_value = ''
else:
- escaped_serialized_value = "'{0}'".format(serialized_value)
+ escaped_serialized_value = six.u("'{0}'").format(serialized_value)
- format_default = lambda: "{0}:{1}".format(field,
+ format_default = lambda: six.u("{0}:{1}").format(field,
escaped_serialized_value)
format_func = getattr(self, 'format_{0}'.format(field),
A set of parameters to filter the task list with.
"""
- def __init__(self, filter_params=[]):
+ def __init__(self, warrior, filter_params=[]):
self.filter_params = filter_params
+ super(TaskFilter, self).__init__(warrior)
def add_filter(self, filter_str):
self.filter_params.append(filter_str)
attribute_key = key.split('.')[0]
# Since this is user input, we need to normalize before we serialize
- value = self._normalize(key, value)
+ value = self._normalize(attribute_key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
modifier = '.is' if value else '.none'
key = key + modifier if '.' not in key else key
- self.filter_params.append("{0}:{1}".format(key, value))
+ self.filter_params.append(six.u("{0}:{1}").format(key, value))
def get_filter_params(self):
return [f for f in self.filter_params if f]
def clone(self):
- c = self.__class__()
+ c = self.__class__(self.warrior)
c.filter_params = list(self.filter_params)
return c
def __init__(self, warrior=None, filter_obj=None):
self.warrior = warrior
self._result_cache = None
- self.filter_obj = filter_obj or TaskFilter()
+ self.filter_obj = filter_obj or TaskFilter(warrior)
def __deepcopy__(self, memo):
"""
class TaskWarrior(object):
- def __init__(self, data_location='~/.task', create=True):
- data_location = os.path.expanduser(data_location)
- if create and not os.path.exists(data_location):
- os.makedirs(data_location)
+ def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
+ self.taskrc_location = os.path.expanduser(taskrc_location)
+
+ # If taskrc does not exist, pass / to use defaults and avoid creating
+ # dummy .taskrc file by TaskWarrior
+ if not os.path.exists(self.taskrc_location):
+ self.taskrc_location = '/'
+
+ self.version = self._get_version()
self.config = {
- 'data.location': os.path.expanduser(data_location),
'confirmation': 'no',
'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
'recurrence.confirmation': 'no', # Necessary for modifying R tasks
+
+ # Defaults to on since 2.4.5, we expect off during parsing
+ 'json.array': 'off',
+
+ # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
+ # arbitrary big number which is likely to be large enough
+ 'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
}
+
+ # Set data.location override if passed via kwarg
+ if data_location is not None:
+ data_location = os.path.expanduser(data_location)
+ if create and not os.path.exists(data_location):
+ os.makedirs(data_location)
+ self.config['data.location'] = data_location
+
self.tasks = TaskQuerySet(self)
- self.version = self._get_version()
def _get_command_args(self, args, config_override={}):
- command_args = ['task', 'rc:/']
+ command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
config = self.config.copy()
config.update(config_override)
for item in config.items():
command_args.append('rc.{0}={1}'.format(*item))
- command_args.extend(map(str, args))
+ command_args.extend(map(six.text_type, args))
return command_args
def _get_version(self):
stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
return stdout.strip('\n')
- def execute_command(self, args, config_override={}, allow_failure=True):
+ def get_config(self):
+ raw_output = self.execute_command(
+ ['show'],
+ config_override={'verbose': 'nothing'}
+ )
+
+ config = dict()
+ config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
+
+ for line in raw_output:
+ match = config_regex.match(line)
+ if match:
+ config[match.group('key')] = match.group('value').strip()
+
+ return config
+
+ def execute_command(self, args, config_override={}, allow_failure=True,
+ return_all=False):
command_args = self._get_command_args(
args, config_override=config_override)
logger.debug(' '.join(command_args))
else:
error_msg = stdout.strip()
raise TaskWarriorException(error_msg)
- return stdout.strip().split('\n')
+
+ # Return all whole triplet only if explicitly asked for
+ if not return_all:
+ return stdout.rstrip().split('\n')
+ else:
+ return (stdout.rstrip().split('\n'),
+ stderr.rstrip().split('\n'),
+ p.returncode)
def enforce_recurrence(self):
# Run arbitrary report command which will trigger generation
# of recurrent tasks.
- # TODO: Make a version dependant enforcement once
- # TW-1531 is handled
- self.execute_command(['next'], allow_failure=False)
+
+ # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
+ if self.version < VERSION_2_4_2:
+ self.execute_command(['next'], allow_failure=False)
def filter_tasks(self, filter_obj):
self.enforce_recurrence()
import pytz
import six
import shutil
+import sys
import tempfile
import unittest
'annotations',
)
+total_seconds_2_6 = lambda x: x.microseconds / 1e6 + x.seconds + x.days * 24 * 3600
+
+
class TasklibTest(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.mkdtemp(dir='.')
- self.tw = TaskWarrior(data_location=self.tmp)
+ self.tw = TaskWarrior(data_location=self.tmp, taskrc_location='/')
def tearDown(self):
shutil.rmtree(self.tmp)
self.assertRaises(Task.DeletedTask, t.done)
- def test_start_completed_task(self):
- t = Task(self.tw, description="test task")
- t.save()
- t.done()
-
- self.assertRaises(Task.CompletedTask, t.start)
-
def test_starting_task(self):
t = Task(self.tw, description="test task")
now = t.datetime_normalizer(datetime.datetime.now())
self.assertTrue(now.replace(microsecond=0) <= t['end'])
self.assertEqual(t['status'], 'deleted')
+ def test_started_task_active(self):
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.start()
+ self.assertTrue(t.active)
+
+ def test_unstarted_task_inactive(self):
+ t = Task(self.tw, description="test task")
+ self.assertFalse(t.active)
+ t.save()
+ self.assertFalse(t.active)
+
+ def test_start_active_task(self):
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.start()
+ self.assertRaises(Task.ActiveTask, t.start)
+
+ def test_stop_completed_task(self):
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.start()
+ t.done()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.done()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ def test_stop_deleted_task(self):
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.start()
+ t.delete()
+ t.stop()
+
+ def test_stop_inactive_task(self):
+ t = Task(self.tw, description="test task")
+ t.save()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ t = Task(self.tw, description="test task")
+ t.save()
+ t.start()
+ t.stop()
+
+ self.assertRaises(Task.InactiveTask, t.stop)
+
+ def test_stopping_task(self):
+ t = Task(self.tw, description="test task")
+ now = t.datetime_normalizer(datetime.datetime.now())
+ t.save()
+ t.start()
+ t.stop()
+
+ self.assertEqual(t['end'], None)
+ self.assertEqual(t['status'], 'pending')
+ self.assertFalse(t.active)
+
def test_modify_simple_attribute_without_space(self):
t = Task(self.tw, description="test")
t.save()
t.save()
self.assertEqual(len(self.tw.tasks.pending()), 2)
+ def test_modify_number_of_tasks_at_once(self):
+ for i in range(1, 100):
+ Task(self.tw, description="test task %d" % i, tags=['test']).save()
+
+ self.tw.execute_command(['+test', 'mod', 'unified', 'description'])
+
+ def test_return_all_from_executed_command(self):
+ Task(self.tw, description="test task", tags=['test']).save()
+ out, err, rc = self.tw.execute_command(['count'], return_all=True)
+ self.assertEqual(rc, 0)
+
+ def test_return_all_from_failed_executed_command(self):
+ Task(self.tw, description="test task", tags=['test']).save()
+ out, err, rc = self.tw.execute_command(['countinvalid'],
+ return_all=True, allow_failure=False)
+ self.assertNotEqual(rc, 0)
+
+
class TaskFromHookTest(TasklibTest):
input_add_data = six.StringIO(
'{"description":"Buy some milk",'
'"entry":"20141118T050231Z",'
'"status":"pending",'
+ '"start":"20141119T152233Z",'
'"uuid":"a360fc44-315c-4366-b70c-ea7e7520b749"}')
input_modify_data = six.StringIO(input_add_data.getvalue() + '\n' +
self.assertEqual(t._original_data['status'], "pending")
self.assertEqual(t._original_data['description'], "Buy some milk")
self.assertEqual(set(t._modified_fields),
- set(['status', 'description']))
+ set(['status', 'description', 'start']))
def test_export_data(self):
t = Task(self.tw, description="test task",
def test_serialize_naive_datetime(self):
t = Task(self.tw, description="task1", due=self.localtime_naive)
- self.assertEqual(json.loads(t.export_data())['due'],
+ self.assertEqual(json.loads(t.export_data())['due'],
self.utctime_aware.strftime(DATE_FORMAT))
def test_timezone_naive_date_setitem(self):
def test_serialize_naive_date(self):
t = Task(self.tw, description="task1", due=self.localdate_naive)
- self.assertEqual(json.loads(t.export_data())['due'],
+ self.assertEqual(json.loads(t.export_data())['due'],
self.utctime_aware.strftime(DATE_FORMAT))
def test_timezone_aware_datetime_setitem(self):
def test_serialize_aware_datetime(self):
t = Task(self.tw, description="task1", due=self.localtime_aware)
- self.assertEqual(json.loads(t.export_data())['due'],
+ self.assertEqual(json.loads(t.export_data())['due'],
self.utctime_aware.strftime(DATE_FORMAT))
+class DatetimeStringTest(TasklibTest):
+
+ def test_simple_now_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description="test task", due="now")
+ now = local_zone.localize(datetime.datetime.now())
+
+ # Assert that both times are not more than 5 seconds apart
+ if sys.version_info < (2,7):
+ self.assertTrue(total_seconds_2_6(now - t['due']) < 5)
+ self.assertTrue(total_seconds_2_6(t['due'] - now) < 5)
+ else:
+ self.assertTrue((now - t['due']).total_seconds() < 5)
+ self.assertTrue((t['due'] - now).total_seconds() < 5)
+
+ def test_simple_eoy_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description="test task", due="eoy")
+ now = local_zone.localize(datetime.datetime.now())
+ eoy = local_zone.localize(datetime.datetime(
+ year=now.year,
+ month=12,
+ day=31,
+ hour=23,
+ minute=59,
+ second=59
+ ))
+ self.assertEqual(eoy, t['due'])
+
+ def test_complex_eoy_conversion(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description="test task", due="eoy - 4 months")
+ now = local_zone.localize(datetime.datetime.now())
+ due_date = local_zone.localize(datetime.datetime(
+ year=now.year,
+ month=12,
+ day=31,
+ hour=23,
+ minute=59,
+ second=59
+ )) - datetime.timedelta(0,4 * 30 * 86400)
+ self.assertEqual(due_date, t['due'])
+
+ def test_filtering_with_string_datetime(self):
+ if self.tw.version < six.text_type('2.4.0'):
+ # Python2.6 does not support SkipTest. As a workaround
+ # mark the test as passed by exiting.
+ if getattr(unittest, 'SkipTest', None) is not None:
+ raise unittest.SkipTest()
+ else:
+ return
+
+ t = Task(self.tw, description="test task",
+ due=datetime.datetime.now() - datetime.timedelta(0,2))
+ t.save()
+ self.assertEqual(len(self.tw.tasks.filter(due__before="now")), 1)
+
class AnnotationTest(TasklibTest):
def setUp(self):
class UnicodeTest(TasklibTest):
def test_unicode_task(self):
- Task(self.tw, description="†åßk").save()
+ Task(self.tw, description=six.u("†åßk")).save()
self.tw.tasks.get()
+ def test_filter_by_unicode_task(self):
+ Task(self.tw, description=six.u("†åßk")).save()
+ tasks = self.tw.tasks.filter(description=six.u("†åßk"))
+ self.assertEqual(len(tasks), 1)
+
def test_non_unicode_task(self):
Task(self.tw, description="test task").save()
self.tw.tasks.get()