]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/task.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Add serialize/deserialize for fields and implement for due date
[etc/taskwarrior.git] / tasklib / task.py
1 import copy
2 import datetime
3 import json
4 import os
5 import subprocess
6 import tempfile
7 import uuid
8
9
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11
12 REPR_OUTPUT_SIZE = 10
13
14 PENDING = 'pending'
15
16
17 class TaskWarriorException(Exception):
18     pass
19
20
21 class Task(object):
22
23     class DoesNotExist(Exception):
24         pass
25
26     def __init__(self, warrior, data={}):
27         self.warrior = warrior
28         self._data = data
29
30     def __getitem__(self, key):
31         return self._get_field(key)
32
33     def __setitem__(self, key, val):
34         self._data[key] = val
35
36     def __unicode__(self):
37         return self._data.get('description')
38
39     def _get_field(self, key):
40         hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41         return hydrate_func(self._data.get(key))
42
43     def _set_field(self, key, value):
44         dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45         self._data[key] = dehydrate_func(value)
46
47     def serialize_due(self, date):
48         return date.strftime(DATE_FORMAT)
49
50     def deserialize_due(self, date_str):
51         if not date_str:
52             return None
53         return datetime.datetime.strptime(date_str, DATE_FORMAT)
54
55     def regenerate_uuid(self):
56         self['uuid'] = str(uuid.uuid4())
57
58     def delete(self):
59         self.warrior.delete_task(self['uuid'])
60
61     def done(self):
62         self.warrior.complete_task(self['uuid'])
63
64     def save(self, delete_first=True):
65         if self['uuid'] and delete_first:
66             self.delete()
67         if not self['uuid'] or delete_first:
68             self.regenerate_uuid()
69         self.warrior.import_tasks([self._data])
70
71     __repr__ = __unicode__
72
73
74 class TaskFilter(object):
75     """
76     A set of parameters to filter the task list with.
77     """
78
79     def __init__(self, filter_params=()):
80         self.filter_params = filter_params
81
82     def add_filter(self, param, value):
83         self.filter_params += ((param, value),)
84
85     def get_filter_params(self):
86         return self.filter_params
87
88     def clone(self):
89         c = self.__class__()
90         c.filter_params = tuple(self.filter_params)
91         return c
92
93
94 class TaskQuerySet(object):
95     """
96     Represents a lazy lookup for a task objects.
97     """
98
99     def __init__(self, warrior=None, filter_obj=None):
100         self.warrior = warrior
101         self._result_cache = None
102         self.filter_obj = filter_obj or TaskFilter()
103
104     def __deepcopy__(self, memo):
105         """
106         Deep copy of a QuerySet doesn't populate the cache
107         """
108         obj = self.__class__()
109         for k,v in self.__dict__.items():
110             if k in ('_iter','_result_cache'):
111                 obj.__dict__[k] = None
112             else:
113                 obj.__dict__[k] = copy.deepcopy(v, memo)
114         return obj
115
116     def __repr__(self):
117         data = list(self[:REPR_OUTPUT_SIZE + 1])
118         if len(data) > REPR_OUTPUT_SIZE:
119             data[-1] = "...(remaining elements truncated)..."
120         return repr(data)
121
122     def __len__(self):
123         if self._result_cache is None:
124             self._result_cache = list(self)
125         return len(self._result_cache)
126
127     def __iter__(self):
128         if self._result_cache is None:
129             self._result_cache = self._execute()
130         return iter(self._result_cache)
131
132     def __getitem__(self, k):
133         if self._result_cache is None:
134             self._result_cache = list(self)
135         return self._result_cache.__getitem__(k)
136
137     def __bool__(self):
138         if self._result_cache is not None:
139             return bool(self._result_cache)
140         try:
141             next(iter(self))
142         except StopIteration:
143             return False
144         return True
145
146     def __nonzero__(self):
147         return type(self).__bool__(self)
148
149     def _clone(self, klass=None, **kwargs):
150         if klass is None:
151             klass = self.__class__
152         filter_obj = self.filter_obj.clone()
153         c = klass(warrior=self.warrior, filter_obj=filter_obj)
154         c.__dict__.update(kwargs)
155         return c
156
157     def _execute(self):
158         """
159         Fetch the tasks which match the current filters.
160         """
161         return self.warrior._execute_filter(self.filter_obj)
162
163     def all(self):
164         """
165         Returns a new TaskQuerySet that is a copy of the current one.
166         """
167         return self._clone()
168
169     def pending(self):
170         return self.filter(status=PENDING)
171
172     def filter(self, **kwargs):
173         """
174         Returns a new TaskQuerySet with the given filters added.
175         """
176         clone = self._clone()
177         for param, value in kwargs.items():
178             clone.filter_obj.add_filter(param, value)
179         return clone
180
181     def get(self, **kwargs):
182         """
183         Performs the query and returns a single object matching the given
184         keyword arguments.
185         """
186         clone = self.filter(**kwargs)
187         num = len(clone)
188         if num == 1:
189             return clone._result_cache[0]
190         if not num:
191             raise Task.DoesNotExist(
192                 'Task matching query does not exist. '
193                 'Lookup parameters were {0}'.format(kwargs))
194         raise ValueError(
195             'get() returned more than one Task -- it returned {0}! '
196             'Lookup parameters were {1}'.format(num, kwargs))
197
198
199 class TaskWarrior(object):
200     DEFAULT_FILTERS = {
201         'status': 'pending',
202     }
203
204     def __init__(self, data_location='~/.task', create=True):
205         if not os.path.exists(data_location):
206             os.makedirs(data_location)
207         self.config = {
208             'data.location': os.path.expanduser(data_location),
209         }
210         self.tasks = TaskQuerySet(self)
211
212     def _generate_command(self, command):
213         args = ['task', 'rc:/']
214         for item in self.config.items():
215             args.append('rc.{0}={1}'.format(*item))
216         args.append(command)
217         return ' '.join(args)
218
219     def _execute_command(self, command):
220         p = subprocess.Popen(self._generate_command(command), shell=True,
221                              stdout=subprocess.PIPE, stderr=subprocess.PIPE)
222         stdout, stderr = p.communicate()
223         if p.returncode:
224             raise TaskWarriorException(stderr.strip())
225         return stdout.strip().split('\n')
226
227     def _format_filter_kwarg(self, kwarg):
228         key, val = kwarg[0], kwarg[1]
229         key = key.replace('__', '.')
230         return '{0}:{1}'.format(key, val)
231
232     def _execute_filter(self, filter_obj):
233         filter_commands = ' '.join(map(self._format_filter_kwarg,
234                                        filter_obj.get_filter_params()))
235         command = '{0} export'.format(filter_commands)
236         tasks = []
237         for line in self._execute_command(command):
238             if line:
239                 tasks.append(Task(self, json.loads(line.strip(','))))
240         return tasks
241
242     def add_task(self, description, project=None):
243         args = ['add', description]
244         if project is not None:
245             args.append('project:{0}'.format(project))
246         self._execute_command(' '.join(args))
247
248     def delete_task(self, task_id):
249         self._execute_command('{0} rc.confirmation:no delete'.format(task_id))
250
251     def complete_task(self, task_id):
252         self._execute_command('{0} done'.format(task_id))
253
254     def import_tasks(self, tasks):
255         fd, path = tempfile.mkstemp()
256         with open(path, 'w') as f:
257             f.write(json.dumps(tasks))
258         self._execute_command('import {0}'.format(path))