]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

336c050ec12ac56951671332b60ec08fe97f439c
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 VERSION_2_1_0 = six.u('2.1.0')
16 VERSION_2_2_0 = six.u('2.2.0')
17 VERSION_2_3_0 = six.u('2.3.0')
18 VERSION_2_4_0 = six.u('2.4.0')
19
20 logger = logging.getLogger(__name__)
21
22
23 class TaskWarriorException(Exception):
24     pass
25
26
27 class TaskResource(object):
28     read_only_fields = []
29
30     def _load_data(self, data):
31         self._data = data
32
33     def __getitem__(self, key):
34         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
35                                lambda x: x)
36         return hydrate_func(self._data.get(key))
37
38     def __setitem__(self, key, value):
39         if key in self.read_only_fields:
40             raise RuntimeError('Field \'%s\' is read-only' % key)
41         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
42                                  lambda x: x)
43         self._data[key] = dehydrate_func(value)
44         self._modified_fields.add(key)
45
46     def __str__(self):
47         s = six.text_type(self.__unicode__())
48         if not six.PY3:
49             s = s.encode('utf-8')
50         return s
51
52     def __repr__(self):
53         return str(self)
54
55
56 class TaskAnnotation(TaskResource):
57     read_only_fields = ['entry', 'description']
58
59     def __init__(self, task, data={}):
60         self.task = task
61         self._load_data(data)
62
63     def deserialize_entry(self, data):
64         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
65
66     def serialize_entry(self, date):
67         return date.strftime(DATE_FORMAT) if date else ''
68
69     def remove(self):
70         self.task.remove_annotation(self)
71
72     def __unicode__(self):
73         return self['description']
74
75     __repr__ = __unicode__
76
77
78 class Task(TaskResource):
79     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
80
81     class DoesNotExist(Exception):
82         pass
83
84     class CompletedTask(Exception):
85         """
86         Raised when the operation cannot be performed on the completed task.
87         """
88         pass
89
90     class DeletedTask(Exception):
91         """
92         Raised when the operation cannot be performed on the deleted task.
93         """
94         pass
95
96     class NotSaved(Exception):
97         """
98         Raised when the operation cannot be performed on the task, because
99         it has not been saved to TaskWarrior yet.
100         """
101         pass
102
103     def __init__(self, warrior, data={}, **kwargs):
104         self.warrior = warrior
105
106         # We keep data for backwards compatibility
107         kwargs.update(data)
108
109         self._load_data(kwargs)
110         self._modified_fields = set()
111
112     def __unicode__(self):
113         return self['description']
114
115     def __eq__(self, other):
116         return self['uuid'] == other['uuid']
117
118     def __hash__(self):
119         return self['uuid'].__hash__()
120
121     @property
122     def completed(self):
123         return self['status'] == six.text_type('completed')
124
125     @property
126     def deleted(self):
127         return self['status'] == six.text_type('deleted')
128
129     @property
130     def waiting(self):
131         return self['status'] == six.text_type('waiting')
132
133     @property
134     def pending(self):
135         return self['status'] == six.text_type('pending')
136
137     @property
138     def saved(self):
139         return self['uuid'] is not None or self['id'] is not None
140
141     def serialize_due(self, date):
142         return date.strftime(DATE_FORMAT)
143
144     def deserialize_due(self, date_str):
145         if not date_str:
146             return None
147         return datetime.datetime.strptime(date_str, DATE_FORMAT)
148
149     def deserialize_annotations(self, data):
150         return [TaskAnnotation(self, d) for d in data] if data else []
151
152     def deserialize_tags(self, tags):
153         if isinstance(tags, basestring):
154             return tags.split(',') if tags else []
155         return tags
156
157     def serialize_tags(self, tags):
158         return ','.join(tags) if tags else ''
159
160     def delete(self):
161         if not self.saved:
162             raise Task.NotSaved("Task needs to be saved before it can be deleted")
163
164         # Refresh the status, and raise exception if the task is deleted
165         self.refresh(only_fields=['status'])
166
167         if self.deleted:
168             raise Task.DeletedTask("Task was already deleted")
169
170         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
171             'confirmation': 'no',
172         })
173
174         # Refresh the status again, so that we have updated info stored
175         self.refresh(only_fields=['status'])
176
177
178     def done(self):
179         if not self.saved:
180             raise Task.NotSaved("Task needs to be saved before it can be completed")
181
182         # Refresh, and raise exception if task is already completed/deleted
183         self.refresh(only_fields=['status'])
184
185         if self.completed:
186             raise Task.CompletedTask("Cannot complete a completed task")
187         elif self.deleted:
188             raise Task.DeletedTask("Deleted task cannot be completed")
189
190         self.warrior.execute_command([self['uuid'], 'done'])
191
192         # Refresh the status again, so that we have updated info stored
193         self.refresh(only_fields=['status'])
194
195     def save(self):
196         args = [self['uuid'], 'modify'] if self.saved else ['add']
197         args.extend(self._get_modified_fields_as_args())
198         output = self.warrior.execute_command(args)
199
200         # Parse out the new ID, if the task is being added for the first time
201         if not self.saved:
202             id_lines = [l for l in output if l.startswith('Created task ')]
203
204             # Complain loudly if it seems that more tasks were created
205             # Should not happen
206             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
207                 raise TaskWarriorException("Unexpected output when creating "
208                                            "task: %s" % '\n'.join(id_lines))
209
210             # Circumvent the ID storage, since ID is considered read-only
211             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
212
213         self._modified_fields.clear()
214         self.refresh()
215
216     def add_annotation(self, annotation):
217         if not self.saved:
218             raise Task.NotSaved("Task needs to be saved to add annotation")
219
220         args = [self['uuid'], 'annotate', annotation]
221         self.warrior.execute_command(args)
222         self.refresh(only_fields=['annotations'])
223
224     def remove_annotation(self, annotation):
225         if not self.saved:
226             raise Task.NotSaved("Task needs to be saved to add annotation")
227
228         if isinstance(annotation, TaskAnnotation):
229             annotation = annotation['description']
230         args = [self['uuid'], 'denotate', annotation]
231         self.warrior.execute_command(args)
232         self.refresh(only_fields=['annotations'])
233
234     def _get_modified_fields_as_args(self):
235         args = []
236
237         def add_field(field):
238             # Task version older than 2.4.0 ignores first word of the
239             # task description if description: prefix is used
240             if self.warrior.version < VERSION_2_4_0 and field == 'description':
241                 args.append(self._data[field])
242             else:
243                 args.append('{0}:{1}'.format(field, self._data[field]))
244
245         # If we're modifying saved task, simply pass on all modified fields
246         if self.saved:
247             for field in self._modified_fields:
248                 add_field(field)
249         # For new tasks, pass all fields that make sense
250         else:
251             for field in self._data.keys():
252                 if field in self.read_only_fields:
253                     continue
254                 add_field(field)
255
256         return args
257
258     def refresh(self, only_fields=[]):
259         # Raise error when trying to refresh a task that has not been saved
260         if not self.saved:
261             raise Task.NotSaved("Task needs to be saved to be refreshed")
262
263         # We need to use ID as backup for uuid here for the refreshes
264         # of newly saved tasks. Any other place in the code is fine
265         # with using UUID only.
266         args = [self['uuid'] or self['id'], 'export']
267         new_data = json.loads(self.warrior.execute_command(args)[0])
268         if only_fields:
269             to_update = dict(
270                 [(k, new_data.get(k)) for k in only_fields])
271             self._data.update(to_update)
272         else:
273             self._data = new_data
274
275
276 class TaskFilter(object):
277     """
278     A set of parameters to filter the task list with.
279     """
280
281     def __init__(self, filter_params=[]):
282         self.filter_params = filter_params
283
284     def add_filter(self, filter_str):
285         self.filter_params.append(filter_str)
286
287     def add_filter_param(self, key, value):
288         key = key.replace('__', '.')
289
290         # Replace the value with empty string, since that is the
291         # convention in TW for empty values
292         value = value if value is not None else ''
293
294         # If we are filtering by uuid:, do not use uuid keyword
295         # due to TW-1452 bug
296         if key == 'uuid':
297             self.filter_params.insert(0, value)
298         else:
299             self.filter_params.append('{0}:{1}'.format(key, value))
300
301     def get_filter_params(self):
302         return [f for f in self.filter_params if f]
303
304     def clone(self):
305         c = self.__class__()
306         c.filter_params = list(self.filter_params)
307         return c
308
309
310 class TaskQuerySet(object):
311     """
312     Represents a lazy lookup for a task objects.
313     """
314
315     def __init__(self, warrior=None, filter_obj=None):
316         self.warrior = warrior
317         self._result_cache = None
318         self.filter_obj = filter_obj or TaskFilter()
319
320     def __deepcopy__(self, memo):
321         """
322         Deep copy of a QuerySet doesn't populate the cache
323         """
324         obj = self.__class__()
325         for k, v in self.__dict__.items():
326             if k in ('_iter', '_result_cache'):
327                 obj.__dict__[k] = None
328             else:
329                 obj.__dict__[k] = copy.deepcopy(v, memo)
330         return obj
331
332     def __repr__(self):
333         data = list(self[:REPR_OUTPUT_SIZE + 1])
334         if len(data) > REPR_OUTPUT_SIZE:
335             data[-1] = "...(remaining elements truncated)..."
336         return repr(data)
337
338     def __len__(self):
339         if self._result_cache is None:
340             self._result_cache = list(self)
341         return len(self._result_cache)
342
343     def __iter__(self):
344         if self._result_cache is None:
345             self._result_cache = self._execute()
346         return iter(self._result_cache)
347
348     def __getitem__(self, k):
349         if self._result_cache is None:
350             self._result_cache = list(self)
351         return self._result_cache.__getitem__(k)
352
353     def __bool__(self):
354         if self._result_cache is not None:
355             return bool(self._result_cache)
356         try:
357             next(iter(self))
358         except StopIteration:
359             return False
360         return True
361
362     def __nonzero__(self):
363         return type(self).__bool__(self)
364
365     def _clone(self, klass=None, **kwargs):
366         if klass is None:
367             klass = self.__class__
368         filter_obj = self.filter_obj.clone()
369         c = klass(warrior=self.warrior, filter_obj=filter_obj)
370         c.__dict__.update(kwargs)
371         return c
372
373     def _execute(self):
374         """
375         Fetch the tasks which match the current filters.
376         """
377         return self.warrior.filter_tasks(self.filter_obj)
378
379     def all(self):
380         """
381         Returns a new TaskQuerySet that is a copy of the current one.
382         """
383         return self._clone()
384
385     def pending(self):
386         return self.filter(status=PENDING)
387
388     def completed(self):
389         return self.filter(status=COMPLETED)
390
391     def filter(self, *args, **kwargs):
392         """
393         Returns a new TaskQuerySet with the given filters added.
394         """
395         clone = self._clone()
396         for f in args:
397             clone.filter_obj.add_filter(f)
398         for key, value in kwargs.items():
399             clone.filter_obj.add_filter_param(key, value)
400         return clone
401
402     def get(self, **kwargs):
403         """
404         Performs the query and returns a single object matching the given
405         keyword arguments.
406         """
407         clone = self.filter(**kwargs)
408         num = len(clone)
409         if num == 1:
410             return clone._result_cache[0]
411         if not num:
412             raise Task.DoesNotExist(
413                 'Task matching query does not exist. '
414                 'Lookup parameters were {0}'.format(kwargs))
415         raise ValueError(
416             'get() returned more than one Task -- it returned {0}! '
417             'Lookup parameters were {1}'.format(num, kwargs))
418
419
420 class TaskWarrior(object):
421     def __init__(self, data_location='~/.task', create=True):
422         data_location = os.path.expanduser(data_location)
423         if create and not os.path.exists(data_location):
424             os.makedirs(data_location)
425         self.config = {
426             'data.location': os.path.expanduser(data_location),
427         }
428         self.tasks = TaskQuerySet(self)
429         self.version = self._get_version()
430
431     def _get_command_args(self, args, config_override={}):
432         command_args = ['task', 'rc:/']
433         config = self.config.copy()
434         config.update(config_override)
435         for item in config.items():
436             command_args.append('rc.{0}={1}'.format(*item))
437         command_args.extend(map(str, args))
438         return command_args
439
440     def _get_version(self):
441         p = subprocess.Popen(
442                 ['task', '--version'],
443                 stdout=subprocess.PIPE,
444                 stderr=subprocess.PIPE)
445         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
446         return stdout.strip('\n')
447
448     def execute_command(self, args, config_override={}):
449         command_args = self._get_command_args(
450             args, config_override=config_override)
451         logger.debug(' '.join(command_args))
452         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
453                              stderr=subprocess.PIPE)
454         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
455         if p.returncode:
456             if stderr.strip():
457                 error_msg = stderr.strip().splitlines()[-1]
458             else:
459                 error_msg = stdout.strip()
460             raise TaskWarriorException(error_msg)
461         return stdout.strip().split('\n')
462
463     def filter_tasks(self, filter_obj):
464         args = ['export', '--'] + filter_obj.get_filter_params()
465         tasks = []
466         for line in self.execute_command(args):
467             if line:
468                 data = line.strip(',')
469                 try:
470                     tasks.append(Task(self, json.loads(data)))
471                 except ValueError:
472                     raise TaskWarriorException('Invalid JSON: %s' % data)
473         return tasks
474
475     def merge_with(self, path, push=False):
476         path = path.rstrip('/') + '/'
477         self.execute_command(['merge', path], config_override={
478             'merge.autopush': 'yes' if push else 'no',
479         })
480
481     def undo(self):
482         self.execute_command(['undo'], config_override={
483             'confirmation': 'no',
484         })