]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Merge branch 'release-0.3.1'
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import logging
5 import os
6 import subprocess
7
8 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
9 REPR_OUTPUT_SIZE = 10
10 PENDING = 'pending'
11 COMPLETED = 'completed'
12
13 logger = logging.getLogger(__name__)
14
15
16 class TaskWarriorException(Exception):
17     pass
18
19
20 class TaskResource(object):
21     read_only_fields = []
22
23     def _load_data(self, data):
24         self._data = data
25
26     def __getitem__(self, key):
27         hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
28                                lambda x: x)
29         return hydrate_func(self._data.get(key))
30
31     def __setitem__(self, key, value):
32         if key in self.read_only_fields:
33             raise RuntimeError('Field \'%s\' is read-only' % key)
34         dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
35                                  lambda x: x)
36         self._data[key] = dehydrate_func(value)
37         self._modified_fields.add(key)
38
39     def __repr__(self):
40         return self.__unicode__()
41
42
43 class TaskAnnotation(TaskResource):
44     read_only_fields = ['entry', 'description']
45
46     def __init__(self, task, data={}):
47         self.task = task
48         self._load_data(data)
49
50     def deserialize_entry(self, data):
51         return datetime.datetime.strptime(data, DATE_FORMAT) if data else None
52
53     def serialize_entry(self, date):
54         return date.strftime(DATE_FORMAT) if date else ''
55
56     def remove(self):
57         self.task.remove_annotation(self)
58
59     def __unicode__(self):
60         return self['description']
61
62     __repr__ = __unicode__
63
64
65 class Task(TaskResource):
66     read_only_fields = ['id', 'entry', 'urgency']
67
68     class DoesNotExist(Exception):
69         pass
70
71     def __init__(self, warrior, data={}):
72         self.warrior = warrior
73         self._load_data(data)
74         self._modified_fields = set()
75
76     def __unicode__(self):
77         return self['description']
78
79     def serialize_due(self, date):
80         return date.strftime(DATE_FORMAT)
81
82     def deserialize_due(self, date_str):
83         if not date_str:
84             return None
85         return datetime.datetime.strptime(date_str, DATE_FORMAT)
86
87     def deserialize_annotations(self, data):
88         return [TaskAnnotation(self, d) for d in data] if data else []
89
90     def deserialize_tags(self, tags):
91         if isinstance(tags, basestring):
92             return tags.split(',') if tags else []
93         return tags
94
95     def serialize_tags(self, tags):
96         return ','.join(tags) if tags else ''
97
98     def delete(self):
99         self.warrior.execute_command([self['id'], 'delete'], config_override={
100             'confirmation': 'no',
101         })
102
103     def done(self):
104         self.warrior.execute_command([self['id'], 'done'])
105
106     def save(self):
107         args = [self['id'], 'modify'] if self['id'] else ['add']
108         args.extend(self._get_modified_fields_as_args())
109         self.warrior.execute_command(args)
110         self._modified_fields.clear()
111
112     def add_annotation(self, annotation):
113         args = [self['id'], 'annotate', annotation]
114         self.warrior.execute_command(args)
115         self.refresh(only_fields=['annotations'])
116
117     def remove_annotation(self, annotation):
118         if isinstance(annotation, TaskAnnotation):
119             annotation = annotation['description']
120         args = [self['id'], 'denotate', annotation]
121         self.warrior.execute_command(args)
122         self.refresh(only_fields=['annotations'])
123
124     def _get_modified_fields_as_args(self):
125         args = []
126         for field in self._modified_fields:
127             args.append('{}:{}'.format(field, self._data[field]))
128         return args
129
130     def refresh(self, only_fields=[]):
131         args = [self['uuid'], 'export']
132         new_data = json.loads(self.warrior.execute_command(args)[0])
133         if only_fields:
134             to_update = dict(
135                 [(k, new_data.get(k)) for k in only_fields])
136             self._data.update(to_update)
137         else:
138             self._data = new_data
139
140
141 class TaskFilter(object):
142     """
143     A set of parameters to filter the task list with.
144     """
145
146     def __init__(self, filter_params=[]):
147         self.filter_params = filter_params
148
149     def add_filter(self, filter_str):
150         self.filter_params.append(filter_str)
151
152     def add_filter_param(self, key, value):
153         key = key.replace('__', '.')
154         self.filter_params.append('{0}:{1}'.format(key, value))
155
156     def get_filter_params(self):
157         return [f for f in self.filter_params if f]
158
159     def clone(self):
160         c = self.__class__()
161         c.filter_params = list(self.filter_params)
162         return c
163
164
165 class TaskQuerySet(object):
166     """
167     Represents a lazy lookup for a task objects.
168     """
169
170     def __init__(self, warrior=None, filter_obj=None):
171         self.warrior = warrior
172         self._result_cache = None
173         self.filter_obj = filter_obj or TaskFilter()
174
175     def __deepcopy__(self, memo):
176         """
177         Deep copy of a QuerySet doesn't populate the cache
178         """
179         obj = self.__class__()
180         for k, v in self.__dict__.items():
181             if k in ('_iter', '_result_cache'):
182                 obj.__dict__[k] = None
183             else:
184                 obj.__dict__[k] = copy.deepcopy(v, memo)
185         return obj
186
187     def __repr__(self):
188         data = list(self[:REPR_OUTPUT_SIZE + 1])
189         if len(data) > REPR_OUTPUT_SIZE:
190             data[-1] = "...(remaining elements truncated)..."
191         return repr(data)
192
193     def __len__(self):
194         if self._result_cache is None:
195             self._result_cache = list(self)
196         return len(self._result_cache)
197
198     def __iter__(self):
199         if self._result_cache is None:
200             self._result_cache = self._execute()
201         return iter(self._result_cache)
202
203     def __getitem__(self, k):
204         if self._result_cache is None:
205             self._result_cache = list(self)
206         return self._result_cache.__getitem__(k)
207
208     def __bool__(self):
209         if self._result_cache is not None:
210             return bool(self._result_cache)
211         try:
212             next(iter(self))
213         except StopIteration:
214             return False
215         return True
216
217     def __nonzero__(self):
218         return type(self).__bool__(self)
219
220     def _clone(self, klass=None, **kwargs):
221         if klass is None:
222             klass = self.__class__
223         filter_obj = self.filter_obj.clone()
224         c = klass(warrior=self.warrior, filter_obj=filter_obj)
225         c.__dict__.update(kwargs)
226         return c
227
228     def _execute(self):
229         """
230         Fetch the tasks which match the current filters.
231         """
232         return self.warrior.filter_tasks(self.filter_obj)
233
234     def all(self):
235         """
236         Returns a new TaskQuerySet that is a copy of the current one.
237         """
238         return self._clone()
239
240     def pending(self):
241         return self.filter(status=PENDING)
242
243     def completed(self):
244         return self.filter(status=COMPLETED)
245
246     def filter(self, *args, **kwargs):
247         """
248         Returns a new TaskQuerySet with the given filters added.
249         """
250         clone = self._clone()
251         for f in args:
252             clone.filter_obj.add_filter(f)
253         for key, value in kwargs.items():
254             clone.filter_obj.add_filter_param(key, value)
255         return clone
256
257     def get(self, **kwargs):
258         """
259         Performs the query and returns a single object matching the given
260         keyword arguments.
261         """
262         clone = self.filter(**kwargs)
263         num = len(clone)
264         if num == 1:
265             return clone._result_cache[0]
266         if not num:
267             raise Task.DoesNotExist(
268                 'Task matching query does not exist. '
269                 'Lookup parameters were {0}'.format(kwargs))
270         raise ValueError(
271             'get() returned more than one Task -- it returned {0}! '
272             'Lookup parameters were {1}'.format(num, kwargs))
273
274
275 class TaskWarrior(object):
276     def __init__(self, data_location='~/.task', create=True):
277         data_location = os.path.expanduser(data_location)
278         if create and not os.path.exists(data_location):
279             os.makedirs(data_location)
280         self.config = {
281             'data.location': os.path.expanduser(data_location),
282         }
283         self.tasks = TaskQuerySet(self)
284
285     def _get_command_args(self, args, config_override={}):
286         command_args = ['task', 'rc:/']
287         config = self.config.copy()
288         config.update(config_override)
289         for item in config.items():
290             command_args.append('rc.{0}={1}'.format(*item))
291         command_args.extend(map(str, args))
292         return command_args
293
294     def execute_command(self, args, config_override={}):
295         command_args = self._get_command_args(
296             args, config_override=config_override)
297         logger.debug(' '.join(command_args))
298         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
299                              stderr=subprocess.PIPE)
300         stdout, stderr = [x.decode() for x in p.communicate()]
301         if p.returncode:
302             if stderr.strip():
303                 error_msg = stderr.strip().splitlines()[-1]
304             else:
305                 error_msg = stdout.strip()
306             raise TaskWarriorException(error_msg)
307         return stdout.strip().split('\n')
308
309     def filter_tasks(self, filter_obj):
310         args = ['export', '--'] + filter_obj.get_filter_params()
311         tasks = []
312         for line in self.execute_command(args):
313             if line:
314                 data = line.strip(',')
315                 try:
316                     tasks.append(Task(self, json.loads(data)))
317                 except ValueError:
318                     raise TaskWarriorException('Invalid JSON: %s' % data)
319         return tasks
320
321     def merge_with(self, path, push=False):
322         path = path.rstrip('/') + '/'
323         self.execute_command(['merge', path], config_override={
324             'merge.autopush': 'yes' if push else 'no',
325         })
326
327     def undo(self):
328         self.execute_command(['undo'], config_override={
329             'confirmation': 'no',
330         })