]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

1202437db2f172672a9102144ec89a50fb9791d5
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency', 'uuid']
75
76     class DoesNotExist(Exception):
77         pass
78
79     class CompletedTask(Exception):
80         """
81         Raised when the operation cannot be performed on the completed task.
82         """
83         pass
84
85     class DeletedTask(Exception):
86         """
87         Raised when the operation cannot be performed on the deleted task.
88         """
89         pass
90
91     def __init__(self, warrior, data={}):
92         self.warrior = warrior
93         self._load_data(data)
94         self._modified_fields = set()
95
96     def __unicode__(self):
97         return self['description']
98
99     @property
100     def completed(self):
101         return self['status'] == six.text_type('completed')
102
103     @property
104     def deleted(self):
105         return self['status'] == six.text_type('deleted')
106
107     @property
108     def waiting(self):
109         return self['status'] == six.text_type('waiting')
110
111     @property
112     def pending(self):
113         return self['status'] == six.text_type('pending')
114
115     @property
116     def saved(self):
117         return self['uuid'] is not None or self['id'] is not None
118
119     def serialize_due(self, date):
120         return date.strftime(DATE_FORMAT)
121
122     def deserialize_due(self, date_str):
123         if not date_str:
124             return None
125         return datetime.datetime.strptime(date_str, DATE_FORMAT)
126
127     def deserialize_annotations(self, data):
128         return [TaskAnnotation(self, d) for d in data] if data else []
129
130     def deserialize_tags(self, tags):
131         if isinstance(tags, basestring):
132             return tags.split(',') if tags else []
133         return tags
134
135     def serialize_tags(self, tags):
136         return ','.join(tags) if tags else ''
137
138     def delete(self):
139         # Refresh the status, and raise exception if the task is deleted
140         self.refresh(only_fields=['status'])
141
142         if self.deleted:
143             raise self.DeletedTask("Task was already deleted")
144
145         self.warrior.execute_command([self['uuid'], 'delete'], config_override={
146             'confirmation': 'no',
147         })
148
149         # Refresh the status again, so that we have updated info stored
150         self.refresh(only_fields=['status'])
151
152
153     def done(self):
154         # Refresh, and raise exception if task is already completed/deleted
155         self.refresh(only_fields=['status'])
156
157         if self.completed:
158             raise self.CompletedTask("Cannot complete a completed task")
159         elif self.deleted:
160             raise self.DeletedTask("Deleted task cannot be completed")
161
162         self.warrior.execute_command([self['uuid'], 'done'])
163
164         # Refresh the status again, so that we have updated info stored
165         self.refresh(only_fields=['status'])
166
167     def save(self):
168         args = [self['uuid'], 'modify'] if self.saved else ['add']
169         args.extend(self._get_modified_fields_as_args())
170         output = self.warrior.execute_command(args)
171
172         # Parse out the new ID, if the task is being added for the first time
173         if not self.saved:
174             id_lines = [l for l in output if l.startswith('Created task ')]
175
176             # Complain loudly if it seems that more tasks were created
177             # Should not happen
178             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
179                 raise TaskWarriorException("Unexpected output when creating "
180                                            "task: %s" % '\n'.join(id_lines))
181
182             # Circumvent the ID storage, since ID is considered read-only
183             self._data['id'] = int(id_lines[0].split(' ')[2].rstrip('.'))
184
185         self._modified_fields.clear()
186         self.refresh()
187
188     def add_annotation(self, annotation):
189         args = [self['uuid'], 'annotate', annotation]
190         self.warrior.execute_command(args)
191         # TODO: This will not work with the tasks that are not yet saved
192         self.refresh(only_fields=['annotations'])
193
194     def remove_annotation(self, annotation):
195         if isinstance(annotation, TaskAnnotation):
196             annotation = annotation['description']
197         args = [self['uuid'], 'denotate', annotation]
198         self.warrior.execute_command(args)
199         # TODO: This will not work with the tasks that are not yet saved
200         self.refresh(only_fields=['annotations'])
201
202     def _get_modified_fields_as_args(self):
203         args = []
204         for field in self._modified_fields:
205             args.append('{}:{}'.format(field, self._data[field]))
206         return args
207
208     def refresh(self, only_fields=[]):
209         # Do not refresh for tasks that are not yet saved in the TW
210         if not self.saved:
211             return
212
213         # We need to use ID as backup for uuid here for the refreshes
214         # of newly saved tasks. Any other place in the code is fine
215         # with using UUID only.
216         args = [self['uuid'] or self['id'], 'export']
217         new_data = json.loads(self.warrior.execute_command(args)[0])
218         if only_fields:
219             to_update = dict(
220                 [(k, new_data.get(k)) for k in only_fields])
221             self._data.update(to_update)
222         else:
223             self._data = new_data
224
225
226 class TaskFilter(object):
227     """
228     A set of parameters to filter the task list with.
229     """
230
231     def __init__(self, filter_params=[]):
232         self.filter_params = filter_params
233
234     def add_filter(self, filter_str):
235         self.filter_params.append(filter_str)
236
237     def add_filter_param(self, key, value):
238         key = key.replace('__', '.')
239
240         # Replace the value with empty string, since that is the
241         # convention in TW for empty values
242         value = value if value is not None else ''
243         self.filter_params.append('{0}:{1}'.format(key, value))
244
245     def get_filter_params(self):
246         return [f for f in self.filter_params if f]
247
248     def clone(self):
249         c = self.__class__()
250         c.filter_params = list(self.filter_params)
251         return c
252
253
254 class TaskQuerySet(object):
255     """
256     Represents a lazy lookup for a task objects.
257     """
258
259     def __init__(self, warrior=None, filter_obj=None):
260         self.warrior = warrior
261         self._result_cache = None
262         self.filter_obj = filter_obj or TaskFilter()
263
264     def __deepcopy__(self, memo):
265         """
266         Deep copy of a QuerySet doesn't populate the cache
267         """
268         obj = self.__class__()
269         for k, v in self.__dict__.items():
270             if k in ('_iter', '_result_cache'):
271                 obj.__dict__[k] = None
272             else:
273                 obj.__dict__[k] = copy.deepcopy(v, memo)
274         return obj
275
276     def __repr__(self):
277         data = list(self[:REPR_OUTPUT_SIZE + 1])
278         if len(data) > REPR_OUTPUT_SIZE:
279             data[-1] = "...(remaining elements truncated)..."
280         return repr(data)
281
282     def __len__(self):
283         if self._result_cache is None:
284             self._result_cache = list(self)
285         return len(self._result_cache)
286
287     def __iter__(self):
288         if self._result_cache is None:
289             self._result_cache = self._execute()
290         return iter(self._result_cache)
291
292     def __getitem__(self, k):
293         if self._result_cache is None:
294             self._result_cache = list(self)
295         return self._result_cache.__getitem__(k)
296
297     def __bool__(self):
298         if self._result_cache is not None:
299             return bool(self._result_cache)
300         try:
301             next(iter(self))
302         except StopIteration:
303             return False
304         return True
305
306     def __nonzero__(self):
307         return type(self).__bool__(self)
308
309     def _clone(self, klass=None, **kwargs):
310         if klass is None:
311             klass = self.__class__
312         filter_obj = self.filter_obj.clone()
313         c = klass(warrior=self.warrior, filter_obj=filter_obj)
314         c.__dict__.update(kwargs)
315         return c
316
317     def _execute(self):
318         """
319         Fetch the tasks which match the current filters.
320         """
321         return self.warrior.filter_tasks(self.filter_obj)
322
323     def all(self):
324         """
325         Returns a new TaskQuerySet that is a copy of the current one.
326         """
327         return self._clone()
328
329     def pending(self):
330         return self.filter(status=PENDING)
331
332     def completed(self):
333         return self.filter(status=COMPLETED)
334
335     def filter(self, *args, **kwargs):
336         """
337         Returns a new TaskQuerySet with the given filters added.
338         """
339         clone = self._clone()
340         for f in args:
341             clone.filter_obj.add_filter(f)
342         for key, value in kwargs.items():
343             clone.filter_obj.add_filter_param(key, value)
344         return clone
345
346     def get(self, **kwargs):
347         """
348         Performs the query and returns a single object matching the given
349         keyword arguments.
350         """
351         clone = self.filter(**kwargs)
352         num = len(clone)
353         if num == 1:
354             return clone._result_cache[0]
355         if not num:
356             raise Task.DoesNotExist(
357                 'Task matching query does not exist. '
358                 'Lookup parameters were {0}'.format(kwargs))
359         raise ValueError(
360             'get() returned more than one Task -- it returned {0}! '
361             'Lookup parameters were {1}'.format(num, kwargs))
362
363
364 class TaskWarrior(object):
365     def __init__(self, data_location='~/.task', create=True):
366         data_location = os.path.expanduser(data_location)
367         if create and not os.path.exists(data_location):
368             os.makedirs(data_location)
369         self.config = {
370             'data.location': os.path.expanduser(data_location),
371         }
372         self.tasks = TaskQuerySet(self)
373
374     def _get_command_args(self, args, config_override={}):
375         command_args = ['task', 'rc:/']
376         config = self.config.copy()
377         config.update(config_override)
378         for item in config.items():
379             command_args.append('rc.{0}={1}'.format(*item))
380         command_args.extend(map(str, args))
381         return command_args
382
383     def execute_command(self, args, config_override={}):
384         command_args = self._get_command_args(
385             args, config_override=config_override)
386         logger.debug(' '.join(command_args))
387         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
388                              stderr=subprocess.PIPE)
389         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
390         if p.returncode:
391             if stderr.strip():
392                 error_msg = stderr.strip().splitlines()[-1]
393             else:
394                 error_msg = stdout.strip()
395             raise TaskWarriorException(error_msg)
396         return stdout.strip().split('\n')
397
398     def filter_tasks(self, filter_obj):
399         args = ['export', '--'] + filter_obj.get_filter_params()
400         tasks = []
401         for line in self.execute_command(args):
402             if line:
403                 data = line.strip(',')
404                 try:
405                     tasks.append(Task(self, json.loads(data)))
406                 except ValueError:
407                     raise TaskWarriorException('Invalid JSON: %s' % data)
408         return tasks
409
410     def merge_with(self, path, push=False):
411         path = path.rstrip('/') + '/'
412         self.execute_command(['merge', path], config_override={
413             'merge.autopush': 'yes' if push else 'no',
414         })
415
416     def undo(self):
417         self.execute_command(['undo'], config_override={
418             'confirmation': 'no',
419         })