X-Git-Url: https://git.madduck.net/etc/taskwarrior.git/blobdiff_plain/4ed5af30a93bb2840e437230e87422a8bebc9b74..3c47c1b8537fcd2557ee86374b8e234b521eee8f:/tasklib/task.py diff --git a/tasklib/task.py b/tasklib/task.py index de0443c..f2b020d 100644 --- a/tasklib/task.py +++ b/tasklib/task.py @@ -1,8 +1,10 @@ +from __future__ import print_function import copy import datetime import json import logging import os +import six import subprocess DATE_FORMAT = '%Y%m%dT%H%M%SZ' @@ -36,8 +38,14 @@ class TaskResource(object): self._data[key] = dehydrate_func(value) self._modified_fields.add(key) + def __str__(self): + s = six.text_type(self.__unicode__()) + if not six.PY3: + s = s.encode('utf-8') + return s + def __repr__(self): - return self.__unicode__() + return str(self) class TaskAnnotation(TaskResource): @@ -63,11 +71,23 @@ class TaskAnnotation(TaskResource): class Task(TaskResource): - read_only_fields = ['id', 'entry', 'urgency'] + read_only_fields = ['id', 'entry', 'urgency', 'uuid'] class DoesNotExist(Exception): pass + class CompletedTask(Exception): + """ + Raised when the operation cannot be performed on the completed task. + """ + pass + + class DeletedTask(Exception): + """ + Raised when the operation cannot be performed on the deleted task. + """ + pass + def __init__(self, warrior, data={}): self.warrior = warrior self._load_data(data) @@ -76,6 +96,22 @@ class Task(TaskResource): def __unicode__(self): return self['description'] + @property + def completed(self): + return self['status'] == six.text_type('completed') + + @property + def deleted(self): + return self['status'] == six.text_type('deleted') + + @property + def waiting(self): + return self['status'] == six.text_type('waiting') + + @property + def pending(self): + return self['status'] == six.text_type('pending') + def serialize_due(self, date): return date.strftime(DATE_FORMAT) @@ -96,28 +132,49 @@ class Task(TaskResource): return ','.join(tags) if tags else '' def delete(self): - self.warrior.execute_command([self['id'], 'delete'], config_override={ + # Refresh the status, and raise exception if the task is deleted + self.refresh(only_fields=['status']) + + if self.deleted: + raise self.DeletedTask("Task was already deleted") + + self.warrior.execute_command([self['uuid'], 'delete'], config_override={ 'confirmation': 'no', }) + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status']) + + def done(self): - self.warrior.execute_command([self['id'], 'done']) + # Refresh, and raise exception if task is already completed/deleted + self.refresh(only_fields=['status']) + + if self.completed: + raise self.CompletedTask("Cannot complete a completed task") + elif self.deleted: + raise self.DeletedTask("Deleted task cannot be completed") + + self.warrior.execute_command([self['uuid'], 'done']) + + # Refresh the status again, so that we have updated info stored + self.refresh(only_fields=['status']) def save(self): - args = [self['id'], 'modify'] if self['id'] else ['add'] + args = [self['uuid'], 'modify'] if self['uuid'] else ['add'] args.extend(self._get_modified_fields_as_args()) self.warrior.execute_command(args) self._modified_fields.clear() def add_annotation(self, annotation): - args = [self['id'], 'annotate', annotation] + args = [self['uuid'], 'annotate', annotation] self.warrior.execute_command(args) self.refresh(only_fields=['annotations']) def remove_annotation(self, annotation): if isinstance(annotation, TaskAnnotation): annotation = annotation['description'] - args = [self['id'], 'denotate', annotation] + args = [self['uuid'], 'denotate', annotation] self.warrior.execute_command(args) self.refresh(only_fields=['annotations']) @@ -151,6 +208,10 @@ class TaskFilter(object): def add_filter_param(self, key, value): key = key.replace('__', '.') + + # Replace the value with empty string, since that is the + # convention in TW for empty values + value = value if value is not None else '' self.filter_params.append('{0}:{1}'.format(key, value)) def get_filter_params(self): @@ -297,7 +358,7 @@ class TaskWarrior(object): logger.debug(' '.join(command_args)) p = subprocess.Popen(command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = [x.decode() for x in p.communicate()] + stdout, stderr = [x.decode('utf-8') for x in p.communicate()] if p.returncode: if stderr.strip(): error_msg = stderr.strip().splitlines()[-1]