]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Emulate django's QuerySet API for filtering tasks
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import json
3 import os
4 import subprocess
5 import tempfile
6 import uuid
7
8
9 REPR_OUTPUT_SIZE = 10
10 PENDING = 'pending'
11
12
13 class TaskWarriorException(Exception):
14     pass
15
16
17 class Task(object):
18
19     class DoesNotExist(Exception):
20         pass
21
22     def __init__(self, warrior, data={}):
23         self.warrior = warrior
24         self._data = data
25
26     def __getitem__(self, key):
27         return self._data.get(key)
28
29     def __setitem__(self, key, val):
30         self._data[key] = val
31
32     def __unicode__(self):
33         return self._data.get('description')
34
35     def regenerate_uuid(self):
36         self['uuid'] = str(uuid.uuid4())
37
38     def delete(self):
39         self.warrior.delete_task(self['uuid'])
40
41     def done(self):
42         self.warrior.complete_task(self['uuid'])
43
44     def save(self, delete_first=True):
45         if self['uuid'] and delete_first:
46             self.delete()
47         if not self['uuid'] or delete_first:
48             self.regenerate_uuid()
49         self.warrior.import_tasks([self._data])
50
51     __repr__ = __unicode__
52
53
54 class TaskFilter(object):
55     """
56     A set of parameters to filter the task list with.
57     """
58
59     def __init__(self, filter_params=()):
60         self.filter_params = filter_params
61
62     def add_filter(self, param, value):
63         self.filter_params += ((param, value),)
64
65     def get_filter_params(self):
66         return self.filter_params
67
68     def clone(self):
69         c = self.__class__()
70         c.filter_params = tuple(self.filter_params)
71         return c
72
73
74 class TaskQuerySet(object):
75     """
76     Represents a lazy lookup for a task objects.
77     """
78
79     def __init__(self, warrior=None, filter_obj=None):
80         self.warrior = warrior
81         self._result_cache = None
82         self.filter_obj = filter_obj or TaskFilter()
83
84     def __deepcopy__(self, memo):
85         """
86         Deep copy of a QuerySet doesn't populate the cache
87         """
88         obj = self.__class__()
89         for k,v in self.__dict__.items():
90             if k in ('_iter','_result_cache'):
91                 obj.__dict__[k] = None
92             else:
93                 obj.__dict__[k] = copy.deepcopy(v, memo)
94         return obj
95
96     def __repr__(self):
97         data = list(self[:REPR_OUTPUT_SIZE + 1])
98         if len(data) > REPR_OUTPUT_SIZE:
99             data[-1] = "...(remaining elements truncated)..."
100         return repr(data)
101
102     def __len__(self):
103         if self._result_cache is None:
104             self._result_cache = list(self)
105         return len(self._result_cache)
106
107     def __iter__(self):
108         if self._result_cache is None:
109             self._result_cache = self._execute()
110         return iter(self._result_cache)
111
112     def __getitem__(self, k):
113         if self._result_cache is None:
114             self._result_cache = list(self)
115         return self._result_cache.__getitem__(k)
116
117     def __bool__(self):
118         if self._result_cache is not None:
119             return bool(self._result_cache)
120         try:
121             next(iter(self))
122         except StopIteration:
123             return False
124         return True
125
126     def __nonzero__(self):
127         return type(self).__bool__(self)
128
129     def _clone(self, klass=None, **kwargs):
130         if klass is None:
131             klass = self.__class__
132         filter_obj = self.filter_obj.clone()
133         c = klass(warrior=self.warrior, filter_obj=filter_obj)
134         c.__dict__.update(kwargs)
135         return c
136
137     def _execute(self):
138         """
139         Fetch the tasks which match the current filters.
140         """
141         return self.warrior._execute_filter(self.filter_obj)
142
143     def all(self):
144         """
145         Returns a new TaskQuerySet that is a copy of the current one.
146         """
147         return self._clone()
148
149     def pending(self):
150         return self.filter(status=PENDING)
151
152     def filter(self, **kwargs):
153         """
154         Returns a new TaskQuerySet with the given filters added.
155         """
156         clone = self._clone()
157         for param, value in kwargs.items():
158             clone.filter_obj.add_filter(param, value)
159         return clone
160
161     def get(self, **kwargs):
162         """
163         Performs the query and returns a single object matching the given
164         keyword arguments.
165         """
166         clone = self.filter(**kwargs)
167         num = len(clone)
168         if num == 1:
169             return clone._result_cache[0]
170         if not num:
171             raise Task.DoesNotExist(
172                 'Task matching query does not exist. '
173                 'Lookup parameters were {0}'.format(kwargs))
174         raise ValueError(
175             'get() returned more than one Task -- it returned {0}! '
176             'Lookup parameters were {1}'.format(num, kwargs))
177
178
179 class TaskWarrior(object):
180     DEFAULT_FILTERS = {
181         'status': 'pending',
182     }
183
184     def __init__(self, data_location='~/.task', create=True):
185         if not os.path.exists(data_location):
186             os.makedirs(data_location)
187         self.config = {
188             'data.location': os.path.expanduser(data_location),
189         }
190         self.tasks = TaskQuerySet(self)
191
192     def _generate_command(self, command):
193         args = ['task', 'rc:/']
194         for item in self.config.items():
195             args.append('rc.{0}={1}'.format(*item))
196         args.append(command)
197         return ' '.join(args)
198
199     def _execute_command(self, command):
200         p = subprocess.Popen(self._generate_command(command), shell=True,
201                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
202         stdout, stderr = p.communicate()
203         if p.returncode:
204             raise TaskWarriorException(stderr.strip())
205         return stdout.strip().split('\n')
206
207     def _format_filter_kwarg(self, kwarg):
208         key, val = kwarg[0], kwarg[1]
209         key = key.replace('__', '.')
210         return '{0}:{1}'.format(key, val)
211
212     def _execute_filter(self, filter_obj):
213         filter_commands = ' '.join(map(self._format_filter_kwarg,
214                                        filter_obj.get_filter_params()))
215         command = '{0} export'.format(filter_commands)
216         tasks = []
217         for line in self._execute_command(command):
218             if line:
219                 tasks.append(Task(self, json.loads(line.strip(','))))
220         return tasks
221
222     def add_task(self, description, project=None):
223         args = ['add', description]
224         if project is not None:
225             args.append('project:{0}'.format(project))
226         self._execute_command(' '.join(args))
227
228     def delete_task(self, task_id):
229         self._execute_command('{0} rc.confirmation:no delete'.format(task_id))
230
231     def complete_task(self, task_id):
232         self._execute_command('{0} done'.format(task_id))
233
234     def import_tasks(self, tasks):
235         fd, path = tempfile.mkstemp()
236         with open(path, 'w') as f:
237             f.write(json.dumps(tasks))
238         self._execute_command('import {0}'.format(path))