]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

Task: Move TW-specific saving logic into the backend
[etc/taskwarrior.git] / tasklib / backends.py
1 import abc
2 import os
3 import re
4 import subprocess
5
6
7 VERSION_2_1_0 = six.u('2.1.0')
8 VERSION_2_2_0 = six.u('2.2.0')
9 VERSION_2_3_0 = six.u('2.3.0')
10 VERSION_2_4_0 = six.u('2.4.0')
11 VERSION_2_4_1 = six.u('2.4.1')
12 VERSION_2_4_2 = six.u('2.4.2')
13 VERSION_2_4_3 = six.u('2.4.3')
14 VERSION_2_4_4 = six.u('2.4.4')
15 VERSION_2_4_5 = six.u('2.4.5')
16
17
18 class Backend(object):
19
20     @abc.abstractmethod
21     def filter_tasks(self, filter_obj):
22         """Returns a list of Task objects matching the given filter"""
23         pass
24
25     @abc.abstractmethod
26     def save_task(self, task):
27         pass
28
29     @abc.abstractmethod
30     def delete_task(self, task):
31         pass
32
33     @abc.abstractmethod
34     def start_task(self, task):
35         pass
36
37     @abc.abstractmethod
38     def stop_task(self, task):
39         pass
40
41     @abc.abstractmethod
42     def sync(self):
43         """Syncs the backend database with the taskd server"""
44         pass
45
46
47 class TaskWarriorException(Exception):
48     pass
49
50
51 class TaskWarrior(object):
52     def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
53         self.taskrc_location = os.path.expanduser(taskrc_location)
54
55         # If taskrc does not exist, pass / to use defaults and avoid creating
56         # dummy .taskrc file by TaskWarrior
57         if not os.path.exists(self.taskrc_location):
58             self.taskrc_location = '/'
59
60         self.version = self._get_version()
61         self.config = {
62             'confirmation': 'no',
63             'dependency.confirmation': 'no',  # See TW-1483 or taskrc man page
64             'recurrence.confirmation': 'no',  # Necessary for modifying R tasks
65
66             # Defaults to on since 2.4.5, we expect off during parsing
67             'json.array': 'off',
68
69             # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
70             # arbitrary big number which is likely to be large enough
71             'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
72         }
73
74         # Set data.location override if passed via kwarg
75         if data_location is not None:
76             data_location = os.path.expanduser(data_location)
77             if create and not os.path.exists(data_location):
78                 os.makedirs(data_location)
79             self.config['data.location'] = data_location
80
81         self.tasks = TaskQuerySet(self)
82
83     def _get_command_args(self, args, config_override=None):
84         command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
85         config = self.config.copy()
86         config.update(config_override or dict())
87         for item in config.items():
88             command_args.append('rc.{0}={1}'.format(*item))
89         command_args.extend(map(six.text_type, args))
90         return command_args
91
92     def _get_version(self):
93         p = subprocess.Popen(
94                 ['task', '--version'],
95                 stdout=subprocess.PIPE,
96                 stderr=subprocess.PIPE)
97         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
98         return stdout.strip('\n')
99
100     def get_config(self):
101         raw_output = self.execute_command(
102                 ['show'],
103                 config_override={'verbose': 'nothing'}
104             )
105
106         config = dict()
107         config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
108
109         for line in raw_output:
110             match = config_regex.match(line)
111             if match:
112                 config[match.group('key')] = match.group('value').strip()
113
114         return config
115
116     def execute_command(self, args, config_override=None, allow_failure=True,
117                         return_all=False):
118         command_args = self._get_command_args(
119             args, config_override=config_override)
120         logger.debug(' '.join(command_args))
121         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
122                              stderr=subprocess.PIPE)
123         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
124         if p.returncode and allow_failure:
125             if stderr.strip():
126                 error_msg = stderr.strip()
127             else:
128                 error_msg = stdout.strip()
129             raise TaskWarriorException(error_msg)
130
131         # Return all whole triplet only if explicitly asked for
132         if not return_all:
133             return stdout.rstrip().split('\n')
134         else:
135             return (stdout.rstrip().split('\n'),
136                     stderr.rstrip().split('\n'),
137                     p.returncode)
138
139     def enforce_recurrence(self):
140         # Run arbitrary report command which will trigger generation
141         # of recurrent tasks.
142
143         # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
144         if self.version < VERSION_2_4_2:
145             self.execute_command(['next'], allow_failure=False)
146
147     def filter_tasks(self, filter_obj):
148         self.enforce_recurrence()
149         args = ['export', '--'] + filter_obj.get_filter_params()
150         tasks = []
151         for line in self.execute_command(args):
152             if line:
153                 data = line.strip(',')
154                 try:
155                     filtered_task = Task(self)
156                     filtered_task._load_data(json.loads(data))
157                     tasks.append(filtered_task)
158                 except ValueError:
159                     raise TaskWarriorException('Invalid JSON: %s' % data)
160         return tasks
161
162     def save_task(self, task):
163         """Save a task into TaskWarrior database using add/modify call"""
164
165         args = [task['uuid'], 'modify'] if task.saved else ['add']
166         args.extend(task._get_modified_fields_as_args())
167         output = self.execute_command(args)
168
169         # Parse out the new ID, if the task is being added for the first time
170         if not task.saved:
171             id_lines = [l for l in output if l.startswith('Created task ')]
172
173             # Complain loudly if it seems that more tasks were created
174             # Should not happen
175             if len(id_lines) != 1 or len(id_lines[0].split(' ')) != 3:
176                 raise TaskWarriorException("Unexpected output when creating "
177                                            "task: %s" % '\n'.join(id_lines))
178
179             # Circumvent the ID storage, since ID is considered read-only
180             identifier = id_lines[0].split(' ')[2].rstrip('.')
181
182             # Identifier can be either ID or UUID for completed tasks
183             try:
184                 task._data['id'] = int(identifier)
185             except ValueError:
186                 task._data['uuid'] = identifier
187
188         # Refreshing is very important here, as not only modification time
189         # is updated, but arbitrary attribute may have changed due hooks
190         # altering the data before saving
191         task.refresh(after_save=True)
192
193     def merge_with(self, path, push=False):
194         path = path.rstrip('/') + '/'
195         self.execute_command(['merge', path], config_override={
196             'merge.autopush': 'yes' if push else 'no',
197         })
198
199     def undo(self):
200         self.execute_command(['undo'])