]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

096f656d7d9e903ecebd3b6a2c36f58b568afb79
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32         # We need to use a copy for original data, so that changes
33         # are not propagated
34         self._original_data = data.copy()
35
36     def __getitem__(self, key):
37         # This is a workaround to make TaskResource non-iterable
38         # over simple index-based iteration
39         try:
40             int(key)
41             raise StopIteration
42         except ValueError:
43             pass
44
45         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
46                                lambda x: x)
47         return hydrate_func(self._data.get(key))
48
49     def __setitem__(self, key, value):
50         if key in self.read_only_fields:
51             raise RuntimeError('Field \'%s\' is read-only' % key)
52         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
53                                  lambda x: x)
54         self._data[key] = dehydrate_func(value)
55
56     def __str__(self):
57         s = six.text_type(self.__unicode__())
58         if not six.PY3:
59             s = s.encode('utf-8')
60         return s
61
62     def __repr__(self):
63         return str(self)
64
65
66 class TaskAnnotation(TaskResource):
67     read_only_fields = ['entry', 'description']
68
69     def __init__(self, task, data={}):
70         self.task = task
71         self._load_data(data)
72
73     def deserialize_entry(self, data):
74         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
75
76     def serialize_entry(self, date):
77         return date.strftime(DATE_FORMAT) if date else ''
78
79     def remove(self):
80         self.task.remove_annotation(self)
81
82     def __unicode__(self):
83         return self['description']
84
85     __repr__ = __unicode__
86
87
88 class Task(TaskResource):
89     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
90
91     class DoesNotExist(Exception):
92         pass
93
94     class CompletedTask(Exception):
95         """
96         Raised when the operation cannot be performed on the completed task.
97         """
98         pass
99
100     class DeletedTask(Exception):
101         """
102         Raised when the operation cannot be performed on the deleted task.
103         """
104         pass
105
106     class NotSaved(Exception):
107         """
108         Raised when the operation cannot be performed on the task, because
109         it has not been saved to TaskWarrior yet.
110         """
111         pass
112
113     def __init__(self, warrior, data={}, **kwargs):
114         self.warrior = warrior
115
116         # We keep data for backwards compatibility
117         kwargs.update(data)
118
119         self._load_data(kwargs)
120
121     def __unicode__(self):
122         return self['description']
123
124     def __eq__(self, other):
125         return self['uuid'] == other['uuid']
126
127     def __hash__(self):
128         return self['uuid'].__hash__()
129
130     @property
131     def _modified_fields(self):
132         for key in self._data.keys():
133             if self._data.get(key) != self._original_data.get(key):
134                 yield key
135
136     @property
137     def completed(self):
138         return self['status'] == six.text_type('completed')
139
140     @property
141     def deleted(self):
142         return self['status'] == six.text_type('deleted')
143
144     @property
145     def waiting(self):
146         return self['status'] == six.text_type('waiting')
147
148     @property
149     def pending(self):
150         return self['status'] == six.text_type('pending')
151
152     @property
153     def saved(self):
154         return self['uuid'] is not None or self['id'] is not None
155
156     def serialize_due(self, date):
157         return date.strftime(DATE_FORMAT)
158
159     def deserialize_due(self, date_str):
160         if not date_str:
161             return None
162         return datetime.datetime.strptime(date_str, DATE_FORMAT)
163
164     def deserialize_annotations(self, data):
165         return [TaskAnnotation(self, d) for d in data] if data else []
166
167     def deserialize_tags(self, tags):
168         if isinstance(tags, basestring):
169             return tags.split(',') if tags else []
170         return tags
171
172     def serialize_tags(self, tags):
173         return ','.join(tags) if tags else ''
174
175     def delete(self):
176         if not self.saved:
177             raise Task.NotSaved("Task needs to be saved before it can be deleted")
178
179         # Refresh the status, and raise exception if the task is deleted
180         self.refresh(only_fields=['status'])
181
182         if self.deleted:
183             raise Task.DeletedTask("Task was already deleted")
184
185         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
186             'confirmation': 'no',
187         })
188
189         # Refresh the status again, so that we have updated info stored
190         self.refresh(only_fields=['status'])
191
192
193     def done(self):
194         if not self.saved:
195             raise Task.NotSaved("Task needs to be saved before it can be completed")
196
197         # Refresh, and raise exception if task is already completed/deleted
198         self.refresh(only_fields=['status'])
199
200         if self.completed:
201             raise Task.CompletedTask("Cannot complete a completed task")
202         elif self.deleted:
203             raise Task.DeletedTask("Deleted task cannot be completed")
204
205         self.warrior.execute_command([self['uuid'], 'done'])
206
207         # Refresh the status again, so that we have updated info stored
208         self.refresh(only_fields=['status'])
209
210     def save(self):
211         args = [self['uuid'], 'modify'] if self.saved else ['add']
212         args.extend(self._get_modified_fields_as_args())
213         output = self.warrior.execute_command(args)
214
215         # Parse out the new ID, if the task is being added for the first time
216         if not self.saved:
217             id_lines = [l for l in output if l.startswith('Created task ')]
218
219             # Complain loudly if it seems that more tasks were created
220             # Should not happen
221             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
222                 raise TaskWarriorException("Unexpected output when creating "
223                                            "task: %s" % '\n'.join(id_lines))
224
225             # Circumvent the ID storage, since ID is considered read-only
226             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
227
228         self.refresh()
229
230     def add_annotation(self, annotation):
231         if not self.saved:
232             raise Task.NotSaved("Task needs to be saved to add annotation")
233
234         args = [self['uuid'], 'annotate', annotation]
235         self.warrior.execute_command(args)
236         self.refresh(only_fields=['annotations'])
237
238     def remove_annotation(self, annotation):
239         if not self.saved:
240             raise Task.NotSaved("Task needs to be saved to add annotation")
241
242         if isinstance(annotation, TaskAnnotation):
243             annotation = annotation['description']
244         args = [self['uuid'], 'denotate', annotation]
245         self.warrior.execute_command(args)
246         self.refresh(only_fields=['annotations'])
247
248     def _get_modified_fields_as_args(self):
249         args = []
250
251         def add_field(field):
252             # Task version older than 2.4.0 ignores first word of the
253             # task description if description: prefix is used
254             if self.warrior.version < VERSION_2_4_0 and field == 'description':
255                 args.append(self._data[field])
256             else:
257                 args.append('{0}:{1}'.format(field, self._data[field]))
258
259         # If we're modifying saved task, simply pass on all modified fields
260         if self.saved:
261             for field in self._modified_fields:
262                 add_field(field)
263         # For new tasks, pass all fields that make sense
264         else:
265             for field in self._data.keys():
266                 if field in self.read_only_fields:
267                     continue
268                 add_field(field)
269
270         return args
271
272     def refresh(self, only_fields=[]):
273         # Raise error when trying to refresh a task that has not been saved
274         if not self.saved:
275             raise Task.NotSaved("Task needs to be saved to be refreshed")
276
277         # We need to use ID as backup for uuid here for the refreshes
278         # of newly saved tasks. Any other place in the code is fine
279         # with using UUID only.
280         args = [self['uuid'] or self['id'], 'export']
281         new_data = json.loads(self.warrior.execute_command(args)[0])
282         if only_fields:
283             to_update = dict(
284                 [(k, new_data.get(k)) for k in only_fields])
285             self._data.update(to_update)
286             self._original_data.update(to_update)
287         else:
288             self._data = new_data
289             # We need to create a clone for original_data though
290             # Shallow copy is alright, since data dict uses only
291             # primitive data types
292             self._original_data = new_data.copy()
293
294
295 class TaskFilter(object):
296     """
297     A set of parameters to filter the task list with.
298     """
299
300     def __init__(self, filter_params=[]):
301         self.filter_params = filter_params
302
303     def add_filter(self, filter_str):
304         self.filter_params.append(filter_str)
305
306     def add_filter_param(self, key, value):
307         key = key.replace('__', '.')
308
309         # Replace the value with empty string, since that is the
310         # convention in TW for empty values
311         value = value if value is not None else ''
312
313         # If we are filtering by uuid:, do not use uuid keyword
314         # due to TW-1452 bug
315         if key == 'uuid':
316             self.filter_params.insert(0, value)
317         else:
318             self.filter_params.append('{0}:{1}'.format(key, value))
319
320     def get_filter_params(self):
321         return [f for f in self.filter_params if f]
322
323     def clone(self):
324         c = self.__class__()
325         c.filter_params = list(self.filter_params)
326         return c
327
328
329 class TaskQuerySet(object):
330     """
331     Represents a lazy lookup for a task objects.
332     """
333
334     def __init__(self, warrior=None, filter_obj=None):
335         self.warrior = warrior
336         self._result_cache = None
337         self.filter_obj = filter_obj or TaskFilter()
338
339     def __deepcopy__(self, memo):
340         """
341         Deep copy of a QuerySet doesn't populate the cache
342         """
343         obj = self.__class__()
344         for k, v in self.__dict__.items():
345             if k in ('_iter', '_result_cache'):
346                 obj.__dict__[k] = None
347             else:
348                 obj.__dict__[k] = copy.deepcopy(v, memo)
349         return obj
350
351     def __repr__(self):
352         data = list(self[:REPR_OUTPUT_SIZE + 1])
353         if len(data) > REPR_OUTPUT_SIZE:
354             data[-1] = "...(remaining elements truncated)..."
355         return repr(data)
356
357     def __len__(self):
358         if self._result_cache is None:
359             self._result_cache = list(self)
360         return len(self._result_cache)
361
362     def __iter__(self):
363         if self._result_cache is None:
364             self._result_cache = self._execute()
365         return iter(self._result_cache)
366
367     def __getitem__(self, k):
368         if self._result_cache is None:
369             self._result_cache = list(self)
370         return self._result_cache.__getitem__(k)
371
372     def __bool__(self):
373         if self._result_cache is not None:
374             return bool(self._result_cache)
375         try:
376             next(iter(self))
377         except StopIteration:
378             return False
379         return True
380
381     def __nonzero__(self):
382         return type(self).__bool__(self)
383
384     def _clone(self, klass=None, **kwargs):
385         if klass is None:
386             klass = self.__class__
387         filter_obj = self.filter_obj.clone()
388         c = klass(warrior=self.warrior, filter_obj=filter_obj)
389         c.__dict__.update(kwargs)
390         return c
391
392     def _execute(self):
393         """
394         Fetch the tasks which match the current filters.
395         """
396         return self.warrior.filter_tasks(self.filter_obj)
397
398     def all(self):
399         """
400         Returns a new TaskQuerySet that is a copy of the current one.
401         """
402         return self._clone()
403
404     def pending(self):
405         return self.filter(status=PENDING)
406
407     def completed(self):
408         return self.filter(status=COMPLETED)
409
410     def filter(self, *args, **kwargs):
411         """
412         Returns a new TaskQuerySet with the given filters added.
413         """
414         clone = self._clone()
415         for f in args:
416             clone.filter_obj.add_filter(f)
417         for key, value in kwargs.items():
418             clone.filter_obj.add_filter_param(key, value)
419         return clone
420
421     def get(self, **kwargs):
422         """
423         Performs the query and returns a single object matching the given
424         keyword arguments.
425         """
426         clone = self.filter(**kwargs)
427         num = len(clone)
428         if num == 1:
429             return clone._result_cache[0]
430         if not num:
431             raise Task.DoesNotExist(
432                 'Task matching query does not exist. '
433                 'Lookup parameters were {0}'.format(kwargs))
434         raise ValueError(
435             'get() returned more than one Task -- it returned {0}! '
436             'Lookup parameters were {1}'.format(num, kwargs))
437
438
439 class TaskWarrior(object):
440     def __init__(self, data_location='~/.task', create=True):
441         data_location = os.path.expanduser(data_location)
442         if create and not os.path.exists(data_location):
443             os.makedirs(data_location)
444         self.config = {
445             'data.location': os.path.expanduser(data_location),
446         }
447         self.tasks = TaskQuerySet(self)
448         self.version = self._get_version()
449
450     def _get_command_args(self, args, config_override={}):
451         command_args = ['task', 'rc:/']
452         config = self.config.copy()
453         config.update(config_override)
454         for item in config.items():
455             command_args.append('rc.{0}={1}'.format(*item))
456         command_args.extend(map(str, args))
457         return command_args
458
459     def _get_version(self):
460         p = subprocess.Popen(
461                 ['task', '--version'],
462                 stdout=subprocess.PIPE,
463                 stderr=subprocess.PIPE)
464         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
465         return stdout.strip('\n')
466
467     def execute_command(self, args, config_override={}):
468         command_args = self._get_command_args(
469             args, config_override=config_override)
470         logger.debug(' '.join(command_args))
471         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
472                              stderr=subprocess.PIPE)
473         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
474         if p.returncode:
475             if stderr.strip():
476                 error_msg = stderr.strip().splitlines()[-1]
477             else:
478                 error_msg = stdout.strip()
479             raise TaskWarriorException(error_msg)
480         return stdout.strip().split('\n')
481
482     def filter_tasks(self, filter_obj):
483         args = ['export', '--'] + filter_obj.get_filter_params()
484         tasks = []
485         for line in self.execute_command(args):
486             if line:
487                 data = line.strip(',')
488                 try:
489                     tasks.append(Task(self, json.loads(data)))
490                 except ValueError:
491                     raise TaskWarriorException('Invalid JSON: %s' % data)
492         return tasks
493
494     def merge_with(self, path, push=False):
495         path = path.rstrip('/') + '/'
496         self.execute_command(['merge', path], config_override={
497             'merge.autopush': 'yes' if push else 'no',
498         })
499
500     def undo(self):
501         self.execute_command(['undo'], config_override={
502             'confirmation': 'no',
503         })