]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

f0c73ef4f0012a34c24a7550ef5584f806852f52
[etc/taskwarrior.git] / tasklib / backends.py
1 import abc
2 import json
3 import os
4 import re
5 import subprocess
6
7 from tasklib.task import TaskFilter
8
9 VERSION_2_1_0 = six.u('2.1.0')
10 VERSION_2_2_0 = six.u('2.2.0')
11 VERSION_2_3_0 = six.u('2.3.0')
12 VERSION_2_4_0 = six.u('2.4.0')
13 VERSION_2_4_1 = six.u('2.4.1')
14 VERSION_2_4_2 = six.u('2.4.2')
15 VERSION_2_4_3 = six.u('2.4.3')
16 VERSION_2_4_4 = six.u('2.4.4')
17 VERSION_2_4_5 = six.u('2.4.5')
18
19
20 class Backend(object):
21
22     filter_class = TaskFilter
23
24     @abc.abstractmethod
25     def filter_tasks(self, filter_obj):
26         """Returns a list of Task objects matching the given filter"""
27         pass
28
29     @abc.abstractmethod
30     def save_task(self, task):
31         pass
32
33     @abc.abstractmethod
34     def delete_task(self, task):
35         pass
36
37     @abc.abstractmethod
38     def start_task(self, task):
39         pass
40
41     @abc.abstractmethod
42     def stop_task(self, task):
43         pass
44
45     @abc.abstractmethod
46     def complete_task(self, task):
47         pass
48
49     @abc.abstractmethod
50     def refresh_task(self, task, after_save=False):
51         """
52         Refreshes the given task. Returns new data dict with serialized
53         attributes.
54         """
55         pass
56
57     @abc.abstractmethod
58     def annotate_task(self, task, annotation):
59         pass
60
61     @abc.abstractmethod
62     def denotate_task(self, task, annotation):
63         pass
64
65     @abc.abstractmethod
66     def sync(self):
67         """Syncs the backend database with the taskd server"""
68         pass
69
70
71 class TaskWarriorException(Exception):
72     pass
73
74
75 class TaskWarrior(object):
76     def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
77         self.taskrc_location = os.path.expanduser(taskrc_location)
78
79         # If taskrc does not exist, pass / to use defaults and avoid creating
80         # dummy .taskrc file by TaskWarrior
81         if not os.path.exists(self.taskrc_location):
82             self.taskrc_location = '/'
83
84         self.version = self._get_version()
85         self.config = {
86             'confirmation': 'no',
87             'dependency.confirmation': 'no',  # See TW-1483 or taskrc man page
88             'recurrence.confirmation': 'no',  # Necessary for modifying R tasks
89
90             # Defaults to on since 2.4.5, we expect off during parsing
91             'json.array': 'off',
92
93             # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
94             # arbitrary big number which is likely to be large enough
95             'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
96         }
97
98         # Set data.location override if passed via kwarg
99         if data_location is not None:
100             data_location = os.path.expanduser(data_location)
101             if create and not os.path.exists(data_location):
102                 os.makedirs(data_location)
103             self.config['data.location'] = data_location
104
105         self.tasks = TaskQuerySet(self)
106
107     def _get_command_args(self, args, config_override=None):
108         command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
109         config = self.config.copy()
110         config.update(config_override or dict())
111         for item in config.items():
112             command_args.append('rc.{0}={1}'.format(*item))
113         command_args.extend(map(six.text_type, args))
114         return command_args
115
116     def _get_version(self):
117         p = subprocess.Popen(
118                 ['task', '--version'],
119                 stdout=subprocess.PIPE,
120                 stderr=subprocess.PIPE)
121         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
122         return stdout.strip('\n')
123
124     def get_config(self):
125         raw_output = self.execute_command(
126                 ['show'],
127                 config_override={'verbose': 'nothing'}
128             )
129
130         config = dict()
131         config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
132
133         for line in raw_output:
134             match = config_regex.match(line)
135             if match:
136                 config[match.group('key')] = match.group('value').strip()
137
138         return config
139
140     def execute_command(self, args, config_override=None, allow_failure=True,
141                         return_all=False):
142         command_args = self._get_command_args(
143             args, config_override=config_override)
144         logger.debug(' '.join(command_args))
145         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
146                              stderr=subprocess.PIPE)
147         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
148         if p.returncode and allow_failure:
149             if stderr.strip():
150                 error_msg = stderr.strip()
151             else:
152                 error_msg = stdout.strip()
153             raise TaskWarriorException(error_msg)
154
155         # Return all whole triplet only if explicitly asked for
156         if not return_all:
157             return stdout.rstrip().split('\n')
158         else:
159             return (stdout.rstrip().split('\n'),
160                     stderr.rstrip().split('\n'),
161                     p.returncode)
162
163     def enforce_recurrence(self):
164         # Run arbitrary report command which will trigger generation
165         # of recurrent tasks.
166
167         # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
168         if self.version < VERSION_2_4_2:
169             self.execute_command(['next'], allow_failure=False)
170
171     def merge_with(self, path, push=False):
172         path = path.rstrip('/') + '/'
173         self.execute_command(['merge', path], config_override={
174             'merge.autopush': 'yes' if push else 'no',
175         })
176
177     def undo(self):
178         self.execute_command(['undo'])
179
180     # Backend interface implementation
181
182     def filter_tasks(self, filter_obj):
183         self.enforce_recurrence()
184         args = ['export', '--'] + filter_obj.get_filter_params()
185         tasks = []
186         for line in self.execute_command(args):
187             if line:
188                 data = line.strip(',')
189                 try:
190                     filtered_task = Task(self)
191                     filtered_task._load_data(json.loads(data))
192                     tasks.append(filtered_task)
193                 except ValueError:
194                     raise TaskWarriorException('Invalid JSON: %s' % data)
195         return tasks
196
197     def save_task(self, task):
198         """Save a task into TaskWarrior database using add/modify call"""
199
200         args = [task['uuid'], 'modify'] if task.saved else ['add']
201         args.extend(task._get_modified_fields_as_args())
202         output = self.execute_command(args)
203
204         # Parse out the new ID, if the task is being added for the first time
205         if not task.saved:
206             id_lines = [l for l in output if l.startswith('Created task ')]
207
208             # Complain loudly if it seems that more tasks were created
209             # Should not happen
210             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
211                 raise TaskWarriorException("Unexpected output when creating "
212                                            "task: %s" % '\n'.join(id_lines))
213
214             # Circumvent the ID storage, since ID is considered read-only
215             identifier = id_lines[0].split(' ')[2].rstrip('.')
216
217             # Identifier can be either ID or UUID for completed tasks
218             try:
219                 task._data['id'] = int(identifier)
220             except ValueError:
221                 task._data['uuid'] = identifier
222
223         # Refreshing is very important here, as not only modification time
224         # is updated, but arbitrary attribute may have changed due hooks
225         # altering the data before saving
226         task.refresh(after_save=True)
227
228     def delete_task(self, task):
229         self.execute_command([task['uuid'], 'delete'])
230
231     def start_task(self, task):
232         self.execute_command([task['uuid'], 'start'])
233
234     def stop_task(self, task):
235         self.execute_command([task['uuid'], 'stop'])
236
237     def complete_task(self, task):
238         # Older versions of TW do not stop active task at completion
239         if self.version < VERSION_2_4_0 and task.active:
240             task.stop()
241
242         self.execute_command([task['uuid'], 'done'])
243
244     def annotate_task(self, task, annotation):
245         args = [task['uuid'], 'annotate', annotation]
246         self.execute_command(args)
247
248     def denotate_task(self, task, annotation):
249         args = [task['uuid'], 'denotate', annotation]
250         self.execute_command(args)
251
252     def refresh_task(self, task, after_save=False):
253         # We need to use ID as backup for uuid here for the refreshes
254         # of newly saved tasks. Any other place in the code is fine
255         # with using UUID only.
256         args = [task['uuid'] or task['id'], 'export']
257         output = self.execute_command(args)
258
259         def valid(output):
260             return len(output) == 1 and output[0].startswith('{')
261
262         # For older TW versions attempt to uniquely locate the task
263         # using the data we have if it has been just saved.
264         # This can happen when adding a completed task on older TW versions.
265         if (not valid(output) and self.version < VERSION_2_4_5
266                 and after_save):
267
268             # Make a copy, removing ID and UUID. It's most likely invalid
269             # (ID 0) if it failed to match a unique task.
270             data = copy.deepcopy(task._data)
271             data.pop('id', None)
272             data.pop('uuid', None)
273
274             taskfilter = self.filter_class(self)
275             for key, value in data.items():
276                 taskfilter.add_filter_param(key, value)
277
278             output = self.execute_command(['export', '--'] +
279                 taskfilter.get_filter_params())
280
281         # If more than 1 task has been matched still, raise an exception
282         if not valid(output):
283             raise TaskWarriorException(
284                 "Unique identifiers {0} with description: {1} matches "
285                 "multiple tasks: {2}".format(
286                 task['uuid'] or task['id'], task['description'], output)
287             )
288
289         return json.loads(output[0])
290
291     def sync(self):
292         self.execute_command(['sync'])