]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

TaskWarrior: Set confirmation:no in config by default
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32         # We need to use a copy for original data, so that changes
33         # are not propagated. Shallow copy is alright, since data dict uses only
34         # primitive data types
35         self._original_data = data.copy()
36
37     def _update_data(self, data, update_original=False):
38         """
39         Low level update of the internal _data dict. Data which are coming as
40         updates should already be serialized. If update_original is True, the
41         original_data dict is updated as well.
42         """
43
44         self._data.update(data)
45
46         if update_original:
47             self._original_data.update(data)
48
49     def __getitem__(self, key):
50         # This is a workaround to make TaskResource non-iterable
51         # over simple index-based iteration
52         try:
53             int(key)
54             raise StopIteration
55         except ValueError:
56             pass
57
58         return self._deserialize(key, self._data.get(key))
59
60     def __setitem__(self, key, value):
61         if key in self.read_only_fields:
62             raise RuntimeError('Field \'%s\' is read-only' % key)
63         self._data[key] = self._serialize(key, value)
64
65     def _deserialize(self, key, value):
66         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
67                                lambda x: x)
68         return hydrate_func(value)
69
70     def _serialize(self, key, value):
71         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
72                                  lambda x: x)
73         return dehydrate_func(value)
74
75     def __str__(self):
76         s = six.text_type(self.__unicode__())
77         if not six.PY3:
78             s = s.encode('utf-8')
79         return s
80
81     def __repr__(self):
82         return str(self)
83
84
85 class TaskAnnotation(TaskResource):
86     read_only_fields = ['entry', 'description']
87
88     def __init__(self, task, data={}):
89         self.task = task
90         self._load_data(data)
91
92     def deserialize_entry(self, data):
93         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
94
95     def serialize_entry(self, date):
96         return date.strftime(DATE_FORMAT) if date else ''
97
98     def remove(self):
99         self.task.remove_annotation(self)
100
101     def __unicode__(self):
102         return self['description']
103
104     __repr__ = __unicode__
105
106
107 class Task(TaskResource):
108     read_only_fields = ['id', 'entry', 'urgency', 'uuid', 'modified']
109
110     class DoesNotExist(Exception):
111         pass
112
113     class CompletedTask(Exception):
114         """
115         Raised when the operation cannot be performed on the completed task.
116         """
117         pass
118
119     class DeletedTask(Exception):
120         """
121         Raised when the operation cannot be performed on the deleted task.
122         """
123         pass
124
125     class NotSaved(Exception):
126         """
127         Raised when the operation cannot be performed on the task, because
128         it has not been saved to TaskWarrior yet.
129         """
130         pass
131
132     def __init__(self, warrior, **kwargs):
133         self.warrior = warrior
134
135         # We serialize the data in kwargs so that users of the library
136         # do not have to pass different data formats via __setitem__ and
137         # __init__ methods, that would be confusing
138
139         # Rather unfortunate syntax due to python2.6 comaptiblity
140         self._load_data(dict((key, self._serialize(key, value))
141                         for (key, value) in six.iteritems(kwargs)))
142
143     def __unicode__(self):
144         return self['description']
145
146     def __eq__(self, other):
147         if self['uuid'] and other['uuid']:
148             # For saved Tasks, just define equality by equality of uuids
149             return self['uuid'] == other['uuid']
150         else:
151             # If the tasks are not saved, compare the actual instances
152             return id(self) == id(other)
153
154
155     def __hash__(self):
156         if self['uuid']:
157             # For saved Tasks, just define equality by equality of uuids
158             return self['uuid'].__hash__()
159         else:
160             # If the tasks are not saved, return hash of instance id
161             return id(self).__hash__()
162
163     @property
164     def _modified_fields(self):
165         writable_fields = set(self._data.keys()) - set(self.read_only_fields)
166         for key in writable_fields:
167             if self._data.get(key) != self._original_data.get(key):
168                 yield key
169
170     @property
171     def completed(self):
172         return self['status'] == six.text_type('completed')
173
174     @property
175     def deleted(self):
176         return self['status'] == six.text_type('deleted')
177
178     @property
179     def waiting(self):
180         return self['status'] == six.text_type('waiting')
181
182     @property
183     def pending(self):
184         return self['status'] == six.text_type('pending')
185
186     @property
187     def saved(self):
188         return self['uuid'] is not None or self['id'] is not None
189
190     def serialize_due(self, date):
191         if not date:
192             return None
193         return date.strftime(DATE_FORMAT)
194
195     def deserialize_due(self, date_str):
196         if not date_str:
197             return None
198         return datetime.datetime.strptime(date_str, DATE_FORMAT)
199
200     def serialize_depends(self, cur_dependencies):
201         # Check that all the tasks are saved
202         for task in cur_dependencies:
203             if not task.saved:
204                 raise Task.NotSaved('Task \'%s\' needs to be saved before '
205                                     'it can be set as dependency.' % task)
206
207         # Return the list of uuids
208         return ','.join(task['uuid'] for task in cur_dependencies)
209
210     def deserialize_depends(self, raw_uuids):
211         raw_uuids = raw_uuids or ''  # Convert None to empty string
212         uuids = raw_uuids.split(',')
213         return set(self.warrior.tasks.get(uuid=uuid) for uuid in uuids if uuid)
214
215     def format_depends(self):
216         # We need to generate added and removed dependencies list,
217         # since Taskwarrior does not accept redefining dependencies.
218
219         # This cannot be part of serialize_depends, since we need
220         # to keep a list of all depedencies in the _data dictionary,
221         # not just currently added/removed ones
222
223         old_dependencies_raw = self._original_data.get('depends','')
224         old_dependencies = self.deserialize_depends(old_dependencies_raw)
225
226         added = self['depends'] - old_dependencies
227         removed = old_dependencies - self['depends']
228
229         # Removed dependencies need to be prefixed with '-'
230         return 'depends:' + ','.join(
231                 [t['uuid'] for t in added] +
232                 ['-' + t['uuid'] for t in removed]
233             )
234
235     def deserialize_annotations(self, data):
236         return [TaskAnnotation(self, d) for d in data] if data else []
237
238     def deserialize_tags(self, tags):
239         if isinstance(tags, basestring):
240             return tags.split(',') if tags else []
241         return tags
242
243     def serialize_tags(self, tags):
244         return ','.join(tags) if tags else ''
245
246     def format_description(self):
247         # Task version older than 2.4.0 ignores first word of the
248         # task description if description: prefix is used
249         if self.warrior.version < VERSION_2_4_0:
250             return self._data['description']
251         else:
252             return "description:'{0}'".format(self._data['description'] or '')
253
254     def delete(self):
255         if not self.saved:
256             raise Task.NotSaved("Task needs to be saved before it can be deleted")
257
258         # Refresh the status, and raise exception if the task is deleted
259         self.refresh(only_fields=['status'])
260
261         if self.deleted:
262             raise Task.DeletedTask("Task was already deleted")
263
264         self.warrior.execute_command([self['uuid'], 'delete'])
265
266         # Refresh the status again, so that we have updated info stored
267         self.refresh(only_fields=['status'])
268
269
270     def done(self):
271         if not self.saved:
272             raise Task.NotSaved("Task needs to be saved before it can be completed")
273
274         # Refresh, and raise exception if task is already completed/deleted
275         self.refresh(only_fields=['status'])
276
277         if self.completed:
278             raise Task.CompletedTask("Cannot complete a completed task")
279         elif self.deleted:
280             raise Task.DeletedTask("Deleted task cannot be completed")
281
282         self.warrior.execute_command([self['uuid'], 'done'])
283
284         # Refresh the status again, so that we have updated info stored
285         self.refresh(only_fields=['status'])
286
287     def save(self):
288         args = [self['uuid'], 'modify'] if self.saved else ['add']
289         args.extend(self._get_modified_fields_as_args())
290         output = self.warrior.execute_command(args)
291
292         # Parse out the new ID, if the task is being added for the first time
293         if not self.saved:
294             id_lines = [l for l in output if l.startswith('Created task ')]
295
296             # Complain loudly if it seems that more tasks were created
297             # Should not happen
298             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
299                 raise TaskWarriorException("Unexpected output when creating "
300                                            "task: %s" % '\n'.join(id_lines))
301
302             # Circumvent the ID storage, since ID is considered read-only
303             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
304
305         self.refresh()
306
307     def add_annotation(self, annotation):
308         if not self.saved:
309             raise Task.NotSaved("Task needs to be saved to add annotation")
310
311         args = [self['uuid'], 'annotate', annotation]
312         self.warrior.execute_command(args)
313         self.refresh(only_fields=['annotations'])
314
315     def remove_annotation(self, annotation):
316         if not self.saved:
317             raise Task.NotSaved("Task needs to be saved to add annotation")
318
319         if isinstance(annotation, TaskAnnotation):
320             annotation = annotation['description']
321         args = [self['uuid'], 'denotate', annotation]
322         self.warrior.execute_command(args)
323         self.refresh(only_fields=['annotations'])
324
325     def _get_modified_fields_as_args(self):
326         args = []
327
328         def add_field(field):
329             # Add the output of format_field method to args list (defaults to
330             # field:value)
331             format_default = lambda k: "{0}:'{1}'".format(k, self._data[k] or '')
332             format_func = getattr(self, 'format_{0}'.format(field),
333                                   lambda: format_default(field))
334             args.append(format_func())
335
336         # If we're modifying saved task, simply pass on all modified fields
337         if self.saved:
338             for field in self._modified_fields:
339                 add_field(field)
340         # For new tasks, pass all fields that make sense
341         else:
342             for field in self._data.keys():
343                 if field in self.read_only_fields:
344                     continue
345                 add_field(field)
346
347         return args
348
349     def refresh(self, only_fields=[]):
350         # Raise error when trying to refresh a task that has not been saved
351         if not self.saved:
352             raise Task.NotSaved("Task needs to be saved to be refreshed")
353
354         # We need to use ID as backup for uuid here for the refreshes
355         # of newly saved tasks. Any other place in the code is fine
356         # with using UUID only.
357         args = [self['uuid'] or self['id'], 'export']
358         new_data = json.loads(self.warrior.execute_command(args)[0])
359         if only_fields:
360             to_update = dict(
361                 [(k, new_data.get(k)) for k in only_fields])
362             self._update_data(to_update, update_original=True)
363         else:
364             self._load_data(new_data)
365
366
367 class TaskFilter(object):
368     """
369     A set of parameters to filter the task list with.
370     """
371
372     def __init__(self, filter_params=[]):
373         self.filter_params = filter_params
374
375     def add_filter(self, filter_str):
376         self.filter_params.append(filter_str)
377
378     def add_filter_param(self, key, value):
379         key = key.replace('__', '.')
380
381         # Replace the value with empty string, since that is the
382         # convention in TW for empty values
383         value = value if value is not None else ''
384
385         # If we are filtering by uuid:, do not use uuid keyword
386         # due to TW-1452 bug
387         if key == 'uuid':
388             self.filter_params.insert(0, value)
389         else:
390             self.filter_params.append('{0}:{1}'.format(key, value))
391
392     def get_filter_params(self):
393         return [f for f in self.filter_params if f]
394
395     def clone(self):
396         c = self.__class__()
397         c.filter_params = list(self.filter_params)
398         return c
399
400
401 class TaskQuerySet(object):
402     """
403     Represents a lazy lookup for a task objects.
404     """
405
406     def __init__(self, warrior=None, filter_obj=None):
407         self.warrior = warrior
408         self._result_cache = None
409         self.filter_obj = filter_obj or TaskFilter()
410
411     def __deepcopy__(self, memo):
412         """
413         Deep copy of a QuerySet doesn't populate the cache
414         """
415         obj = self.__class__()
416         for k, v in self.__dict__.items():
417             if k in ('_iter', '_result_cache'):
418                 obj.__dict__[k] = None
419             else:
420                 obj.__dict__[k] = copy.deepcopy(v, memo)
421         return obj
422
423     def __repr__(self):
424         data = list(self[:REPR_OUTPUT_SIZE + 1])
425         if len(data) > REPR_OUTPUT_SIZE:
426             data[-1] = "...(remaining elements truncated)..."
427         return repr(data)
428
429     def __len__(self):
430         if self._result_cache is None:
431             self._result_cache = list(self)
432         return len(self._result_cache)
433
434     def __iter__(self):
435         if self._result_cache is None:
436             self._result_cache = self._execute()
437         return iter(self._result_cache)
438
439     def __getitem__(self, k):
440         if self._result_cache is None:
441             self._result_cache = list(self)
442         return self._result_cache.__getitem__(k)
443
444     def __bool__(self):
445         if self._result_cache is not None:
446             return bool(self._result_cache)
447         try:
448             next(iter(self))
449         except StopIteration:
450             return False
451         return True
452
453     def __nonzero__(self):
454         return type(self).__bool__(self)
455
456     def _clone(self, klass=None, **kwargs):
457         if klass is None:
458             klass = self.__class__
459         filter_obj = self.filter_obj.clone()
460         c = klass(warrior=self.warrior, filter_obj=filter_obj)
461         c.__dict__.update(kwargs)
462         return c
463
464     def _execute(self):
465         """
466         Fetch the tasks which match the current filters.
467         """
468         return self.warrior.filter_tasks(self.filter_obj)
469
470     def all(self):
471         """
472         Returns a new TaskQuerySet that is a copy of the current one.
473         """
474         return self._clone()
475
476     def pending(self):
477         return self.filter(status=PENDING)
478
479     def completed(self):
480         return self.filter(status=COMPLETED)
481
482     def filter(self, *args, **kwargs):
483         """
484         Returns a new TaskQuerySet with the given filters added.
485         """
486         clone = self._clone()
487         for f in args:
488             clone.filter_obj.add_filter(f)
489         for key, value in kwargs.items():
490             clone.filter_obj.add_filter_param(key, value)
491         return clone
492
493     def get(self, **kwargs):
494         """
495         Performs the query and returns a single object matching the given
496         keyword arguments.
497         """
498         clone = self.filter(**kwargs)
499         num = len(clone)
500         if num == 1:
501             return clone._result_cache[0]
502         if not num:
503             raise Task.DoesNotExist(
504                 'Task matching query does not exist. '
505                 'Lookup parameters were {0}'.format(kwargs))
506         raise ValueError(
507             'get() returned more than one Task -- it returned {0}! '
508             'Lookup parameters were {1}'.format(num, kwargs))
509
510
511 class TaskWarrior(object):
512     def __init__(self, data_location='~/.task', create=True):
513         data_location = os.path.expanduser(data_location)
514         if create and not os.path.exists(data_location):
515             os.makedirs(data_location)
516         self.config = {
517             'data.location': os.path.expanduser(data_location),
518             'confirmation': 'no',
519         }
520         self.tasks = TaskQuerySet(self)
521         self.version = self._get_version()
522
523     def _get_command_args(self, args, config_override={}):
524         command_args = ['task', 'rc:/']
525         config = self.config.copy()
526         config.update(config_override)
527         for item in config.items():
528             command_args.append('rc.{0}={1}'.format(*item))
529         command_args.extend(map(str, args))
530         return command_args
531
532     def _get_version(self):
533         p = subprocess.Popen(
534                 ['task', '--version'],
535                 stdout=subprocess.PIPE,
536                 stderr=subprocess.PIPE)
537         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
538         return stdout.strip('\n')
539
540     def execute_command(self, args, config_override={}):
541         command_args = self._get_command_args(
542             args, config_override=config_override)
543         logger.debug(' '.join(command_args))
544         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
545                              stderr=subprocess.PIPE)
546         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
547         if p.returncode:
548             if stderr.strip():
549                 error_msg = stderr.strip().splitlines()[-1]
550             else:
551                 error_msg = stdout.strip()
552             raise TaskWarriorException(error_msg)
553         return stdout.strip().split('\n')
554
555     def filter_tasks(self, filter_obj):
556         args = ['export', '--'] + filter_obj.get_filter_params()
557         tasks = []
558         for line in self.execute_command(args):
559             if line:
560                 data = line.strip(',')
561                 try:
562                     filtered_task = Task(self)
563                     filtered_task._load_data(json.loads(data))
564                     tasks.append(filtered_task)
565                 except ValueError:
566                     raise TaskWarriorException('Invalid JSON: %s' % data)
567         return tasks
568
569     def merge_with(self, path, push=False):
570         path = path.rstrip('/') + '/'
571         self.execute_command(['merge', path], config_override={
572             'merge.autopush': 'yes' if push else 'no',
573         })
574
575     def undo(self):
576         self.execute_command(['undo'])