]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

backends: Add method for annotations to the backend interface
[etc/taskwarrior.git] / tasklib / backends.py
1 import abc
2 import json
3 import os
4 import re
5 import subprocess
6
7 from tasklib.task import TaskFilter
8
9 VERSION_2_1_0 = six.u('2.1.0')
10 VERSION_2_2_0 = six.u('2.2.0')
11 VERSION_2_3_0 = six.u('2.3.0')
12 VERSION_2_4_0 = six.u('2.4.0')
13 VERSION_2_4_1 = six.u('2.4.1')
14 VERSION_2_4_2 = six.u('2.4.2')
15 VERSION_2_4_3 = six.u('2.4.3')
16 VERSION_2_4_4 = six.u('2.4.4')
17 VERSION_2_4_5 = six.u('2.4.5')
18
19
20 class Backend(object):
21
22     filter_class = TaskFilter
23
24     @abc.abstractmethod
25     def filter_tasks(self, filter_obj):
26         """Returns a list of Task objects matching the given filter"""
27         pass
28
29     @abc.abstractmethod
30     def save_task(self, task):
31         pass
32
33     @abc.abstractmethod
34     def delete_task(self, task):
35         pass
36
37     @abc.abstractmethod
38     def start_task(self, task):
39         pass
40
41     @abc.abstractmethod
42     def stop_task(self, task):
43         pass
44
45     @abc.abstractmethod
46     def complete_task(self, task):
47         pass
48
49     @abc.abstractmethod
50     def refresh_task(self, task, after_save=False):
51         """
52         Refreshes the given task. Returns new data dict with serialized
53         attributes.
54         """
55         pass
56
57     @abc.abstractmethod
58     def annotate_task(self, task, annotation):
59         pass
60
61     @abc.abstractmethod
62     def denotate_task(self, task, annotation):
63         pass
64
65     @abc.abstractmethod
66     def sync(self):
67         """Syncs the backend database with the taskd server"""
68         pass
69
70
71 class TaskWarriorException(Exception):
72     pass
73
74
75 class TaskWarrior(object):
76     def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
77         self.taskrc_location = os.path.expanduser(taskrc_location)
78
79         # If taskrc does not exist, pass / to use defaults and avoid creating
80         # dummy .taskrc file by TaskWarrior
81         if not os.path.exists(self.taskrc_location):
82             self.taskrc_location = '/'
83
84         self.version = self._get_version()
85         self.config = {
86             'confirmation': 'no',
87             'dependency.confirmation': 'no',  # See TW-1483 or taskrc man page
88             'recurrence.confirmation': 'no',  # Necessary for modifying R tasks
89
90             # Defaults to on since 2.4.5, we expect off during parsing
91             'json.array': 'off',
92
93             # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
94             # arbitrary big number which is likely to be large enough
95             'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
96         }
97
98         # Set data.location override if passed via kwarg
99         if data_location is not None:
100             data_location = os.path.expanduser(data_location)
101             if create and not os.path.exists(data_location):
102                 os.makedirs(data_location)
103             self.config['data.location'] = data_location
104
105         self.tasks = TaskQuerySet(self)
106
107     def _get_command_args(self, args, config_override=None):
108         command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
109         config = self.config.copy()
110         config.update(config_override or dict())
111         for item in config.items():
112             command_args.append('rc.{0}={1}'.format(*item))
113         command_args.extend(map(six.text_type, args))
114         return command_args
115
116     def _get_version(self):
117         p = subprocess.Popen(
118                 ['task', '--version'],
119                 stdout=subprocess.PIPE,
120                 stderr=subprocess.PIPE)
121         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
122         return stdout.strip('\n')
123
124     def get_config(self):
125         raw_output = self.execute_command(
126                 ['show'],
127                 config_override={'verbose': 'nothing'}
128             )
129
130         config = dict()
131         config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
132
133         for line in raw_output:
134             match = config_regex.match(line)
135             if match:
136                 config[match.group('key')] = match.group('value').strip()
137
138         return config
139
140     def execute_command(self, args, config_override=None, allow_failure=True,
141                         return_all=False):
142         command_args = self._get_command_args(
143             args, config_override=config_override)
144         logger.debug(' '.join(command_args))
145         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
146                              stderr=subprocess.PIPE)
147         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
148         if p.returncode and allow_failure:
149             if stderr.strip():
150                 error_msg = stderr.strip()
151             else:
152                 error_msg = stdout.strip()
153             raise TaskWarriorException(error_msg)
154
155         # Return all whole triplet only if explicitly asked for
156         if not return_all:
157             return stdout.rstrip().split('\n')
158         else:
159             return (stdout.rstrip().split('\n'),
160                     stderr.rstrip().split('\n'),
161                     p.returncode)
162
163     def enforce_recurrence(self):
164         # Run arbitrary report command which will trigger generation
165         # of recurrent tasks.
166
167         # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
168         if self.version < VERSION_2_4_2:
169             self.execute_command(['next'], allow_failure=False)
170
171     def merge_with(self, path, push=False):
172         path = path.rstrip('/') + '/'
173         self.execute_command(['merge', path], config_override={
174             'merge.autopush': 'yes' if push else 'no',
175         })
176
177     def undo(self):
178         self.execute_command(['undo'])
179
180     # Backend interface implementation
181
182     def filter_tasks(self, filter_obj):
183         self.enforce_recurrence()
184         args = ['export', '--'] + filter_obj.get_filter_params()
185         tasks = []
186         for line in self.execute_command(args):
187             if line:
188                 data = line.strip(',')
189                 try:
190                     filtered_task = Task(self)
191                     filtered_task._load_data(json.loads(data))
192                     tasks.append(filtered_task)
193                 except ValueError:
194                     raise TaskWarriorException('Invalid JSON: %s' % data)
195         return tasks
196
197     def save_task(self, task):
198         """Save a task into TaskWarrior database using add/modify call"""
199
200         args = [task['uuid'], 'modify'] if task.saved else ['add']
201         args.extend(task._get_modified_fields_as_args())
202         output = self.execute_command(args)
203
204         # Parse out the new ID, if the task is being added for the first time
205         if not task.saved:
206             id_lines = [l for l in output if l.startswith('Created task ')]
207
208             # Complain loudly if it seems that more tasks were created
209             # Should not happen
210             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
211                 raise TaskWarriorException("Unexpected output when creating "
212                                            "task: %s" % '\n'.join(id_lines))
213
214             # Circumvent the ID storage, since ID is considered read-only
215             identifier = id_lines[0].split(' ')[2].rstrip('.')
216
217             # Identifier can be either ID or UUID for completed tasks
218             try:
219                 task._data['id'] = int(identifier)
220             except ValueError:
221                 task._data['uuid'] = identifier
222
223         # Refreshing is very important here, as not only modification time
224         # is updated, but arbitrary attribute may have changed due hooks
225         # altering the data before saving
226         task.refresh(after_save=True)
227
228     def delete_task(self, task):
229         self.execute_command([task['uuid'], 'delete'])
230
231     def start_task(self, task):
232         self.execute_command([task['uuid'], 'start'])
233
234     def stop_task(self, task):
235         self.execute_command([task['uuid'], 'stop'])
236
237     def complete_task(self, task):
238         # Older versions of TW do not stop active task at completion
239         if self.version < VERSION_2_4_0 and task.active:
240             task.stop()
241
242         self.execute_command([task['uuid'], 'done'])
243
244     def refresh_task(self, task, after_save=False):
245         # We need to use ID as backup for uuid here for the refreshes
246         # of newly saved tasks. Any other place in the code is fine
247         # with using UUID only.
248         args = [task['uuid'] or task['id'], 'export']
249         output = self.execute_command(args)
250
251         def valid(output):
252             return len(output) == 1 and output[0].startswith('{')
253
254         # For older TW versions attempt to uniquely locate the task
255         # using the data we have if it has been just saved.
256         # This can happen when adding a completed task on older TW versions.
257         if (not valid(output) and self.version < VERSION_2_4_5
258                 and after_save):
259
260             # Make a copy, removing ID and UUID. It's most likely invalid
261             # (ID 0) if it failed to match a unique task.
262             data = copy.deepcopy(task._data)
263             data.pop('id', None)
264             data.pop('uuid', None)
265
266             taskfilter = self.filter_class(self)
267             for key, value in data.items():
268                 taskfilter.add_filter_param(key, value)
269
270             output = self.execute_command(['export', '--'] +
271                 taskfilter.get_filter_params())
272
273         # If more than 1 task has been matched still, raise an exception
274         if not valid(output):
275             raise TaskWarriorException(
276                 "Unique identifiers {0} with description: {1} matches "
277                 "multiple tasks: {2}".format(
278                 task['uuid'] or task['id'], task['description'], output)
279             )
280
281         return json.loads(output[0])
282
283     def sync(self):
284         self.execute_command(['sync'])