All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
8 DATE_FORMAT = '%Y%m%dT%H%M%SZ'
11 COMPLETED = 'completed'
13 logger = logging.getLogger(__name__)
16 class TaskWarriorException(Exception):
22 class DoesNotExist(Exception):
25 def __init__(self, warrior, data={}):
26 self.warrior = warrior
28 self._modified_fields = set()
30 def __unicode__(self):
31 return self['description']
33 def __getitem__(self, key):
34 hydrate_func = getattr(self, 'deserialize_{0}'.format(key),
36 return hydrate_func(self._data.get(key))
38 def __setitem__(self, key, value):
39 dehydrate_func = getattr(self, 'serialize_{0}'.format(key),
41 self._data[key] = dehydrate_func(value)
42 self._modified_fields.add(key)
44 def serialize_due(self, date):
45 return date.strftime(DATE_FORMAT)
47 def deserialize_due(self, date_str):
50 return datetime.datetime.strptime(date_str, DATE_FORMAT)
52 def serialize_annotations(self, annotations):
53 ann_list = list(annotations)
55 ann['entry'] = ann['entry'].strftime(DATE_FORMAT)
58 def deserialize_annotations(self, annotations):
59 ann_list = list(annotations)
61 ann['entry'] = datetime.datetime.strptime(
62 ann['entry'], DATE_FORMAT)
65 def deserialize_tags(self, tags):
66 if isinstance(tags, basestring):
67 return tags.split(',') if tags else []
70 def serialize_tags(self, tags):
71 return ','.join(tags) if tags else ''
74 self.warrior.execute_command([self['id'], 'delete'], config_override={
79 self.warrior.execute_command([self['id'], 'done'])
82 args = [self['id'], 'modify'] if self['id'] else ['add']
83 args.extend(self._get_modified_fields_as_args())
84 self.warrior.execute_command(args)
85 self._modified_fields.clear()
87 def _get_modified_fields_as_args(self):
89 for field in self._modified_fields:
90 args.append('{}:{}'.format(field, self._data[field]))
93 __repr__ = __unicode__
96 class TaskFilter(object):
98 A set of parameters to filter the task list with.
101 def __init__(self, filter_params=[]):
102 self.filter_params = filter_params
104 def add_filter(self, filter_str):
105 self.filter_params.append(filter_str)
107 def add_filter_param(self, key, value):
108 key = key.replace('__', '.')
109 self.filter_params.append('{0}:{1}'.format(key, value))
111 def get_filter_params(self):
112 return [f for f in self.filter_params if f]
116 c.filter_params = list(self.filter_params)
120 class TaskQuerySet(object):
122 Represents a lazy lookup for a task objects.
125 def __init__(self, warrior=None, filter_obj=None):
126 self.warrior = warrior
127 self._result_cache = None
128 self.filter_obj = filter_obj or TaskFilter()
130 def __deepcopy__(self, memo):
132 Deep copy of a QuerySet doesn't populate the cache
134 obj = self.__class__()
135 for k, v in self.__dict__.items():
136 if k in ('_iter', '_result_cache'):
137 obj.__dict__[k] = None
139 obj.__dict__[k] = copy.deepcopy(v, memo)
143 data = list(self[:REPR_OUTPUT_SIZE + 1])
144 if len(data) > REPR_OUTPUT_SIZE:
145 data[-1] = "...(remaining elements truncated)..."
149 if self._result_cache is None:
150 self._result_cache = list(self)
151 return len(self._result_cache)
154 if self._result_cache is None:
155 self._result_cache = self._execute()
156 return iter(self._result_cache)
158 def __getitem__(self, k):
159 if self._result_cache is None:
160 self._result_cache = list(self)
161 return self._result_cache.__getitem__(k)
164 if self._result_cache is not None:
165 return bool(self._result_cache)
168 except StopIteration:
172 def __nonzero__(self):
173 return type(self).__bool__(self)
175 def _clone(self, klass=None, **kwargs):
177 klass = self.__class__
178 filter_obj = self.filter_obj.clone()
179 c = klass(warrior=self.warrior, filter_obj=filter_obj)
180 c.__dict__.update(kwargs)
185 Fetch the tasks which match the current filters.
187 return self.warrior.filter_tasks(self.filter_obj)
191 Returns a new TaskQuerySet that is a copy of the current one.
196 return self.filter(status=PENDING)
199 return self.filter(status=COMPLETED)
201 def filter(self, *args, **kwargs):
203 Returns a new TaskQuerySet with the given filters added.
205 clone = self._clone()
207 clone.filter_obj.add_filter(f)
208 for key, value in kwargs.items():
209 clone.filter_obj.add_filter_param(key, value)
212 def get(self, **kwargs):
214 Performs the query and returns a single object matching the given
217 clone = self.filter(**kwargs)
220 return clone._result_cache[0]
222 raise Task.DoesNotExist(
223 'Task matching query does not exist. '
224 'Lookup parameters were {0}'.format(kwargs))
226 'get() returned more than one Task -- it returned {0}! '
227 'Lookup parameters were {1}'.format(num, kwargs))
230 class TaskWarrior(object):
231 def __init__(self, data_location='~/.task', create=True):
232 data_location = os.path.expanduser(data_location)
233 if create and not os.path.exists(data_location):
234 os.makedirs(data_location)
236 'data.location': os.path.expanduser(data_location),
238 self.tasks = TaskQuerySet(self)
240 def _get_command_args(self, args, config_override={}):
241 command_args = ['task', 'rc:/']
242 config = self.config.copy()
243 config.update(config_override)
244 for item in config.items():
245 command_args.append('rc.{0}={1}'.format(*item))
246 command_args.extend(map(str, args))
249 def execute_command(self, args, config_override={}):
250 command_args = self._get_command_args(
251 args, config_override=config_override)
252 logger.debug(' '.join(command_args))
253 p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
254 stderr=subprocess.PIPE)
255 stdout, stderr = [x.decode() for x in p.communicate()]
258 error_msg = stderr.strip().splitlines()[-1]
260 error_msg = stdout.strip()
261 raise TaskWarriorException(error_msg)
262 return stdout.strip().split('\n')
264 def filter_tasks(self, filter_obj):
265 args = ['export', '--'] + filter_obj.get_filter_params()
267 for line in self.execute_command(args):
269 data = line.strip(',')
271 tasks.append(Task(self, json.loads(data)))
273 raise TaskWarriorException('Invalid JSON: %s' % data)
276 def merge_with(self, path, push=False):
277 path = path.rstrip('/') + '/'
278 self.execute_command(['merge', path], config_override={
279 'merge.autopush': 'yes' if push else 'no',
283 self.execute_command(['undo'], config_override={
284 'confirmation': 'no',