]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Merge pull request #11 from tbabej/improve_task_object
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32
33     def __getitem__(self, key):
34         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
35                                lambda x: x)
36         return hydrate_func(self._data.get(key))
37
38     def __setitem__(self, key, value):
39         if key in self.read_only_fields:
40             raise RuntimeError('Field \'%s\' is read-only' % key)
41         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
42                                  lambda x: x)
43         self._data[key] = dehydrate_func(value)
44         self._modified_fields.add(key)
45
46     def __str__(self):
47         s = six.text_type(self.__unicode__())
48         if not six.PY3:
49             s = s.encode('utf-8')
50         return s
51
52     def __repr__(self):
53         return str(self)
54
55
56 class TaskAnnotation(TaskResource):
57     read_only_fields = ['entry', 'description']
58
59     def __init__(self, task, data={}):
60         self.task = task
61         self._load_data(data)
62
63     def deserialize_entry(self, data):
64         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
65
66     def serialize_entry(self, date):
67         return date.strftime(DATE_FORMAT) if date else ''
68
69     def remove(self):
70         self.task.remove_annotation(self)
71
72     def __unicode__(self):
73         return self['description']
74
75     __repr__ = __unicode__
76
77
78 class Task(TaskResource):
79     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
80
81     class DoesNotExist(Exception):
82         pass
83
84     class CompletedTask(Exception):
85         """
86         Raised when the operation cannot be performed on the completed task.
87         """
88         pass
89
90     class DeletedTask(Exception):
91         """
92         Raised when the operation cannot be performed on the deleted task.
93         """
94         pass
95
96     class NotSaved(Exception):
97         """
98         Raised when the operation cannot be performed on the task, because
99         it has not been saved to TaskWarrior yet.
100         """
101         pass
102
103     def __init__(self, warrior, data={}, **kwargs):
104         self.warrior = warrior
105
106         # We keep data for backwards compatibility
107         kwargs.update(data)
108
109         self._load_data(kwargs)
110         self._modified_fields = set()
111
112     def __unicode__(self):
113         return self['description']
114
115     @property
116     def completed(self):
117         return self['status'] == six.text_type('completed')
118
119     @property
120     def deleted(self):
121         return self['status'] == six.text_type('deleted')
122
123     @property
124     def waiting(self):
125         return self['status'] == six.text_type('waiting')
126
127     @property
128     def pending(self):
129         return self['status'] == six.text_type('pending')
130
131     @property
132     def saved(self):
133         return self['uuid'] is not None or self['id'] is not None
134
135     def serialize_due(self, date):
136         return date.strftime(DATE_FORMAT)
137
138     def deserialize_due(self, date_str):
139         if not date_str:
140             return None
141         return datetime.datetime.strptime(date_str, DATE_FORMAT)
142
143     def deserialize_annotations(self, data):
144         return [TaskAnnotation(self, d) for d in data] if data else []
145
146     def deserialize_tags(self, tags):
147         if isinstance(tags, basestring):
148             return tags.split(',') if tags else []
149         return tags
150
151     def serialize_tags(self, tags):
152         return ','.join(tags) if tags else ''
153
154     def delete(self):
155         if not self.saved:
156             raise Task.NotSaved("Task needs to be saved before it can be deleted")
157
158         # Refresh the status, and raise exception if the task is deleted
159         self.refresh(only_fields=['status'])
160
161         if self.deleted:
162             raise Task.DeletedTask("Task was already deleted")
163
164         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
165             'confirmation': 'no',
166         })
167
168         # Refresh the status again, so that we have updated info stored
169         self.refresh(only_fields=['status'])
170
171
172     def done(self):
173         if not self.saved:
174             raise Task.NotSaved("Task needs to be saved before it can be completed")
175
176         # Refresh, and raise exception if task is already completed/deleted
177         self.refresh(only_fields=['status'])
178
179         if self.completed:
180             raise Task.CompletedTask("Cannot complete a completed task")
181         elif self.deleted:
182             raise Task.DeletedTask("Deleted task cannot be completed")
183
184         self.warrior.execute_command([self['uuid'], 'done'])
185
186         # Refresh the status again, so that we have updated info stored
187         self.refresh(only_fields=['status'])
188
189     def save(self):
190         args = [self['uuid'], 'modify'] if self.saved else ['add']
191         args.extend(self._get_modified_fields_as_args())
192         output = self.warrior.execute_command(args)
193
194         # Parse out the new ID, if the task is being added for the first time
195         if not self.saved:
196             id_lines = [l for l in output if l.startswith('Created task ')]
197
198             # Complain loudly if it seems that more tasks were created
199             # Should not happen
200             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
201                 raise TaskWarriorException("Unexpected output when creating "
202                                            "task: %s" % '\n'.join(id_lines))
203
204             # Circumvent the ID storage, since ID is considered read-only
205             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
206
207         self._modified_fields.clear()
208         self.refresh()
209
210     def add_annotation(self, annotation):
211         if not self.saved:
212             raise Task.NotSaved("Task needs to be saved to add annotation")
213
214         args = [self['uuid'], 'annotate', annotation]
215         self.warrior.execute_command(args)
216         self.refresh(only_fields=['annotations'])
217
218     def remove_annotation(self, annotation):
219         if not self.saved:
220             raise Task.NotSaved("Task needs to be saved to add annotation")
221
222         if isinstance(annotation, TaskAnnotation):
223             annotation = annotation['description']
224         args = [self['uuid'], 'denotate', annotation]
225         self.warrior.execute_command(args)
226         self.refresh(only_fields=['annotations'])
227
228     def _get_modified_fields_as_args(self):
229         args = []
230
231         def add_field(field):
232             # Task version older than 2.4.0 ignores first word of the
233             # task description if description: prefix is used
234             if self.warrior.version < VERSION_2_4_0 and field == 'description':
235                 args.append(self._data[field])
236             else:
237                 args.append('{0}:{1}'.format(field, self._data[field]))
238
239         # If we're modifying saved task, simply pass on all modified fields
240         if self.saved:
241             for field in self._modified_fields:
242                 add_field(field)
243         # For new tasks, pass all fields that make sense
244         else:
245             for field in self._data.keys():
246                 if field in self.read_only_fields:
247                     continue
248                 add_field(field)
249
250         return args
251
252     def refresh(self, only_fields=[]):
253         # Raise error when trying to refresh a task that has not been saved
254         if not self.saved:
255             raise Task.NotSaved("Task needs to be saved to be refreshed")
256
257         # We need to use ID as backup for uuid here for the refreshes
258         # of newly saved tasks. Any other place in the code is fine
259         # with using UUID only.
260         args = [self['uuid'] or self['id'], 'export']
261         new_data = json.loads(self.warrior.execute_command(args)[0])
262         if only_fields:
263             to_update = dict(
264                 [(k, new_data.get(k)) for k in only_fields])
265             self._data.update(to_update)
266         else:
267             self._data = new_data
268
269
270 class TaskFilter(object):
271     """
272     A set of parameters to filter the task list with.
273     """
274
275     def __init__(self, filter_params=[]):
276         self.filter_params = filter_params
277
278     def add_filter(self, filter_str):
279         self.filter_params.append(filter_str)
280
281     def add_filter_param(self, key, value):
282         key = key.replace('__', '.')
283
284         # Replace the value with empty string, since that is the
285         # convention in TW for empty values
286         value = value if value is not None else ''
287         self.filter_params.append('{0}:{1}'.format(key, value))
288
289     def get_filter_params(self):
290         return [f for f in self.filter_params if f]
291
292     def clone(self):
293         c = self.__class__()
294         c.filter_params = list(self.filter_params)
295         return c
296
297
298 class TaskQuerySet(object):
299     """
300     Represents a lazy lookup for a task objects.
301     """
302
303     def __init__(self, warrior=None, filter_obj=None):
304         self.warrior = warrior
305         self._result_cache = None
306         self.filter_obj = filter_obj or TaskFilter()
307
308     def __deepcopy__(self, memo):
309         """
310         Deep copy of a QuerySet doesn't populate the cache
311         """
312         obj = self.__class__()
313         for k, v in self.__dict__.items():
314             if k in ('_iter', '_result_cache'):
315                 obj.__dict__[k] = None
316             else:
317                 obj.__dict__[k] = copy.deepcopy(v, memo)
318         return obj
319
320     def __repr__(self):
321         data = list(self[:REPR_OUTPUT_SIZE + 1])
322         if len(data) > REPR_OUTPUT_SIZE:
323             data[-1] = "...(remaining elements truncated)..."
324         return repr(data)
325
326     def __len__(self):
327         if self._result_cache is None:
328             self._result_cache = list(self)
329         return len(self._result_cache)
330
331     def __iter__(self):
332         if self._result_cache is None:
333             self._result_cache = self._execute()
334         return iter(self._result_cache)
335
336     def __getitem__(self, k):
337         if self._result_cache is None:
338             self._result_cache = list(self)
339         return self._result_cache.__getitem__(k)
340
341     def __bool__(self):
342         if self._result_cache is not None:
343             return bool(self._result_cache)
344         try:
345             next(iter(self))
346         except StopIteration:
347             return False
348         return True
349
350     def __nonzero__(self):
351         return type(self).__bool__(self)
352
353     def _clone(self, klass=None, **kwargs):
354         if klass is None:
355             klass = self.__class__
356         filter_obj = self.filter_obj.clone()
357         c = klass(warrior=self.warrior, filter_obj=filter_obj)
358         c.__dict__.update(kwargs)
359         return c
360
361     def _execute(self):
362         """
363         Fetch the tasks which match the current filters.
364         """
365         return self.warrior.filter_tasks(self.filter_obj)
366
367     def all(self):
368         """
369         Returns a new TaskQuerySet that is a copy of the current one.
370         """
371         return self._clone()
372
373     def pending(self):
374         return self.filter(status=PENDING)
375
376     def completed(self):
377         return self.filter(status=COMPLETED)
378
379     def filter(self, *args, **kwargs):
380         """
381         Returns a new TaskQuerySet with the given filters added.
382         """
383         clone = self._clone()
384         for f in args:
385             clone.filter_obj.add_filter(f)
386         for key, value in kwargs.items():
387             clone.filter_obj.add_filter_param(key, value)
388         return clone
389
390     def get(self, **kwargs):
391         """
392         Performs the query and returns a single object matching the given
393         keyword arguments.
394         """
395         clone = self.filter(**kwargs)
396         num = len(clone)
397         if num == 1:
398             return clone._result_cache[0]
399         if not num:
400             raise Task.DoesNotExist(
401                 'Task matching query does not exist. '
402                 'Lookup parameters were {0}'.format(kwargs))
403         raise ValueError(
404             'get() returned more than one Task -- it returned {0}! '
405             'Lookup parameters were {1}'.format(num, kwargs))
406
407
408 class TaskWarrior(object):
409     def __init__(self, data_location='~/.task', create=True):
410         data_location = os.path.expanduser(data_location)
411         if create and not os.path.exists(data_location):
412             os.makedirs(data_location)
413         self.config = {
414             'data.location': os.path.expanduser(data_location),
415         }
416         self.tasks = TaskQuerySet(self)
417         self.version = self._get_version()
418
419     def _get_command_args(self, args, config_override={}):
420         command_args = ['task', 'rc:/']
421         config = self.config.copy()
422         config.update(config_override)
423         for item in config.items():
424             command_args.append('rc.{0}={1}'.format(*item))
425         command_args.extend(map(str, args))
426         return command_args
427
428     def _get_version(self):
429         p = subprocess.Popen(
430                 ['task', '--version'],
431                 stdout=subprocess.PIPE,
432                 stderr=subprocess.PIPE)
433         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
434         return stdout.strip('\n')
435
436     def execute_command(self, args, config_override={}):
437         command_args = self._get_command_args(
438             args, config_override=config_override)
439         logger.debug(' '.join(command_args))
440         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
441                              stderr=subprocess.PIPE)
442         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
443         if p.returncode:
444             if stderr.strip():
445                 error_msg = stderr.strip().splitlines()[-1]
446             else:
447                 error_msg = stdout.strip()
448             raise TaskWarriorException(error_msg)
449         return stdout.strip().split('\n')
450
451     def filter_tasks(self, filter_obj):
452         args = ['export', '--'] + filter_obj.get_filter_params()
453         tasks = []
454         for line in self.execute_command(args):
455             if line:
456                 data = line.strip(',')
457                 try:
458                     tasks.append(Task(self, json.loads(data)))
459                 except ValueError:
460                     raise TaskWarriorException('Invalid JSON: %s' % data)
461         return tasks
462
463     def merge_with(self, path, push=False):
464         path = path.rstrip('/') + '/'
465         self.execute_command(['merge', path], config_override={
466             'merge.autopush': 'yes' if push else 'no',
467         })
468
469     def undo(self):
470         self.execute_command(['undo'], config_override={
471             'confirmation': 'no',
472         })