VERSION_2_4_1 = six.u('2.4.1')
VERSION_2_4_2 = six.u('2.4.2')
VERSION_2_4_3 = six.u('2.4.3')
+VERSION_2_4_4 = six.u('2.4.4')
+VERSION_2_4_5 = six.u('2.4.5')
logger = logging.getLogger(__name__)
local_zone = tzlocal.get_localzone()
return ','.join(task['uuid'] for task in value)
def deserialize_depends(self, raw_uuids):
- raw_uuids = raw_uuids or '' # Convert None to empty string
- uuids = raw_uuids.split(',')
+ raw_uuids = raw_uuids or [] # Convert None to empty list
+
+ # TW 2.4.4 encodes list of dependencies as a single string
+ if type(raw_uuids) is not list:
+ uuids = raw_uuids.split(',')
+ # TW 2.4.5 and later exports them as a list, no conversion needed
+ else:
+ uuids = raw_uuids
+
return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
def datetime_normalizer(self, value):
class TaskAnnotation(TaskResource):
read_only_fields = ['entry', 'description']
- def __init__(self, task, data={}):
+ def __init__(self, task, data=None):
self.task = task
- self._load_data(data)
+ self._load_data(data or dict())
super(TaskAnnotation, self).__init__(task.warrior)
def remove(self):
"task: %s" % '\n'.join(id_lines))
# Circumvent the ID storage, since ID is considered read-only
- self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
+ identifier = id_lines[0].split(' ')[2].rstrip('.')
+
+ # Identifier can be either ID or UUID for completed tasks
+ try:
+ self._data['id'] = int(identifier)
+ except ValueError:
+ self._data['uuid'] = identifier
# Refreshing is very important here, as not only modification time
# is updated, but arbitrary attribute may have changed due hooks
# altering the data before saving
- self.refresh()
+ self.refresh(after_save=True)
def add_annotation(self, annotation):
if not self.saved:
return args
- def refresh(self, only_fields=[]):
+ def refresh(self, only_fields=None, after_save=False):
# Raise error when trying to refresh a task that has not been saved
if not self.saved:
raise Task.NotSaved("Task needs to be saved to be refreshed")
# of newly saved tasks. Any other place in the code is fine
# with using UUID only.
args = [self['uuid'] or self['id'], 'export']
- new_data = json.loads(self.warrior.execute_command(args)[0])
+ output = self.warrior.execute_command(args)
+
+ def valid(output):
+ return len(output) == 1 and output[0].startswith('{')
+
+ # For older TW versions attempt to uniquely locate the task
+ # using the data we have if it has been just saved.
+ # This can happen when adding a completed task on older TW versions.
+ if (not valid(output) and self.warrior.version < VERSION_2_4_5
+ and after_save):
+
+ # Make a copy, removing ID and UUID. It's most likely invalid
+ # (ID 0) if it failed to match a unique task.
+ data = copy.deepcopy(self._data)
+ data.pop('id', None)
+ data.pop('uuid', None)
+
+ taskfilter = TaskFilter(self.warrior)
+ for key, value in data.items():
+ taskfilter.add_filter_param(key, value)
+
+ output = self.warrior.execute_command(['export', '--'] +
+ taskfilter.get_filter_params())
+
+ # If more than 1 task has been matched still, raise an exception
+ if not valid(output):
+ raise TaskWarriorException(
+ "Unique identifiers {0} with description: {1} matches "
+ "multiple tasks: {2}".format(
+ self['uuid'] or self['id'], self['description'], output)
+ )
+
+ new_data = json.loads(output[0])
if only_fields:
to_update = dict(
[(k, new_data.get(k)) for k in only_fields])
A set of parameters to filter the task list with.
"""
- def __init__(self, warrior, filter_params=[]):
- self.filter_params = filter_params
+ def __init__(self, warrior, filter_params=None):
+ self.filter_params = filter_params or []
super(TaskFilter, self).__init__(warrior)
def add_filter(self, filter_str):
# We enforce equality match by using 'is' (or 'none') modifier
# Without using this syntax, filter fails due to TW-1479
- modifier = '.is' if value else '.none'
- key = key + modifier if '.' not in key else key
+ # which is, however, fixed in 2.4.5
+ if self.warrior.version < VERSION_2_4_5:
+ modifier = '.is' if value else '.none'
+ key = key + modifier if '.' not in key else key
self.filter_params.append(six.u("{0}:{1}").format(key, value))
'confirmation': 'no',
'dependency.confirmation': 'no', # See TW-1483 or taskrc man page
'recurrence.confirmation': 'no', # Necessary for modifying R tasks
+
+ # Defaults to on since 2.4.5, we expect off during parsing
+ 'json.array': 'off',
+
# 2.4.3 onwards supports 0 as infite bulk, otherwise set just
# arbitrary big number which is likely to be large enough
'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
self.tasks = TaskQuerySet(self)
- def _get_command_args(self, args, config_override={}):
+ def _get_command_args(self, args, config_override=None):
command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
config = self.config.copy()
- config.update(config_override)
+ config.update(config_override or dict())
for item in config.items():
command_args.append('rc.{0}={1}'.format(*item))
command_args.extend(map(six.text_type, args))
)
config = dict()
- config_regex = re.compile(r'^(?P<key>[_a-z\.]+)\s+(?P<value>[^\s].+$)')
+ config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
for line in raw_output:
match = config_regex.match(line)
return config
- def execute_command(self, args, config_override={}, allow_failure=True,
+ def execute_command(self, args, config_override=None, allow_failure=True,
return_all=False):
command_args = self._get_command_args(
args, config_override=config_override)