VERSION_2_2_0 = six.u('2.2.0')
VERSION_2_3_0 = six.u('2.3.0')
VERSION_2_4_0 = six.u('2.4.0')
+VERSION_2_4_1 = six.u('2.4.1')
+VERSION_2_4_2 = six.u('2.4.2')
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
def normalize_modified(self, value):
return self.datetime_normalizer(value)
+ def serialize_start(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_start(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_start(self, value):
+ return self.datetime_normalizer(value)
+
+ def serialize_end(self, value):
+ return self.timestamp_serializer(value)
+
+ def deserialize_end(self, value):
+ return self.timestamp_deserializer(value)
+
+ def normalize_end(self, value):
+ return self.datetime_normalizer(value)
+
def serialize_due(self, value):
return self.timestamp_serializer(value)
self.warrior.execute_command([self['uuid'], 'delete'])
# Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start', 'end'])
+
+ def start(self):
+ if not self.saved:
+ raise Task.NotSaved("Task needs to be saved before it can be started")
+
+ # Refresh, and raise exception if task is already completed/deleted
self.refresh(only_fields=['status'])
+ if self.completed:
+ raise Task.CompletedTask("Cannot start a completed task")
+ elif self.deleted:
+ raise Task.DeletedTask("Deleted task cannot be started")
+
+ self.warrior.execute_command([self['uuid'], 'start'])
+
+ # Refresh the status again, so that we have updated info stored
+ self.refresh(only_fields=['status', 'start'])
def done(self):
if not self.saved:
self.warrior.execute_command([self['uuid'], 'done'])
# Refresh the status again, so that we have updated info stored
- self.refresh(only_fields=['status'])
+ self.refresh(only_fields=['status', 'start', 'end'])
def save(self):
if self.saved and not self.modified:
attribute_key = key.split('.')[0]
# Since this is user input, we need to normalize before we serialize
- value = self._normalize(key, value)
+ value = self._normalize(attribute_key, value)
value = self._serialize(attribute_key, value)
# If we are filtering by uuid:, do not use uuid keyword
class TaskWarrior(object):
- def __init__(self, data_location='~/.task', create=True):
+ def __init__(self, data_location='~/.task', create=True, taskrc_location='~/.taskrc'):
data_location = os.path.expanduser(data_location)
+ self.taskrc_location = os.path.expanduser(taskrc_location)
+
+ # If taskrc does not exist, pass / to use defaults and avoid creating
+ # dummy .taskrc file by TaskWarrior
+ if not os.path.exists(self.taskrc_location):
+ self.taskrc_location = '/'
+
if create and not os.path.exists(data_location):
os.makedirs(data_location)
+
self.config = {
- 'data.location': os.path.expanduser(data_location),
+ 'data.location': data_location,
'confirmation': 'no',
'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
'recurrence.confirmation': 'no', # Necessary for modifying R tasks
self.version = self._get_version()
def _get_command_args(self, args, config_override={}):
- command_args = ['task', 'rc:/']
+ command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
config = self.config.copy()
config.update(config_override)
for item in config.items():
def enforce_recurrence(self):
# Run arbitrary report command which will trigger generation
# of recurrent tasks.
- # TODO: Make a version dependant enforcement once
- # TW-1531 is handled
- self.execute_command(['next'], allow_failure=False)
+
+ # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
+ if self.version < VERSION_2_4_2:
+ self.execute_command(['next'], allow_failure=False)
def filter_tasks(self, filter_obj):
self.enforce_recurrence()