All patches and comments are welcome. Please squash your changes to logical
commits before using git-format-patch and git-send-email to
patches@git.madduck.net.
If you'd read over the Git project's submission guidelines and adhered to them,
I'd be especially grateful.
11 class TaskWarriorException(Exception):
17 def __init__(self, warrior, data={}):
18 self.warrior = warrior
21 def __getitem__(self, key):
22 return self._data.get(key)
24 def __setitem__(self, key, val):
27 def __unicode__(self):
28 return self._data.get('description')
30 def regenerate_uuid(self):
31 self['uuid'] = str(uuid.uuid4())
34 self.warrior.delete_task(self['uuid'])
37 self.warrior.complete_task(self['uuid'])
39 def save(self, delete_first=True):
40 if self['uuid'] and delete_first:
42 if not self['uuid'] or delete_first:
43 self.regenerate_uuid()
44 self.warrior.import_tasks([self._data])
46 __repr__ = __unicode__
49 class TaskWarrior(object):
54 def __init__(self, data_location='~/.task', create=True):
55 if not os.path.exists(data_location):
56 os.makedirs(data_location)
58 'data.location': os.path.expanduser(data_location),
61 def _generate_command(self, command):
62 args = ['task', 'rc:/']
63 for item in self.config.items():
64 args.append('rc.{0}={1}'.format(*item))
68 def _execute(self, command):
69 p = subprocess.Popen(self._generate_command(command), shell=True,
70 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
71 stdout, stderr = p.communicate()
73 raise TaskWarriorException(stderr.strip())
74 return stdout.strip().split('\n')
76 def _format_filter_kwarg(self, kwarg):
77 key, val = kwarg[0], kwarg[1]
78 if key in ['tag', 'tags']:
80 key = key.replace('__', '.')
81 return '{0}:{1}'.format(key, val)
83 def get_tasks(self, **filter_kwargs):
84 filters = dict(self.DEFAULT_FILTERS)
85 filters.update(filter_kwargs)
86 filter_commands = ' '.join(map(self._format_filter_kwarg,
88 command = '{0} export'.format(filter_commands)
90 for line in self._execute(command):
92 tasks.append(Task(self, json.loads(line.strip(','))))
95 def get_task(self, task_id):
96 command = '{0} export'.format(task_id)
97 return Task(self, json.loads(self._execute(command)[0]))
99 def add_task(self, description, project=None):
100 args = ['add', description]
101 if project is not None:
102 args.append('project:{0}'.format(project))
103 self._execute(' '.join(args))
105 def delete_task(self, task_id):
106 self._execute('{0} rc.confirmation:no delete'.format(task_id))
108 def complete_task(self, task_id):
109 self._execute('{0} done'.format(task_id))
111 def import_tasks(self, tasks):
112 fd, path = tempfile.mkstemp()
113 with open(path, 'w') as f:
114 f.write(json.dumps(tasks))
115 self._execute('import {0}'.format(path))