]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

TaskWarrior: Detect task version
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32
33     def __getitem__(self, key):
34         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
35                                lambda x: x)
36         return hydrate_func(self._data.get(key))
37
38     def __setitem__(self, key, value):
39         if key in self.read_only_fields:
40             raise RuntimeError('Field \'%s\' is read-only' % key)
41         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
42                                  lambda x: x)
43         self._data[key] = dehydrate_func(value)
44         self._modified_fields.add(key)
45
46     def __str__(self):
47         s = six.text_type(self.__unicode__())
48         if not six.PY3:
49             s = s.encode('utf-8')
50         return s
51
52     def __repr__(self):
53         return str(self)
54
55
56 class TaskAnnotation(TaskResource):
57     read_only_fields = ['entry', 'description']
58
59     def __init__(self, task, data={}):
60         self.task = task
61         self._load_data(data)
62
63     def deserialize_entry(self, data):
64         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
65
66     def serialize_entry(self, date):
67         return date.strftime(DATE_FORMAT) if date else ''
68
69     def remove(self):
70         self.task.remove_annotation(self)
71
72     def __unicode__(self):
73         return self['description']
74
75     __repr__ = __unicode__
76
77
78 class Task(TaskResource):
79     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
80
81     class DoesNotExist(Exception):
82         pass
83
84     class CompletedTask(Exception):
85         """
86         Raised when the operation cannot be performed on the completed task.
87         """
88         pass
89
90     class DeletedTask(Exception):
91         """
92         Raised when the operation cannot be performed on the deleted task.
93         """
94         pass
95
96     class NotSaved(Exception):
97         """
98         Raised when the operation cannot be performed on the task, because
99         it has not been saved to TaskWarrior yet.
100         """
101         pass
102
103     def __init__(self, warrior, data={}, **kwargs):
104         self.warrior = warrior
105
106         # We keep data for backwards compatibility
107         kwargs.update(data)
108
109         self._load_data(kwargs)
110         self._modified_fields = set()
111
112     def __unicode__(self):
113         return self['description']
114
115     @property
116     def completed(self):
117         return self['status'] == six.text_type('completed')
118
119     @property
120     def deleted(self):
121         return self['status'] == six.text_type('deleted')
122
123     @property
124     def waiting(self):
125         return self['status'] == six.text_type('waiting')
126
127     @property
128     def pending(self):
129         return self['status'] == six.text_type('pending')
130
131     @property
132     def saved(self):
133         return self['uuid'] is not None or self['id'] is not None
134
135     def serialize_due(self, date):
136         return date.strftime(DATE_FORMAT)
137
138     def deserialize_due(self, date_str):
139         if not date_str:
140             return None
141         return datetime.datetime.strptime(date_str, DATE_FORMAT)
142
143     def deserialize_annotations(self, data):
144         return [TaskAnnotation(self, d) for d in data] if data else []
145
146     def deserialize_tags(self, tags):
147         if isinstance(tags, basestring):
148             return tags.split(',') if tags else []
149         return tags
150
151     def serialize_tags(self, tags):
152         return ','.join(tags) if tags else ''
153
154     def delete(self):
155         if not self.saved:
156             raise Task.NotSaved("Task needs to be saved before it can be deleted")
157
158         # Refresh the status, and raise exception if the task is deleted
159         self.refresh(only_fields=['status'])
160
161         if self.deleted:
162             raise Task.DeletedTask("Task was already deleted")
163
164         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
165             'confirmation': 'no',
166         })
167
168         # Refresh the status again, so that we have updated info stored
169         self.refresh(only_fields=['status'])
170
171
172     def done(self):
173         if not self.saved:
174             raise Task.NotSaved("Task needs to be saved before it can be completed")
175
176         # Refresh, and raise exception if task is already completed/deleted
177         self.refresh(only_fields=['status'])
178
179         if self.completed:
180             raise Task.CompletedTask("Cannot complete a completed task")
181         elif self.deleted:
182             raise Task.DeletedTask("Deleted task cannot be completed")
183
184         self.warrior.execute_command([self['uuid'], 'done'])
185
186         # Refresh the status again, so that we have updated info stored
187         self.refresh(only_fields=['status'])
188
189     def save(self):
190         args = [self['uuid'], 'modify'] if self.saved else ['add']
191         args.extend(self._get_modified_fields_as_args())
192         output = self.warrior.execute_command(args)
193
194         # Parse out the new ID, if the task is being added for the first time
195         if not self.saved:
196             id_lines = [l for l in output if l.startswith('Created task ')]
197
198             # Complain loudly if it seems that more tasks were created
199             # Should not happen
200             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
201                 raise TaskWarriorException("Unexpected output when creating "
202                                            "task: %s" % '\n'.join(id_lines))
203
204             # Circumvent the ID storage, since ID is considered read-only
205             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
206
207         self._modified_fields.clear()
208         self.refresh()
209
210     def add_annotation(self, annotation):
211         if not self.saved:
212             raise Task.NotSaved("Task needs to be saved to add annotation")
213
214         args = [self['uuid'], 'annotate', annotation]
215         self.warrior.execute_command(args)
216         self.refresh(only_fields=['annotations'])
217
218     def remove_annotation(self, annotation):
219         if not self.saved:
220             raise Task.NotSaved("Task needs to be saved to add annotation")
221
222         if isinstance(annotation, TaskAnnotation):
223             annotation = annotation['description']
224         args = [self['uuid'], 'denotate', annotation]
225         self.warrior.execute_command(args)
226         self.refresh(only_fields=['annotations'])
227
228     def _get_modified_fields_as_args(self):
229         args = []
230
231         # If we're modifying saved task, simply pass on all modified fields
232         if self.saved:
233             for field in self._modified_fields:
234                 args.append('{0}:{1}'.format(field, self._data[field]))
235         # For new tasks, pass all fields that make sense
236         else:
237             for field in self._data.keys():
238                 if field in self.read_only_fields:
239                     continue
240                 args.append('{0}:{1}'.format(field, self._data[field]))
241
242         return args
243
244     def refresh(self, only_fields=[]):
245         # Raise error when trying to refresh a task that has not been saved
246         if not self.saved:
247             raise Task.NotSaved("Task needs to be saved to be refreshed")
248
249         # We need to use ID as backup for uuid here for the refreshes
250         # of newly saved tasks. Any other place in the code is fine
251         # with using UUID only.
252         args = [self['uuid'] or self['id'], 'export']
253         new_data = json.loads(self.warrior.execute_command(args)[0])
254         if only_fields:
255             to_update = dict(
256                 [(k, new_data.get(k)) for k in only_fields])
257             self._data.update(to_update)
258         else:
259             self._data = new_data
260
261
262 class TaskFilter(object):
263     """
264     A set of parameters to filter the task list with.
265     """
266
267     def __init__(self, filter_params=[]):
268         self.filter_params = filter_params
269
270     def add_filter(self, filter_str):
271         self.filter_params.append(filter_str)
272
273     def add_filter_param(self, key, value):
274         key = key.replace('__', '.')
275
276         # Replace the value with empty string, since that is the
277         # convention in TW for empty values
278         value = value if value is not None else ''
279         self.filter_params.append('{0}:{1}'.format(key, value))
280
281     def get_filter_params(self):
282         return [f for f in self.filter_params if f]
283
284     def clone(self):
285         c = self.__class__()
286         c.filter_params = list(self.filter_params)
287         return c
288
289
290 class TaskQuerySet(object):
291     """
292     Represents a lazy lookup for a task objects.
293     """
294
295     def __init__(self, warrior=None, filter_obj=None):
296         self.warrior = warrior
297         self._result_cache = None
298         self.filter_obj = filter_obj or TaskFilter()
299
300     def __deepcopy__(self, memo):
301         """
302         Deep copy of a QuerySet doesn't populate the cache
303         """
304         obj = self.__class__()
305         for k, v in self.__dict__.items():
306             if k in ('_iter', '_result_cache'):
307                 obj.__dict__[k] = None
308             else:
309                 obj.__dict__[k] = copy.deepcopy(v, memo)
310         return obj
311
312     def __repr__(self):
313         data = list(self[:REPR_OUTPUT_SIZE + 1])
314         if len(data) > REPR_OUTPUT_SIZE:
315             data[-1] = "...(remaining elements truncated)..."
316         return repr(data)
317
318     def __len__(self):
319         if self._result_cache is None:
320             self._result_cache = list(self)
321         return len(self._result_cache)
322
323     def __iter__(self):
324         if self._result_cache is None:
325             self._result_cache = self._execute()
326         return iter(self._result_cache)
327
328     def __getitem__(self, k):
329         if self._result_cache is None:
330             self._result_cache = list(self)
331         return self._result_cache.__getitem__(k)
332
333     def __bool__(self):
334         if self._result_cache is not None:
335             return bool(self._result_cache)
336         try:
337             next(iter(self))
338         except StopIteration:
339             return False
340         return True
341
342     def __nonzero__(self):
343         return type(self).__bool__(self)
344
345     def _clone(self, klass=None, **kwargs):
346         if klass is None:
347             klass = self.__class__
348         filter_obj = self.filter_obj.clone()
349         c = klass(warrior=self.warrior, filter_obj=filter_obj)
350         c.__dict__.update(kwargs)
351         return c
352
353     def _execute(self):
354         """
355         Fetch the tasks which match the current filters.
356         """
357         return self.warrior.filter_tasks(self.filter_obj)
358
359     def all(self):
360         """
361         Returns a new TaskQuerySet that is a copy of the current one.
362         """
363         return self._clone()
364
365     def pending(self):
366         return self.filter(status=PENDING)
367
368     def completed(self):
369         return self.filter(status=COMPLETED)
370
371     def filter(self, *args, **kwargs):
372         """
373         Returns a new TaskQuerySet with the given filters added.
374         """
375         clone = self._clone()
376         for f in args:
377             clone.filter_obj.add_filter(f)
378         for key, value in kwargs.items():
379             clone.filter_obj.add_filter_param(key, value)
380         return clone
381
382     def get(self, **kwargs):
383         """
384         Performs the query and returns a single object matching the given
385         keyword arguments.
386         """
387         clone = self.filter(**kwargs)
388         num = len(clone)
389         if num == 1:
390             return clone._result_cache[0]
391         if not num:
392             raise Task.DoesNotExist(
393                 'Task matching query does not exist. '
394                 'Lookup parameters were {0}'.format(kwargs))
395         raise ValueError(
396             'get() returned more than one Task -- it returned {0}! '
397             'Lookup parameters were {1}'.format(num, kwargs))
398
399
400 class TaskWarrior(object):
401     def __init__(self, data_location='~/.task', create=True):
402         data_location = os.path.expanduser(data_location)
403         if create and not os.path.exists(data_location):
404             os.makedirs(data_location)
405         self.config = {
406             'data.location': os.path.expanduser(data_location),
407         }
408         self.tasks = TaskQuerySet(self)
409         self.version = self._get_version()
410
411     def _get_command_args(self, args, config_override={}):
412         command_args = ['task', 'rc:/']
413         config = self.config.copy()
414         config.update(config_override)
415         for item in config.items():
416             command_args.append('rc.{0}={1}'.format(*item))
417         command_args.extend(map(str, args))
418         return command_args
419
420     def _get_version(self):
421         p = subprocess.Popen(
422                 ['task', '--version'],
423                 stdout=subprocess.PIPE,
424                 stderr=subprocess.PIPE)
425         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
426         return stdout.strip('\n')
427
428     def execute_command(self, args, config_override={}):
429         command_args = self._get_command_args(
430             args, config_override=config_override)
431         logger.debug(' '.join(command_args))
432         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
433                              stderr=subprocess.PIPE)
434         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
435         if p.returncode:
436             if stderr.strip():
437                 error_msg = stderr.strip().splitlines()[-1]
438             else:
439                 error_msg = stdout.strip()
440             raise TaskWarriorException(error_msg)
441         return stdout.strip().split('\n')
442
443     def filter_tasks(self, filter_obj):
444         args = ['export', '--'] + filter_obj.get_filter_params()
445         tasks = []
446         for line in self.execute_command(args):
447             if line:
448                 data = line.strip(',')
449                 try:
450                     tasks.append(Task(self, json.loads(data)))
451                 except ValueError:
452                     raise TaskWarriorException('Invalid JSON: %s' % data)
453         return tasks
454
455     def merge_with(self, path, push=False):
456         path = path.rstrip('/') + '/'
457         self.execute_command(['merge', path], config_override={
458             'merge.autopush': 'yes' if push else 'no',
459         })
460
461     def undo(self):
462         self.execute_command(['undo'], config_override={
463             'confirmation': 'no',
464         })