]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Don't explicitly require six==1.5.2 to avoid VersionConflict error
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32
33     def __getitem__(self, key):
34         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
35                                lambda x: x)
36         return hydrate_func(self._data.get(key))
37
38     def __setitem__(self, key, value):
39         if key in self.read_only_fields:
40             raise RuntimeError('Field \'%s\' is read-only' % key)
41         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
42                                  lambda x: x)
43         self._data[key] = dehydrate_func(value)
44         self._modified_fields.add(key)
45
46     def __str__(self):
47         s = six.text_type(self.__unicode__())
48         if not six.PY3:
49             s = s.encode('utf-8')
50         return s
51
52     def __repr__(self):
53         return str(self)
54
55
56 class TaskAnnotation(TaskResource):
57     read_only_fields = ['entry', 'description']
58
59     def __init__(self, task, data={}):
60         self.task = task
61         self._load_data(data)
62
63     def deserialize_entry(self, data):
64         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
65
66     def serialize_entry(self, date):
67         return date.strftime(DATE_FORMAT) if date else ''
68
69     def remove(self):
70         self.task.remove_annotation(self)
71
72     def __unicode__(self):
73         return self['description']
74
75     __repr__ = __unicode__
76
77
78 class Task(TaskResource):
79     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
80
81     class DoesNotExist(Exception):
82         pass
83
84     class CompletedTask(Exception):
85         """
86         Raised when the operation cannot be performed on the completed task.
87         """
88         pass
89
90     class DeletedTask(Exception):
91         """
92         Raised when the operation cannot be performed on the deleted task.
93         """
94         pass
95
96     class NotSaved(Exception):
97         """
98         Raised when the operation cannot be performed on the task, because
99         it has not been saved to TaskWarrior yet.
100         """
101         pass
102
103     def __init__(self, warrior, data={}, **kwargs):
104         self.warrior = warrior
105
106         # We keep data for backwards compatibility
107         kwargs.update(data)
108
109         self._load_data(kwargs)
110         self._modified_fields = set()
111
112     def __unicode__(self):
113         return self['description']
114
115     @property
116     def completed(self):
117         return self['status'] == six.text_type('completed')
118
119     @property
120     def deleted(self):
121         return self['status'] == six.text_type('deleted')
122
123     @property
124     def waiting(self):
125         return self['status'] == six.text_type('waiting')
126
127     @property
128     def pending(self):
129         return self['status'] == six.text_type('pending')
130
131     @property
132     def saved(self):
133         return self['uuid'] is not None or self['id'] is not None
134
135     def serialize_due(self, date):
136         return date.strftime(DATE_FORMAT)
137
138     def deserialize_due(self, date_str):
139         if not date_str:
140             return None
141         return datetime.datetime.strptime(date_str, DATE_FORMAT)
142
143     def deserialize_annotations(self, data):
144         return [TaskAnnotation(self, d) for d in data] if data else []
145
146     def deserialize_tags(self, tags):
147         if isinstance(tags, basestring):
148             return tags.split(',') if tags else []
149         return tags
150
151     def serialize_tags(self, tags):
152         return ','.join(tags) if tags else ''
153
154     def delete(self):
155         if not self.saved:
156             raise Task.NotSaved("Task needs to be saved before it can be deleted")
157
158         # Refresh the status, and raise exception if the task is deleted
159         self.refresh(only_fields=['status'])
160
161         if self.deleted:
162             raise Task.DeletedTask("Task was already deleted")
163
164         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
165             'confirmation': 'no',
166         })
167
168         # Refresh the status again, so that we have updated info stored
169         self.refresh(only_fields=['status'])
170
171
172     def done(self):
173         if not self.saved:
174             raise Task.NotSaved("Task needs to be saved before it can be completed")
175
176         # Refresh, and raise exception if task is already completed/deleted
177         self.refresh(only_fields=['status'])
178
179         if self.completed:
180             raise Task.CompletedTask("Cannot complete a completed task")
181         elif self.deleted:
182             raise Task.DeletedTask("Deleted task cannot be completed")
183
184         self.warrior.execute_command([self['uuid'], 'done'])
185
186         # Refresh the status again, so that we have updated info stored
187         self.refresh(only_fields=['status'])
188
189     def save(self):
190         args = [self['uuid'], 'modify'] if self.saved else ['add']
191         args.extend(self._get_modified_fields_as_args())
192         output = self.warrior.execute_command(args)
193
194         # Parse out the new ID, if the task is being added for the first time
195         if not self.saved:
196             id_lines = [l for l in output if l.startswith('Created task ')]
197
198             # Complain loudly if it seems that more tasks were created
199             # Should not happen
200             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
201                 raise TaskWarriorException("Unexpected output when creating "
202                                            "task: %s" % '\n'.join(id_lines))
203
204             # Circumvent the ID storage, since ID is considered read-only
205             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
206
207         self._modified_fields.clear()
208         self.refresh()
209
210     def add_annotation(self, annotation):
211         if not self.saved:
212             raise Task.NotSaved("Task needs to be saved to add annotation")
213
214         args = [self['uuid'], 'annotate', annotation]
215         self.warrior.execute_command(args)
216         self.refresh(only_fields=['annotations'])
217
218     def remove_annotation(self, annotation):
219         if not self.saved:
220             raise Task.NotSaved("Task needs to be saved to add annotation")
221
222         if isinstance(annotation, TaskAnnotation):
223             annotation = annotation['description']
224         args = [self['uuid'], 'denotate', annotation]
225         self.warrior.execute_command(args)
226         self.refresh(only_fields=['annotations'])
227
228     def _get_modified_fields_as_args(self):
229         args = []
230
231         def add_field(field):
232             # Task version older than 2.4.0 ignores first word of the
233             # task description if description: prefix is used
234             if self.warrior.version < VERSION_2_4_0 and field == 'description':
235                 args.append(self._data[field])
236             else:
237                 args.append('{0}:{1}'.format(field, self._data[field]))
238
239         # If we're modifying saved task, simply pass on all modified fields
240         if self.saved:
241             for field in self._modified_fields:
242                 add_field(field)
243         # For new tasks, pass all fields that make sense
244         else:
245             for field in self._data.keys():
246                 if field in self.read_only_fields:
247                     continue
248                 add_field(field)
249
250         return args
251
252     def refresh(self, only_fields=[]):
253         # Raise error when trying to refresh a task that has not been saved
254         if not self.saved:
255             raise Task.NotSaved("Task needs to be saved to be refreshed")
256
257         # We need to use ID as backup for uuid here for the refreshes
258         # of newly saved tasks. Any other place in the code is fine
259         # with using UUID only.
260         args = [self['uuid'] or self['id'], 'export']
261         new_data = json.loads(self.warrior.execute_command(args)[0])
262         if only_fields:
263             to_update = dict(
264                 [(k, new_data.get(k)) for k in only_fields])
265             self._data.update(to_update)
266         else:
267             self._data = new_data
268
269
270 class TaskFilter(object):
271     """
272     A set of parameters to filter the task list with.
273     """
274
275     def __init__(self, filter_params=[]):
276         self.filter_params = filter_params
277
278     def add_filter(self, filter_str):
279         self.filter_params.append(filter_str)
280
281     def add_filter_param(self, key, value):
282         key = key.replace('__', '.')
283
284         # Replace the value with empty string, since that is the
285         # convention in TW for empty values
286         value = value if value is not None else ''
287
288         # If we are filtering by uuid:, do not use uuid keyword
289         # due to TW-1452 bug
290         if key == 'uuid':
291             self.filter_params.insert(0, value)
292         else:
293             self.filter_params.append('{0}:{1}'.format(key, value))
294
295     def get_filter_params(self):
296         return [f for f in self.filter_params if f]
297
298     def clone(self):
299         c = self.__class__()
300         c.filter_params = list(self.filter_params)
301         return c
302
303
304 class TaskQuerySet(object):
305     """
306     Represents a lazy lookup for a task objects.
307     """
308
309     def __init__(self, warrior=None, filter_obj=None):
310         self.warrior = warrior
311         self._result_cache = None
312         self.filter_obj = filter_obj or TaskFilter()
313
314     def __deepcopy__(self, memo):
315         """
316         Deep copy of a QuerySet doesn't populate the cache
317         """
318         obj = self.__class__()
319         for k, v in self.__dict__.items():
320             if k in ('_iter', '_result_cache'):
321                 obj.__dict__[k] = None
322             else:
323                 obj.__dict__[k] = copy.deepcopy(v, memo)
324         return obj
325
326     def __repr__(self):
327         data = list(self[:REPR_OUTPUT_SIZE + 1])
328         if len(data) > REPR_OUTPUT_SIZE:
329             data[-1] = "...(remaining elements truncated)..."
330         return repr(data)
331
332     def __len__(self):
333         if self._result_cache is None:
334             self._result_cache = list(self)
335         return len(self._result_cache)
336
337     def __iter__(self):
338         if self._result_cache is None:
339             self._result_cache = self._execute()
340         return iter(self._result_cache)
341
342     def __getitem__(self, k):
343         if self._result_cache is None:
344             self._result_cache = list(self)
345         return self._result_cache.__getitem__(k)
346
347     def __bool__(self):
348         if self._result_cache is not None:
349             return bool(self._result_cache)
350         try:
351             next(iter(self))
352         except StopIteration:
353             return False
354         return True
355
356     def __nonzero__(self):
357         return type(self).__bool__(self)
358
359     def _clone(self, klass=None, **kwargs):
360         if klass is None:
361             klass = self.__class__
362         filter_obj = self.filter_obj.clone()
363         c = klass(warrior=self.warrior, filter_obj=filter_obj)
364         c.__dict__.update(kwargs)
365         return c
366
367     def _execute(self):
368         """
369         Fetch the tasks which match the current filters.
370         """
371         return self.warrior.filter_tasks(self.filter_obj)
372
373     def all(self):
374         """
375         Returns a new TaskQuerySet that is a copy of the current one.
376         """
377         return self._clone()
378
379     def pending(self):
380         return self.filter(status=PENDING)
381
382     def completed(self):
383         return self.filter(status=COMPLETED)
384
385     def filter(self, *args, **kwargs):
386         """
387         Returns a new TaskQuerySet with the given filters added.
388         """
389         clone = self._clone()
390         for f in args:
391             clone.filter_obj.add_filter(f)
392         for key, value in kwargs.items():
393             clone.filter_obj.add_filter_param(key, value)
394         return clone
395
396     def get(self, **kwargs):
397         """
398         Performs the query and returns a single object matching the given
399         keyword arguments.
400         """
401         clone = self.filter(**kwargs)
402         num = len(clone)
403         if num == 1:
404             return clone._result_cache[0]
405         if not num:
406             raise Task.DoesNotExist(
407                 'Task matching query does not exist. '
408                 'Lookup parameters were {0}'.format(kwargs))
409         raise ValueError(
410             'get() returned more than one Task -- it returned {0}! '
411             'Lookup parameters were {1}'.format(num, kwargs))
412
413
414 class TaskWarrior(object):
415     def __init__(self, data_location='~/.task', create=True):
416         data_location = os.path.expanduser(data_location)
417         if create and not os.path.exists(data_location):
418             os.makedirs(data_location)
419         self.config = {
420             'data.location': os.path.expanduser(data_location),
421         }
422         self.tasks = TaskQuerySet(self)
423         self.version = self._get_version()
424
425     def _get_command_args(self, args, config_override={}):
426         command_args = ['task', 'rc:/']
427         config = self.config.copy()
428         config.update(config_override)
429         for item in config.items():
430             command_args.append('rc.{0}={1}'.format(*item))
431         command_args.extend(map(str, args))
432         return command_args
433
434     def _get_version(self):
435         p = subprocess.Popen(
436                 ['task', '--version'],
437                 stdout=subprocess.PIPE,
438                 stderr=subprocess.PIPE)
439         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
440         return stdout.strip('\n')
441
442     def execute_command(self, args, config_override={}):
443         command_args = self._get_command_args(
444             args, config_override=config_override)
445         logger.debug(' '.join(command_args))
446         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
447                              stderr=subprocess.PIPE)
448         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
449         if p.returncode:
450             if stderr.strip():
451                 error_msg = stderr.strip().splitlines()[-1]
452             else:
453                 error_msg = stdout.strip()
454             raise TaskWarriorException(error_msg)
455         return stdout.strip().split('\n')
456
457     def filter_tasks(self, filter_obj):
458         args = ['export', '--'] + filter_obj.get_filter_params()
459         tasks = []
460         for line in self.execute_command(args):
461             if line:
462                 data = line.strip(',')
463                 try:
464                     tasks.append(Task(self, json.loads(data)))
465                 except ValueError:
466                     raise TaskWarriorException('Invalid JSON: %s' % data)
467         return tasks
468
469     def merge_with(self, path, push=False):
470         path = path.rstrip('/') + '/'
471         self.execute_command(['merge', path], config_override={
472             'merge.autopush': 'yes' if push else 'no',
473         })
474
475     def undo(self):
476         self.execute_command(['undo'], config_override={
477             'confirmation': 'no',
478         })