| Server IP : 172.67.134.114 / Your IP : 162.159.115.41 Web Server : Apache/2.4.37 System : Linux almalinux.duckdns.org 4.18.0-553.111.1.el8_10.x86_64 #1 SMP Sun Mar 8 20:06:07 EDT 2026 x86_64 User : ricodeal ( 1046) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /lib64/python3.6/site-packages/pyanaconda/core/ |
Upload File : |
#
# DBus connections
#
# Copyright (C) 2019 Red Hat, Inc. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
from dasbus.connection import SystemMessageBus, SessionMessageBus, MessageBus
from dasbus.constants import DBUS_STARTER_ADDRESS
from dasbus.error import ErrorMapper, get_error_decorator
from pyanaconda.anaconda_loggers import get_module_logger
from pyanaconda.core.constants import DBUS_ANACONDA_SESSION_ADDRESS, ANACONDA_BUS_ADDR_FILE
from pyanaconda.modules.common.errors import register_errors
log = get_module_logger(__name__)
__all__ = ["DBus", "SystemBus", "SessionBus", "error_mapper", "dbus_error"]
class AnacondaMessageBus(MessageBus):
"""Representation of an Anaconda bus connection."""
@property
def address(self):
"""The bus address."""
return self._find_bus_address()
def _get_connection(self):
"""Get a connection to a bus at the specified address."""
bus_address = self._find_bus_address()
log.info("Connecting to the Anaconda bus at %s.", bus_address)
return self._provider.get_addressed_bus_connection(bus_address)
def _find_bus_address(self):
"""Get the address of the Anaconda bus."""
if DBUS_ANACONDA_SESSION_ADDRESS in os.environ:
return os.environ.get(DBUS_ANACONDA_SESSION_ADDRESS)
if os.path.exists(ANACONDA_BUS_ADDR_FILE):
with open(ANACONDA_BUS_ADDR_FILE, 'rt') as f:
return f.read().strip()
raise ConnectionError("Can't find Anaconda bus address!")
class DefaultMessageBus(AnacondaMessageBus):
"""Representation of a default bus connection."""
def _find_bus_address(self):
"""Get the address of the default bus.
Connect to the bus specified by the environmental variable
DBUS_STARTER_ADDRESS. If it is not specified, connect to
the Anaconda bus.
"""
if DBUS_STARTER_ADDRESS in os.environ:
return os.environ.get(DBUS_STARTER_ADDRESS)
return super()._find_bus_address()
# The mapper of DBus errors.
error_mapper = ErrorMapper()
# Default bus. Anaconda uses this connection.
DBus = DefaultMessageBus(
error_mapper=error_mapper
)
# System bus.
SystemBus = SystemMessageBus(
error_mapper=error_mapper
)
# Session bus.
SessionBus = SessionMessageBus(
error_mapper=error_mapper
)
# The decorator for DBus errors.
dbus_error = get_error_decorator(error_mapper)
# Register all DBus errors.
register_errors()