]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Task: Provide a generic framework for format_* methods
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32         # We need to use a copy for original data, so that changes
33         # are not propagated. Shallow copy is alright, since data dict uses only
34         # primitive data types
35         self._original_data = data.copy()
36
37     def _update_data(self, data, update_original=False):
38         """
39         Low level update of the internal _data dict. Data which are coming as
40         updates should already be serialized. If update_original is True, the
41         original_data dict is updated as well.
42         """
43
44         self._data.update(data)
45
46         if update_original:
47             self._original_data.update(data)
48
49     def __getitem__(self, key):
50         # This is a workaround to make TaskResource non-iterable
51         # over simple index-based iteration
52         try:
53             int(key)
54             raise StopIteration
55         except ValueError:
56             pass
57
58         return self._deserialize(key, self._data.get(key))
59
60     def __setitem__(self, key, value):
61         if key in self.read_only_fields:
62             raise RuntimeError('Field \'%s\' is read-only' % key)
63         self._data[key] = self._serialize(key, value)
64
65     def _deserialize(self, key, value):
66         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
67                                lambda x: x)
68         return hydrate_func(value)
69
70     def _serialize(self, key, value):
71         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
72                                  lambda x: x)
73         return dehydrate_func(value)
74
75     def __str__(self):
76         s = six.text_type(self.__unicode__())
77         if not six.PY3:
78             s = s.encode('utf-8')
79         return s
80
81     def __repr__(self):
82         return str(self)
83
84
85 class TaskAnnotation(TaskResource):
86     read_only_fields = ['entry', 'description']
87
88     def __init__(self, task, data={}):
89         self.task = task
90         self._load_data(data)
91
92     def deserialize_entry(self, data):
93         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
94
95     def serialize_entry(self, date):
96         return date.strftime(DATE_FORMAT) if date else ''
97
98     def remove(self):
99         self.task.remove_annotation(self)
100
101     def __unicode__(self):
102         return self['description']
103
104     __repr__ = __unicode__
105
106
107 class Task(TaskResource):
108     read_only_fields = ['id', 'entry', 'urgency', 'uuid', 'modified']
109
110     class DoesNotExist(Exception):
111         pass
112
113     class CompletedTask(Exception):
114         """
115         Raised when the operation cannot be performed on the completed task.
116         """
117         pass
118
119     class DeletedTask(Exception):
120         """
121         Raised when the operation cannot be performed on the deleted task.
122         """
123         pass
124
125     class NotSaved(Exception):
126         """
127         Raised when the operation cannot be performed on the task, because
128         it has not been saved to TaskWarrior yet.
129         """
130         pass
131
132     def __init__(self, warrior, **kwargs):
133         self.warrior = warrior
134
135         # We serialize the data in kwargs so that users of the library
136         # do not have to pass different data formats via __setitem__ and
137         # __init__ methods, that would be confusing
138
139         # Rather unfortunate syntax due to python2.6 comaptiblity
140         self._load_data(dict((key, self._serialize(key, value))
141                         for (key, value) in six.iteritems(kwargs)))
142
143     def __unicode__(self):
144         return self['description']
145
146     def __eq__(self, other):
147         if self['uuid'] and other['uuid']:
148             # For saved Tasks, just define equality by equality of uuids
149             return self['uuid'] == other['uuid']
150         else:
151             # If the tasks are not saved, compare the actual instances
152             return id(self) == id(other)
153
154
155     def __hash__(self):
156         if self['uuid']:
157             # For saved Tasks, just define equality by equality of uuids
158             return self['uuid'].__hash__()
159         else:
160             # If the tasks are not saved, return hash of instance id
161             return id(self).__hash__()
162
163     @property
164     def _modified_fields(self):
165         writable_fields = set(self._data.keys()) - set(self.read_only_fields)
166         for key in writable_fields:
167             if self._data.get(key) != self._original_data.get(key):
168                 yield key
169
170     @property
171     def completed(self):
172         return self['status'] == six.text_type('completed')
173
174     @property
175     def deleted(self):
176         return self['status'] == six.text_type('deleted')
177
178     @property
179     def waiting(self):
180         return self['status'] == six.text_type('waiting')
181
182     @property
183     def pending(self):
184         return self['status'] == six.text_type('pending')
185
186     @property
187     def saved(self):
188         return self['uuid'] is not None or self['id'] is not None
189
190     def serialize_due(self, date):
191         if not date:
192             return None
193         return date.strftime(DATE_FORMAT)
194
195     def deserialize_due(self, date_str):
196         if not date_str:
197             return None
198         return datetime.datetime.strptime(date_str, DATE_FORMAT)
199
200     def serialize_depends(self, cur_dependencies):
201         # Check that all the tasks are saved
202         for task in cur_dependencies:
203             if not task.saved:
204                 raise Task.NotSaved('Task \'%s\' needs to be saved before '
205                                     'it can be set as dependency.' % task)
206
207         # Return the list of uuids
208         return ','.join(task['uuid'] for task in cur_dependencies)
209
210     def deserialize_depends(self, raw_uuids):
211         raw_uuids = raw_uuids or ''  # Convert None to empty string
212         uuids = raw_uuids.split(',')
213         return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
214
215     def format_depends(self):
216         # We need to generate added and removed dependencies list,
217         # since Taskwarrior does not accept redefining dependencies.
218
219         # This cannot be part of serialize_depends, since we need
220         # to keep a list of all depedencies in the _data dictionary,
221         # not just currently added/removed ones
222
223         old_dependencies_raw = self._original_data.get('depends','')
224         old_dependencies = self.deserialize_depends(old_dependencies_raw)
225
226         added = self['depends'] - old_dependencies
227         removed = old_dependencies - self['depends']
228
229         # Removed dependencies need to be prefixed with '-'
230         return 'depends:' + ','.join(
231                 [t['uuid'] for t in added] +
232                 ['-' + t['uuid'] for t in removed]
233             )
234
235     def deserialize_annotations(self, data):
236         return [TaskAnnotation(self, d) for d in data] if data else []
237
238     def deserialize_tags(self, tags):
239         if isinstance(tags, basestring):
240             return tags.split(',') if tags else []
241         return tags
242
243     def serialize_tags(self, tags):
244         return ','.join(tags) if tags else ''
245
246     def format_description(self):
247         # Task version older than 2.4.0 ignores first word of the
248         # task description if description: prefix is used
249         if self.warrior.version < VERSION_2_4_0:
250             return self._data['description']
251         else:
252             return "description:{0}".format(self._data['description'] or '')
253
254     def delete(self):
255         if not self.saved:
256             raise Task.NotSaved("Task needs to be saved before it can be deleted")
257
258         # Refresh the status, and raise exception if the task is deleted
259         self.refresh(only_fields=['status'])
260
261         if self.deleted:
262             raise Task.DeletedTask("Task was already deleted")
263
264         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
265             'confirmation': 'no',
266         })
267
268         # Refresh the status again, so that we have updated info stored
269         self.refresh(only_fields=['status'])
270
271
272     def done(self):
273         if not self.saved:
274             raise Task.NotSaved("Task needs to be saved before it can be completed")
275
276         # Refresh, and raise exception if task is already completed/deleted
277         self.refresh(only_fields=['status'])
278
279         if self.completed:
280             raise Task.CompletedTask("Cannot complete a completed task")
281         elif self.deleted:
282             raise Task.DeletedTask("Deleted task cannot be completed")
283
284         self.warrior.execute_command([self['uuid'], 'done'])
285
286         # Refresh the status again, so that we have updated info stored
287         self.refresh(only_fields=['status'])
288
289     def save(self):
290         args = [self['uuid'], 'modify'] if self.saved else ['add']
291         args.extend(self._get_modified_fields_as_args())
292         output = self.warrior.execute_command(args)
293
294         # Parse out the new ID, if the task is being added for the first time
295         if not self.saved:
296             id_lines = [l for l in output if l.startswith('Created task ')]
297
298             # Complain loudly if it seems that more tasks were created
299             # Should not happen
300             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
301                 raise TaskWarriorException("Unexpected output when creating "
302                                            "task: %s" % '\n'.join(id_lines))
303
304             # Circumvent the ID storage, since ID is considered read-only
305             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
306
307         self.refresh()
308
309     def add_annotation(self, annotation):
310         if not self.saved:
311             raise Task.NotSaved("Task needs to be saved to add annotation")
312
313         args = [self['uuid'], 'annotate', annotation]
314         self.warrior.execute_command(args)
315         self.refresh(only_fields=['annotations'])
316
317     def remove_annotation(self, annotation):
318         if not self.saved:
319             raise Task.NotSaved("Task needs to be saved to add annotation")
320
321         if isinstance(annotation, TaskAnnotation):
322             annotation = annotation['description']
323         args = [self['uuid'], 'denotate', annotation]
324         self.warrior.execute_command(args)
325         self.refresh(only_fields=['annotations'])
326
327     def _get_modified_fields_as_args(self):
328         args = []
329
330         def add_field(field):
331             # Add the output of format_field method to args list (defaults to
332             # field:value)
333             format_default = lambda k: '{0}:{1}'.format(k, self._data[k] or '')
334             format_func = getattr(self, 'format_{0}'.format(field),
335                                   lambda: format_default(field))
336             args.append(format_func())
337
338         # If we're modifying saved task, simply pass on all modified fields
339         if self.saved:
340             for field in self._modified_fields:
341                 add_field(field)
342         # For new tasks, pass all fields that make sense
343         else:
344             for field in self._data.keys():
345                 if field in self.read_only_fields:
346                     continue
347                 add_field(field)
348
349         return args
350
351     def refresh(self, only_fields=[]):
352         # Raise error when trying to refresh a task that has not been saved
353         if not self.saved:
354             raise Task.NotSaved("Task needs to be saved to be refreshed")
355
356         # We need to use ID as backup for uuid here for the refreshes
357         # of newly saved tasks. Any other place in the code is fine
358         # with using UUID only.
359         args = [self['uuid'] or self['id'], 'export']
360         new_data = json.loads(self.warrior.execute_command(args)[0])
361         if only_fields:
362             to_update = dict(
363                 [(k, new_data.get(k)) for k in only_fields])
364             self._update_data(to_update, update_original=True)
365         else:
366             self._load_data(new_data)
367
368
369 class TaskFilter(object):
370     """
371     A set of parameters to filter the task list with.
372     """
373
374     def __init__(self, filter_params=[]):
375         self.filter_params = filter_params
376
377     def add_filter(self, filter_str):
378         self.filter_params.append(filter_str)
379
380     def add_filter_param(self, key, value):
381         key = key.replace('__', '.')
382
383         # Replace the value with empty string, since that is the
384         # convention in TW for empty values
385         value = value if value is not None else ''
386
387         # If we are filtering by uuid:, do not use uuid keyword
388         # due to TW-1452 bug
389         if key == 'uuid':
390             self.filter_params.insert(0, value)
391         else:
392             self.filter_params.append('{0}:{1}'.format(key, value))
393
394     def get_filter_params(self):
395         return [f for f in self.filter_params if f]
396
397     def clone(self):
398         c = self.__class__()
399         c.filter_params = list(self.filter_params)
400         return c
401
402
403 class TaskQuerySet(object):
404     """
405     Represents a lazy lookup for a task objects.
406     """
407
408     def __init__(self, warrior=None, filter_obj=None):
409         self.warrior = warrior
410         self._result_cache = None
411         self.filter_obj = filter_obj or TaskFilter()
412
413     def __deepcopy__(self, memo):
414         """
415         Deep copy of a QuerySet doesn't populate the cache
416         """
417         obj = self.__class__()
418         for k, v in self.__dict__.items():
419             if k in ('_iter', '_result_cache'):
420                 obj.__dict__[k] = None
421             else:
422                 obj.__dict__[k] = copy.deepcopy(v, memo)
423         return obj
424
425     def __repr__(self):
426         data = list(self[:REPR_OUTPUT_SIZE + 1])
427         if len(data) > REPR_OUTPUT_SIZE:
428             data[-1] = "...(remaining elements truncated)..."
429         return repr(data)
430
431     def __len__(self):
432         if self._result_cache is None:
433             self._result_cache = list(self)
434         return len(self._result_cache)
435
436     def __iter__(self):
437         if self._result_cache is None:
438             self._result_cache = self._execute()
439         return iter(self._result_cache)
440
441     def __getitem__(self, k):
442         if self._result_cache is None:
443             self._result_cache = list(self)
444         return self._result_cache.__getitem__(k)
445
446     def __bool__(self):
447         if self._result_cache is not None:
448             return bool(self._result_cache)
449         try:
450             next(iter(self))
451         except StopIteration:
452             return False
453         return True
454
455     def __nonzero__(self):
456         return type(self).__bool__(self)
457
458     def _clone(self, klass=None, **kwargs):
459         if klass is None:
460             klass = self.__class__
461         filter_obj = self.filter_obj.clone()
462         c = klass(warrior=self.warrior, filter_obj=filter_obj)
463         c.__dict__.update(kwargs)
464         return c
465
466     def _execute(self):
467         """
468         Fetch the tasks which match the current filters.
469         """
470         return self.warrior.filter_tasks(self.filter_obj)
471
472     def all(self):
473         """
474         Returns a new TaskQuerySet that is a copy of the current one.
475         """
476         return self._clone()
477
478     def pending(self):
479         return self.filter(status=PENDING)
480
481     def completed(self):
482         return self.filter(status=COMPLETED)
483
484     def filter(self, *args, **kwargs):
485         """
486         Returns a new TaskQuerySet with the given filters added.
487         """
488         clone = self._clone()
489         for f in args:
490             clone.filter_obj.add_filter(f)
491         for key, value in kwargs.items():
492             clone.filter_obj.add_filter_param(key, value)
493         return clone
494
495     def get(self, **kwargs):
496         """
497         Performs the query and returns a single object matching the given
498         keyword arguments.
499         """
500         clone = self.filter(**kwargs)
501         num = len(clone)
502         if num == 1:
503             return clone._result_cache[0]
504         if not num:
505             raise Task.DoesNotExist(
506                 'Task matching query does not exist. '
507                 'Lookup parameters were {0}'.format(kwargs))
508         raise ValueError(
509             'get() returned more than one Task -- it returned {0}! '
510             'Lookup parameters were {1}'.format(num, kwargs))
511
512
513 class TaskWarrior(object):
514     def __init__(self, data_location='~/.task', create=True):
515         data_location = os.path.expanduser(data_location)
516         if create and not os.path.exists(data_location):
517             os.makedirs(data_location)
518         self.config = {
519             'data.location': os.path.expanduser(data_location),
520         }
521         self.tasks = TaskQuerySet(self)
522         self.version = self._get_version()
523
524     def _get_command_args(self, args, config_override={}):
525         command_args = ['task', 'rc:/']
526         config = self.config.copy()
527         config.update(config_override)
528         for item in config.items():
529             command_args.append('rc.{0}={1}'.format(*item))
530         command_args.extend(map(str, args))
531         return command_args
532
533     def _get_version(self):
534         p = subprocess.Popen(
535                 ['task', '--version'],
536                 stdout=subprocess.PIPE,
537                 stderr=subprocess.PIPE)
538         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
539         return stdout.strip('\n')
540
541     def execute_command(self, args, config_override={}):
542         command_args = self._get_command_args(
543             args, config_override=config_override)
544         logger.debug(' '.join(command_args))
545         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
546                              stderr=subprocess.PIPE)
547         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
548         if p.returncode:
549             if stderr.strip():
550                 error_msg = stderr.strip().splitlines()[-1]
551             else:
552                 error_msg = stdout.strip()
553             raise TaskWarriorException(error_msg)
554         return stdout.strip().split('\n')
555
556     def filter_tasks(self, filter_obj):
557         args = ['export', '--'] + filter_obj.get_filter_params()
558         tasks = []
559         for line in self.execute_command(args):
560             if line:
561                 data = line.strip(',')
562                 try:
563                     filtered_task = Task(self)
564                     filtered_task._load_data(json.loads(data))
565                     tasks.append(filtered_task)
566                 except ValueError:
567                     raise TaskWarriorException('Invalid JSON: %s' % data)
568         return tasks
569
570     def merge_with(self, path, push=False):
571         path = path.rstrip('/') + '/'
572         self.execute_command(['merge', path], config_override={
573             'merge.autopush': 'yes' if push else 'no',
574         })
575
576     def undo(self):
577         self.execute_command(['undo'], config_override={
578             'confirmation': 'no',
579         })