403Webshell
Server IP : 104.21.25.180  /  Your IP : 162.159.115.42
Web Server : Apache/2.4.37
System : Linux almalinux.duckdns.org 4.18.0-553.111.1.el8_10.x86_64 #1 SMP Sun Mar 8 20:06:07 EDT 2026 x86_64
User : ricodeal ( 1046)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib64/python3.6/site-packages/gnome_abrt/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib64/python3.6/site-packages/gnome_abrt/problems.py
## Copyright (C) 2012 ABRT team <[email protected]>
## Copyright (C) 2001-2005 Red Hat, Inc.

## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.

## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.

## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 51 Franklin Street, Suite 500, Boston, MA  02110-1335  USA

import datetime
import logging
import re

# gnome-abrt
import gnome_abrt.url
from gnome_abrt.application import find_application
from gnome_abrt.errors import (InvalidProblem,
                               UnavailableSource)
from gnome_abrt.l10n import _

class ProblemSource:
    NEW_PROBLEM = 0
    DELETED_PROBLEM = 1
    CHANGED_PROBLEM = 2

    def __init__(self):
        self._observers = set()

    def get_items(self, problem_id, *args):
        pass

    def get_problems(self):
        pass

    def chown_problem(self, problem_id):
        pass

    def delete_problem(self, problem_id):
        pass

    def attach(self, observer):
        if observer not in self._observers:
            self._observers.add(observer)

    def detach(self, observer):
        try:
            self._observers.remove(observer)
        except ValueError as ex:
            logging.debug(ex)

    def notify(self, change_type=None, problem=None):
        logging.debug("{0} : Notify".format(self.__class__.__name__))
        for observer in self._observers:
            observer.changed(self, change_type, problem)

    def refresh(self):
        pass

