]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

36f3c3b13054e4bdab5870ec8e663c89535c5489
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
75
76     class DoesNotExist(Exception):
77         pass
78
79     class CompletedTask(Exception):
80         """
81         Raised when the operation cannot be performed on the completed task.
82         """
83         pass
84
85     class DeletedTask(Exception):
86         """
87         Raised when the operation cannot be performed on the deleted task.
88         """
89         pass
90
91     def __init__(self, warrior, data={}):
92         self.warrior = warrior
93         self._load_data(data)
94         self._modified_fields = set()
95
96     def __unicode__(self):
97         return self['description']
98
99     @property
100     def completed(self):
101         return self['status'] == six.text_type('completed')
102
103     @property
104     def deleted(self):
105         return self['status'] == six.text_type('deleted')
106
107     @property
108     def waiting(self):
109         return self['status'] == six.text_type('waiting')
110
111     @property
112     def pending(self):
113         return self['status'] == six.text_type('pending')
114
115     @property
116     def saved(self):
117         return self['uuid'] is not None or self['id'] is not None
118
119     def serialize_due(self, date):
120         return date.strftime(DATE_FORMAT)
121
122     def deserialize_due(self, date_str):
123         if not date_str:
124             return None
125         return datetime.datetime.strptime(date_str, DATE_FORMAT)
126
127     def deserialize_annotations(self, data):
128         return [TaskAnnotation(self, d) for d in data] if data else []
129
130     def deserialize_tags(self, tags):
131         if isinstance(tags, basestring):
132             return tags.split(',') if tags else []
133         return tags
134
135     def serialize_tags(self, tags):
136         return ','.join(tags) if tags else ''
137
138     def delete(self):
139         # Refresh the status, and raise exception if the task is deleted
140         self.refresh(only_fields=['status'])
141
142         if self.deleted:
143             raise self.DeletedTask("Task was already deleted")
144
145         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
146             'confirmation': 'no',
147         })
148
149         # Refresh the status again, so that we have updated info stored
150         self.refresh(only_fields=['status'])
151
152
153     def done(self):
154         # Refresh, and raise exception if task is already completed/deleted
155         self.refresh(only_fields=['status'])
156
157         if self.completed:
158             raise self.CompletedTask("Cannot complete a completed task")
159         elif self.deleted:
160             raise self.DeletedTask("Deleted task cannot be completed")
161
162         self.warrior.execute_command([self['uuid'], 'done'])
163
164         # Refresh the status again, so that we have updated info stored
165         self.refresh(only_fields=['status'])
166
167     def save(self):
168         args = [self['uuid'], 'modify'] if self.saved else ['add']
169         args.extend(self._get_modified_fields_as_args())
170         output = self.warrior.execute_command(args)
171
172         # Parse out the new ID, if the task is being added for the first time
173         if not self.saved:
174             id_lines = [l for l in output if l.startswith('Created task ')]
175
176             # Complain loudly if it seems that more tasks were created
177             # Should not happen
178             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
179                 raise TaskWarriorException("Unexpected output when creating "
180                                            "task: %s" % '\n'.join(id_lines))
181
182             # Circumvent the ID storage, since ID is considered read-only
183             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
184
185         self._modified_fields.clear()
186         self.refresh()
187
188     def add_annotation(self, annotation):
189         args = [self['uuid'], 'annotate', annotation]
190         self.warrior.execute_command(args)
191         # TODO: This will not work with the tasks that are not yet saved
192         self.refresh(only_fields=['annotations'])
193
194     def remove_annotation(self, annotation):
195         if isinstance(annotation, TaskAnnotation):
196             annotation = annotation['description']
197         args = [self['uuid'], 'denotate', annotation]
198         self.warrior.execute_command(args)
199         # TODO: This will not work with the tasks that are not yet saved
200         self.refresh(only_fields=['annotations'])
201
202     def _get_modified_fields_as_args(self):
203         args = []
204
205         # If we're modifying saved task, simply pass on all modified fields
206         if self.saved:
207             for field in self._modified_fields:
208                 args.append('{0}:{1}'.format(field, self._data[field]))
209         # For new tasks, pass all fields that make sense
210         else:
211             for field in self._data.keys():
212                 if field in self.read_only_fields:
213                     continue
214                 args.append('{0}:{1}'.format(field, self._data[field]))
215
216         return args
217
218     def refresh(self, only_fields=[]):
219         # Do not refresh for tasks that are not yet saved in the TW
220         if not self.saved:
221             return
222
223         # We need to use ID as backup for uuid here for the refreshes
224         # of newly saved tasks. Any other place in the code is fine
225         # with using UUID only.
226         args = [self['uuid'] or self['id'], 'export']
227         new_data = json.loads(self.warrior.execute_command(args)[0])
228         if only_fields:
229             to_update = dict(
230                 [(k, new_data.get(k)) for k in only_fields])
231             self._data.update(to_update)
232         else:
233             self._data = new_data
234
235
236 class TaskFilter(object):
237     """
238     A set of parameters to filter the task list with.
239     """
240
241     def __init__(self, filter_params=[]):
242         self.filter_params = filter_params
243
244     def add_filter(self, filter_str):
245         self.filter_params.append(filter_str)
246
247     def add_filter_param(self, key, value):
248         key = key.replace('__', '.')
249
250         # Replace the value with empty string, since that is the
251         # convention in TW for empty values
252         value = value if value is not None else ''
253         self.filter_params.append('{0}:{1}'.format(key, value))
254
255     def get_filter_params(self):
256         return [f for f in self.filter_params if f]
257
258     def clone(self):
259         c = self.__class__()
260         c.filter_params = list(self.filter_params)
261         return c
262
263
264 class TaskQuerySet(object):
265     """
266     Represents a lazy lookup for a task objects.
267     """
268
269     def __init__(self, warrior=None, filter_obj=None):
270         self.warrior = warrior
271         self._result_cache = None
272         self.filter_obj = filter_obj or TaskFilter()
273
274     def __deepcopy__(self, memo):
275         """
276         Deep copy of a QuerySet doesn't populate the cache
277         """
278         obj = self.__class__()
279         for k, v in self.__dict__.items():
280             if k in ('_iter', '_result_cache'):
281                 obj.__dict__[k] = None
282             else:
283                 obj.__dict__[k] = copy.deepcopy(v, memo)
284         return obj
285
286     def __repr__(self):
287         data = list(self[:REPR_OUTPUT_SIZE + 1])
288         if len(data) > REPR_OUTPUT_SIZE:
289             data[-1] = "...(remaining elements truncated)..."
290         return repr(data)
291
292     def __len__(self):
293         if self._result_cache is None:
294             self._result_cache = list(self)
295         return len(self._result_cache)
296
297     def __iter__(self):
298         if self._result_cache is None:
299             self._result_cache = self._execute()
300         return iter(self._result_cache)
301
302     def __getitem__(self, k):
303         if self._result_cache is None:
304             self._result_cache = list(self)
305         return self._result_cache.__getitem__(k)
306
307     def __bool__(self):
308         if self._result_cache is not None:
309             return bool(self._result_cache)
310         try:
311             next(iter(self))
312         except StopIteration:
313             return False
314         return True
315
316     def __nonzero__(self):
317         return type(self).__bool__(self)
318
319     def _clone(self, klass=None, **kwargs):
320         if klass is None:
321             klass = self.__class__
322         filter_obj = self.filter_obj.clone()
323         c = klass(warrior=self.warrior, filter_obj=filter_obj)
324         c.__dict__.update(kwargs)
325         return c
326
327     def _execute(self):
328         """
329         Fetch the tasks which match the current filters.
330         """
331         return self.warrior.filter_tasks(self.filter_obj)
332
333     def all(self):
334         """
335         Returns a new TaskQuerySet that is a copy of the current one.
336         """
337         return self._clone()
338
339     def pending(self):
340         return self.filter(status=PENDING)
341
342     def completed(self):
343         return self.filter(status=COMPLETED)
344
345     def filter(self, *args, **kwargs):
346         """
347         Returns a new TaskQuerySet with the given filters added.
348         """
349         clone = self._clone()
350         for f in args:
351             clone.filter_obj.add_filter(f)
352         for key, value in kwargs.items():
353             clone.filter_obj.add_filter_param(key, value)
354         return clone
355
356     def get(self, **kwargs):
357         """
358         Performs the query and returns a single object matching the given
359         keyword arguments.
360         """
361         clone = self.filter(**kwargs)
362         num = len(clone)
363         if num == 1:
364             return clone._result_cache[0]
365         if not num:
366             raise Task.DoesNotExist(
367                 'Task matching query does not exist. '
368                 'Lookup parameters were {0}'.format(kwargs))
369         raise ValueError(
370             'get() returned more than one Task -- it returned {0}! '
371             'Lookup parameters were {1}'.format(num, kwargs))
372
373
374 class TaskWarrior(object):
375     def __init__(self, data_location='~/.task', create=True):
376         data_location = os.path.expanduser(data_location)
377         if create and not os.path.exists(data_location):
378             os.makedirs(data_location)
379         self.config = {
380             'data.location': os.path.expanduser(data_location),
381         }
382         self.tasks = TaskQuerySet(self)
383
384     def _get_command_args(self, args, config_override={}):
385         command_args = ['task', 'rc:/']
386         config = self.config.copy()
387         config.update(config_override)
388         for item in config.items():
389             command_args.append('rc.{0}={1}'.format(*item))
390         command_args.extend(map(str, args))
391         return command_args
392
393     def execute_command(self, args, config_override={}):
394         command_args = self._get_command_args(
395             args, config_override=config_override)
396         logger.debug(' '.join(command_args))
397         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
398                              stderr=subprocess.PIPE)
399         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
400         if p.returncode:
401             if stderr.strip():
402                 error_msg = stderr.strip().splitlines()[-1]
403             else:
404                 error_msg = stdout.strip()
405             raise TaskWarriorException(error_msg)
406         return stdout.strip().split('\n')
407
408     def filter_tasks(self, filter_obj):
409         args = ['export', '--'] + filter_obj.get_filter_params()
410         tasks = []
411         for line in self.execute_command(args):
412             if line:
413                 data = line.strip(',')
414                 try:
415                     tasks.append(Task(self, json.loads(data)))
416                 except ValueError:
417                     raise TaskWarriorException('Invalid JSON: %s' % data)
418         return tasks
419
420     def merge_with(self, path, push=False):
421         path = path.rstrip('/') + '/'
422         self.execute_command(['merge', path], config_override={
423             'merge.autopush': 'yes' if push else 'no',
424         })
425
426     def undo(self):
427         self.execute_command(['undo'], config_override={
428             'confirmation': 'no',
429         })