]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

5a223e222d91c70313d480be74441409a6e56a85
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import os
5 import subprocess
6 import tempfile
7 import uuid
8
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11
12 REPR_OUTPUT_SIZE = 10
13
14 PENDING = 'pending'
15
16
17 class TaskWarriorException(Exception):
18     pass
19
20
21 class Task(object):
22
23     class DoesNotExist(Exception):
24         pass
25
26     def __init__(self, warrior, data={}):
27         self.warrior = warrior
28         self._data = data
29
30     def __getitem__(self, key):
31         return self._get_field(key)
32
33     def __setitem__(self, key, val):
34         self._data[key] = val
35
36     def __unicode__(self):
37         return self._data.get('description')
38
39     def _get_field(self, key):
40         hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41         return hydrate_func(self._data.get(key))
42
43     def _set_field(self, key, value):
44         dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45         self._data[key] = dehydrate_func(value)
46
47     def serialize_due(self, date):
48         return date.strftime(DATE_FORMAT)
49
50     def deserialize_due(self, date_str):
51         if not date_str:
52             return None
53         return datetime.datetime.strptime(date_str, DATE_FORMAT)
54
55     def serialize_annotations(self, annotations):
56         ann_list = list(annotations)
57         for ann in ann_list:
58             ann['entry'] = ann['entry'].strftime(DATE_FORMAT)
59         return ann_list
60
61     def deserialize_annotations(self, annotations):
62         ann_list = list(annotations)
63         for ann in ann_list:
64             ann['entry'] = datetime.datetime.strptime(
65                 ann['entry'], DATE_FORMAT)
66         return ann_list
67
68     def regenerate_uuid(self):
69         self['uuid'] = str(uuid.uuid4())
70
71     def delete(self):
72         self.warrior.delete_task(self['uuid'])
73
74     def done(self):
75         self.warrior.complete_task(self['uuid'])
76
77     def save(self, delete_first=True):
78         if self['uuid'] and delete_first:
79             self.delete()
80         if not self['uuid'] or delete_first:
81             self.regenerate_uuid()
82         self.warrior.import_tasks([self._data])
83
84     __repr__ = __unicode__
85
86
87 class TaskFilter(object):
88     """
89     A set of parameters to filter the task list with.
90     """
91
92     def __init__(self, filter_params=[]):
93         self.filter_params = filter_params
94
95     def add_filter(self, filter_str):
96         self.filter_params.append(filter_str)
97
98     def add_filter_param(self, key, value):
99         key = key.replace('__', '.')
100         self.filter_params.append('{0}:{1}'.format(key, value))
101
102     def get_filter_params(self):
103         return [f for f in self.filter_params if f]
104
105     def clone(self):
106         c = self.__class__()
107         c.filter_params = list(self.filter_params)
108         return c
109
110
111 class TaskQuerySet(object):
112     """
113     Represents a lazy lookup for a task objects.
114     """
115
116     def __init__(self, warrior=None, filter_obj=None):
117         self.warrior = warrior
118         self._result_cache = None
119         self.filter_obj = filter_obj or TaskFilter()
120
121     def __deepcopy__(self, memo):
122         """
123         Deep copy of a QuerySet doesn't populate the cache
124         """
125         obj = self.__class__()
126         for k,v in self.__dict__.items():
127             if k in ('_iter','_result_cache'):
128                 obj.__dict__[k] = None
129             else:
130                 obj.__dict__[k] = copy.deepcopy(v, memo)
131         return obj
132
133     def __repr__(self):
134         data = list(self[:REPR_OUTPUT_SIZE + 1])
135         if len(data) > REPR_OUTPUT_SIZE:
136             data[-1] = "...(remaining elements truncated)..."
137         return repr(data)
138
139     def __len__(self):
140         if self._result_cache is None:
141             self._result_cache = list(self)
142         return len(self._result_cache)
143
144     def __iter__(self):
145         if self._result_cache is None:
146             self._result_cache = self._execute()
147         return iter(self._result_cache)
148
149     def __getitem__(self, k):
150         if self._result_cache is None:
151             self._result_cache = list(self)
152         return self._result_cache.__getitem__(k)
153
154     def __bool__(self):
155         if self._result_cache is not None:
156             return bool(self._result_cache)
157         try:
158             next(iter(self))
159         except StopIteration:
160             return False
161         return True
162
163     def __nonzero__(self):
164         return type(self).__bool__(self)
165
166     def _clone(self, klass=None, **kwargs):
167         if klass is None:
168             klass = self.__class__
169         filter_obj = self.filter_obj.clone()
170         c = klass(warrior=self.warrior, filter_obj=filter_obj)
171         c.__dict__.update(kwargs)
172         return c
173
174     def _execute(self):
175         """
176         Fetch the tasks which match the current filters.
177         """
178         return self.warrior.filter_tasks(self.filter_obj)
179
180     def all(self):
181         """
182         Returns a new TaskQuerySet that is a copy of the current one.
183         """
184         return self._clone()
185
186     def pending(self):
187         return self.filter(status=PENDING)
188
189     def filter(self, *args, **kwargs):
190         """
191         Returns a new TaskQuerySet with the given filters added.
192         """
193         clone = self._clone()
194         for f in args:
195             clone.filter_obj.add_filter(f)
196         for key, value in kwargs.items():
197             clone.filter_obj.add_filter_param(key, value)
198         return clone
199
200     def get(self, **kwargs):
201         """
202         Performs the query and returns a single object matching the given
203         keyword arguments.
204         """
205         clone = self.filter(**kwargs)
206         num = len(clone)
207         if num == 1:
208             return clone._result_cache[0]
209         if not num:
210             raise Task.DoesNotExist(
211                 'Task matching query does not exist. '
212                 'Lookup parameters were {0}'.format(kwargs))
213         raise ValueError(
214             'get() returned more than one Task -- it returned {0}! '
215             'Lookup parameters were {1}'.format(num, kwargs))
216
217
218 class TaskWarrior(object):
219     def __init__(self, data_location='~/.task', create=True):
220         if not os.path.exists(data_location):
221             os.makedirs(data_location)
222         self.config = {
223             'data.location': os.path.expanduser(data_location),
224         }
225         self.tasks = TaskQuerySet(self)
226
227     def _get_command_args(self, args, config_override={}):
228         command_args = ['task', 'rc:/']
229         config = self.config.copy()
230         config.update(config_override)
231         for item in config.items():
232             command_args.append('rc.{0}={1}'.format(*item))
233         command_args.extend(args)
234         return command_args
235
236     def execute_command(self, args, config_override={}):
237         command_args = self._get_command_args(
238             args, config_override=config_override)
239         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
240                              stderr=subprocess.PIPE)
241         stdout, stderr = p.communicate()
242         if p.returncode:
243             error_msg = stderr.strip().splitlines()[-1]
244             raise TaskWarriorException(error_msg)
245         return stdout.strip().split('\n')
246
247     def filter_tasks(self, filter_obj):
248         args = ['export', '--'] + filter_obj.get_filter_params()
249         tasks = []
250         for line in self.execute_command(args):
251             if line:
252                 tasks.append(Task(self, json.loads(line.strip(','))))
253         return tasks
254
255     def add_task(self, description, project=None):
256         args = ['add', description]
257         if project is not None:
258             args.append('project:{0}'.format(project))
259         self.execute_command(args)
260
261     def delete_task(self, task_id):
262         args = [task_id, 'rc.confirmation:no', 'delete']
263         self.execute_command(args)
264
265     def complete_task(self, task_id):
266         args = [task_id, 'done']
267         self.execute_command(args)
268
269     def import_tasks(self, tasks):
270         fd, path = tempfile.mkstemp()
271         with open(path, 'w') as f:
272             f.write(json.dumps(tasks))
273         args = ['import', path]
274         self.execute_command(args)
275
276     def merge_with(self, path, push=False):
277         path = path.rstrip('/') + '/'
278         args = ['merge', path]
279         self.execute_command(args, config_override={
280             'merge.autopush': 'yes' if push else 'no',
281         })