class Problem:
    INITIAL_ELEMENTS = ['component', 'executable', 'cmdline', 'count', 'type',
                        'last_occurrence', 'time', 'reason', 'pkg_arch',
                        'pkg_epoch', 'pkg_name', 'pkg_release', 'pkg_version',
                        'environ', 'pid']

    class Submission:
        URL = "URL"
        MSG = "MSG"
        BTHASH = "BTHASH"

        def __init__(self, problem, name, rtype, data):
            self._problem = problem
            self._name = name
            self._title = name
            self._rtype = rtype
            self._data = data
            self._url_done = False

        def _update_title_async_cb(self, result, unused_userdata):
            if result[1]:
                self._title = result[1]
                self._problem.source.notify(ProblemSource.CHANGED_PROBLEM,
                                            self._problem)

        @property
        def name(self):
            return self._name

        @property
        def rtype(self):
            return self._rtype

        @property
        def data(self):
            return self._data

        @data.setter
        def data(self, value):
            self._data = value

        @property
        def title(self):
            if self._rtype == Problem.Submission.URL and not self._url_done:
                self._url_done = True
                gnome_abrt.url.get_url_title_async(self._data,
                        self._update_title_async_cb, None)

            return self._title


    def __init__(self, problem_id, source):
        self.problem_id = problem_id
        self.source = source
        self.app = None
        self.submission = None
        self.data = None
        self._deleted = False

    def __str__(self):
        return self.problem_id

    def __eq__(self, other):
        if not other:
            return False
        elif isinstance(other, str):
            return self.problem_id == other
        elif isinstance(other, Problem):
            return self.problem_id == other.problem_id

        raise TypeError('Not allowed type in __eq__: '
                         + other.__class__.__name__)

    def __loaditems__(self, *args):
        if self._deleted:
            logging.debug("Accessing deleted problem '{0}'"
                    .format(self.problem_id))
            return {}

        if self.data is None:
            self.data = {}

        items = self.source.get_items(self.problem_id, *args)
        for item in args:
            self.data[item] = items.get(item)

        return items

    #pylint: disable=R0911
    def __getitem__(self, item, cached=True):
        def datetime_from_stamp(stamp):
            try:
                return datetime.datetime.fromtimestamp(float(stamp))
            except TypeError:
                raise InvalidProblem(self.problem_id, "Empty time stamp")
            except ValueError:
                raise InvalidProblem(self.problem_id,
                                     "Invalid value in time stamp")

        if self.data is None:
            # Load initial problem data into cache
            self.__loaditems__(*Problem.INITIAL_ELEMENTS)

        if item == 'date':
            return datetime_from_stamp(self['time'])
        if item == 'date_last':
            last_ocr = self['last_occurrence']
            if last_ocr:
                return datetime_from_stamp(last_ocr)
            else:
                return datetime_from_stamp(self['time'])
        elif item == 'application':
            return self.get_application()
        elif item == 'is_reported':
            return self.is_reported()
        elif item == 'submission':
            return self.get_submission()
        elif item == 'human_type':
            return self.get_human_type()
        elif item == 'package_name':
            return self.get_package_name()
        elif item == 'package_version':
            return self.get_package_version()

        if cached and item in self.data:
            retval = self.data[item]
            if retval is None and item == "count":
                return "1"
            return retval

        loaded = self.__loaditems__(item)
        if item in loaded:
            return loaded[item]

        if item == 'count':
            return "1"

        return None

    def __setitem__(self, _, unused):
        raise RuntimeError("Problems are readonly")

    def __delitem__(self, _):
        raise RuntimeError("Problems are readonly")

    def __len__(self):
        return 1

    def refresh(self):
        if self._deleted:
            logging.debug("Not refreshing deleted problem '{0}'"
                            .format(self.problem_id))
            return

        logging.debug("Refreshing problem '{0}'".format(self.problem_id))
        self.data = None
        self.submission = None
        self.source.notify(ProblemSource.CHANGED_PROBLEM, self)

    def delete(self):
        self.source.delete_problem(self.problem_id)
        self._deleted = True

    def is_reported(self):
        return self['reported_to'] is not None

    def get_human_type(self):
        typ = self['type']

        if not typ:
            raise InvalidProblem(self.problem_id,
                    "Missing or corrupted 'type' file")

        if typ == "CCpp":
            return "C/C++"

        return typ

    def get_package_name(self):
        name = self['pkg_name']

        if not name:
            name = self['component']

        return name

    def get_package_version(self):
        epoch_value = self['pkg_epoch']

        epoch = ""
        if epoch_value and epoch_value != "0":
            epoch = "{0}:".format(epoch_value)

        version = self['pkg_version']
        release = self['pkg_release']
        arch = self['pkg_arch']

        if version and release and arch:
            return "{0}{1}-{2}.{3}".format(epoch, version, release, arch)

        return self['package']

    def assure_ownership(self):
        return self.source.chown_problem(self.problem_id)

    def get_application(self):
        if not self.app:
            self.app = find_application(self['cmdline'],
                                        self['environ'],
                                        self['pid'],
                                        self['component'])

        return self.app

    def get_submission(self):
        if not self.submission:
            reg = re.compile(r'^(?P<pfx>.*?):\s*(?P<typ>\S*?)=(?P<data>.*)')
            self.submission = []
            if self['reported_to']:
                # Most common type of line in reported_to file
                # Bugzilla: URL=http://bugzilla.com/?=123456
                for line in self['reported_to'].split('\n'):
                    if not line:
                        continue

                    parsed = reg.match(line)
                    if parsed:
                        pfx = parsed.group('pfx')
                        typ = parsed.group('typ')
                        data = parsed.group('data')
                    else:
                        continue

                    sbm = next((s for s in self.submission
                                if s.rtype == typ and s.name == pfx), None)

                    if sbm is not None:
                        sbm.data = data
                    else:
                        self.submission.append(
                            Problem.Submission(self, pfx, typ, data))

        return self.submission


