]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

44c6965c1e3fdde7be836e48a4f8befb7f9a38c8
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32         # We need to use a copy for original data, so that changes
33         # are not propagated
34         self._original_data = data.copy()
35
36     def __getitem__(self, key):
37         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
38                                lambda x: x)
39         return hydrate_func(self._data.get(key))
40
41     def __setitem__(self, key, value):
42         if key in self.read_only_fields:
43             raise RuntimeError('Field \'%s\' is read-only' % key)
44         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
45                                  lambda x: x)
46         self._data[key] = dehydrate_func(value)
47
48     def __str__(self):
49         s = six.text_type(self.__unicode__())
50         if not six.PY3:
51             s = s.encode('utf-8')
52         return s
53
54     def __repr__(self):
55         return str(self)
56
57
58 class TaskAnnotation(TaskResource):
59     read_only_fields = ['entry', 'description']
60
61     def __init__(self, task, data={}):
62         self.task = task
63         self._load_data(data)
64
65     def deserialize_entry(self, data):
66         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
67
68     def serialize_entry(self, date):
69         return date.strftime(DATE_FORMAT) if date else ''
70
71     def remove(self):
72         self.task.remove_annotation(self)
73
74     def __unicode__(self):
75         return self['description']
76
77     __repr__ = __unicode__
78
79
80 class Task(TaskResource):
81     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
82
83     class DoesNotExist(Exception):
84         pass
85
86     class CompletedTask(Exception):
87         """
88         Raised when the operation cannot be performed on the completed task.
89         """
90         pass
91
92     class DeletedTask(Exception):
93         """
94         Raised when the operation cannot be performed on the deleted task.
95         """
96         pass
97
98     class NotSaved(Exception):
99         """
100         Raised when the operation cannot be performed on the task, because
101         it has not been saved to TaskWarrior yet.
102         """
103         pass
104
105     def __init__(self, warrior, data={}, **kwargs):
106         self.warrior = warrior
107
108         # We keep data for backwards compatibility
109         kwargs.update(data)
110
111         self._load_data(kwargs)
112
113     def __unicode__(self):
114         return self['description']
115
116     def __eq__(self, other):
117         return self['uuid'] == other['uuid']
118
119     def __hash__(self):
120         return self['uuid'].__hash__()
121
122     @property
123     def _modified_fields(self):
124         for key in self._data.keys():
125             if self._data.get(key) != self._original_data.get(key):
126                 yield key
127
128     @property
129     def completed(self):
130         return self['status'] == six.text_type('completed')
131
132     @property
133     def deleted(self):
134         return self['status'] == six.text_type('deleted')
135
136     @property
137     def waiting(self):
138         return self['status'] == six.text_type('waiting')
139
140     @property
141     def pending(self):
142         return self['status'] == six.text_type('pending')
143
144     @property
145     def saved(self):
146         return self['uuid'] is not None or self['id'] is not None
147
148     def serialize_due(self, date):
149         return date.strftime(DATE_FORMAT)
150
151     def deserialize_due(self, date_str):
152         if not date_str:
153             return None
154         return datetime.datetime.strptime(date_str, DATE_FORMAT)
155
156     def deserialize_annotations(self, data):
157         return [TaskAnnotation(self, d) for d in data] if data else []
158
159     def deserialize_tags(self, tags):
160         if isinstance(tags, basestring):
161             return tags.split(',') if tags else []
162         return tags
163
164     def serialize_tags(self, tags):
165         return ','.join(tags) if tags else ''
166
167     def delete(self):
168         if not self.saved:
169             raise Task.NotSaved("Task needs to be saved before it can be deleted")
170
171         # Refresh the status, and raise exception if the task is deleted
172         self.refresh(only_fields=['status'])
173
174         if self.deleted:
175             raise Task.DeletedTask("Task was already deleted")
176
177         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
178             'confirmation': 'no',
179         })
180
181         # Refresh the status again, so that we have updated info stored
182         self.refresh(only_fields=['status'])
183
184
185     def done(self):
186         if not self.saved:
187             raise Task.NotSaved("Task needs to be saved before it can be completed")
188
189         # Refresh, and raise exception if task is already completed/deleted
190         self.refresh(only_fields=['status'])
191
192         if self.completed:
193             raise Task.CompletedTask("Cannot complete a completed task")
194         elif self.deleted:
195             raise Task.DeletedTask("Deleted task cannot be completed")
196
197         self.warrior.execute_command([self['uuid'], 'done'])
198
199         # Refresh the status again, so that we have updated info stored
200         self.refresh(only_fields=['status'])
201
202     def save(self):
203         args = [self['uuid'], 'modify'] if self.saved else ['add']
204         args.extend(self._get_modified_fields_as_args())
205         output = self.warrior.execute_command(args)
206
207         # Parse out the new ID, if the task is being added for the first time
208         if not self.saved:
209             id_lines = [l for l in output if l.startswith('Created task ')]
210
211             # Complain loudly if it seems that more tasks were created
212             # Should not happen
213             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
214                 raise TaskWarriorException("Unexpected output when creating "
215                                            "task: %s" % '\n'.join(id_lines))
216
217             # Circumvent the ID storage, since ID is considered read-only
218             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
219
220         self.refresh()
221
222     def add_annotation(self, annotation):
223         if not self.saved:
224             raise Task.NotSaved("Task needs to be saved to add annotation")
225
226         args = [self['uuid'], 'annotate', annotation]
227         self.warrior.execute_command(args)
228         self.refresh(only_fields=['annotations'])
229
230     def remove_annotation(self, annotation):
231         if not self.saved:
232             raise Task.NotSaved("Task needs to be saved to add annotation")
233
234         if isinstance(annotation, TaskAnnotation):
235             annotation = annotation['description']
236         args = [self['uuid'], 'denotate', annotation]
237         self.warrior.execute_command(args)
238         self.refresh(only_fields=['annotations'])
239
240     def _get_modified_fields_as_args(self):
241         args = []
242
243         def add_field(field):
244             # Task version older than 2.4.0 ignores first word of the
245             # task description if description: prefix is used
246             if self.warrior.version < VERSION_2_4_0 and field == 'description':
247                 args.append(self._data[field])
248             else:
249                 args.append('{0}:{1}'.format(field, self._data[field]))
250
251         # If we're modifying saved task, simply pass on all modified fields
252         if self.saved:
253             for field in self._modified_fields:
254                 add_field(field)
255         # For new tasks, pass all fields that make sense
256         else:
257             for field in self._data.keys():
258                 if field in self.read_only_fields:
259                     continue
260                 add_field(field)
261
262         return args
263
264     def refresh(self, only_fields=[]):
265         # Raise error when trying to refresh a task that has not been saved
266         if not self.saved:
267             raise Task.NotSaved("Task needs to be saved to be refreshed")
268
269         # We need to use ID as backup for uuid here for the refreshes
270         # of newly saved tasks. Any other place in the code is fine
271         # with using UUID only.
272         args = [self['uuid'] or self['id'], 'export']
273         new_data = json.loads(self.warrior.execute_command(args)[0])
274         if only_fields:
275             to_update = dict(
276                 [(k, new_data.get(k)) for k in only_fields])
277             self._data.update(to_update)
278             self._original_data.update(to_update)
279         else:
280             self._data = new_data
281             # We need to create a clone for original_data though
282             # Shallow copy is alright, since data dict uses only
283             # primitive data types
284             self._original_data = new_data.copy()
285
286
287 class TaskFilter(object):
288     """
289     A set of parameters to filter the task list with.
290     """
291
292     def __init__(self, filter_params=[]):
293         self.filter_params = filter_params
294
295     def add_filter(self, filter_str):
296         self.filter_params.append(filter_str)
297
298     def add_filter_param(self, key, value):
299         key = key.replace('__', '.')
300
301         # Replace the value with empty string, since that is the
302         # convention in TW for empty values
303         value = value if value is not None else ''
304
305         # If we are filtering by uuid:, do not use uuid keyword
306         # due to TW-1452 bug
307         if key == 'uuid':
308             self.filter_params.insert(0, value)
309         else:
310             self.filter_params.append('{0}:{1}'.format(key, value))
311
312     def get_filter_params(self):
313         return [f for f in self.filter_params if f]
314
315     def clone(self):
316         c = self.__class__()
317         c.filter_params = list(self.filter_params)
318         return c
319
320
321 class TaskQuerySet(object):
322     """
323     Represents a lazy lookup for a task objects.
324     """
325
326     def __init__(self, warrior=None, filter_obj=None):
327         self.warrior = warrior
328         self._result_cache = None
329         self.filter_obj = filter_obj or TaskFilter()
330
331     def __deepcopy__(self, memo):
332         """
333         Deep copy of a QuerySet doesn't populate the cache
334         """
335         obj = self.__class__()
336         for k, v in self.__dict__.items():
337             if k in ('_iter', '_result_cache'):
338                 obj.__dict__[k] = None
339             else:
340                 obj.__dict__[k] = copy.deepcopy(v, memo)
341         return obj
342
343     def __repr__(self):
344         data = list(self[:REPR_OUTPUT_SIZE + 1])
345         if len(data) > REPR_OUTPUT_SIZE:
346             data[-1] = "...(remaining elements truncated)..."
347         return repr(data)
348
349     def __len__(self):
350         if self._result_cache is None:
351             self._result_cache = list(self)
352         return len(self._result_cache)
353
354     def __iter__(self):
355         if self._result_cache is None:
356             self._result_cache = self._execute()
357         return iter(self._result_cache)
358
359     def __getitem__(self, k):
360         if self._result_cache is None:
361             self._result_cache = list(self)
362         return self._result_cache.__getitem__(k)
363
364     def __bool__(self):
365         if self._result_cache is not None:
366             return bool(self._result_cache)
367         try:
368             next(iter(self))
369         except StopIteration:
370             return False
371         return True
372
373     def __nonzero__(self):
374         return type(self).__bool__(self)
375
376     def _clone(self, klass=None, **kwargs):
377         if klass is None:
378             klass = self.__class__
379         filter_obj = self.filter_obj.clone()
380         c = klass(warrior=self.warrior, filter_obj=filter_obj)
381         c.__dict__.update(kwargs)
382         return c
383
384     def _execute(self):
385         """
386         Fetch the tasks which match the current filters.
387         """
388         return self.warrior.filter_tasks(self.filter_obj)
389
390     def all(self):
391         """
392         Returns a new TaskQuerySet that is a copy of the current one.
393         """
394         return self._clone()
395
396     def pending(self):
397         return self.filter(status=PENDING)
398
399     def completed(self):
400         return self.filter(status=COMPLETED)
401
402     def filter(self, *args, **kwargs):
403         """
404         Returns a new TaskQuerySet with the given filters added.
405         """
406         clone = self._clone()
407         for f in args:
408             clone.filter_obj.add_filter(f)
409         for key, value in kwargs.items():
410             clone.filter_obj.add_filter_param(key, value)
411         return clone
412
413     def get(self, **kwargs):
414         """
415         Performs the query and returns a single object matching the given
416         keyword arguments.
417         """
418         clone = self.filter(**kwargs)
419         num = len(clone)
420         if num == 1:
421             return clone._result_cache[0]
422         if not num:
423             raise Task.DoesNotExist(
424                 'Task matching query does not exist. '
425                 'Lookup parameters were {0}'.format(kwargs))
426         raise ValueError(
427             'get() returned more than one Task -- it returned {0}! '
428             'Lookup parameters were {1}'.format(num, kwargs))
429
430
431 class TaskWarrior(object):
432     def __init__(self, data_location='~/.task', create=True):
433         data_location = os.path.expanduser(data_location)
434         if create and not os.path.exists(data_location):
435             os.makedirs(data_location)
436         self.config = {
437             'data.location': os.path.expanduser(data_location),
438         }
439         self.tasks = TaskQuerySet(self)
440         self.version = self._get_version()
441
442     def _get_command_args(self, args, config_override={}):
443         command_args = ['task', 'rc:/']
444         config = self.config.copy()
445         config.update(config_override)
446         for item in config.items():
447             command_args.append('rc.{0}={1}'.format(*item))
448         command_args.extend(map(str, args))
449         return command_args
450
451     def _get_version(self):
452         p = subprocess.Popen(
453                 ['task', '--version'],
454                 stdout=subprocess.PIPE,
455                 stderr=subprocess.PIPE)
456         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
457         return stdout.strip('\n')
458
459     def execute_command(self, args, config_override={}):
460         command_args = self._get_command_args(
461             args, config_override=config_override)
462         logger.debug(' '.join(command_args))
463         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
464                              stderr=subprocess.PIPE)
465         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
466         if p.returncode:
467             if stderr.strip():
468                 error_msg = stderr.strip().splitlines()[-1]
469             else:
470                 error_msg = stdout.strip()
471             raise TaskWarriorException(error_msg)
472         return stdout.strip().split('\n')
473
474     def filter_tasks(self, filter_obj):
475         args = ['export', '--'] + filter_obj.get_filter_params()
476         tasks = []
477         for line in self.execute_command(args):
478             if line:
479                 data = line.strip(',')
480                 try:
481                     tasks.append(Task(self, json.loads(data)))
482                 except ValueError:
483                     raise TaskWarriorException('Invalid JSON: %s' % data)
484         return tasks
485
486     def merge_with(self, path, push=False):
487         path = path.rstrip('/') + '/'
488         self.execute_command(['merge', path], config_override={
489             'merge.autopush': 'yes' if push else 'no',
490         })
491
492     def undo(self):
493         self.execute_command(['undo'], config_override={
494             'confirmation': 'no',
495         })