]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Allow passing custom string filters to TaskQuerySet and make Popen secure
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import os
5 import subprocess
6 import tempfile
7 import uuid
8
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11
12 REPR_OUTPUT_SIZE = 10
13
14 PENDING = 'pending'
15
16
17 class TaskWarriorException(Exception):
18     pass
19
20
21 class Task(object):
22
23     class DoesNotExist(Exception):
24         pass
25
26     def __init__(self, warrior, data={}):
27         self.warrior = warrior
28         self._data = data
29
30     def __getitem__(self, key):
31         return self._get_field(key)
32
33     def __setitem__(self, key, val):
34         self._data[key] = val
35
36     def __unicode__(self):
37         return self._data.get('description')
38
39     def _get_field(self, key):
40         hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41         return hydrate_func(self._data.get(key))
42
43     def _set_field(self, key, value):
44         dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45         self._data[key] = dehydrate_func(value)
46
47     def serialize_due(self, date):
48         return date.strftime(DATE_FORMAT)
49
50     def deserialize_due(self, date_str):
51         if not date_str:
52             return None
53         return datetime.datetime.strptime(date_str, DATE_FORMAT)
54
55     def regenerate_uuid(self):
56         self['uuid'] = str(uuid.uuid4())
57
58     def delete(self):
59         self.warrior.delete_task(self['uuid'])
60
61     def done(self):
62         self.warrior.complete_task(self['uuid'])
63
64     def save(self, delete_first=True):
65         if self['uuid'] and delete_first:
66             self.delete()
67         if not self['uuid'] or delete_first:
68             self.regenerate_uuid()
69         self.warrior.import_tasks([self._data])
70
71     __repr__ = __unicode__
72
73
74 class TaskFilter(object):
75     """
76     A set of parameters to filter the task list with.
77     """
78
79     def __init__(self, filter_params=[]):
80         self.filter_params = filter_params
81
82     def add_filter(self, filter_str):
83         self.filter_params.append(filter_str)
84
85     def add_filter_param(self, key, value):
86         key = key.replace('__', '.')
87         self.filter_params.append('{0}:{1}'.format(key, value))
88
89     def get_filter_params(self):
90         return ['({})'.format(f) for f in self.filter_params if f]
91
92     def clone(self):
93         c = self.__class__()
94         c.filter_params = list(self.filter_params)
95         return c
96
97
98 class TaskQuerySet(object):
99     """
100     Represents a lazy lookup for a task objects.
101     """
102
103     def __init__(self, warrior=None, filter_obj=None):
104         self.warrior = warrior
105         self._result_cache = None
106         self.filter_obj = filter_obj or TaskFilter()
107
108     def __deepcopy__(self, memo):
109         """
110         Deep copy of a QuerySet doesn't populate the cache
111         """
112         obj = self.__class__()
113         for k,v in self.__dict__.items():
114             if k in ('_iter','_result_cache'):
115                 obj.__dict__[k] = None
116             else:
117                 obj.__dict__[k] = copy.deepcopy(v, memo)
118         return obj
119
120     def __repr__(self):
121         data = list(self[:REPR_OUTPUT_SIZE + 1])
122         if len(data) > REPR_OUTPUT_SIZE:
123             data[-1] = "...(remaining elements truncated)..."
124         return repr(data)
125
126     def __len__(self):
127         if self._result_cache is None:
128             self._result_cache = list(self)
129         return len(self._result_cache)
130
131     def __iter__(self):
132         if self._result_cache is None:
133             self._result_cache = self._execute()
134         return iter(self._result_cache)
135
136     def __getitem__(self, k):
137         if self._result_cache is None:
138             self._result_cache = list(self)
139         return self._result_cache.__getitem__(k)
140
141     def __bool__(self):
142         if self._result_cache is not None:
143             return bool(self._result_cache)
144         try:
145             next(iter(self))
146         except StopIteration:
147             return False
148         return True
149
150     def __nonzero__(self):
151         return type(self).__bool__(self)
152
153     def _clone(self, klass=None, **kwargs):
154         if klass is None:
155             klass = self.__class__
156         filter_obj = self.filter_obj.clone()
157         c = klass(warrior=self.warrior, filter_obj=filter_obj)
158         c.__dict__.update(kwargs)
159         return c
160
161     def _execute(self):
162         """
163         Fetch the tasks which match the current filters.
164         """
165         return self.warrior._execute_filter(self.filter_obj)
166
167     def all(self):
168         """
169         Returns a new TaskQuerySet that is a copy of the current one.
170         """
171         return self._clone()
172
173     def pending(self):
174         return self.filter(status=PENDING)
175
176     def filter(self, *args, **kwargs):
177         """
178         Returns a new TaskQuerySet with the given filters added.
179         """
180         clone = self._clone()
181         for f in args:
182             clone.filter_obj.add_filter(f)
183         for key, value in kwargs.items():
184             clone.filter_obj.add_filter_param(key, value)
185         return clone
186
187     def get(self, **kwargs):
188         """
189         Performs the query and returns a single object matching the given
190         keyword arguments.
191         """
192         clone = self.filter(**kwargs)
193         num = len(clone)
194         if num == 1:
195             return clone._result_cache[0]
196         if not num:
197             raise Task.DoesNotExist(
198                 'Task matching query does not exist. '
199                 'Lookup parameters were {0}'.format(kwargs))
200         raise ValueError(
201             'get() returned more than one Task -- it returned {0}! '
202             'Lookup parameters were {1}'.format(num, kwargs))
203
204
205 class TaskWarrior(object):
206     DEFAULT_FILTERS = {
207         'status': 'pending',
208     }
209
210     def __init__(self, data_location='~/.task', create=True):
211         if not os.path.exists(data_location):
212             os.makedirs(data_location)
213         self.config = {
214             'data.location': os.path.expanduser(data_location),
215         }
216         self.tasks = TaskQuerySet(self)
217
218     def _get_command_args(self, args):
219         command_args = ['task', 'rc:/']
220         for item in self.config.items():
221             command_args.append('rc.{0}={1}'.format(*item))
222         command_args.extend(args)
223         return command_args
224
225     def _execute_command(self, args):
226         p = subprocess.Popen(self._get_command_args(args),
227                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
228         stdout, stderr = p.communicate()
229         if p.returncode:
230             raise TaskWarriorException(stderr.strip())
231         return stdout.strip().split('\n')
232
233     def _execute_filter(self, filter_obj):
234         args = filter_obj.get_filter_params() + ['export']
235         tasks = []
236         for line in self._execute_command(args):
237             if line:
238                 tasks.append(Task(self, json.loads(line.strip(','))))
239         return tasks
240
241     def add_task(self, description, project=None):
242         args = ['add', description]
243         if project is not None:
244             args.append('project:{0}'.format(project))
245         self._execute_command(args)
246
247     def delete_task(self, task_id):
248         args = [task_id, 'rc.confirmation:no', 'delete']
249         self._execute_command(args)
250
251     def complete_task(self, task_id):
252         args = [task_id, 'done']
253         self._execute_command(args)
254
255     def import_tasks(self, tasks):
256         fd, path = tempfile.mkstemp()
257         with open(path, 'w') as f:
258             f.write(json.dumps(tasks))
259         args = ['import', path]
260         self._execute_command(args)