]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

170fc77e545bf076a5d1d6d6ed6977178fc2fe34
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
75
76     class DoesNotExist(Exception):
77         pass
78
79     class CompletedTask(Exception):
80         """
81         Raised when the operation cannot be performed on the completed task.
82         """
83         pass
84
85     class DeletedTask(Exception):
86         """
87         Raised when the operation cannot be performed on the deleted task.
88         """
89         pass
90
91     class NotSaved(Exception):
92         """
93         Raised when the operation cannot be performed on the task, because
94         it has not been saved to TaskWarrior yet.
95         """
96         pass
97
98     def __init__(self, warrior, data={}, **kwargs):
99         self.warrior = warrior
100
101         # We keep data for backwards compatibility
102         kwargs.update(data)
103
104         self._load_data(kwargs)
105         self._modified_fields = set()
106
107     def __unicode__(self):
108         return self['description']
109
110     @property
111     def completed(self):
112         return self['status'] == six.text_type('completed')
113
114     @property
115     def deleted(self):
116         return self['status'] == six.text_type('deleted')
117
118     @property
119     def waiting(self):
120         return self['status'] == six.text_type('waiting')
121
122     @property
123     def pending(self):
124         return self['status'] == six.text_type('pending')
125
126     @property
127     def saved(self):
128         return self['uuid'] is not None or self['id'] is not None
129
130     def serialize_due(self, date):
131         return date.strftime(DATE_FORMAT)
132
133     def deserialize_due(self, date_str):
134         if not date_str:
135             return None
136         return datetime.datetime.strptime(date_str, DATE_FORMAT)
137
138     def deserialize_annotations(self, data):
139         return [TaskAnnotation(self, d) for d in data] if data else []
140
141     def deserialize_tags(self, tags):
142         if isinstance(tags, basestring):
143             return tags.split(',') if tags else []
144         return tags
145
146     def serialize_tags(self, tags):
147         return ','.join(tags) if tags else ''
148
149     def delete(self):
150         if not self.saved:
151             raise self.NotSaved("Task needs to be saved before it can be deleted")
152
153         # Refresh the status, and raise exception if the task is deleted
154         self.refresh(only_fields=['status'])
155
156         if self.deleted:
157             raise self.DeletedTask("Task was already deleted")
158
159         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
160             'confirmation': 'no',
161         })
162
163         # Refresh the status again, so that we have updated info stored
164         self.refresh(only_fields=['status'])
165
166
167     def done(self):
168         if not self.saved:
169             raise self.NotSaved("Task needs to be saved before it can be completed")
170
171         # Refresh, and raise exception if task is already completed/deleted
172         self.refresh(only_fields=['status'])
173
174         if self.completed:
175             raise self.CompletedTask("Cannot complete a completed task")
176         elif self.deleted:
177             raise self.DeletedTask("Deleted task cannot be completed")
178
179         self.warrior.execute_command([self['uuid'], 'done'])
180
181         # Refresh the status again, so that we have updated info stored
182         self.refresh(only_fields=['status'])
183
184     def save(self):
185         args = [self['uuid'], 'modify'] if self.saved else ['add']
186         args.extend(self._get_modified_fields_as_args())
187         output = self.warrior.execute_command(args)
188
189         # Parse out the new ID, if the task is being added for the first time
190         if not self.saved:
191             id_lines = [l for l in output if l.startswith('Created task ')]
192
193             # Complain loudly if it seems that more tasks were created
194             # Should not happen
195             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
196                 raise TaskWarriorException("Unexpected output when creating "
197                                            "task: %s" % '\n'.join(id_lines))
198
199             # Circumvent the ID storage, since ID is considered read-only
200             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
201
202         self._modified_fields.clear()
203         self.refresh()
204
205     def add_annotation(self, annotation):
206         if not self.saved:
207             raise self.NotSaved("Task needs to be saved to add annotation")
208
209         args = [self['uuid'], 'annotate', annotation]
210         self.warrior.execute_command(args)
211         self.refresh(only_fields=['annotations'])
212
213     def remove_annotation(self, annotation):
214         if not self.saved:
215             raise self.NotSaved("Task needs to be saved to add annotation")
216
217         if isinstance(annotation, TaskAnnotation):
218             annotation = annotation['description']
219         args = [self['uuid'], 'denotate', annotation]
220         self.warrior.execute_command(args)
221         self.refresh(only_fields=['annotations'])
222
223     def _get_modified_fields_as_args(self):
224         args = []
225
226         # If we're modifying saved task, simply pass on all modified fields
227         if self.saved:
228             for field in self._modified_fields:
229                 args.append('{0}:{1}'.format(field, self._data[field]))
230         # For new tasks, pass all fields that make sense
231         else:
232             for field in self._data.keys():
233                 if field in self.read_only_fields:
234                     continue
235                 args.append('{0}:{1}'.format(field, self._data[field]))
236
237         return args
238
239     def refresh(self, only_fields=[]):
240         # Raise error when trying to refresh a task that has not been saved
241         if not self.saved:
242             raise self.NotSaved("Task needs to be saved to be refreshed")
243
244         # We need to use ID as backup for uuid here for the refreshes
245         # of newly saved tasks. Any other place in the code is fine
246         # with using UUID only.
247         args = [self['uuid'] or self['id'], 'export']
248         new_data = json.loads(self.warrior.execute_command(args)[0])
249         if only_fields:
250             to_update = dict(
251                 [(k, new_data.get(k)) for k in only_fields])
252             self._data.update(to_update)
253         else:
254             self._data = new_data
255
256
257 class TaskFilter(object):
258     """
259     A set of parameters to filter the task list with.
260     """
261
262     def __init__(self, filter_params=[]):
263         self.filter_params = filter_params
264
265     def add_filter(self, filter_str):
266         self.filter_params.append(filter_str)
267
268     def add_filter_param(self, key, value):
269         key = key.replace('__', '.')
270
271         # Replace the value with empty string, since that is the
272         # convention in TW for empty values
273         value = value if value is not None else ''
274         self.filter_params.append('{0}:{1}'.format(key, value))
275
276     def get_filter_params(self):
277         return [f for f in self.filter_params if f]
278
279     def clone(self):
280         c = self.__class__()
281         c.filter_params = list(self.filter_params)
282         return c
283
284
285 class TaskQuerySet(object):
286     """
287     Represents a lazy lookup for a task objects.
288     """
289
290     def __init__(self, warrior=None, filter_obj=None):
291         self.warrior = warrior
292         self._result_cache = None
293         self.filter_obj = filter_obj or TaskFilter()
294
295     def __deepcopy__(self, memo):
296         """
297         Deep copy of a QuerySet doesn't populate the cache
298         """
299         obj = self.__class__()
300         for k, v in self.__dict__.items():
301             if k in ('_iter', '_result_cache'):
302                 obj.__dict__[k] = None
303             else:
304                 obj.__dict__[k] = copy.deepcopy(v, memo)
305         return obj
306
307     def __repr__(self):
308         data = list(self[:REPR_OUTPUT_SIZE + 1])
309         if len(data) > REPR_OUTPUT_SIZE:
310             data[-1] = "...(remaining elements truncated)..."
311         return repr(data)
312
313     def __len__(self):
314         if self._result_cache is None:
315             self._result_cache = list(self)
316         return len(self._result_cache)
317
318     def __iter__(self):
319         if self._result_cache is None:
320             self._result_cache = self._execute()
321         return iter(self._result_cache)
322
323     def __getitem__(self, k):
324         if self._result_cache is None:
325             self._result_cache = list(self)
326         return self._result_cache.__getitem__(k)
327
328     def __bool__(self):
329         if self._result_cache is not None:
330             return bool(self._result_cache)
331         try:
332             next(iter(self))
333         except StopIteration:
334             return False
335         return True
336
337     def __nonzero__(self):
338         return type(self).__bool__(self)
339
340     def _clone(self, klass=None, **kwargs):
341         if klass is None:
342             klass = self.__class__
343         filter_obj = self.filter_obj.clone()
344         c = klass(warrior=self.warrior, filter_obj=filter_obj)
345         c.__dict__.update(kwargs)
346         return c
347
348     def _execute(self):
349         """
350         Fetch the tasks which match the current filters.
351         """
352         return self.warrior.filter_tasks(self.filter_obj)
353
354     def all(self):
355         """
356         Returns a new TaskQuerySet that is a copy of the current one.
357         """
358         return self._clone()
359
360     def pending(self):
361         return self.filter(status=PENDING)
362
363     def completed(self):
364         return self.filter(status=COMPLETED)
365
366     def filter(self, *args, **kwargs):
367         """
368         Returns a new TaskQuerySet with the given filters added.
369         """
370         clone = self._clone()
371         for f in args:
372             clone.filter_obj.add_filter(f)
373         for key, value in kwargs.items():
374             clone.filter_obj.add_filter_param(key, value)
375         return clone
376
377     def get(self, **kwargs):
378         """
379         Performs the query and returns a single object matching the given
380         keyword arguments.
381         """
382         clone = self.filter(**kwargs)
383         num = len(clone)
384         if num == 1:
385             return clone._result_cache[0]
386         if not num:
387             raise Task.DoesNotExist(
388                 'Task matching query does not exist. '
389                 'Lookup parameters were {0}'.format(kwargs))
390         raise ValueError(
391             'get() returned more than one Task -- it returned {0}! '
392             'Lookup parameters were {1}'.format(num, kwargs))
393
394
395 class TaskWarrior(object):
396     def __init__(self, data_location='~/.task', create=True):
397         data_location = os.path.expanduser(data_location)
398         if create and not os.path.exists(data_location):
399             os.makedirs(data_location)
400         self.config = {
401             'data.location': os.path.expanduser(data_location),
402         }
403         self.tasks = TaskQuerySet(self)
404
405     def _get_command_args(self, args, config_override={}):
406         command_args = ['task', 'rc:/']
407         config = self.config.copy()
408         config.update(config_override)
409         for item in config.items():
410             command_args.append('rc.{0}={1}'.format(*item))
411         command_args.extend(map(str, args))
412         return command_args
413
414     def execute_command(self, args, config_override={}):
415         command_args = self._get_command_args(
416             args, config_override=config_override)
417         logger.debug(' '.join(command_args))
418         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
419                              stderr=subprocess.PIPE)
420         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
421         if p.returncode:
422             if stderr.strip():
423                 error_msg = stderr.strip().splitlines()[-1]
424             else:
425                 error_msg = stdout.strip()
426             raise TaskWarriorException(error_msg)
427         return stdout.strip().split('\n')
428
429     def filter_tasks(self, filter_obj):
430         args = ['export', '--'] + filter_obj.get_filter_params()
431         tasks = []
432         for line in self.execute_command(args):
433             if line:
434                 data = line.strip(',')
435                 try:
436                     tasks.append(Task(self, json.loads(data)))
437                 except ValueError:
438                     raise TaskWarriorException('Invalid JSON: %s' % data)
439         return tasks
440
441     def merge_with(self, path, push=False):
442         path = path.rstrip('/') + '/'
443         self.execute_command(['merge', path], config_override={
444             'merge.autopush': 'yes' if push else 'no',
445         })
446
447     def undo(self):
448         self.execute_command(['undo'], config_override={
449             'confirmation': 'no',
450         })