return super(Task, self).serialize_depends(cur_dependencies)
- def format_depends(self):
- # We need to generate added and removed dependencies list,
- # since Taskwarrior does not accept redefining dependencies.
-
- # This cannot be part of serialize_depends, since we need
- # to keep a list of all depedencies in the _data dictionary,
- # not just currently added/removed ones
-
- old_dependencies = self._original_data.get('depends', set())
-
- added = self['depends'] - old_dependencies
- removed = old_dependencies - self['depends']
-
- # Removed dependencies need to be prefixed with '-'
- return 'depends:' + ','.join(
- [t['uuid'] for t in added] +
- ['-' + t['uuid'] for t in removed]
- )
-
- def format_description(self):
- # Task version older than 2.4.0 ignores first word of the
- # task description if description: prefix is used
- if self.warrior.version < VERSION_2_4_0:
- return self._data['description']
- else:
- return six.u("description:'{0}'").format(self._data['description'] or '')
-
def delete(self):
if not self.saved:
raise Task.NotSaved("Task needs to be saved before it can be deleted")
if self.deleted:
raise Task.DeletedTask("Task was already deleted")
- self.warrior.execute_command([self['uuid'], 'delete'])
+ self.backend.delete_task(self)
# Refresh the status again, so that we have updated info stored
self.refresh(only_fields=['status', 'start', 'end'])
elif self.active:
raise Task.ActiveTask("Task is already active")
- self.warrior.execute_command([self['uuid'], 'start'])
+ self.backend.start_task(self)
# Refresh the status again, so that we have updated info stored
self.refresh(only_fields=['status', 'start'])
if not self.active:
raise Task.InactiveTask("Cannot stop an inactive task")
- self.warrior.execute_command([self['uuid'], 'stop'])
+ self.backend.stop_task(self)
# Refresh the status again, so that we have updated info stored
self.refresh(only_fields=['status', 'start'])
elif self.deleted:
raise Task.DeletedTask("Deleted task cannot be completed")
- # Older versions of TW do not stop active task at completion
- if self.warrior.version < VERSION_2_4_0 and self.active:
- self.stop()
-
- self.warrior.execute_command([self['uuid'], 'done'])
+ self.backend.complete_task(self)
# Refresh the status again, so that we have updated info stored
self.refresh(only_fields=['status', 'start', 'end'])
if self.saved and not self.modified:
return
- args = [self['uuid'], 'modify'] if self.saved else ['add']
- args.extend(self._get_modified_fields_as_args())
- output = self.warrior.execute_command(args)
-
- # Parse out the new ID, if the task is being added for the first time
- if not self.saved:
- id_lines = [l for l in output if l.startswith('Created task ')]
-
- # Complain loudly if it seems that more tasks were created
- # Should not happen
- if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
- raise TaskWarriorException("Unexpected output when creating "
- "task: %s" % '\n'.join(id_lines))
-
- # Circumvent the ID storage, since ID is considered read-only
- identifier = id_lines[0].split(' ')[2].rstrip('.')
-
- # Identifier can be either ID or UUID for completed tasks
- try:
- self._data['id'] = int(identifier)
- except ValueError:
- self._data['uuid'] = identifier
-
- # Refreshing is very important here, as not only modification time
- # is updated, but arbitrary attribute may have changed due hooks
- # altering the data before saving
- self.refresh(after_save=True)
+ # All the actual work is done by the backend
+ self.backend.save_task(self)
def add_annotation(self, annotation):
if not self.saved:
raise Task.NotSaved("Task needs to be saved to add annotation")
- args = [self['uuid'], 'annotate', annotation]
- self.warrior.execute_command(args)
+ self.backend.annotate_task(self, annotation)
self.refresh(only_fields=['annotations'])
def remove_annotation(self, annotation):
if isinstance(annotation, TaskAnnotation):
annotation = annotation['description']
- args = [self['uuid'], 'denotate', annotation]
- self.warrior.execute_command(args)
- self.refresh(only_fields=['annotations'])
-
- def _get_modified_fields_as_args(self):
- args = []
- def add_field(field):
- # Add the output of format_field method to args list (defaults to
- # field:value)
- serialized_value = self._serialize(field, self._data[field])
-
- # Empty values should not be enclosed in quotation marks, see
- # TW-1510
- if serialized_value is '':
- escaped_serialized_value = ''
- else:
- escaped_serialized_value = six.u("'{0}'").format(serialized_value)
-
- format_default = lambda: six.u("{0}:{1}").format(field,
- escaped_serialized_value)
-
- format_func = getattr(self, 'format_{0}'.format(field),
- format_default)
-
- args.append(format_func())
-
- # If we're modifying saved task, simply pass on all modified fields
- if self.saved:
- for field in self._modified_fields:
- add_field(field)
- # For new tasks, pass all fields that make sense
- else:
- for field in self._data.keys():
- if field in self.read_only_fields:
- continue
- add_field(field)
-
- return args
+ self.backend.denotate_task(self, annotation)
+ self.refresh(only_fields=['annotations'])
def refresh(self, only_fields=None, after_save=False):
# Raise error when trying to refresh a task that has not been saved
if not self.saved:
raise Task.NotSaved("Task needs to be saved to be refreshed")
- # We need to use ID as backup for uuid here for the refreshes
- # of newly saved tasks. Any other place in the code is fine
- # with using UUID only.
- args = [self['uuid'] or self['id'], 'export']
- output = self.warrior.execute_command(args)
-
- def valid(output):
- return len(output) == 1 and output[0].startswith('{')
-
- # For older TW versions attempt to uniquely locate the task
- # using the data we have if it has been just saved.
- # This can happen when adding a completed task on older TW versions.
- if (not valid(output) and self.warrior.version < VERSION_2_4_5
- and after_save):
-
- # Make a copy, removing ID and UUID. It's most likely invalid
- # (ID 0) if it failed to match a unique task.
- data = copy.deepcopy(self._data)
- data.pop('id', None)
- data.pop('uuid', None)
-
- taskfilter = TaskFilter(self.warrior)
- for key, value in data.items():
- taskfilter.add_filter_param(key, value)
-
- output = self.warrior.execute_command(['export', '--'] +
- taskfilter.get_filter_params())
-
- # If more than 1 task has been matched still, raise an exception
- if not valid(output):
- raise TaskWarriorException(
- "Unique identifiers {0} with description: {1} matches "
- "multiple tasks: {2}".format(
- self['uuid'] or self['id'], self['description'], output)
- )
-
- new_data = json.loads(output[0])
+ new_data = self.backend.refresh_task(self, after_save=after_save)
+
if only_fields:
to_update = dict(
[(k, new_data.get(k)) for k in only_fields])