]> git.madduck.net Git - etc/taskwarrior.git/blobdiff - tasklib/backends.py

madduck's git repository

Every one of the projects in this repository is available at the canonical URL git://git.madduck.net/madduck/pub/<projectpath> — see each project's metadata for the exact URL.

All patches and comments are welcome. Please squash your changes to logical commits before using git-format-patch and git-send-email to patches@git.madduck.net. If you'd read over the Git project's submission guidelines and adhered to them, I'd be especially grateful.

SSH access, as well as push access can be individually arranged.

If you use my repositories frequently, consider adding the following snippet to ~/.gitconfig and using the third clone URL listed for each project:

[url "git://git.madduck.net/madduck/"]
  insteadOf = madduck:

TaskWarrior: Implement sync method
[etc/taskwarrior.git] / tasklib / backends.py
index 6bf11bb3340c6e0228977880b18e2285853d66ba..80928f1648c25cb1a6099cceb078beeb88c6f6f4 100644 (file)
@@ -1,8 +1,10 @@
 import abc
 import abc
+import json
 import os
 import re
 import subprocess
 
 import os
 import re
 import subprocess
 
+from tasklib.task import TaskFilter
 
 VERSION_2_1_0 = six.u('2.1.0')
 VERSION_2_2_0 = six.u('2.2.0')
 
 VERSION_2_1_0 = six.u('2.1.0')
 VERSION_2_2_0 = six.u('2.2.0')
@@ -17,6 +19,8 @@ VERSION_2_4_5 = six.u('2.4.5')
 
 class Backend(object):
 
 
 class Backend(object):
 
+    filter_class = TaskFilter
+
     @abc.abstractmethod
     def filter_tasks(self, filter_obj):
         """Returns a list of Task objects matching the given filter"""
     @abc.abstractmethod
     def filter_tasks(self, filter_obj):
         """Returns a list of Task objects matching the given filter"""
@@ -38,6 +42,18 @@ class Backend(object):
     def stop_task(self, task):
         pass
 
     def stop_task(self, task):
         pass
 
+    @abc.abstractmethod
+    def complete_task(self, task):
+        pass
+
+    @abc.abstractmethod
+    def refresh_task(self, task, after_save=False):
+        """
+        Refreshes the given task. Returns new data dict with serialized
+        attributes.
+        """
+        pass
+
     @abc.abstractmethod
     def sync(self):
         """Syncs the backend database with the taskd server"""
     @abc.abstractmethod
     def sync(self):
         """Syncs the backend database with the taskd server"""
@@ -204,3 +220,57 @@ class TaskWarrior(object):
     def delete_task(self, task):
         self.execute_command([task['uuid'], 'delete'])
 
     def delete_task(self, task):
         self.execute_command([task['uuid'], 'delete'])
 
+    def start_task(self, task):
+        self.execute_command([task['uuid'], 'start'])
+
+    def stop_task(self, task):
+        self.execute_command([task['uuid'], 'stop'])
+
+    def complete_task(self, task):
+        # Older versions of TW do not stop active task at completion
+        if self.version < VERSION_2_4_0 and task.active:
+            task.stop()
+
+        self.execute_command([task['uuid'], 'done'])
+
+    def refresh_task(self, task, after_save=False):
+        # We need to use ID as backup for uuid here for the refreshes
+        # of newly saved tasks. Any other place in the code is fine
+        # with using UUID only.
+        args = [task['uuid'] or task['id'], 'export']
+        output = self.execute_command(args)
+
+        def valid(output):
+            return len(output) == 1 and output[0].startswith('{')
+
+        # For older TW versions attempt to uniquely locate the task
+        # using the data we have if it has been just saved.
+        # This can happen when adding a completed task on older TW versions.
+        if (not valid(output) and self.version < VERSION_2_4_5
+                and after_save):
+
+            # Make a copy, removing ID and UUID. It's most likely invalid
+            # (ID 0) if it failed to match a unique task.
+            data = copy.deepcopy(task._data)
+            data.pop('id', None)
+            data.pop('uuid', None)
+
+            taskfilter = self.filter_class(self)
+            for key, value in data.items():
+                taskfilter.add_filter_param(key, value)
+
+            output = self.execute_command(['export', '--'] +
+                taskfilter.get_filter_params())
+
+        # If more than 1 task has been matched still, raise an exception
+        if not valid(output):
+            raise TaskWarriorException(
+                "Unique identifiers {0} with description: {1} matches "
+                "multiple tasks: {2}".format(
+                task['uuid'] or task['id'], task['description'], output)
+            )
+
+        return json.loads(output[0])
+
+    def sync(self):
+        self.execute_command(['sync'])