]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

backends: Add complete_task to the backend interface
[etc/taskwarrior.git] / tasklib / backends.py
1 import abc
2 import os
3 import re
4 import subprocess
5
6
7 VERSION_2_1_0 = six.u('2.1.0')
8 VERSION_2_2_0 = six.u('2.2.0')
9 VERSION_2_3_0 = six.u('2.3.0')
10 VERSION_2_4_0 = six.u('2.4.0')
11 VERSION_2_4_1 = six.u('2.4.1')
12 VERSION_2_4_2 = six.u('2.4.2')
13 VERSION_2_4_3 = six.u('2.4.3')
14 VERSION_2_4_4 = six.u('2.4.4')
15 VERSION_2_4_5 = six.u('2.4.5')
16
17
18 class Backend(object):
19
20     @abc.abstractmethod
21     def filter_tasks(self, filter_obj):
22         """Returns a list of Task objects matching the given filter"""
23         pass
24
25     @abc.abstractmethod
26     def save_task(self, task):
27         pass
28
29     @abc.abstractmethod
30     def delete_task(self, task):
31         pass
32
33     @abc.abstractmethod
34     def start_task(self, task):
35         pass
36
37     @abc.abstractmethod
38     def stop_task(self, task):
39         pass
40
41     @abc.abstractmethod
42     def complete_task(self, task):
43         pass
44
45     @abc.abstractmethod
46     def sync(self):
47         """Syncs the backend database with the taskd server"""
48         pass
49
50
51 class TaskWarriorException(Exception):
52     pass
53
54
55 class TaskWarrior(object):
56     def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
57         self.taskrc_location = os.path.expanduser(taskrc_location)
58
59         # If taskrc does not exist, pass / to use defaults and avoid creating
60         # dummy .taskrc file by TaskWarrior
61         if not os.path.exists(self.taskrc_location):
62             self.taskrc_location = '/'
63
64         self.version = self._get_version()
65         self.config = {
66             'confirmation': 'no',
67             'dependency.confirmation': 'no',  # See TW-1483 or taskrc man page
68             'recurrence.confirmation': 'no',  # Necessary for modifying R tasks
69
70             # Defaults to on since 2.4.5, we expect off during parsing
71             'json.array': 'off',
72
73             # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
74             # arbitrary big number which is likely to be large enough
75             'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
76         }
77
78         # Set data.location override if passed via kwarg
79         if data_location is not None:
80             data_location = os.path.expanduser(data_location)
81             if create and not os.path.exists(data_location):
82                 os.makedirs(data_location)
83             self.config['data.location'] = data_location
84
85         self.tasks = TaskQuerySet(self)
86
87     def _get_command_args(self, args, config_override=None):
88         command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
89         config = self.config.copy()
90         config.update(config_override or dict())
91         for item in config.items():
92             command_args.append('rc.{0}={1}'.format(*item))
93         command_args.extend(map(six.text_type, args))
94         return command_args
95
96     def _get_version(self):
97         p = subprocess.Popen(
98                 ['task', '--version'],
99                 stdout=subprocess.PIPE,
100                 stderr=subprocess.PIPE)
101         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
102         return stdout.strip('\n')
103
104     def get_config(self):
105         raw_output = self.execute_command(
106                 ['show'],
107                 config_override={'verbose': 'nothing'}
108             )
109
110         config = dict()
111         config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
112
113         for line in raw_output:
114             match = config_regex.match(line)
115             if match:
116                 config[match.group('key')] = match.group('value').strip()
117
118         return config
119
120     def execute_command(self, args, config_override=None, allow_failure=True,
121                         return_all=False):
122         command_args = self._get_command_args(
123             args, config_override=config_override)
124         logger.debug(' '.join(command_args))
125         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
126                              stderr=subprocess.PIPE)
127         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
128         if p.returncode and allow_failure:
129             if stderr.strip():
130                 error_msg = stderr.strip()
131             else:
132                 error_msg = stdout.strip()
133             raise TaskWarriorException(error_msg)
134
135         # Return all whole triplet only if explicitly asked for
136         if not return_all:
137             return stdout.rstrip().split('\n')
138         else:
139             return (stdout.rstrip().split('\n'),
140                     stderr.rstrip().split('\n'),
141                     p.returncode)
142
143     def enforce_recurrence(self):
144         # Run arbitrary report command which will trigger generation
145         # of recurrent tasks.
146
147         # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
148         if self.version < VERSION_2_4_2:
149             self.execute_command(['next'], allow_failure=False)
150
151     def merge_with(self, path, push=False):
152         path = path.rstrip('/') + '/'
153         self.execute_command(['merge', path], config_override={
154             'merge.autopush': 'yes' if push else 'no',
155         })
156
157     def undo(self):
158         self.execute_command(['undo'])
159
160     # Backend interface implementation
161
162     def filter_tasks(self, filter_obj):
163         self.enforce_recurrence()
164         args = ['export', '--'] + filter_obj.get_filter_params()
165         tasks = []
166         for line in self.execute_command(args):
167             if line:
168                 data = line.strip(',')
169                 try:
170                     filtered_task = Task(self)
171                     filtered_task._load_data(json.loads(data))
172                     tasks.append(filtered_task)
173                 except ValueError:
174                     raise TaskWarriorException('Invalid JSON: %s' % data)
175         return tasks
176
177     def save_task(self, task):
178         """Save a task into TaskWarrior database using add/modify call"""
179
180         args = [task['uuid'], 'modify'] if task.saved else ['add']
181         args.extend(task._get_modified_fields_as_args())
182         output = self.execute_command(args)
183
184         # Parse out the new ID, if the task is being added for the first time
185         if not task.saved:
186             id_lines = [l for l in output if l.startswith('Created task ')]
187
188             # Complain loudly if it seems that more tasks were created
189             # Should not happen
190             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
191                 raise TaskWarriorException("Unexpected output when creating "
192                                            "task: %s" % '\n'.join(id_lines))
193
194             # Circumvent the ID storage, since ID is considered read-only
195             identifier = id_lines[0].split(' ')[2].rstrip('.')
196
197             # Identifier can be either ID or UUID for completed tasks
198             try:
199                 task._data['id'] = int(identifier)
200             except ValueError:
201                 task._data['uuid'] = identifier
202
203         # Refreshing is very important here, as not only modification time
204         # is updated, but arbitrary attribute may have changed due hooks
205         # altering the data before saving
206         task.refresh(after_save=True)
207
208     def delete_task(self, task):
209         self.execute_command([task['uuid'], 'delete'])
210
211     def start_task(self, task):
212         self.execute_command([task['uuid'], 'start'])
213
214     def stop_task(self, task):
215         self.execute_command([task['uuid'], 'stop'])