class MultipleSources(ProblemSource):

    def __init__(self, sources):
        super(MultipleSources, self).__init__()

        if not sources:
            raise ValueError("At least one source must be passed")

        self.sources = sources

        class SourceObserver:
            def __init__(self, parent):
                self.parent = parent

            #pylint: disable=W0613
            def changed(self, source, change_type=None, problem=None):
                self.parent.notify(change_type, problem)

        for src in self.sources:
            src.attach(SourceObserver(self))

        self._disable_notify = False

    def __eq__(self, other):
        # override __eq__ to be able to find component source's master source
        # in a list of sources
        if isinstance(other, ProblemSource):
            # check if the other is a component
            if other in self.sources:
                return True
            elif not isinstance(other, MultipleSources):
                # the other source is not a component and cannot be self because
                # it is not an instance of MultipleSources
                return False

        # fall back to built-in behaviour (self == other)
        return NotImplemented

    def _pop_source(self, index):
        self.sources.pop(index)
        if not self.sources:
            raise UnavailableSource(self, None)

    def get_items(self, problem_id, *args):
        pass

    def _foreach_source(self, callback):
        i = 0
        while i != len(self.sources):
            try:
                callback(self.sources[i])
                i += 1
            except UnavailableSource as ex:
                logging.debug("{0}".format(str(ex)))
                if not ex.temporary:
                    self._pop_source(i)
                else:
                    i += 1

    def get_problems(self):
        result = []

        extend_result = lambda source: result.extend(source.get_problems())
        self._foreach_source(extend_result)

        return result

    def chown_problem(self, problem_id):
        pass

    def delete_problem(self, problem_id):
        pass

    def notify(self, change_type=None, problem=None):
        if self._disable_notify:
            return

        super(MultipleSources, self).notify(change_type, problem)

    def refresh(self):
        self._disable_notify = True
        try:
            self._foreach_source(lambda source: source.refresh())
        finally:
            self._disable_notify = False

        self.notify()

class CachedSource(ProblemSource):

    def __init__(self):
        super(CachedSource, self).__init__()

        self._cache = None

    def get_problems(self):
        if not self._cache:
            self._cache = []
            for prblmid in self._get_problems():
                try:
                    self._cache.append(self._create_new_problem(prblmid))
                except InvalidProblem as ex:
                    logging.warning(str(ex))

        return self._cache if self._cache else []

    def refresh(self):
        self._cache = None
        self.notify()

    def _get_problems(self):
        """Overwrite this function in ancestor"""
        raise NotImplementedError("")

    def _delete_problem(self, problem_id):
        """Overwrite this function in ancestor"""
        raise NotImplementedError("")

    def _problem_is_in_cache(self, problem_id):
        return self._cache is not None and problem_id in self._cache

    def _insert_to_cache(self, problem):
        if not self._problem_is_in_cache(problem):
            self._cache.append(problem)

    def _remove_from_cache(self, problem_id):
        if not self._problem_is_in_cache(problem_id):
            return

        prblm = self._cache[self._cache.index(problem_id)]
        self._cache.remove(problem_id)
        self.notify(ProblemSource.DELETED_PROBLEM, prblm)

    def delete_problem(self, problem_id):
        if not self._delete_problem(problem_id):
            return

        try:
            self._remove_from_cache(problem_id)
        except ValueError as ex:
            logging.warning(_('Not found in cache but deleted: {0}'),
                    ex)
            self._cache = None
            self.notify()

    def _create_new_problem(self, problem_id):
        return Problem(problem_id, self)

    def process_new_problem_id(self, problem_id):
        if self._problem_is_in_cache(problem_id):
            prblm = self._cache[self._cache.index(problem_id)]
            prblm.refresh()
        else:
            prblm = self._create_new_problem(problem_id)
            self._insert_to_cache(prblm)
            self.notify(ProblemSource.NEW_PROBLEM, prblm)

Youez - 2016 - github.com/yon3zu
LinuXploit