]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

c72a9b307ec9769d73081d92b4ae221c07a5a881
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency']
75
76     class DoesNotExist(Exception):
77         pass
78
79     class CompletedTask(Exception):
80         """
81         Raised when the operation cannot be performed on the completed task.
82         """
83         pass
84
85     class DeletedTask(Exception):
86         """
87         Raised when the operation cannot be performed on the deleted task.
88         """
89         pass
90
91     def __init__(self, warrior, data={}):
92         self.warrior = warrior
93         self._load_data(data)
94         self._modified_fields = set()
95
96     def __unicode__(self):
97         return self['description']
98
99     @property
100     def completed(self):
101         return self['status'] == six.text_type('completed')
102
103     @property
104     def deleted(self):
105         return self['status'] == six.text_type('deleted')
106
107     @property
108     def waiting(self):
109         return self['status'] == six.text_type('waiting')
110
111     @property
112     def pending(self):
113         return self['status'] == six.text_type('pending')
114
115     def serialize_due(self, date):
116         return date.strftime(DATE_FORMAT)
117
118     def deserialize_due(self, date_str):
119         if not date_str:
120             return None
121         return datetime.datetime.strptime(date_str, DATE_FORMAT)
122
123     def deserialize_annotations(self, data):
124         return [TaskAnnotation(self, d) for d in data] if data else []
125
126     def deserialize_tags(self, tags):
127         if isinstance(tags, basestring):
128             return tags.split(',') if tags else []
129         return tags
130
131     def serialize_tags(self, tags):
132         return ','.join(tags) if tags else ''
133
134     def delete(self):
135         # Refresh the status, and raise exception if the task is deleted
136         self.refresh(only_fields=['status'])
137
138         if self.deleted:
139             raise self.DeletedTask("Task was already deleted")
140
141         self.warrior.execute_command([self['id'], 'delete'], config_override={
142             'confirmation': 'no',
143         })
144
145         # Refresh the status again, so that we have updated info stored
146         self.refresh(only_fields=['status'])
147
148
149     def done(self):
150         # Refresh, and raise exception if task is already completed/deleted
151         self.refresh(only_fields=['status'])
152
153         if self.completed:
154             raise self.CompletedTask("Cannot complete a completed task")
155         elif self.deleted:
156             raise self.DeletedTask("Deleted task cannot be completed")
157
158         self.warrior.execute_command([self['id'], 'done'])
159
160         # Refresh the status again, so that we have updated info stored
161         self.refresh(only_fields=['status'])
162
163     def save(self):
164         args = [self['id'], 'modify'] if self['id'] else ['add']
165         args.extend(self._get_modified_fields_as_args())
166         self.warrior.execute_command(args)
167         self._modified_fields.clear()
168
169     def add_annotation(self, annotation):
170         args = [self['id'], 'annotate', annotation]
171         self.warrior.execute_command(args)
172         self.refresh(only_fields=['annotations'])
173
174     def remove_annotation(self, annotation):
175         if isinstance(annotation, TaskAnnotation):
176             annotation = annotation['description']
177         args = [self['id'], 'denotate', annotation]
178         self.warrior.execute_command(args)
179         self.refresh(only_fields=['annotations'])
180
181     def _get_modified_fields_as_args(self):
182         args = []
183         for field in self._modified_fields:
184             args.append('{}:{}'.format(field, self._data[field]))
185         return args
186
187     def refresh(self, only_fields=[]):
188         args = [self['uuid'], 'export']
189         new_data = json.loads(self.warrior.execute_command(args)[0])
190         if only_fields:
191             to_update = dict(
192                 [(k, new_data.get(k)) for k in only_fields])
193             self._data.update(to_update)
194         else:
195             self._data = new_data
196
197
198 class TaskFilter(object):
199     """
200     A set of parameters to filter the task list with.
201     """
202
203     def __init__(self, filter_params=[]):
204         self.filter_params = filter_params
205
206     def add_filter(self, filter_str):
207         self.filter_params.append(filter_str)
208
209     def add_filter_param(self, key, value):
210         key = key.replace('__', '.')
211
212         # Replace the value with empty string, since that is the
213         # convention in TW for empty values
214         value = value if value is not None else ''
215         self.filter_params.append('{0}:{1}'.format(key, value))
216
217     def get_filter_params(self):
218         return [f for f in self.filter_params if f]
219
220     def clone(self):
221         c = self.__class__()
222         c.filter_params = list(self.filter_params)
223         return c
224
225
226 class TaskQuerySet(object):
227     """
228     Represents a lazy lookup for a task objects.
229     """
230
231     def __init__(self, warrior=None, filter_obj=None):
232         self.warrior = warrior
233         self._result_cache = None
234         self.filter_obj = filter_obj or TaskFilter()
235
236     def __deepcopy__(self, memo):
237         """
238         Deep copy of a QuerySet doesn't populate the cache
239         """
240         obj = self.__class__()
241         for k, v in self.__dict__.items():
242             if k in ('_iter', '_result_cache'):
243                 obj.__dict__[k] = None
244             else:
245                 obj.__dict__[k] = copy.deepcopy(v, memo)
246         return obj
247
248     def __repr__(self):
249         data = list(self[:REPR_OUTPUT_SIZE + 1])
250         if len(data) > REPR_OUTPUT_SIZE:
251             data[-1] = "...(remaining elements truncated)..."
252         return repr(data)
253
254     def __len__(self):
255         if self._result_cache is None:
256             self._result_cache = list(self)
257         return len(self._result_cache)
258
259     def __iter__(self):
260         if self._result_cache is None:
261             self._result_cache = self._execute()
262         return iter(self._result_cache)
263
264     def __getitem__(self, k):
265         if self._result_cache is None:
266             self._result_cache = list(self)
267         return self._result_cache.__getitem__(k)
268
269     def __bool__(self):
270         if self._result_cache is not None:
271             return bool(self._result_cache)
272         try:
273             next(iter(self))
274         except StopIteration:
275             return False
276         return True
277
278     def __nonzero__(self):
279         return type(self).__bool__(self)
280
281     def _clone(self, klass=None, **kwargs):
282         if klass is None:
283             klass = self.__class__
284         filter_obj = self.filter_obj.clone()
285         c = klass(warrior=self.warrior, filter_obj=filter_obj)
286         c.__dict__.update(kwargs)
287         return c
288
289     def _execute(self):
290         """
291         Fetch the tasks which match the current filters.
292         """
293         return self.warrior.filter_tasks(self.filter_obj)
294
295     def all(self):
296         """
297         Returns a new TaskQuerySet that is a copy of the current one.
298         """
299         return self._clone()
300
301     def pending(self):
302         return self.filter(status=PENDING)
303
304     def completed(self):
305         return self.filter(status=COMPLETED)
306
307     def filter(self, *args, **kwargs):
308         """
309         Returns a new TaskQuerySet with the given filters added.
310         """
311         clone = self._clone()
312         for f in args:
313             clone.filter_obj.add_filter(f)
314         for key, value in kwargs.items():
315             clone.filter_obj.add_filter_param(key, value)
316         return clone
317
318     def get(self, **kwargs):
319         """
320         Performs the query and returns a single object matching the given
321         keyword arguments.
322         """
323         clone = self.filter(**kwargs)
324         num = len(clone)
325         if num == 1:
326             return clone._result_cache[0]
327         if not num:
328             raise Task.DoesNotExist(
329                 'Task matching query does not exist. '
330                 'Lookup parameters were {0}'.format(kwargs))
331         raise ValueError(
332             'get() returned more than one Task -- it returned {0}! '
333             'Lookup parameters were {1}'.format(num, kwargs))
334
335
336 class TaskWarrior(object):
337     def __init__(self, data_location='~/.task', create=True):
338         data_location = os.path.expanduser(data_location)
339         if create and not os.path.exists(data_location):
340             os.makedirs(data_location)
341         self.config = {
342             'data.location': os.path.expanduser(data_location),
343         }
344         self.tasks = TaskQuerySet(self)
345
346     def _get_command_args(self, args, config_override={}):
347         command_args = ['task', 'rc:/']
348         config = self.config.copy()
349         config.update(config_override)
350         for item in config.items():
351             command_args.append('rc.{0}={1}'.format(*item))
352         command_args.extend(map(str, args))
353         return command_args
354
355     def execute_command(self, args, config_override={}):
356         command_args = self._get_command_args(
357             args, config_override=config_override)
358         logger.debug(' '.join(command_args))
359         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
360                              stderr=subprocess.PIPE)
361         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
362         if p.returncode:
363             if stderr.strip():
364                 error_msg = stderr.strip().splitlines()[-1]
365             else:
366                 error_msg = stdout.strip()
367             raise TaskWarriorException(error_msg)
368         return stdout.strip().split('\n')
369
370     def filter_tasks(self, filter_obj):
371         args = ['export', '--'] + filter_obj.get_filter_params()
372         tasks = []
373         for line in self.execute_command(args):
374             if line:
375                 data = line.strip(',')
376                 try:
377                     tasks.append(Task(self, json.loads(data)))
378                 except ValueError:
379                     raise TaskWarriorException('Invalid JSON: %s' % data)
380         return tasks
381
382     def merge_with(self, path, push=False):
383         path = path.rstrip('/') + '/'
384         self.execute_command(['merge', path], config_override={
385             'merge.autopush': 'yes' if push else 'no',
386         })
387
388     def undo(self):
389         self.execute_command(['undo'], config_override={
390             'confirmation': 'no',
391         })