From ead5400796b2d1ecb9af4161fec7664df8d4ddfb Mon Sep 17 00:00:00 2001 From: Nathaniel Russell Date: Fri, 18 Apr 2025 21:18:54 +0000 Subject: [PATCH] Update 5 files - /gsecrets/provider/pkcs11_provider.py - /gsecrets/provider/yubikey_provider.py - /gsecrets/meson.build - /gsecrets/provider/providers.py - /meson.build --- gsecrets/meson.build | 1 - gsecrets/provider/pkcs11_provider.py | 324 -------------------------- gsecrets/provider/providers.py | 4 +- gsecrets/provider/yubikey_provider.py | 277 ---------------------- meson.build | 2 - 5 files changed, 1 insertion(+), 607 deletions(-) delete mode 100644 gsecrets/provider/pkcs11_provider.py delete mode 100644 gsecrets/provider/yubikey_provider.py diff --git a/gsecrets/meson.build b/gsecrets/meson.build index b3da53d4a..3ddd35411 100644 --- a/gsecrets/meson.build +++ b/gsecrets/meson.build @@ -8,7 +8,6 @@ conf.set('PYTHON', python_bin.full_path()) conf.set('DATA_DIR', DATA_DIR) conf.set('PROFILE', profile) conf.set('APPID', application_id) -conf.set('PKCS11_LIB', join_paths(get_option('prefix'), get_option('libdir'), 'opensc-pkcs11.so')) configure_file( input: 'const.py.in', diff --git a/gsecrets/provider/pkcs11_provider.py b/gsecrets/provider/pkcs11_provider.py deleted file mode 100644 index 7054206a6..000000000 --- a/gsecrets/provider/pkcs11_provider.py +++ /dev/null @@ -1,324 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-only -from __future__ import annotations - -import atexit -import logging -from gettext import gettext as _ -from typing import TYPE_CHECKING - -from gi.repository import Adw, Gio, GLib, GObject, Gtk - -# pylint: disable=import-error -from PyKCS11 import ( - CKA_CLASS, - CKA_KEY_TYPE, - CKA_LABEL, - CKA_SIGN, - CKK_RSA, - CKO_PRIVATE_KEY, - CKR_USER_ALREADY_LOGGED_IN, - CKS_RW_USER_FUNCTIONS, - PyKCS11, - PyKCS11Lib, -) - -import gsecrets.config_manager as config -from gsecrets import const -from gsecrets.provider.base_provider import BaseProvider - -if TYPE_CHECKING: - from gsecrets.database_manager import DatabaseManager - from gsecrets.utils import LazyValue - -# KeePass-Smart-Certificate-Key-Provider -# https://github.com/BodnarSoft/KeePass-Smart-Certificate-Key-Provider/ -# Source/SmartCertificateKeyProvider.cs -# Static string: do not change as otherwise we are no longer compatible -KEEPASS_SCKP_TEXT = ( - "Data text for KeePass Password Safe Plugin" - " - {F3EF424C-7517-4D58-A3FB-C1FB458FDDB6}!" -) - - -class Pkcs11Provider(BaseProvider): - def __init__(self, window): - super().__init__() - self.window = window - - self._pkcs11 = None - self._session = None - - self._active_certificate = None - - atexit.register(self._cleanup) - - def _cleanup(self): - if self._session: - try: - self._session.logout() - except PyKCS11.PyKCS11Error: - logging.exception("Could not cleanup") - - self._session.closeSession() - - def scan_slots(self): - slot_list = self._pkcs11.getSlotList(tokenPresent=True) - if slot_list: - self._session = self._pkcs11.openSession( - slot_list[0], - PyKCS11.CKF_RW_SESSION, - ) - return True - - return False - - @property - def available(self): - return True - - def logout(self): - try: - self._session.logout() - except PyKCS11.PyKCS11Error: - logging.exception("Could not logout") - - def login(self, pin: str) -> bool: - try: - self._session.login(pin) - except PyKCS11.PyKCS11Error as err: - # Check if we were already logged in (due to some race condition) - if err.value != CKR_USER_ALREADY_LOGGED_IN: - logging.exception("Could not login") - return False - - logging.debug("Successfully logged in") - return True - - def _create_model(self): - model = Gtk.StringList() - model.append(_("No Smartcard")) - - if not self._pkcs11: - return model - - if not self.scan_slots(): - return model - - model.splice(0, 1, [_("No RSA Certificate")]) - objects = self._session.findObjects( - [(CKA_CLASS, CKO_PRIVATE_KEY), (CKA_KEY_TYPE, CKK_RSA), (CKA_SIGN, True)], - ) - for obj in objects: - value = self._session.getAttributeValue(obj, [CKA_LABEL])[0] - model.append(value) - - return model - - def create_unlock_widget(self, database_manager: DatabaseManager) -> Gtk.Widget: - row = Adw.ComboRow() - row.set_title(_("Smartcard")) - row.set_subtitle(_("Select certificate")) - row.connect("notify::selected", self._on_unlock_row_selected) - - self.refresh_stack = Gtk.Stack() - self.refresh_button = get_refresh_button() - self.refresh_button.connect("clicked", self._on_refresh_button_clicked, row) - self.refresh_stack.add_named(self.refresh_button, "button") - - self.refresh_spinner = Adw.Spinner() - self.refresh_stack.add_named(self.refresh_spinner, "spinner") - row.add_suffix(self.refresh_stack) - self.refresh_stack.set_visible_child(self.refresh_button) - - self.fill_data(row, database_manager) - - row.set_selected(0) - - return row - - def _on_unlock_row_selected( - self, - widget: Adw.ComboRow, - _param: GObject.ParamSpec, - ) -> None: - if selected_item := widget.get_selected_item(): - self._active_certificate = selected_item.get_string() - - def _refresh_pkcs11_async( - self, - callback: Gio.AsyncReadyCallback, - row: Adw.ComboRow, - ) -> None: - def pkcs11_refresh(task, _source_object, _task_data, _cancellable): - self._pkcs11 = None - if not self._pkcs11: - self._pkcs11 = PyKCS11Lib() - - try: - self._pkcs11.load(const.PKCS11_LIB) - except PyKCS11.PyKCS11Error as err: - logging.warning("Could not load pkcs11 library: %s", err) - task.return_error(err) - return - - present = self.scan_slots() - task.return_boolean(present) - - self.refresh_stack.set_visible_child(self.refresh_spinner) - - task = Gio.Task.new(self, None, callback, row) - task.run_in_thread(pkcs11_refresh) - - def _refresh_pkcs11_finish(self, result): - self.refresh_stack.set_visible_child(self.refresh_button) - return result.propagate_boolean() - - def _on_refresh_button_clicked( - self, - _button: Gtk.Button, - row: Adw.ComboRow, - ) -> None: - self._refresh_pkcs11_async(self._on_pkcs11_refresh, row) - - def _on_pkcs11_refresh( - self, - _provider: Pkcs11Provider, - result: Gio.AsyncResult, - row: Adw.ComboRow, - ) -> None: - try: - present = self._refresh_pkcs11_finish(result) - except GLib.Error: - present = False - - if not present: - dialog = Adw.AlertDialog.new( - _("No smartcard present"), - _("Please insert smartcard and retry."), - ) - - dialog.add_response("ok", _("_OK")) - dialog.present(self.window) - return - - session_info = self._session.getSessionInfo() - if session_info.state >= CKS_RW_USER_FUNCTIONS: - # We are already authorized to perform R/W user functions. - # This means that we are already considered 'logged in' and - # do not need to ask for a password. - logging.debug("Already logged in") - self.fill_data(row) - return - - entry = Adw.PasswordEntryRow(activates_default=True, title=_("Passphrase")) - entry.add_css_class("card") - - dialog = Adw.AlertDialog.new(_("Unlock"), _("Unlock your smartcard")) - dialog.add_response("cancel", _("_Cancel")) - dialog.add_response("unlock", _("_Unlock")) - dialog.set_default_response("unlock") - dialog.set_extra_child(entry) - dialog.set_focus(entry) - dialog.connect("response", self._on_pin_dialog_response, entry, row) - dialog.present(self.window) - - def fill_data( - self, - row: Adw.ComboRow, - database_manager: DatabaseManager | None = None, - ) -> None: - model = self._create_model() - row.set_model(model) - - if not database_manager: - return - - row_select = 0 - if ( - cfg := config.get_provider_config(database_manager.path, "Pkcs11Provider") - ) and "serial" in cfg: - model = row.get_model() - - for pos, info in enumerate(model): - if info == cfg["serial"]: - row_select = pos - break - - row.set_selected(row_select) - - def _on_pin_dialog_response( - self, - _dialog: Adw.AlertDialog, - response: str, - entry: Gtk.Entry, - row: Adw.ComboRow, - ) -> None: - if response == "unlock": - pin = entry.get_text() - - ret = self.login(pin) - if not ret: - self.window.send_notification(_("Failed to unlock Smartcard")) - return - - self.fill_data(row) - - def create_database_row(self): - row = Adw.ComboRow() - row.set_title(_("Smartcard")) - row.set_subtitle(_("Use a smartcard")) - row.connect("notify::selected", self._on_unlock_row_selected) - - self.refresh_stack = Gtk.Stack() - self.refresh_button = get_refresh_button() - self.refresh_button.connect("clicked", self._on_refresh_button_clicked, row) - self.refresh_stack.add_named(self.refresh_button, "button") - - self.refresh_spinner = Adw.Spinner() - self.refresh_stack.add_named(self.refresh_spinner, "spinner") - row.add_suffix(self.refresh_stack) - self.refresh_stack.set_visible_child(self.refresh_button) - - self.fill_data(row) - - row.set_selected(0) - - return row - - def generate_key(self, _salt: LazyValue[bytes]) -> bool: - if not self._session: - return False - - objs = self._session.findObjects( - [(CKA_CLASS, CKO_PRIVATE_KEY), (CKA_LABEL, self._active_certificate)], - ) - if len(objs) == 0: - return False - - priv_key = objs[0] - mecha = PyKCS11.Mechanism(PyKCS11.CKM_SHA1_RSA_PKCS, None) - - try: - signed = self._session.sign(priv_key, KEEPASS_SCKP_TEXT, mecha) - except PyKCS11.PyKCS11Error as err: - logging.exception("Could not sign data, abort") - msg = "Could not sign data" - raise ValueError(msg) from err - - signed_bytes = bytearray(signed) - immutable_bytes = bytes(signed_bytes) - - self.raw_key = immutable_bytes - - return True - - def config(self) -> dict: - return {"label": self._active_certificate} - - -def get_refresh_button(): - button = Gtk.Button.new_from_icon_name("view-refresh-symbolic") - button.set_valign(Gtk.Align.CENTER) - button.add_css_class("flat") - button.set_tooltip_text(_("Refresh Certificate List")) - return button diff --git a/gsecrets/provider/providers.py b/gsecrets/provider/providers.py index 62a0bb13e..febb6dc0f 100644 --- a/gsecrets/provider/providers.py +++ b/gsecrets/provider/providers.py @@ -7,13 +7,11 @@ from typing import TYPE_CHECKING from gi.repository import Adw, Gio, GLib, GObject from gsecrets.provider.file_provider import FileProvider -from gsecrets.provider.pkcs11_provider import Pkcs11Provider -from gsecrets.provider.yubikey_provider import YubiKeyProvider if TYPE_CHECKING: from gsecrets.utils import LazyValue -KEY_PROVIDERS = [FileProvider, Pkcs11Provider, YubiKeyProvider] +KEY_PROVIDERS = [FileProvider] class Providers(GObject.Object): diff --git a/gsecrets/provider/yubikey_provider.py b/gsecrets/provider/yubikey_provider.py deleted file mode 100644 index 685b6a7c9..000000000 --- a/gsecrets/provider/yubikey_provider.py +++ /dev/null @@ -1,277 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-only -from __future__ import annotations - -import logging -from gettext import gettext as _ -from typing import TYPE_CHECKING - -import usb -import yubico -from gi.repository import Adw, Gio, GObject, Gtk - -import gsecrets.config_manager as config -from gsecrets.provider.base_provider import BaseProvider - -if TYPE_CHECKING: - from gsecrets.database_manager import DatabaseManager - from gsecrets.utils import LazyValue - -NOT_INIT = "Provider not initialized." - - -class YubiKeyInfo(GObject.Object): - __gtype_name__ = "YubiKeyInfo" - - def __init__(self, description: str | None = None, serial: int = 0, slot: int = 0): - super().__init__() - - self._description = description - self._serial = serial - self._slot = slot - self._raw_key = None - - @GObject.Property(type=str) - def label(self): - if not self._description: - return _("No Key") - - # TRANSLATORS For example: YubiKey 4 [123456] - Slot 2 - return _("{description} [{serial}] - Slot {slot}").format( - description=self._description, - serial=str(self._serial), - slot=self._slot, - ) - - @GObject.Property(type=str) - def description(self): - return self._description - - @GObject.Property(type=str) - def serial(self): - return self._serial - - @GObject.Property(type=str) - def slot(self): - return self._slot - - -class YubiKeyProvider(BaseProvider): - def __init__(self, _window: Adw.Window) -> None: - super().__init__() - - self.active_key: YubiKeyInfo | None = None - self.unlock_row: Adw.ComboRow | None = None - self.create_row: Adw.ComboRow | None = None - - # NOTE: There is currently a bug which prevents yubikey rescan - # running under flatpak, that's why we can simply collect all yubikeys - # here in one go and set available according to the list length - # Needs to be fixed asap - self.yubikeys = self.get_all_yubikeys(False) - - def get_all_yubikeys(self, debug: bool) -> list: - """Look for YubiKeys. - - We look with ever increasing `skip' value until an error is returned. - - Return all instances of class YubiKey we got before failing. - """ - res = [] - for _idx in range(4): - try: - yubikey = yubico.find_yubikey(debug=debug, skip=_idx) - except yubico.yubikey.YubiKeyError: - break - else: - try: - serial = yubikey.serial() - except yubico.yubikey.YubiKeyTimeout: - logging.exception("Timeout getting yubikey serial") - except yubico.yubikey.YubiKeyError: - logging.exception("Could not read yubikey serial") - else: - logging.debug( - "Found %s [%s]", - yubikey.description, - str(serial), - ) - - res += [ - YubiKeyInfo(yubikey.description, serial, slot) - for slot in yubikey.status().valid_configs() - ] - finally: - # This must be done as otherwise usb access is broken - del yubikey - - return res - - def get_yubikey(self, serial: int, debug: bool = False) -> yubico.YubiKey: - """Get a specific yubikey based on it's serial.""" - try: - for _idx in range(4): - yubikey = yubico.find_yubikey(debug=debug, skip=_idx) - - if yubikey.serial() == serial: - return yubikey - - del yubikey - except yubico.yubikey.YubiKeyError: - pass - - return None - - @property - def available(self): - return len(self.yubikeys) != 0 - - def _create_model(self): - model = Gio.ListStore.new(YubiKeyInfo) - model.append(YubiKeyInfo()) - - for this in self.yubikeys: - model.append(this) - - return model - - def create_unlock_widget(self, database_manager: DatabaseManager) -> Gtk.Widget: - factory = Gtk.SignalListItemFactory() - factory.connect("setup", self._on_factory_setup) - factory.connect("bind", self._on_factory_bind) - - row = Adw.ComboRow() - row.set_title(_("YubiKey")) - row.set_subtitle(_("Select key")) - row.set_factory(factory) - row.connect("notify::selected", self._on_unlock_row_selected) - self.unlock_row = row - - refresh_button = Gtk.Button() - refresh_button.set_valign(Gtk.Align.CENTER) - refresh_button.add_css_class("flat") - refresh_button.set_icon_name("view-refresh-symbolic") - # TRANSLATORS YubiKey is a proper name key, see https://en.wikipedia.org/wiki/YubiKey. - refresh_button.set_tooltip_text(_("Select YubiKey slot")) - refresh_button.connect("clicked", self._on_refresh_button_clicked) - row.add_suffix(refresh_button) - - self._on_refresh_button_clicked(row) - row.set_selected(0) - - if ( - cfg := config.get_provider_config(database_manager.path, "YubiKeyProvider") - ) and "serial" in cfg: - model = row.get_model() - - for pos, info in enumerate(model): - if info.serial == cfg["serial"] and info.slot == cfg["slot"]: - row.set_selected(pos) - break - - return row - - def _on_unlock_row_selected( - self, - widget: Adw.ComboRow, - _param: GObject.ParamSpec, - ) -> None: - self.active_key = widget.get_selected_item() - - def _on_factory_setup(self, _factory, list_item): - label = Gtk.Label() - label.set_xalign(0.0) - list_item.set_child(label) - - def _on_factory_bind(self, _factory, list_item): - label = list_item.get_child() - info = list_item.get_item() - label.props.label = info.props.label - - def _on_refresh_button_clicked(self, _row: Adw.ComboRow) -> None: - model = self._create_model() - - if self.unlock_row is None: - raise ValueError(NOT_INIT) - - self.unlock_row.set_model(model) - self.unlock_row.set_selected(len(model) - 1) - - def create_database_row(self) -> None: - factory = Gtk.SignalListItemFactory() - factory.connect("setup", self._on_factory_setup) - factory.connect("bind", self._on_factory_bind) - - self.create_row = Adw.ComboRow() - self.create_row.set_title(_("YubiKey")) - self.create_row.set_factory(factory) - self.create_row.connect("notify::selected", self._on_create_row_selected) - - refresh_button = Gtk.Button() - refresh_button.set_valign(Gtk.Align.CENTER) - refresh_button.add_css_class("flat") - refresh_button.set_icon_name("view-refresh-symbolic") - refresh_button.set_tooltip_text(_("Select YubiKey slot")) - refresh_button.connect( - "clicked", - self._on_yubikey_create_refresh_button_clicked, - ) - self.create_row.add_suffix(refresh_button) - - self._on_yubikey_create_refresh_button_clicked(self.create_row) - - return self.create_row - - def _on_create_row_selected( - self, - widget: Adw.ComboRow, - _param: GObject.ParamSpec, - ) -> None: - self.active_key = widget.get_selected_item() - - def _on_yubikey_create_refresh_button_clicked(self, _button: Gtk.Button) -> None: - model = self._create_model() - - if self.create_row is None: - raise ValueError(NOT_INIT) - - self.create_row.set_model(model) - self.create_row.set_selected(0) - - def generate_key(self, salt: LazyValue[bytes]) -> bool: - if self.active_key is None: - return False - - try: - if yubikey := self.get_yubikey(self.active_key.serial): - self.emit(self.show_message, _("Touch YubiKey")) - try: - self.raw_key = yubikey.challenge_response( - salt.value, - slot=self.active_key.slot, - ) - except yubico.yubikey_base.YubiKeyTimeout as ex: - self.emit(self.hide_message) - logging.debug("Timeout waiting for challenge response: %s", ex) - msg = "Timeout waiting for challenge response" - raise ValueError(msg) from ex - - self.emit(self.hide_message) - - # This must be done as otherwise usb access is broken - del yubikey - - if self.raw_key is not None: - return True - - except usb.core.USBError as ex: - logging.warning("USB error during yubikey key generation") - msg = "USB error during yubikey key generation" - raise ValueError(msg) from ex - - return False - - def config(self) -> dict: - if self.active_key is None: - raise ValueError(NOT_INIT) - - return {"serial": self.active_key.serial, "slot": self.active_key.slot} diff --git a/meson.build b/meson.build index d95687da2..a08533ccd 100644 --- a/meson.build +++ b/meson.build @@ -17,8 +17,6 @@ python_bin = python.find_installation('python3', modules:[ 'pyotp', 'validators', 'zxcvbn_rs_py', - 'PyKCS11', - 'yubico', ]) if not python_bin.found() error('No valid python3 binary found') -- GitLab