]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Task: Detect modified fields by actual modifications
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32         self._original_data = data
33
34     def __getitem__(self, key):
35         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
36                                lambda x: x)
37         return hydrate_func(self._data.get(key))
38
39     def __setitem__(self, key, value):
40         if key in self.read_only_fields:
41             raise RuntimeError('Field \'%s\' is read-only' % key)
42         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
43                                  lambda x: x)
44         self._data[key] = dehydrate_func(value)
45
46     def __str__(self):
47         s = six.text_type(self.__unicode__())
48         if not six.PY3:
49             s = s.encode('utf-8')
50         return s
51
52     def __repr__(self):
53         return str(self)
54
55
56 class TaskAnnotation(TaskResource):
57     read_only_fields = ['entry', 'description']
58
59     def __init__(self, task, data={}):
60         self.task = task
61         self._load_data(data)
62
63     def deserialize_entry(self, data):
64         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
65
66     def serialize_entry(self, date):
67         return date.strftime(DATE_FORMAT) if date else ''
68
69     def remove(self):
70         self.task.remove_annotation(self)
71
72     def __unicode__(self):
73         return self['description']
74
75     __repr__ = __unicode__
76
77
78 class Task(TaskResource):
79     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
80
81     class DoesNotExist(Exception):
82         pass
83
84     class CompletedTask(Exception):
85         """
86         Raised when the operation cannot be performed on the completed task.
87         """
88         pass
89
90     class DeletedTask(Exception):
91         """
92         Raised when the operation cannot be performed on the deleted task.
93         """
94         pass
95
96     class NotSaved(Exception):
97         """
98         Raised when the operation cannot be performed on the task, because
99         it has not been saved to TaskWarrior yet.
100         """
101         pass
102
103     def __init__(self, warrior, data={}, **kwargs):
104         self.warrior = warrior
105
106         # We keep data for backwards compatibility
107         kwargs.update(data)
108
109         self._load_data(kwargs)
110
111     def __unicode__(self):
112         return self['description']
113
114     def __eq__(self, other):
115         return self['uuid'] == other['uuid']
116
117     def __hash__(self):
118         return self['uuid'].__hash__()
119
120     @property
121     def _modified_fields(self):
122         for key in self._data.keys():
123             if self._data.get(key) != self._original_data.get(key):
124                 yield key
125
126     @property
127     def completed(self):
128         return self['status'] == six.text_type('completed')
129
130     @property
131     def deleted(self):
132         return self['status'] == six.text_type('deleted')
133
134     @property
135     def waiting(self):
136         return self['status'] == six.text_type('waiting')
137
138     @property
139     def pending(self):
140         return self['status'] == six.text_type('pending')
141
142     @property
143     def saved(self):
144         return self['uuid'] is not None or self['id'] is not None
145
146     def serialize_due(self, date):
147         return date.strftime(DATE_FORMAT)
148
149     def deserialize_due(self, date_str):
150         if not date_str:
151             return None
152         return datetime.datetime.strptime(date_str, DATE_FORMAT)
153
154     def deserialize_annotations(self, data):
155         return [TaskAnnotation(self, d) for d in data] if data else []
156
157     def deserialize_tags(self, tags):
158         if isinstance(tags, basestring):
159             return tags.split(',') if tags else []
160         return tags
161
162     def serialize_tags(self, tags):
163         return ','.join(tags) if tags else ''
164
165     def delete(self):
166         if not self.saved:
167             raise Task.NotSaved("Task needs to be saved before it can be deleted")
168
169         # Refresh the status, and raise exception if the task is deleted
170         self.refresh(only_fields=['status'])
171
172         if self.deleted:
173             raise Task.DeletedTask("Task was already deleted")
174
175         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
176             'confirmation': 'no',
177         })
178
179         # Refresh the status again, so that we have updated info stored
180         self.refresh(only_fields=['status'])
181
182
183     def done(self):
184         if not self.saved:
185             raise Task.NotSaved("Task needs to be saved before it can be completed")
186
187         # Refresh, and raise exception if task is already completed/deleted
188         self.refresh(only_fields=['status'])
189
190         if self.completed:
191             raise Task.CompletedTask("Cannot complete a completed task")
192         elif self.deleted:
193             raise Task.DeletedTask("Deleted task cannot be completed")
194
195         self.warrior.execute_command([self['uuid'], 'done'])
196
197         # Refresh the status again, so that we have updated info stored
198         self.refresh(only_fields=['status'])
199
200     def save(self):
201         args = [self['uuid'], 'modify'] if self.saved else ['add']
202         args.extend(self._get_modified_fields_as_args())
203         output = self.warrior.execute_command(args)
204
205         # Parse out the new ID, if the task is being added for the first time
206         if not self.saved:
207             id_lines = [l for l in output if l.startswith('Created task ')]
208
209             # Complain loudly if it seems that more tasks were created
210             # Should not happen
211             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
212                 raise TaskWarriorException("Unexpected output when creating "
213                                            "task: %s" % '\n'.join(id_lines))
214
215             # Circumvent the ID storage, since ID is considered read-only
216             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
217
218         self.refresh()
219
220     def add_annotation(self, annotation):
221         if not self.saved:
222             raise Task.NotSaved("Task needs to be saved to add annotation")
223
224         args = [self['uuid'], 'annotate', annotation]
225         self.warrior.execute_command(args)
226         self.refresh(only_fields=['annotations'])
227
228     def remove_annotation(self, annotation):
229         if not self.saved:
230             raise Task.NotSaved("Task needs to be saved to add annotation")
231
232         if isinstance(annotation, TaskAnnotation):
233             annotation = annotation['description']
234         args = [self['uuid'], 'denotate', annotation]
235         self.warrior.execute_command(args)
236         self.refresh(only_fields=['annotations'])
237
238     def _get_modified_fields_as_args(self):
239         args = []
240
241         def add_field(field):
242             # Task version older than 2.4.0 ignores first word of the
243             # task description if description: prefix is used
244             if self.warrior.version < VERSION_2_4_0 and field == 'description':
245                 args.append(self._data[field])
246             else:
247                 args.append('{0}:{1}'.format(field, self._data[field]))
248
249         # If we're modifying saved task, simply pass on all modified fields
250         if self.saved:
251             for field in self._modified_fields:
252                 add_field(field)
253         # For new tasks, pass all fields that make sense
254         else:
255             for field in self._data.keys():
256                 if field in self.read_only_fields:
257                     continue
258                 add_field(field)
259
260         return args
261
262     def refresh(self, only_fields=[]):
263         # Raise error when trying to refresh a task that has not been saved
264         if not self.saved:
265             raise Task.NotSaved("Task needs to be saved to be refreshed")
266
267         # We need to use ID as backup for uuid here for the refreshes
268         # of newly saved tasks. Any other place in the code is fine
269         # with using UUID only.
270         args = [self['uuid'] or self['id'], 'export']
271         new_data = json.loads(self.warrior.execute_command(args)[0])
272         if only_fields:
273             to_update = dict(
274                 [(k, new_data.get(k)) for k in only_fields])
275             self._data.update(to_update)
276             self._original_data.update(to_update)
277         else:
278             self._data = new_data
279             self._original_data = new_data
280
281
282 class TaskFilter(object):
283     """
284     A set of parameters to filter the task list with.
285     """
286
287     def __init__(self, filter_params=[]):
288         self.filter_params = filter_params
289
290     def add_filter(self, filter_str):
291         self.filter_params.append(filter_str)
292
293     def add_filter_param(self, key, value):
294         key = key.replace('__', '.')
295
296         # Replace the value with empty string, since that is the
297         # convention in TW for empty values
298         value = value if value is not None else ''
299
300         # If we are filtering by uuid:, do not use uuid keyword
301         # due to TW-1452 bug
302         if key == 'uuid':
303             self.filter_params.insert(0, value)
304         else:
305             self.filter_params.append('{0}:{1}'.format(key, value))
306
307     def get_filter_params(self):
308         return [f for f in self.filter_params if f]
309
310     def clone(self):
311         c = self.__class__()
312         c.filter_params = list(self.filter_params)
313         return c
314
315
316 class TaskQuerySet(object):
317     """
318     Represents a lazy lookup for a task objects.
319     """
320
321     def __init__(self, warrior=None, filter_obj=None):
322         self.warrior = warrior
323         self._result_cache = None
324         self.filter_obj = filter_obj or TaskFilter()
325
326     def __deepcopy__(self, memo):
327         """
328         Deep copy of a QuerySet doesn't populate the cache
329         """
330         obj = self.__class__()
331         for k, v in self.__dict__.items():
332             if k in ('_iter', '_result_cache'):
333                 obj.__dict__[k] = None
334             else:
335                 obj.__dict__[k] = copy.deepcopy(v, memo)
336         return obj
337
338     def __repr__(self):
339         data = list(self[:REPR_OUTPUT_SIZE + 1])
340         if len(data) > REPR_OUTPUT_SIZE:
341             data[-1] = "...(remaining elements truncated)..."
342         return repr(data)
343
344     def __len__(self):
345         if self._result_cache is None:
346             self._result_cache = list(self)
347         return len(self._result_cache)
348
349     def __iter__(self):
350         if self._result_cache is None:
351             self._result_cache = self._execute()
352         return iter(self._result_cache)
353
354     def __getitem__(self, k):
355         if self._result_cache is None:
356             self._result_cache = list(self)
357         return self._result_cache.__getitem__(k)
358
359     def __bool__(self):
360         if self._result_cache is not None:
361             return bool(self._result_cache)
362         try:
363             next(iter(self))
364         except StopIteration:
365             return False
366         return True
367
368     def __nonzero__(self):
369         return type(self).__bool__(self)
370
371     def _clone(self, klass=None, **kwargs):
372         if klass is None:
373             klass = self.__class__
374         filter_obj = self.filter_obj.clone()
375         c = klass(warrior=self.warrior, filter_obj=filter_obj)
376         c.__dict__.update(kwargs)
377         return c
378
379     def _execute(self):
380         """
381         Fetch the tasks which match the current filters.
382         """
383         return self.warrior.filter_tasks(self.filter_obj)
384
385     def all(self):
386         """
387         Returns a new TaskQuerySet that is a copy of the current one.
388         """
389         return self._clone()
390
391     def pending(self):
392         return self.filter(status=PENDING)
393
394     def completed(self):
395         return self.filter(status=COMPLETED)
396
397     def filter(self, *args, **kwargs):
398         """
399         Returns a new TaskQuerySet with the given filters added.
400         """
401         clone = self._clone()
402         for f in args:
403             clone.filter_obj.add_filter(f)
404         for key, value in kwargs.items():
405             clone.filter_obj.add_filter_param(key, value)
406         return clone
407
408     def get(self, **kwargs):
409         """
410         Performs the query and returns a single object matching the given
411         keyword arguments.
412         """
413         clone = self.filter(**kwargs)
414         num = len(clone)
415         if num == 1:
416             return clone._result_cache[0]
417         if not num:
418             raise Task.DoesNotExist(
419                 'Task matching query does not exist. '
420                 'Lookup parameters were {0}'.format(kwargs))
421         raise ValueError(
422             'get() returned more than one Task -- it returned {0}! '
423             'Lookup parameters were {1}'.format(num, kwargs))
424
425
426 class TaskWarrior(object):
427     def __init__(self, data_location='~/.task', create=True):
428         data_location = os.path.expanduser(data_location)
429         if create and not os.path.exists(data_location):
430             os.makedirs(data_location)
431         self.config = {
432             'data.location': os.path.expanduser(data_location),
433         }
434         self.tasks = TaskQuerySet(self)
435         self.version = self._get_version()
436
437     def _get_command_args(self, args, config_override={}):
438         command_args = ['task', 'rc:/']
439         config = self.config.copy()
440         config.update(config_override)
441         for item in config.items():
442             command_args.append('rc.{0}={1}'.format(*item))
443         command_args.extend(map(str, args))
444         return command_args
445
446     def _get_version(self):
447         p = subprocess.Popen(
448                 ['task', '--version'],
449                 stdout=subprocess.PIPE,
450                 stderr=subprocess.PIPE)
451         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
452         return stdout.strip('\n')
453
454     def execute_command(self, args, config_override={}):
455         command_args = self._get_command_args(
456             args, config_override=config_override)
457         logger.debug(' '.join(command_args))
458         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
459                              stderr=subprocess.PIPE)
460         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
461         if p.returncode:
462             if stderr.strip():
463                 error_msg = stderr.strip().splitlines()[-1]
464             else:
465                 error_msg = stdout.strip()
466             raise TaskWarriorException(error_msg)
467         return stdout.strip().split('\n')
468
469     def filter_tasks(self, filter_obj):
470         args = ['export', '--'] + filter_obj.get_filter_params()
471         tasks = []
472         for line in self.execute_command(args):
473             if line:
474                 data = line.strip(',')
475                 try:
476                     tasks.append(Task(self, json.loads(data)))
477                 except ValueError:
478                     raise TaskWarriorException('Invalid JSON: %s' % data)
479         return tasks
480
481     def merge_with(self, path, push=False):
482         path = path.rstrip('/') + '/'
483         self.execute_command(['merge', path], config_override={
484             'merge.autopush': 'yes' if push else 'no',
485         })
486
487     def undo(self):
488         self.execute_command(['undo'], config_override={
489             'confirmation': 'no',
490         })