All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
17 class TaskWarriorException(Exception):
23 class DoesNotExist(Exception):
26 def __init__(self, warrior, data={}):
27 self.warrior = warrior
30 def __getitem__(self, key):
31 return self._get_field(key)
33 def __setitem__(self, key, val):
36 def __unicode__(self):
37 return self._data.get('description')
39 def _get_field(self, key):
40 hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41 return hydrate_func(self._data.get(key))
43 def _set_field(self, key, value):
44 dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45 self._data[key] = dehydrate_func(value)
47 def serialize_due(self, date):
48 return date.strftime(DATE_FORMAT)
50 def deserialize_due(self, date_str):
53 return datetime.datetime.strptime(date_str, DATE_FORMAT)
55 def regenerate_uuid(self):
56 self['uuid'] = str(uuid.uuid4())
59 self.warrior.delete_task(self['uuid'])
62 self.warrior.complete_task(self['uuid'])
64 def save(self, delete_first=True):
65 if self['uuid'] and delete_first:
67 if not self['uuid'] or delete_first:
68 self.regenerate_uuid()
69 self.warrior.import_tasks([self._data])
71 __repr__ = __unicode__
74 class TaskFilter(object):
76 A set of parameters to filter the task list with.
79 def __init__(self, filter_params=()):
80 self.filter_params = filter_params
82 def add_filter(self, param, value):
83 self.filter_params += ((param, value),)
85 def get_filter_params(self):
86 return self.filter_params
90 c.filter_params = tuple(self.filter_params)
94 class TaskQuerySet(object):
96 Represents a lazy lookup for a task objects.
99 def __init__(self, warrior=None, filter_obj=None):
100 self.warrior = warrior
101 self._result_cache = None
102 self.filter_obj = filter_obj or TaskFilter()
104 def __deepcopy__(self, memo):
106 Deep copy of a QuerySet doesn't populate the cache
108 obj = self.__class__()
109 for k,v in self.__dict__.items():
110 if k in ('_iter','_result_cache'):
111 obj.__dict__[k] = None
113 obj.__dict__[k] = copy.deepcopy(v, memo)
117 data = list(self[:REPR_OUTPUT_SIZE + 1])
118 if len(data) > REPR_OUTPUT_SIZE:
119 data[-1] = "...(remaining elements truncated)..."
123 if self._result_cache is None:
124 self._result_cache = list(self)
125 return len(self._result_cache)
128 if self._result_cache is None:
129 self._result_cache = self._execute()
130 return iter(self._result_cache)
132 def __getitem__(self, k):
133 if self._result_cache is None:
134 self._result_cache = list(self)
135 return self._result_cache.__getitem__(k)
138 if self._result_cache is not None:
139 return bool(self._result_cache)
142 except StopIteration:
146 def __nonzero__(self):
147 return type(self).__bool__(self)
149 def _clone(self, klass=None, **kwargs):
151 klass = self.__class__
152 filter_obj = self.filter_obj.clone()
153 c = klass(warrior=self.warrior, filter_obj=filter_obj)
154 c.__dict__.update(kwargs)
159 Fetch the tasks which match the current filters.
161 return self.warrior._execute_filter(self.filter_obj)
165 Returns a new TaskQuerySet that is a copy of the current one.
170 return self.filter(status=PENDING)
172 def filter(self, **kwargs):
174 Returns a new TaskQuerySet with the given filters added.
176 clone = self._clone()
177 for param, value in kwargs.items():
178 clone.filter_obj.add_filter(param, value)
181 def get(self, **kwargs):
183 Performs the query and returns a single object matching the given
186 clone = self.filter(**kwargs)
189 return clone._result_cache[0]
191 raise Task.DoesNotExist(
192 'Task matching query does not exist. '
193 'Lookup parameters were {0}'.format(kwargs))
195 'get() returned more than one Task -- it returned {0}! '
196 'Lookup parameters were {1}'.format(num, kwargs))
199 class TaskWarrior(object):
204 def __init__(self, data_location='~/.task', create=True):
205 if not os.path.exists(data_location):
206 os.makedirs(data_location)
208 'data.location': os.path.expanduser(data_location),
210 self.tasks = TaskQuerySet(self)
212 def _generate_command(self, command):
213 args = ['task', 'rc:/']
214 for item in self.config.items():
215 args.append('rc.{0}={1}'.format(*item))
217 return ' '.join(args)
219 def _execute_command(self, command):
220 p = subprocess.Popen(self._generate_command(command), shell=True,
221 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
222 stdout, stderr = p.communicate()
224 raise TaskWarriorException(stderr.strip())
225 return stdout.strip().split('\n')
227 def _format_filter_kwarg(self, kwarg):
228 key, val = kwarg[0], kwarg[1]
229 key = key.replace('__', '.')
230 return '{0}:{1}'.format(key, val)
232 def _execute_filter(self, filter_obj):
233 filter_commands = ' '.join(map(self._format_filter_kwarg,
234 filter_obj.get_filter_params()))
235 command = '{0} export'.format(filter_commands)
237 for line in self._execute_command(command):
239 tasks.append(Task(self, json.loads(line.strip(','))))
242 def add_task(self, description, project=None):
243 args = ['add', description]
244 if project is not None:
245 args.append('project:{0}'.format(project))
246 self._execute_command(' '.join(args))
248 def delete_task(self, task_id):
249 self._execute_command('{0} rc.confirmation:no delete'.format(task_id))
251 def complete_task(self, task_id):
252 self._execute_command('{0} done'.format(task_id))
254 def import_tasks(self, tasks):
255 fd, path = tempfile.mkstemp()
256 with open(path, 'w') as f:
257 f.write(json.dumps(tasks))
258 self._execute_command('import {0}'.format(path))