All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
10 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
17 class TaskWarriorException(Exception):
23 class DoesNotExist(Exception):
26 def __init__(self, warrior, data={}):
27 self.warrior = warrior
30 def __getitem__(self, key):
31 return self._get_field(key)
33 def __setitem__(self, key, val):
36 def __unicode__(self):
37 return self._data.get('description')
39 def _get_field(self, key):
40 hydrate_func = getattr(self, 'deserialize_{0}'.format(key), lambda x:x)
41 return hydrate_func(self._data.get(key))
43 def _set_field(self, key, value):
44 dehydrate_func = getattr(self, 'serialize_{0}'.format(key), lambda x:x)
45 self._data[key] = dehydrate_func(value)
47 def serialize_due(self, date):
48 return date.strftime(DATE_FORMAT)
50 def deserialize_due(self, date_str):
53 return datetime.datetime.strptime(date_str, DATE_FORMAT)
55 def regenerate_uuid(self):
56 self['uuid'] = str(uuid.uuid4())
59 self.warrior.delete_task(self['uuid'])
62 self.warrior.complete_task(self['uuid'])
64 def save(self, delete_first=True):
65 if self['uuid'] and delete_first:
67 if not self['uuid'] or delete_first:
68 self.regenerate_uuid()
69 self.warrior.import_tasks([self._data])
71 __repr__ = __unicode__
74 class TaskFilter(object):
76 A set of parameters to filter the task list with.
79 def __init__(self, filter_params=[]):
80 self.filter_params = filter_params
82 def add_filter(self, filter_str):
83 self.filter_params.append(filter_str)
85 def add_filter_param(self, key, value):
86 key = key.replace('__', '.')
87 self.filter_params.append('{0}:{1}'.format(key, value))
89 def get_filter_params(self):
90 return ['({})'.format(f) for f in self.filter_params if f]
94 c.filter_params = list(self.filter_params)
98 class TaskQuerySet(object):
100 Represents a lazy lookup for a task objects.
103 def __init__(self, warrior=None, filter_obj=None):
104 self.warrior = warrior
105 self._result_cache = None
106 self.filter_obj = filter_obj or TaskFilter()
108 def __deepcopy__(self, memo):
110 Deep copy of a QuerySet doesn't populate the cache
112 obj = self.__class__()
113 for k,v in self.__dict__.items():
114 if k in ('_iter','_result_cache'):
115 obj.__dict__[k] = None
117 obj.__dict__[k] = copy.deepcopy(v, memo)
121 data = list(self[:REPR_OUTPUT_SIZE + 1])
122 if len(data) > REPR_OUTPUT_SIZE:
123 data[-1] = "...(remaining elements truncated)..."
127 if self._result_cache is None:
128 self._result_cache = list(self)
129 return len(self._result_cache)
132 if self._result_cache is None:
133 self._result_cache = self._execute()
134 return iter(self._result_cache)
136 def __getitem__(self, k):
137 if self._result_cache is None:
138 self._result_cache = list(self)
139 return self._result_cache.__getitem__(k)
142 if self._result_cache is not None:
143 return bool(self._result_cache)
146 except StopIteration:
150 def __nonzero__(self):
151 return type(self).__bool__(self)
153 def _clone(self, klass=None, **kwargs):
155 klass = self.__class__
156 filter_obj = self.filter_obj.clone()
157 c = klass(warrior=self.warrior, filter_obj=filter_obj)
158 c.__dict__.update(kwargs)
163 Fetch the tasks which match the current filters.
165 return self.warrior._execute_filter(self.filter_obj)
169 Returns a new TaskQuerySet that is a copy of the current one.
174 return self.filter(status=PENDING)
176 def filter(self, *args, **kwargs):
178 Returns a new TaskQuerySet with the given filters added.
180 clone = self._clone()
182 clone.filter_obj.add_filter(f)
183 for key, value in kwargs.items():
184 clone.filter_obj.add_filter_param(key, value)
187 def get(self, **kwargs):
189 Performs the query and returns a single object matching the given
192 clone = self.filter(**kwargs)
195 return clone._result_cache[0]
197 raise Task.DoesNotExist(
198 'Task matching query does not exist. '
199 'Lookup parameters were {0}'.format(kwargs))
201 'get() returned more than one Task -- it returned {0}! '
202 'Lookup parameters were {1}'.format(num, kwargs))
205 class TaskWarrior(object):
210 def __init__(self, data_location='~/.task', create=True):
211 if not os.path.exists(data_location):
212 os.makedirs(data_location)
214 'data.location': os.path.expanduser(data_location),
216 self.tasks = TaskQuerySet(self)
218 def _get_command_args(self, args):
219 command_args = ['task', 'rc:/']
220 for item in self.config.items():
221 command_args.append('rc.{0}={1}'.format(*item))
222 command_args.extend(args)
225 def _execute_command(self, args):
226 p = subprocess.Popen(self._get_command_args(args),
227 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
228 stdout, stderr = p.communicate()
230 raise TaskWarriorException(stderr.strip())
231 return stdout.strip().split('\n')
233 def _execute_filter(self, filter_obj):
234 args = filter_obj.get_filter_params() + ['export']
236 for line in self._execute_command(args):
238 tasks.append(Task(self, json.loads(line.strip(','))))
241 def add_task(self, description, project=None):
242 args = ['add', description]
243 if project is not None:
244 args.append('project:{0}'.format(project))
245 self._execute_command(args)
247 def delete_task(self, task_id):
248 args = [task_id, 'rc.confirmation:no', 'delete']
249 self._execute_command(args)
251 def complete_task(self, task_id):
252 args = [task_id, 'done']
253 self._execute_command(args)
255 def import_tasks(self, tasks):
256 fd, path = tempfile.mkstemp()
257 with open(path, 'w') as f:
258 f.write(json.dumps(tasks))
259 args = ['import', path]
260 self._execute_command(args)