]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Update README to reflect new filtering API
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import os
5 import subprocess
6 import tempfile
7 import uuid
8
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11
12 REPR_OUTPUT_SIZE = 10
13
14 PENDING = 'pending'
15
16
17 class TaskWarriorException(Exception):
18     pass
19
20
21 class Task(object):
22
23     class DoesNotExist(Exception):
24         pass
25
26     def __init__(self, warrior, data={}):
27         self.warrior = warrior
28         self._data = data
29
30     def __getitem__(self, key):
31         return self._get_field(key)
32
33     def __setitem__(self, key, val):
34         self._data[key] = val
35
36     def __unicode__(self):
37         return self._data.get('description')
38
39     def _get_field(self, key):
40         hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41         return hydrate_func(self._data.get(key))
42
43     def _set_field(self, key, value):
44         dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45         self._data[key] = dehydrate_func(value)
46
47     def serialize_due(self, date):
48         return date.strftime(DATE_FORMAT)
49
50     def deserialize_due(self, date_str):
51         if not date_str:
52             return None
53         return datetime.datetime.strptime(date_str, DATE_FORMAT)
54
55     def serialize_annotations(self, annotations):
56         ann_list = list(annotations)
57         for ann in ann_list:
58             ann['entry'] = ann['entry'].strftime(DATE_FORMAT)
59         return ann_list
60
61     def deserialize_annotations(self, annotations):
62         ann_list = list(annotations)
63         for ann in ann_list:
64             ann['entry'] = datetime.datetime.strptime(
65                 ann['entry'], DATE_FORMAT)
66         return ann_list
67
68     def regenerate_uuid(self):
69         self['uuid'] = str(uuid.uuid4())
70
71     def delete(self):
72         self.warrior.delete_task(self['uuid'])
73
74     def done(self):
75         self.warrior.complete_task(self['uuid'])
76
77     def save(self, delete_first=True):
78         if self['uuid'] and delete_first:
79             self.delete()
80         if not self['uuid'] or delete_first:
81             self.regenerate_uuid()
82         self.warrior.import_tasks([self._data])
83
84     __repr__ = __unicode__
85
86
87 class TaskFilter(object):
88     """
89     A set of parameters to filter the task list with.
90     """
91
92     def __init__(self, filter_params=[]):
93         self.filter_params = filter_params
94
95     def add_filter(self, filter_str):
96         self.filter_params.append(filter_str)
97
98     def add_filter_param(self, key, value):
99         key = key.replace('__', '.')
100         self.filter_params.append('{0}:{1}'.format(key, value))
101
102     def get_filter_params(self):
103         return [f for f in self.filter_params if f]
104
105     def clone(self):
106         c = self.__class__()
107         c.filter_params = list(self.filter_params)
108         return c
109
110
111 class TaskQuerySet(object):
112     """
113     Represents a lazy lookup for a task objects.
114     """
115
116     def __init__(self, warrior=None, filter_obj=None):
117         self.warrior = warrior
118         self._result_cache = None
119         self.filter_obj = filter_obj or TaskFilter()
120
121     def __deepcopy__(self, memo):
122         """
123         Deep copy of a QuerySet doesn't populate the cache
124         """
125         obj = self.__class__()
126         for k,v in self.__dict__.items():
127             if k in ('_iter','_result_cache'):
128                 obj.__dict__[k] = None
129             else:
130                 obj.__dict__[k] = copy.deepcopy(v, memo)
131         return obj
132
133     def __repr__(self):
134         data = list(self[:REPR_OUTPUT_SIZE + 1])
135         if len(data) > REPR_OUTPUT_SIZE:
136             data[-1] = "...(remaining elements truncated)..."
137         return repr(data)
138
139     def __len__(self):
140         if self._result_cache is None:
141             self._result_cache = list(self)
142         return len(self._result_cache)
143
144     def __iter__(self):
145         if self._result_cache is None:
146             self._result_cache = self._execute()
147         return iter(self._result_cache)
148
149     def __getitem__(self, k):
150         if self._result_cache is None:
151             self._result_cache = list(self)
152         return self._result_cache.__getitem__(k)
153
154     def __bool__(self):
155         if self._result_cache is not None:
156             return bool(self._result_cache)
157         try:
158             next(iter(self))
159         except StopIteration:
160             return False
161         return True
162
163     def __nonzero__(self):
164         return type(self).__bool__(self)
165
166     def _clone(self, klass=None, **kwargs):
167         if klass is None:
168             klass = self.__class__
169         filter_obj = self.filter_obj.clone()
170         c = klass(warrior=self.warrior, filter_obj=filter_obj)
171         c.__dict__.update(kwargs)
172         return c
173
174     def _execute(self):
175         """
176         Fetch the tasks which match the current filters.
177         """
178         return self.warrior.filter_tasks(self.filter_obj)
179
180     def all(self):
181         """
182         Returns a new TaskQuerySet that is a copy of the current one.
183         """
184         return self._clone()
185
186     def pending(self):
187         return self.filter(status=PENDING)
188
189     def filter(self, *args, **kwargs):
190         """
191         Returns a new TaskQuerySet with the given filters added.
192         """
193         clone = self._clone()
194         for f in args:
195             clone.filter_obj.add_filter(f)
196         for key, value in kwargs.items():
197             clone.filter_obj.add_filter_param(key, value)
198         return clone
199
200     def get(self, **kwargs):
201         """
202         Performs the query and returns a single object matching the given
203         keyword arguments.
204         """
205         clone = self.filter(**kwargs)
206         num = len(clone)
207         if num == 1:
208             return clone._result_cache[0]
209         if not num:
210             raise Task.DoesNotExist(
211                 'Task matching query does not exist. '
212                 'Lookup parameters were {0}'.format(kwargs))
213         raise ValueError(
214             'get() returned more than one Task -- it returned {0}! '
215             'Lookup parameters were {1}'.format(num, kwargs))
216
217
218 class TaskWarrior(object):
219     DEFAULT_FILTERS = {
220         'status': 'pending',
221     }
222
223     def __init__(self, data_location='~/.task', create=True):
224         if not os.path.exists(data_location):
225             os.makedirs(data_location)
226         self.config = {
227             'data.location': os.path.expanduser(data_location),
228         }
229         self.tasks = TaskQuerySet(self)
230
231     def _get_command_args(self, args):
232         command_args = ['task', 'rc:/']
233         for item in self.config.items():
234             command_args.append('rc.{0}={1}'.format(*item))
235         command_args.extend(args)
236         return command_args
237
238     def _execute_command(self, args):
239         p = subprocess.Popen(self._get_command_args(args),
240                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
241         stdout, stderr = p.communicate()
242         if p.returncode:
243             raise TaskWarriorException(stderr.strip())
244         return stdout.strip().split('\n')
245
246     def filter_tasks(self, filter_obj):
247         args = ['export', '--'] + filter_obj.get_filter_params()
248         tasks = []
249         for line in self._execute_command(args):
250             if line:
251                 tasks.append(Task(self, json.loads(line.strip(','))))
252         return tasks
253
254     def add_task(self, description, project=None):
255         args = ['add', description]
256         if project is not None:
257             args.append('project:{0}'.format(project))
258         self._execute_command(args)
259
260     def delete_task(self, task_id):
261         args = [task_id, 'rc.confirmation:no', 'delete']
262         self._execute_command(args)
263
264     def complete_task(self, task_id):
265         args = [task_id, 'done']
266         self._execute_command(args)
267
268     def import_tasks(self, tasks):
269         fd, path = tempfile.mkstemp()
270         with open(path, 'w') as f:
271             f.write(json.dumps(tasks))
272         args = ['import', path]
273         self._execute_command(args)