]> git.madduck.net Git - etc/taskwarrior.git/blob - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

backends: Move TaskWarrior class into the backends file
[etc/taskwarrior.git] / tasklib / backends.py
1 import abc
2 import os
3 import re
4 import subprocess
5
6
7 VERSION_2_1_0 = six.u('2.1.0')
8 VERSION_2_2_0 = six.u('2.2.0')
9 VERSION_2_3_0 = six.u('2.3.0')
10 VERSION_2_4_0 = six.u('2.4.0')
11 VERSION_2_4_1 = six.u('2.4.1')
12 VERSION_2_4_2 = six.u('2.4.2')
13 VERSION_2_4_3 = six.u('2.4.3')
14 VERSION_2_4_4 = six.u('2.4.4')
15 VERSION_2_4_5 = six.u('2.4.5')
16
17
18 class Backend(object):
19
20     @abc.abstractmethod
21     def filter_tasks(self, filter_obj):
22         """Returns a list of Task objects matching the given filter"""
23         pass
24
25     @abc.abstractmethod
26     def save_task(self, task):
27         pass
28
29     @abc.abstractmethod
30     def delete_task(self, task):
31         pass
32
33     @abc.abstractmethod
34     def start_task(self, task):
35         pass
36
37     @abc.abstractmethod
38     def stop_task(self, task):
39         pass
40
41     @abc.abstractmethod
42     def sync(self):
43         """Syncs the backend database with the taskd server"""
44         pass
45
46
47 class TaskWarrior(object):
48     def __init__(self, data_location=None, create=True, taskrc_location='~/.taskrc'):
49         self.taskrc_location = os.path.expanduser(taskrc_location)
50
51         # If taskrc does not exist, pass / to use defaults and avoid creating
52         # dummy .taskrc file by TaskWarrior
53         if not os.path.exists(self.taskrc_location):
54             self.taskrc_location = '/'
55
56         self.version = self._get_version()
57         self.config = {
58             'confirmation': 'no',
59             'dependency.confirmation': 'no',  # See TW-1483 or taskrc man page
60             'recurrence.confirmation': 'no',  # Necessary for modifying R tasks
61
62             # Defaults to on since 2.4.5, we expect off during parsing
63             'json.array': 'off',
64
65             # 2.4.3 onwards supports 0 as infite bulk, otherwise set just
66             # arbitrary big number which is likely to be large enough
67             'bulk': 0 if self.version >= VERSION_2_4_3 else 100000,
68         }
69
70         # Set data.location override if passed via kwarg
71         if data_location is not None:
72             data_location = os.path.expanduser(data_location)
73             if create and not os.path.exists(data_location):
74                 os.makedirs(data_location)
75             self.config['data.location'] = data_location
76
77         self.tasks = TaskQuerySet(self)
78
79     def _get_command_args(self, args, config_override=None):
80         command_args = ['task', 'rc:{0}'.format(self.taskrc_location)]
81         config = self.config.copy()
82         config.update(config_override or dict())
83         for item in config.items():
84             command_args.append('rc.{0}={1}'.format(*item))
85         command_args.extend(map(six.text_type, args))
86         return command_args
87
88     def _get_version(self):
89         p = subprocess.Popen(
90                 ['task', '--version'],
91                 stdout=subprocess.PIPE,
92                 stderr=subprocess.PIPE)
93         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
94         return stdout.strip('\n')
95
96     def get_config(self):
97         raw_output = self.execute_command(
98                 ['show'],
99                 config_override={'verbose': 'nothing'}
100             )
101
102         config = dict()
103         config_regex = re.compile(r'^(?P<key>[^\s]+)\s+(?P<value>[^\s].+$)')
104
105         for line in raw_output:
106             match = config_regex.match(line)
107             if match:
108                 config[match.group('key')] = match.group('value').strip()
109
110         return config
111
112     def execute_command(self, args, config_override=None, allow_failure=True,
113                         return_all=False):
114         command_args = self._get_command_args(
115             args, config_override=config_override)
116         logger.debug(' '.join(command_args))
117         p = subprocess.Popen(command_args, stdout=subprocess.PIPE,
118                              stderr=subprocess.PIPE)
119         stdout, stderr = [x.decode('utf-8') for x in p.communicate()]
120         if p.returncode and allow_failure:
121             if stderr.strip():
122                 error_msg = stderr.strip()
123             else:
124                 error_msg = stdout.strip()
125             raise TaskWarriorException(error_msg)
126
127         # Return all whole triplet only if explicitly asked for
128         if not return_all:
129             return stdout.rstrip().split('\n')
130         else:
131             return (stdout.rstrip().split('\n'),
132                     stderr.rstrip().split('\n'),
133                     p.returncode)
134
135     def enforce_recurrence(self):
136         # Run arbitrary report command which will trigger generation
137         # of recurrent tasks.
138
139         # Only necessary for TW up to 2.4.1, fixed in 2.4.2.
140         if self.version < VERSION_2_4_2:
141             self.execute_command(['next'], allow_failure=False)
142
143     def filter_tasks(self, filter_obj):
144         self.enforce_recurrence()
145         args = ['export', '--'] + filter_obj.get_filter_params()
146         tasks = []
147         for line in self.execute_command(args):
148             if line:
149                 data = line.strip(',')
150                 try:
151                     filtered_task = Task(self)
152                     filtered_task._load_data(json.loads(data))
153                     tasks.append(filtered_task)
154                 except ValueError:
155                     raise TaskWarriorException('Invalid JSON: %s' % data)
156         return tasks
157
158     def merge_with(self, path, push=False):
159         path = path.rstrip('/') + '/'
160         self.execute_command(['merge', path], config_override={
161             'merge.autopush': 'yes' if push else 'no',
162         })
163
164     def undo(self):
165         self.execute_command(['undo'])