summaryrefslogtreecommitdiffstats
path: root/config/chroot_local-includes/usr/local/lib/python3/dist-packages/unlock_veracrypt_volumes/volume_manager.py
blob: 764beaeb711d6b91edea8d20296b4b7467786a82 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import subprocess
import time
import os
from logging import getLogger
from typing import List, Union
from threading import Lock

from gi.repository import Gtk, Gio, UDisks, GUdev, GLib

from unlock_veracrypt_volumes import _
from unlock_veracrypt_volumes.volume_list import ContainerList, DeviceList
from unlock_veracrypt_volumes.volume import Volume
from unlock_veracrypt_volumes.exceptions import UdisksObjectNotFoundError, VolumeNotFoundError
from unlock_veracrypt_volumes.config import APP_NAME, MAIN_UI_FILE


WAIT_FOR_LOOP_SETUP_TIMEOUT = 1


logger = getLogger(__name__)


class VolumeManager(object):
    def __init__(self, application: Gtk.Application):
        self.udisks_client = UDisks.Client.new_sync()
        self.udisks_manager = self.udisks_client.get_manager()
        self.gio_volume_monitor = Gio.VolumeMonitor.get()
        self.gio_volume_monitor.connect("volume-changed", self.on_volume_changed)
        self.gio_volume_monitor.connect("volume-added", self.on_volume_added)
        self.gio_volume_monitor.connect("volume-removed", self.on_volume_removed)
        self.udev_client = GUdev.Client()
        self.mount_op_lock = Lock()

        self.builder = Gtk.Builder.new_from_file(MAIN_UI_FILE)
        self.builder.set_translation_domain(APP_NAME)
        self.builder.connect_signals(self)

        self.window = self.builder.get_object("window")  # type: Gtk.ApplicationWindow
        self.window.set_application(application)
        self.window.set_title("Unlock VeraCrypt Volumes")

        self.container_list = ContainerList()
        self.device_list = DeviceList()

        containers_frame = self.builder.get_object("containers_frame")
        containers_frame.add(self.container_list.list_box)
        devices_frame = self.builder.get_object("devices_frame")
        devices_frame.add(self.device_list.list_box)

        self.add_tcrypt_volumes()

        logger.debug("showing window")
        self.window.show_all()
        self.window.present()

    def add_tcrypt_volumes(self):
        logger.debug("in add_tcrypt_volumes")
        for volume in self.get_tcrypt_volumes():
            self.add_volume(volume)

    def add_volume(self, volume: Volume):
        logger.info("Adding volume %s", volume.device_file)
        if volume.is_file_container:
            self.container_list.add(volume)
        else:
            self.device_list.add(volume)

    def remove_volume(self, volume: Volume):
        logger.info("Removing volume %s", volume.device_file)
        if volume in self.container_list:
            self.container_list.remove(volume)
        elif volume in self.device_list:
            self.device_list.remove(volume)

    def update_volume(self, volume: Volume):
        logger.debug("Updating volume %s", volume.device_file)
        if volume.is_file_container:
            self.container_list.remove(volume)
            self.container_list.add(volume)
        else:
            self.device_list.remove(volume)
            self.device_list.add(volume)

    def get_tcrypt_volumes(self) -> List[Volume]:
        """Returns all connected TCRYPT volumes"""
        return [volume for volume in self.get_all_volumes() if volume.is_tcrypt]

    def get_all_volumes(self) -> List[Volume]:
        """Returns all connected volumes"""
        volumes = list()
        gio_volumes = self.gio_volume_monitor.get_volumes()

        for gio_volume in gio_volumes:
            device_file = gio_volume.get_identifier(Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE)
            if not device_file:
                continue

            logger.debug("volume: %s", device_file)

            try:
                volumes.append(Volume(self, gio_volume))
                logger.debug("is_file_container: %s", volumes[-1].is_file_container)
                logger.debug("is_tcrypt: %s", volumes[-1].is_tcrypt)
                logger.debug("is_unlocked: %s", volumes[-1].is_unlocked)
            except UdisksObjectNotFoundError as e:
                logger.exception(e)

        return volumes

    def on_add_file_container_button_clicked(self, button, data=None):
        path = self.choose_container_path()

        if path in self.container_list.backing_file_paths:
            self.show_warning(title=_("Container already added"),
                              body=_("The file container %s should already be listed.") % path)
            return

        if path:
            self.unlock_file_container(path)

    def attach_file_container(self, path: str) -> Union[Volume, None]:
        logger.debug("attaching file %s. backing_file_paths: %s", path, self.container_list.backing_file_paths)
        warning = dict()

        try:
            fd = os.open(path, os.O_RDWR)
        except PermissionError as e:
            # Try opening read-only
            try:
                fd = os.open(path, os.O_RDONLY)
                warning["title"] = _("Container opened read-only")
                warning["body"] = _("The file container {path} could not be opened with write access. "
                                    "It was opened read-only instead. You will not be able to modify the "
                                    "content of the container.\n"
                                    "{error_message}").format(path=path, error_message=str(e))
            except PermissionError as e:
                self.show_warning(title=_("Error opening file"), body=str(e))
                return None

        fd_list = Gio.UnixFDList()
        fd_list.append(fd)
        udisks_path, __ = self.udisks_manager.call_loop_setup_sync(GLib.Variant('h', 0),  # fd index
                                                                   GLib.Variant('a{sv}', {}),  # options
                                                                   fd_list,  # the fd list
                                                                   None)  # cancellable
        logger.debug("Created loop device %s", udisks_path)

        volume = self._wait_for_loop_setup(path)
        if volume:
            if warning:
                self.show_warning(title=warning["title"], body=warning["body"])
            return volume
        elif not self._udisks_object_is_tcrypt(udisks_path):
            # Remove the loop device
            self.udisks_client.get_object(udisks_path).get_loop().call_delete(GLib.Variant('a{sv}', {}),  # options
                                                                              None,  # cancellable
                                                                              None,  # callback
                                                                              None)  # user data
            self.show_warning(title=_("Not a VeraCrypt container"),
                              body=_("The file %s does not seem to be a VeraCrypt container.") % path)
        else:
            self.show_warning(title=_("Failed to add container"),
                              body=_("Could not add file container %s: Timeout while waiting for loop setup."
                                     "Please try using the <i>Disks</i> application instead.") % path)

    def _wait_for_loop_setup(self, path: str) -> Union[Volume, None]:
        start_time = time.perf_counter()
        while time.perf_counter() - start_time < WAIT_FOR_LOOP_SETUP_TIMEOUT:
            try:
                return self.container_list.find_by_backing_file(path)
            except VolumeNotFoundError:
                self.process_mainloop_events()
                time.sleep(0.1)

    def _udisks_object_is_tcrypt(self, path: str) -> bool:
        if not path:
            return False

        udisks_object = self.udisks_client.get_object(path)
        if not udisks_object:
            return False

        return Volume(self, udisks_object=udisks_object).is_tcrypt

    @staticmethod
    def process_mainloop_events():
        context = GLib.MainLoop().get_context()
        while context.pending():
            context.iteration()

    def open_file_container(self, path: str):
        volume = self.ensure_file_container_is_attached(path)
        if volume:
            volume.open()

    def unlock_file_container(self, path: str, open_after_unlock=False):
        volume = self.ensure_file_container_is_attached(path)
        if volume:
            volume.unlock(open_after_unlock=open_after_unlock)

    def ensure_file_container_is_attached(self, path: str) -> Volume:
        try:
            return self.container_list.find_by_backing_file(path)
        except VolumeNotFoundError:
            return self.attach_file_container(path)

    def choose_container_path(self):
        dialog = Gtk.FileChooserDialog(_("Choose File Container"),
                                       self.window,
                                       Gtk.FileChooserAction.OPEN,
                                       (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
                                        Gtk.STOCK_OPEN, Gtk.ResponseType.ACCEPT))
        result = dialog.run()
        if result != Gtk.ResponseType.ACCEPT:
            dialog.destroy()
            return

        path = dialog.get_filename()
        dialog.destroy()
        return path

    def on_volume_changed(self, volume_monitor: Gio.VolumeMonitor, gio_volume: Gio.Volume):
        logger.debug("in on_volume_changed. volume: %s",
                     gio_volume.get_identifier(Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE))
        try:
            volume = Volume(self, gio_volume)
            if volume.is_tcrypt:
                self.update_volume(volume)
        except UdisksObjectNotFoundError:
            self.remove_volume(Volume(self, gio_volume, with_udisks=False))

    def on_volume_added(self, volume_monitor: Gio.VolumeMonitor, gio_volume: Gio.Volume):
        logger.debug("in on_volume_added. volume: %s",
                     gio_volume.get_identifier(Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE))
        volume = Volume(self, gio_volume)
        if volume.is_tcrypt:
            self.add_volume(volume)

    def on_volume_removed(self, volume_monitor: Gio.VolumeMonitor, gio_volume: Gio.Volume):
        logger.debug("in on_volume_removed. volume: %s",
                     gio_volume.get_identifier(Gio.VOLUME_IDENTIFIER_KIND_UNIX_DEVICE))
        self.remove_volume(Volume(self, gio_volume, with_udisks=False))

    def open_uri(self, uri: str):
        # This is the recommended way, but it turns the cursor into wait status for up to
        # 10 seconds after the file manager was already opened.
        # Gtk.show_uri_on_window(self.window, uri, Gtk.get_current_event_time())
        subprocess.Popen(["xdg-open", uri])

    def show_warning(self, title: str, body: str):
        dialog = Gtk.MessageDialog(self.window,
                                   Gtk.DialogFlags.DESTROY_WITH_PARENT,
                                   Gtk.MessageType.WARNING,
                                   Gtk.ButtonsType.CLOSE,
                                   title)
        dialog.format_secondary_markup(body)
        dialog.run()
        dialog.close()

    def acquire_mount_op_lock(self):
        while True:
            if self.mount_op_lock.acquire(timeout=0.1):
                return
            self.process_mainloop_events()