]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

The next version will be 0.4.1
[etc/taskwarrior.git] / tasklib / task.py
1 from __future__ import print_function
2 import copy
3 import datetime
4 import json
5 import logging
6 import os
7 import six
8 import subprocess
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 REPR_OUTPUT_SIZE = 10
12 PENDING = 'pending'
13 COMPLETED = 'completed'
14
15 logger = logging.getLogger(__name__)
16
17
18 class TaskWarriorException(Exception):
19     pass
20
21
22 class TaskResource(object):
23     read_only_fields = []
24
25     def _load_data(self, data):
26         self._data = data
27
28     def __getitem__(self, key):
29         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
30                                lambda x: x)
31         return hydrate_func(self._data.get(key))
32
33     def __setitem__(self, key, value):
34         if key in self.read_only_fields:
35             raise RuntimeError('Field \'%s\' is read-only' % key)
36         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
37                                  lambda x: x)
38         self._data[key] = dehydrate_func(value)
39         self._modified_fields.add(key)
40
41     def __str__(self):
42         s = six.text_type(self.__unicode__())
43         if not six.PY3:
44             s = s.encode('utf-8')
45         return s
46
47     def __repr__(self):
48         return str(self)
49
50
51 class TaskAnnotation(TaskResource):
52     read_only_fields = ['entry', 'description']
53
54     def __init__(self, task, data={}):
55         self.task = task
56         self._load_data(data)
57
58     def deserialize_entry(self, data):
59         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
60
61     def serialize_entry(self, date):
62         return date.strftime(DATE_FORMAT) if date else ''
63
64     def remove(self):
65         self.task.remove_annotation(self)
66
67     def __unicode__(self):
68         return self['description']
69
70     __repr__ = __unicode__
71
72
73 class Task(TaskResource):
74     read_only_fields = ['id', 'entry', 'urgency']
75
76     class DoesNotExist(Exception):
77         pass
78
79     def __init__(self, warrior, data={}):
80         self.warrior = warrior
81         self._load_data(data)
82         self._modified_fields = set()
83
84     def __unicode__(self):
85         return self['description']
86
87     def serialize_due(self, date):
88         return date.strftime(DATE_FORMAT)
89
90     def deserialize_due(self, date_str):
91         if not date_str:
92             return None
93         return datetime.datetime.strptime(date_str, DATE_FORMAT)
94
95     def deserialize_annotations(self, data):
96         return [TaskAnnotation(self, d) for d in data] if data else []
97
98     def deserialize_tags(self, tags):
99         if isinstance(tags, basestring):
100             return tags.split(',') if tags else []
101         return tags
102
103     def serialize_tags(self, tags):
104         return ','.join(tags) if tags else ''
105
106     def delete(self):
107         self.warrior.execute_command([self['id'], 'delete'], config_override={
108             'confirmation': 'no',
109         })
110
111     def done(self):
112         self.warrior.execute_command([self['id'], 'done'])
113
114     def save(self):
115         args = [self['id'], 'modify'] if self['id'] else ['add']
116         args.extend(self._get_modified_fields_as_args())
117         self.warrior.execute_command(args)
118         self._modified_fields.clear()
119
120     def add_annotation(self, annotation):
121         args = [self['id'], 'annotate', annotation]
122         self.warrior.execute_command(args)
123         self.refresh(only_fields=['annotations'])
124
125     def remove_annotation(self, annotation):
126         if isinstance(annotation, TaskAnnotation):
127             annotation = annotation['description']
128         args = [self['id'], 'denotate', annotation]
129         self.warrior.execute_command(args)
130         self.refresh(only_fields=['annotations'])
131
132     def _get_modified_fields_as_args(self):
133         args = []
134         for field in self._modified_fields:
135             args.append('{}:{}'.format(field, self._data[field]))
136         return args
137
138     def refresh(self, only_fields=[]):
139         args = [self['uuid'], 'export']
140         new_data = json.loads(self.warrior.execute_command(args)[0])
141         if only_fields:
142             to_update = dict(
143                 [(k, new_data.get(k)) for k in only_fields])
144             self._data.update(to_update)
145         else:
146             self._data = new_data
147
148
149 class TaskFilter(object):
150     """
151     A set of parameters to filter the task list with.
152     """
153
154     def __init__(self, filter_params=[]):
155         self.filter_params = filter_params
156
157     def add_filter(self, filter_str):
158         self.filter_params.append(filter_str)
159
160     def add_filter_param(self, key, value):
161         key = key.replace('__', '.')
162         self.filter_params.append('{0}:{1}'.format(key, value))
163
164     def get_filter_params(self):
165         return [f for f in self.filter_params if f]
166
167     def clone(self):
168         c = self.__class__()
169         c.filter_params = list(self.filter_params)
170         return c
171
172
173 class TaskQuerySet(object):
174     """
175     Represents a lazy lookup for a task objects.
176     """
177
178     def __init__(self, warrior=None, filter_obj=None):
179         self.warrior = warrior
180         self._result_cache = None
181         self.filter_obj = filter_obj or TaskFilter()
182
183     def __deepcopy__(self, memo):
184         """
185         Deep copy of a QuerySet doesn't populate the cache
186         """
187         obj = self.__class__()
188         for k, v in self.__dict__.items():
189             if k in ('_iter', '_result_cache'):
190                 obj.__dict__[k] = None
191             else:
192                 obj.__dict__[k] = copy.deepcopy(v, memo)
193         return obj
194
195     def __repr__(self):
196         data = list(self[:REPR_OUTPUT_SIZE + 1])
197         if len(data) > REPR_OUTPUT_SIZE:
198             data[-1] = "...(remaining elements truncated)..."
199         return repr(data)
200
201     def __len__(self):
202         if self._result_cache is None:
203             self._result_cache = list(self)
204         return len(self._result_cache)
205
206     def __iter__(self):
207         if self._result_cache is None:
208             self._result_cache = self._execute()
209         return iter(self._result_cache)
210
211     def __getitem__(self, k):
212         if self._result_cache is None:
213             self._result_cache = list(self)
214         return self._result_cache.__getitem__(k)
215
216     def __bool__(self):
217         if self._result_cache is not None:
218             return bool(self._result_cache)
219         try:
220             next(iter(self))
221         except StopIteration:
222             return False
223         return True
224
225     def __nonzero__(self):
226         return type(self).__bool__(self)
227
228     def _clone(self, klass=None, **kwargs):
229         if klass is None:
230             klass = self.__class__
231         filter_obj = self.filter_obj.clone()
232         c = klass(warrior=self.warrior, filter_obj=filter_obj)
233         c.__dict__.update(kwargs)
234         return c
235
236     def _execute(self):
237         """
238         Fetch the tasks which match the current filters.
239         """
240         return self.warrior.filter_tasks(self.filter_obj)
241
242     def all(self):
243         """
244         Returns a new TaskQuerySet that is a copy of the current one.
245         """
246         return self._clone()
247
248     def pending(self):
249         return self.filter(status=PENDING)
250
251     def completed(self):
252         return self.filter(status=COMPLETED)
253
254     def filter(self, *args, **kwargs):
255         """
256         Returns a new TaskQuerySet with the given filters added.
257         """
258         clone = self._clone()
259         for f in args:
260             clone.filter_obj.add_filter(f)
261         for key, value in kwargs.items():
262             clone.filter_obj.add_filter_param(key, value)
263         return clone
264
265     def get(self, **kwargs):
266         """
267         Performs the query and returns a single object matching the given
268         keyword arguments.
269         """
270         clone = self.filter(**kwargs)
271         num = len(clone)
272         if num == 1:
273             return clone._result_cache[0]
274         if not num:
275             raise Task.DoesNotExist(
276                 'Task matching query does not exist. '
277                 'Lookup parameters were {0}'.format(kwargs))
278         raise ValueError(
279             'get() returned more than one Task -- it returned {0}! '
280             'Lookup parameters were {1}'.format(num, kwargs))
281
282
283 class TaskWarrior(object):
284     def __init__(self, data_location='~/.task', create=True):
285         data_location = os.path.expanduser(data_location)
286         if create and not os.path.exists(data_location):
287             os.makedirs(data_location)
288         self.config = {
289             'data.location': os.path.expanduser(data_location),
290         }
291         self.tasks = TaskQuerySet(self)
292
293     def _get_command_args(self, args, config_override={}):
294         command_args = ['task', 'rc:/']
295         config = self.config.copy()
296         config.update(config_override)
297         for item in config.items():
298             command_args.append('rc.{0}={1}'.format(*item))
299         command_args.extend(map(str, args))
300         return command_args
301
302     def execute_command(self, args, config_override={}):
303         command_args = self._get_command_args(
304             args, config_override=config_override)
305         logger.debug(' '.join(command_args))
306         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
307                              stderr=subprocess.PIPE)
308         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
309         if p.returncode:
310             if stderr.strip():
311                 error_msg = stderr.strip().splitlines()[-1]
312             else:
313                 error_msg = stdout.strip()
314             raise TaskWarriorException(error_msg)
315         return stdout.strip().split('\n')
316
317     def filter_tasks(self, filter_obj):
318         args = ['export', '--'] + filter_obj.get_filter_params()
319         tasks = []
320         for line in self.execute_command(args):
321             if line:
322                 data = line.strip(',')
323                 try:
324                     tasks.append(Task(self, json.loads(data)))
325                 except ValueError:
326                     raise TaskWarriorException('Invalid JSON: %s' % data)
327         return tasks
328
329     def merge_with(self, path, push=False):
330         path = path.rstrip('/') + '/'
331         self.execute_command(['merge', path], config_override={
332             'merge.autopush': 'yes' if push else 'no',
333         })
334
335     def undo(self):
336         self.execute_command(['undo'], config_override={
337             'confirmation': 'no',
338         })