return super(Task, self).serialize_depends(cur_dependencies)
- def format_depends(self):
- # We need to generate added and removed dependencies list,
- # since Taskwarrior does not accept redefining dependencies.
-
- # This cannot be part of serialize_depends, since we need
- # to keep a list of all depedencies in the _data dictionary,
- # not just currently added/removed ones
-
- old_dependencies = self._original_data.get('depends', set())
-
- added = self['depends'] - old_dependencies
- removed = old_dependencies - self['depends']
-
- # Removed dependencies need to be prefixed with '-'
- return 'depends:' + ','.join(
- [t['uuid'] for t in added] +
- ['-' + t['uuid'] for t in removed]
- )
-
- def format_description(self):
- # Task version older than 2.4.0 ignores first word of the
- # task description if description: prefix is used
- if self.warrior.version < VERSION_2_4_0:
- return self._data['description']
- else:
- return six.u("description:'{0}'").format(self._data['description'] or '')
-
def delete(self):
if not self.saved:
raise Task.NotSaved("Task needs to be saved before it can be deleted")
if not self.saved:
raise Task.NotSaved("Task needs to be saved to add annotation")
- args = [self['uuid'], 'annotate', annotation]
- self.warrior.execute_command(args)
+ self.backend.annotate_task(self, annotation)
self.refresh(only_fields=['annotations'])
def remove_annotation(self, annotation):
if isinstance(annotation, TaskAnnotation):
annotation = annotation['description']
- args = [self['uuid'], 'denotate', annotation]
- self.warrior.execute_command(args)
- self.refresh(only_fields=['annotations'])
-
- def _get_modified_fields_as_args(self):
- args = []
-
- def add_field(field):
- # Add the output of format_field method to args list (defaults to
- # field:value)
- serialized_value = self._serialize(field, self._data[field])
-
- # Empty values should not be enclosed in quotation marks, see
- # TW-1510
- if serialized_value is '':
- escaped_serialized_value = ''
- else:
- escaped_serialized_value = six.u("'{0}'").format(serialized_value)
- format_default = lambda: six.u("{0}:{1}").format(field,
- escaped_serialized_value)
-
- format_func = getattr(self, 'format_{0}'.format(field),
- format_default)
-
- args.append(format_func())
-
- # If we're modifying saved task, simply pass on all modified fields
- if self.saved:
- for field in self._modified_fields:
- add_field(field)
- # For new tasks, pass all fields that make sense
- else:
- for field in self._data.keys():
- if field in self.read_only_fields:
- continue
- add_field(field)
-
- return args
+ self.backend.denotate_task(self, annotation)
+ self.refresh(only_fields=['annotations'])
def refresh(self, only_fields=None, after_save=False):
# Raise error when trying to refresh a task that has not been saved
if not self.saved:
raise Task.NotSaved("Task needs to be saved to be refreshed")
- # We need to use ID as backup for uuid here for the refreshes
- # of newly saved tasks. Any other place in the code is fine
- # with using UUID only.
- args = [self['uuid'] or self['id'], 'export']
- output = self.warrior.execute_command(args)
-
- def valid(output):
- return len(output) == 1 and output[0].startswith('{')
-
- # For older TW versions attempt to uniquely locate the task
- # using the data we have if it has been just saved.
- # This can happen when adding a completed task on older TW versions.
- if (not valid(output) and self.warrior.version < VERSION_2_4_5
- and after_save):
-
- # Make a copy, removing ID and UUID. It's most likely invalid
- # (ID 0) if it failed to match a unique task.
- data = copy.deepcopy(self._data)
- data.pop('id', None)
- data.pop('uuid', None)
-
- taskfilter = TaskFilter(self.warrior)
- for key, value in data.items():
- taskfilter.add_filter_param(key, value)
-
- output = self.warrior.execute_command(['export', '--'] +
- taskfilter.get_filter_params())
-
- # If more than 1 task has been matched still, raise an exception
- if not valid(output):
- raise TaskWarriorException(
- "Unique identifiers {0} with description: {1} matches "
- "multiple tasks: {2}".format(
- self['uuid'] or self['id'], self['description'], output)
- )
-
- new_data = json.loads(output[0])
+ new_data = self.backend.refresh_task(self, after_save=after_save)
+
if only_fields:
to_update = dict(
[(k, new_data.get(k)) for k in only_fields])