]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Merge branch 'release/0.5.0'
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency']
75
76     class DoesNotExist(Exception):
77         pass
78
79     def __init__(self, warrior, data={}):
80         self.warrior = warrior
81         self._load_data(data)
82         self._modified_fields = set()
83
84     def __unicode__(self):
85         return self['description']
86
87     def serialize_due(self, date):
88         return date.strftime(DATE_FORMAT)
89
90     def deserialize_due(self, date_str):
91         if not date_str:
92             return None
93         return datetime.datetime.strptime(date_str, DATE_FORMAT)
94
95     def deserialize_annotations(self, data):
96         return [TaskAnnotation(self, d) for d in data] if data else []
97
98     def deserialize_tags(self, tags):
99         if isinstance(tags, basestring):
100             return tags.split(',') if tags else []
101         return tags
102
103     def serialize_tags(self, tags):
104         return ','.join(tags) if tags else ''
105
106     def delete(self):
107         self.warrior.execute_command([self['id'], 'delete'], config_override={
108             'confirmation': 'no',
109         })
110
111     def done(self):
112         self.warrior.execute_command([self['id'], 'done'])
113
114     def save(self):
115         args = [self['id'], 'modify'] if self['id'] else ['add']
116         args.extend(self._get_modified_fields_as_args())
117         self.warrior.execute_command(args)
118         self._modified_fields.clear()
119
120     def add_annotation(self, annotation):
121         args = [self['id'], 'annotate', annotation]
122         self.warrior.execute_command(args)
123         self.refresh(only_fields=['annotations'])
124
125     def remove_annotation(self, annotation):
126         if isinstance(annotation, TaskAnnotation):
127             annotation = annotation['description']
128         args = [self['id'], 'denotate', annotation]
129         self.warrior.execute_command(args)
130         self.refresh(only_fields=['annotations'])
131
132     def _get_modified_fields_as_args(self):
133         args = []
134         for field in self._modified_fields:
135             args.append('{}:{}'.format(field, self._data[field]))
136         return args
137
138     def refresh(self, only_fields=[]):
139         args = [self['uuid'], 'export']
140         new_data = json.loads(self.warrior.execute_command(args)[0])
141         if only_fields:
142             to_update = dict(
143                 [(k, new_data.get(k)) for k in only_fields])
144             self._data.update(to_update)
145         else:
146             self._data = new_data
147
148
149 class TaskFilter(object):
150     """
151     A set of parameters to filter the task list with.
152     """
153
154     def __init__(self, filter_params=[]):
155         self.filter_params = filter_params
156
157     def add_filter(self, filter_str):
158         self.filter_params.append(filter_str)
159
160     def add_filter_param(self, key, value):
161         key = key.replace('__', '.')
162
163         # Replace the value with empty string, since that is the
164         # convention in TW for empty values
165         value = value if value is not None else ''
166         self.filter_params.append('{0}:{1}'.format(key, value))
167
168     def get_filter_params(self):
169         return [f for f in self.filter_params if f]
170
171     def clone(self):
172         c = self.__class__()
173         c.filter_params = list(self.filter_params)
174         return c
175
176
177 class TaskQuerySet(object):
178     """
179     Represents a lazy lookup for a task objects.
180     """
181
182     def __init__(self, warrior=None, filter_obj=None):
183         self.warrior = warrior
184         self._result_cache = None
185         self.filter_obj = filter_obj or TaskFilter()
186
187     def __deepcopy__(self, memo):
188         """
189         Deep copy of a QuerySet doesn't populate the cache
190         """
191         obj = self.__class__()
192         for k, v in self.__dict__.items():
193             if k in ('_iter', '_result_cache'):
194                 obj.__dict__[k] = None
195             else:
196                 obj.__dict__[k] = copy.deepcopy(v, memo)
197         return obj
198
199     def __repr__(self):
200         data = list(self[:REPR_OUTPUT_SIZE + 1])
201         if len(data) > REPR_OUTPUT_SIZE:
202             data[-1] = "...(remaining elements truncated)..."
203         return repr(data)
204
205     def __len__(self):
206         if self._result_cache is None:
207             self._result_cache = list(self)
208         return len(self._result_cache)
209
210     def __iter__(self):
211         if self._result_cache is None:
212             self._result_cache = self._execute()
213         return iter(self._result_cache)
214
215     def __getitem__(self, k):
216         if self._result_cache is None:
217             self._result_cache = list(self)
218         return self._result_cache.__getitem__(k)
219
220     def __bool__(self):
221         if self._result_cache is not None:
222             return bool(self._result_cache)
223         try:
224             next(iter(self))
225         except StopIteration:
226             return False
227         return True
228
229     def __nonzero__(self):
230         return type(self).__bool__(self)
231
232     def _clone(self, klass=None, **kwargs):
233         if klass is None:
234             klass = self.__class__
235         filter_obj = self.filter_obj.clone()
236         c = klass(warrior=self.warrior, filter_obj=filter_obj)
237         c.__dict__.update(kwargs)
238         return c
239
240     def _execute(self):
241         """
242         Fetch the tasks which match the current filters.
243         """
244         return self.warrior.filter_tasks(self.filter_obj)
245
246     def all(self):
247         """
248         Returns a new TaskQuerySet that is a copy of the current one.
249         """
250         return self._clone()
251
252     def pending(self):
253         return self.filter(status=PENDING)
254
255     def completed(self):
256         return self.filter(status=COMPLETED)
257
258     def filter(self, *args, **kwargs):
259         """
260         Returns a new TaskQuerySet with the given filters added.
261         """
262         clone = self._clone()
263         for f in args:
264             clone.filter_obj.add_filter(f)
265         for key, value in kwargs.items():
266             clone.filter_obj.add_filter_param(key, value)
267         return clone
268
269     def get(self, **kwargs):
270         """
271         Performs the query and returns a single object matching the given
272         keyword arguments.
273         """
274         clone = self.filter(**kwargs)
275         num = len(clone)
276         if num == 1:
277             return clone._result_cache[0]
278         if not num:
279             raise Task.DoesNotExist(
280                 'Task matching query does not exist. '
281                 'Lookup parameters were {0}'.format(kwargs))
282         raise ValueError(
283             'get() returned more than one Task -- it returned {0}! '
284             'Lookup parameters were {1}'.format(num, kwargs))
285
286
287 class TaskWarrior(object):
288     def __init__(self, data_location='~/.task', create=True):
289         data_location = os.path.expanduser(data_location)
290         if create and not os.path.exists(data_location):
291             os.makedirs(data_location)
292         self.config = {
293             'data.location': os.path.expanduser(data_location),
294         }
295         self.tasks = TaskQuerySet(self)
296
297     def _get_command_args(self, args, config_override={}):
298         command_args = ['task', 'rc:/']
299         config = self.config.copy()
300         config.update(config_override)
301         for item in config.items():
302             command_args.append('rc.{0}={1}'.format(*item))
303         command_args.extend(map(str, args))
304         return command_args
305
306     def execute_command(self, args, config_override={}):
307         command_args = self._get_command_args(
308             args, config_override=config_override)
309         logger.debug(' '.join(command_args))
310         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
311                              stderr=subprocess.PIPE)
312         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
313         if p.returncode:
314             if stderr.strip():
315                 error_msg = stderr.strip().splitlines()[-1]
316             else:
317                 error_msg = stdout.strip()
318             raise TaskWarriorException(error_msg)
319         return stdout.strip().split('\n')
320
321     def filter_tasks(self, filter_obj):
322         args = ['export', '--'] + filter_obj.get_filter_params()
323         tasks = []
324         for line in self.execute_command(args):
325             if line:
326                 data = line.strip(',')
327                 try:
328                     tasks.append(Task(self, json.loads(data)))
329                 except ValueError:
330                     raise TaskWarriorException('Invalid JSON: %s' % data)
331         return tasks
332
333     def merge_with(self, path, push=False):
334         path = path.rstrip('/') + '/'
335         self.execute_command(['merge', path], config_override={
336             'merge.autopush': 'yes' if push else 'no',
337         })
338
339     def undo(self):
340         self.execute_command(['undo'], config_override={
341             'confirmation': 'no',
342         })