Source code for univention.updater.tools

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Copyright 2008-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
Univention Update tools.
"""

from __future__ import absolute_import
from __future__ import print_function
try:
    import univention.debug as ud
except ImportError:
    import univention.debug2 as ud  # type: ignore

# TODO: Convert to absolute imports only AFTER the unit test has been adopted
from .commands import (
    cmd_dist_upgrade,
    cmd_dist_upgrade_sim,
    cmd_update,
)
from .errors import (
    UnmetDependencyError,
    CannotResolveComponentServerError,
    ConfigurationError,
    DownloadError,
    PreconditionError,
    RequiredComponentError,
    ProxyError,
    VerificationError,
)
from .repo_url import UcsRepoUrl
from univention.lib.ucs import UCS_Version

import errno
import sys
import re
import os
import copy
from six.moves import http_client as httplib
import socket
from univention.config_registry import ConfigRegistry
from six.moves import urllib_request as urllib2, urllib_error
import json
import subprocess
import tempfile
import logging
import functools
import six
import base64
try:
    from typing import Any, AnyStr, Dict, Generator, Iterable, Iterator, List, Optional, Sequence, Set, Text, Tuple, Type, TypeVar, Union  # noqa: F401
    from typing_extensions import Literal  # noqa: F401
    _TS = TypeVar("_TS", bound="_UCSServer")
except ImportError:
    pass

if six.PY2:
    from new import instancemethod
    from backports.tempfile import TemporaryDirectory
else:
    from tempfile import TemporaryDirectory

RE_ALLOWED_DEBIAN_PKGNAMES = re.compile('^[a-z0-9][a-z0-9.+-]+$')
RE_SPLIT_MULTI = re.compile('[ ,]+')
RE_COMPONENT = re.compile(r'^repository/online/component/([^/]+)$')
RE_CREDENTIALS = re.compile(r'^repository/credentials/(?:(?P<realm>[^/]+)/)?(?P<key>[^/]+)$')

MIN_GZIP = 100  # size of non-empty gzip file
UUID_NULL = '00000000-0000-0000-0000-000000000000'


[docs]def verify_script(script, signature): # type: (bytes, bytes) -> Optional[bytes] """ Verify detached signature of script: .. code-block: sh gpg -a -u 6B6E7E3259A9F44F1452D1BE36602BA86B8BFD3C --passphrase-file /etc/archive-keys/ucs4.0.txt -o script.sh.gpg -b script.sh repo-ng-sign-release-file --debug -k 6B6E7E3259A9F44F1452D1BE36602BA86B8BFD3C -p /etc/archive-keys/ucs4.0.txt -i script.sh -o script.sh.gpg .. code-block: python verify_script(open("script.sh", "r").read(), open("script.sh.gpg", "r").read()) :param str script: The script text to verify. :param str signature: The detached signature. :return: None or the error output. :rtype: None or str """ # write signature to temporary file sig_fd, sig_name = tempfile.mkstemp() os.write(sig_fd, signature) os.close(sig_fd) # verify script cmd = ["apt-key", "verify", sig_name, "-"] proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) stdout, _stderr = proc.communicate(script) ret = proc.wait() return stdout if ret != 0 else None
class _UCSRepo(UCS_Version): """ Super class to build URLs for APT repositories. """ ARCHS = {'all', 'amd64'} def __init__(self, release=None, **kwargs): # type: (Optional[UCS_Version], **Any) -> None if release: super(_UCSRepo, self).__init__(release) for (k, v) in kwargs.items(): if isinstance(v, str) and '%(' in v: self.__dict__[k] = _UCSRepo._substitution(v, self.__dict__) else: self.__dict__[k] = v def __repr__(self): # type: () -> str return '%s(**%r)' % (self.__class__.__name__, self.__dict__) def __eq__(self, other): # type: (object) -> bool return isinstance(other, _UCSRepo) and self.path() == other.path() def __ne__(self, other): # type: (object) -> bool return not isinstance(other, _UCSRepo) or self.path() != other.path() def _format(self, format): # type: (str) -> str """ Format longest path for directory/file access. """ while True: try: return format % self except KeyError as ex: (k,) = ex.args # strip missing part i = format.index('%%(%s)' % k) format = format[:i] # strip partial part try: i = format.rindex('/') + 1 except ValueError: i = 0 format = format[:i] class _substitution(object): """ Helper to print dynamically substituted variable. >>> h={'major':2} >>> h['version'] = _UCSRepo._substitution('%(major)d.%(minor)d', h) >>> h['minor'] = 3 >>> '%(version)s' % h '2.3' """ def __init__(self, format, values): # type: (str, Any) -> None self.format = format self.values = values def __str__(self): # type: () -> str try: return self.format % self.values except KeyError as e: for (k, v) in self.values.items(): if self == v: raise KeyError(k) raise e def __repr__(self): # type: () -> str return repr(self.format) def deb(self, server, type="deb"): # type: (_UCSServer, str) -> str """ Format for :file:`/etc/apt/sources.list`. :param str server: The URL of the repository server. :param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository. :returns: The APT repository stanza. :rtype: str """ raise NotImplementedError() def path(self, filename=None): # type: (str) -> str """ Format pool for directory/file access. :param filename: The name of a file in the repository. :returns: relative path. :rtype: str """ raise NotImplementedError() def clean(self, server): # type: (_UCSServer) -> str """ Format for :file:`/etc/apt/mirror.list` :param str server: The URL of the repository server. :returns: The APT repository stanza. :rtype: str """ raise NotImplementedError()
[docs]class UCSRepoPool5(_UCSRepo): """ APT repository using the debian pool structure (ucs5 and above). """ def __init__(self, release=None, **kwargs): # type: (UCS_Version, **Any) -> None kwargs.setdefault('version', UCS_Version.FORMAT) kwargs.setdefault('patch', UCS_Version.FULLFORMAT) kwargs.setdefault('errata', False) super(UCSRepoPool5, self).__init__(release, **kwargs) @property def _suite(self): # type: () -> str """ Format suite. :returns: UCS suite name. :rtype: str >>> UCSRepoPool5(major=5, minor=1, patchlevel=0)._suite 'ucs510' >>> UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True)._suite 'errata510' """ return "{1}{0.major}{0.minor}{0.patchlevel}".format(self, "errata" if self.errata else "ucs")
[docs] def deb(self, server, type="deb", mirror=False): # type: (_UCSServer, str, bool) -> str """ Format for :file:`/etc/apt/sources.list`. :param str server: The URL of the repository server. :param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository. :param bool mirror: Also mirror files for Debian installer. :returns: The APT repository stanza. :rtype: str >>> r=UCSRepoPool5(major=5, minor=1, patchlevel=0) >>> r.deb('https://updates.software-univention.de/') 'deb https://updates.software-univention.de/ ucs510 main' >>> r.deb('https://updates.software-univention.de/', mirror=True) 'deb https://updates.software-univention.de/ ucs510 main main/debian-installer' >>> r=UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True) >>> r.deb('https://updates.software-univention.de/') 'deb https://updates.software-univention.de/ errata510 main' """ components = "main main/debian-installer" if mirror and not self.errata and type == "deb" else "main" return "%s %s %s %s" % (type, server, self._suite, components)
[docs] def path(self, filename=None): # type: (str) -> str """ Format pool for directory/file access. :param filename: The name of a file in the repository. :returns: relative path. :rtype: str >>> UCSRepoPool5(major=5, minor=1, patchlevel=0).path() 'dists/ucs510/InRelease' >>> UCSRepoPool5(major=5, minor=1, patchlevel=0, errata=True).path() 'dists/errata510/InRelease' """ return "dists/{}/{}".format(self._suite, filename or 'InRelease')
[docs]class UCSRepoPool(_UCSRepo): """ Flat Debian APT repository. """ def __init__(self, **kw): # type: (**Any) -> None kw.setdefault('version', UCS_Version.FORMAT) kw.setdefault('patch', UCS_Version.FULLFORMAT) super(UCSRepoPool, self).__init__(**kw)
[docs] def deb(self, server, type="deb"): # type: (_UCSServer, str) -> str """ Format for :file:`/etc/apt/sources.list`. :param str server: The URL of the repository server. :param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository. :returns: The APT repository stanza. :rtype: str >>> r=UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained',arch='amd64') >>> r.deb('https://updates.software-univention.de/') 'deb https://updates.software-univention.de/2.3/maintained/ 2.3-1/amd64/' """ fmt = "%(version)s/%(part)s/ %(patch)s/%(arch)s/" return "%s %s%s" % (type, server, super(UCSRepoPool, self)._format(fmt))
[docs] def path(self, filename=None): # type: (str) -> str """ Format pool for directory/file access. :param filename: The name of a file in the repository. :returns: relative path. :rtype: str >>> UCSRepoPool(major=2,minor=3).path() '2.3/' >>> UCSRepoPool(major=2,minor=3,part='maintained').path() '2.3/maintained/' >>> UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained').path() '2.3/maintained/2.3-1/' >>> UCSRepoPool(major=2,minor=3,patchlevel=1,part='maintained',arch='amd64').path() '2.3/maintained/2.3-1/amd64/Packages.gz' """ fmt = "%(version)s/%(part)s/%(patch)s/%(arch)s/" + (filename or 'Packages.gz') return super(UCSRepoPool, self)._format(fmt)
[docs] def clean(self, server): # type: (_UCSServer) -> str """ Format for :file:`/etc/apt/mirror.list` :param str server: The URL of the repository server. :returns: The APT repository stanza. :rtype: str """ fmt = "%(version)s/%(part)s/%(patch)s/" # %(arch)s/ return "clean %s%s" % (server, super(UCSRepoPool, self)._format(fmt))
[docs]class UCSRepoPoolNoArch(_UCSRepo): """ Flat Debian APT repository without explicit architecture subdirectory. """ ARCHS = {''} def __init__(self, **kw): # type: (**Any) -> None kw.setdefault('version', UCS_Version.FORMAT) kw.setdefault('patch', UCS_Version.FULLFORMAT) super(UCSRepoPoolNoArch, self).__init__(**kw)
[docs] def deb(self, server, type="deb"): # type: (_UCSServer, str) -> str """ Format for :file:`/etc/apt/sources.list`. :param str server: The URL of the repository server. :param str type: The repository type, e.g. `deb` for a binary and `deb-src` for source package repository. :returns: The APT repository stanza. :rtype: str >>> r=UCSRepoPoolNoArch(major=2,minor=3,patch='comp',part='maintained/component',arch='all') >>> r.deb('https://updates.software-univention.de/') 'deb https://updates.software-univention.de/2.3/maintained/component/comp/ ./' """ fmt = "%(version)s/%(part)s/%(patch)s/ ./" return "%s %s%s" % (type, server, super(UCSRepoPoolNoArch, self)._format(fmt))
[docs] def path(self, filename=None): # type: (str) -> str """ Format pool for directory/file access. Returns relative path. :param filename: The name of a file in the repository. :returns: relative path. :rtype: str >>> UCSRepoPoolNoArch(major=2,minor=3).path() '2.3/' >>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component').path() '2.3/maintained/component/' >>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component',patch='comp').path() '2.3/maintained/component/comp/Packages.gz' >>> UCSRepoPoolNoArch(major=2,minor=3,part='maintained/component',patch='comp',arch='all').path() '2.3/maintained/component/comp/Packages.gz' """ fmt = "%(version)s/%(part)s/%(patch)s/" + (filename or 'Packages.gz') return super(UCSRepoPoolNoArch, self)._format(fmt)
[docs] def clean(self, server): # type: (_UCSServer) -> str """ Format for :file:`/etc/apt/mirror.list` :param str server: The URL of the repository server. :returns: The APT repository stanza. :rtype: str """ fmt = "%(version)s/%(part)s/%(patch)s/" return "clean %s%s" % (server, super(UCSRepoPoolNoArch, self)._format(fmt))
class _UCSServer(object): """ Abstrace base class to access UCS compatible update server. """ @classmethod def load_credentials(self, ucr): # type: (ConfigRegistry) -> None """ Load credentials from UCR. :param ConfigRegistry ucr: An UCR instance. """ pass def join(self, rel): # type: (str) -> str """ Return joined URI without credential. :param str rel: relative URI. :return: The joined URI. :rtype: str """ raise NotImplementedError() def access(self, repo, filename=None, get=False): # type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes] """ Access URI and optionally get data. :param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`. :param str filename: An optional relative path. :param bool get: Fetch data if True - otherwise check only. :return: a 3-tuple (code, size, content) or None on errors. :rtype: tuple(int, int, bytes) :raises DownloadError: if the server is unreachable. :raises ValueError: if the credentials use an invalid encoding. :raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable. :raises ProxyError: if the HTTP proxy returned an error. """ raise NotImplementedError() def __add__(self, rel): # type: (_TS, str) -> _TS """ Append relative path component. :param str rel: Relative path. :return: A clone of this instance using the new base path. :rtype: UCSHttpServer """ raise NotImplementedError() @property def prefix(self): # type: () -> str raise NotImplementedError() def __eq__(self, other): # type: (object) -> bool return isinstance(other, _UCSServer) and self.prefix == other.prefix def __ne__(self, other): # type: (object) -> bool return not isinstance(other, _UCSServer) or self.prefix != other.prefix
[docs]class UCSHttpServer(_UCSServer): """ Access to UCS compatible remote update server. """
[docs] class HTTPHeadHandler(urllib2.BaseHandler): """ Handle fallback from HEAD to GET if unimplemented. """
[docs] def http_error_501(self, req, fp, code, msg, headers): # httplib.NOT_IMPLEMENTED # type: (urllib2.Request, Any, int, str, Dict) -> Any m = req.get_method() if m == 'HEAD' == UCSHttpServer.http_method: ud.debug(ud.NETWORK, ud.INFO, "HEAD not implemented at %s, switching to GET." % req) UCSHttpServer.http_method = 'GET' return self.parent.open(req, timeout=req.timeout) else: return None
def __init__(self, baseurl, user_agent=None, timeout=None): # type: (UcsRepoUrl, str, float) -> None """ Setup URL handler for accessing a UCS repository server. :param UcsRepoUrl baseurl: the base URL. :param str user_agent: optional user agent string. :param int timeout: optional timeout for network access. """ self.log.addHandler(logging.NullHandler()) self.baseurl = baseurl self.user_agent = user_agent self.timeout = timeout log = logging.getLogger('updater.UCSHttp') http_method = 'HEAD' head_handler = HTTPHeadHandler() password_manager = urllib2.HTTPPasswordMgrWithDefaultRealm() auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) proxy_handler = urllib2.ProxyHandler() # No need for ProxyBasicAuthHandler, since ProxyHandler parses netloc for @ opener = urllib2.build_opener(head_handler, auth_handler, proxy_handler) failed_hosts = set() # type: Set[str] @property def prefix(self): # type: () -> str return self.baseurl.path.lstrip('/')
[docs] @classmethod def reinit(self): # type: () -> None """ Reload proxy settings and reset failed hosts. """ self.proxy_handler = urllib2.ProxyHandler() self.opener = urllib2.build_opener(self.head_handler, self.auth_handler, self.proxy_handler) self.failed_hosts.clear()
[docs] @classmethod def load_credentials(self, ucr): # type: (ConfigRegistry) -> None """ Load credentials from UCR. :param ConfigRegistry ucr: An UCR instance. """ uuid = ucr.get('uuid/license', UUID_NULL) groups = {} # type: Dict[str, Dict[str, str]] for key, value in ucr.items(): match = RE_CREDENTIALS.match(key) if match: realm, key = match.groups() cfg = groups.setdefault(realm, {}) cfg[key] = value for realm, cfg in groups.items(): try: uris = cfg.pop('uris').split() except KeyError: self.log.error('Incomplete credentials for realm "%s": %r', realm, cfg) continue username = cfg.pop('username', uuid) password = cfg.pop('password', uuid) if cfg: self.log.warn('Extra credentials for realm "%s": %r', realm, cfg) self.password_manager.add_password(realm, uris, username, password) self.log.info('Loaded credentials for realm "%s"', realm)
def __str__(self): # type: () -> str """ URI with credentials. """ return self.baseurl.private() def __repr__(self): # type: () -> str """ Return canonical string representation. """ return '%s(%r, timeout=%r)' % ( self.__class__.__name__, self.baseurl, self.timeout, ) def __add__(self, rel): # type: (str) -> UCSHttpServer """ Append relative path component. :param str rel: Relative path. :return: A clone of this instance using the new base path. :rtype: UCSHttpServer """ uri = copy.copy(self) uri.baseurl += rel return uri
[docs] def join(self, rel): # type: (str) -> str """ Return joined URI without credential. :param str rel: relative URI. :return: The joined URI. :rtype: str """ return (self.baseurl + rel).public()
[docs] def access(self, repo, filename=None, get=False): # type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes] """ Access URI and optionally get data. :param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`. :param str filename: An optional relative path. :param bool get: Fetch data if True - otherwise check only. :return: a 3-tuple (code, size, content) :rtype: tuple(int, int, bytes) :raises DownloadError: if the server is unreachable. :raises ValueError: if the credentials use an invalid encoding. :raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable. :raises ProxyError: if the HTTP proxy returned an error. """ rel = filename if repo is None else repo.path(filename) assert rel is not None if self.user_agent: UCSHttpServer.opener.addheaders = [('User-agent', self.user_agent)] uri = self.join(rel) if self.baseurl.username and self.baseurl.password: UCSHttpServer.password_manager.add_password(realm=None, uri=uri, user=self.baseurl.username, passwd=self.baseurl.password) req = urllib2.Request(uri) def get_host(): # type: () -> str return req.host if six.PY3 else req.get_host() # type: ignore if get_host() in self.failed_hosts: self.log.error('Already failed %s', get_host()) raise DownloadError(uri, -1) if not get and UCSHttpServer.http_method != 'GET': # Overwrite get_method() to return "HEAD" def get_method(self, orig=req.get_method): method = orig() if method == 'GET': return UCSHttpServer.http_method else: return method req.get_method = functools.partial(get_method, req) if six.PY3 else instancemethod(get_method, req, urllib2.Request) # type: ignore self.log.info('Requesting %s', req.get_full_url()) ud.debug(ud.NETWORK, ud.ALL, "updater: %s %s" % (req.get_method(), req.get_full_url())) try: res = UCSHttpServer.opener.open(req, timeout=self.timeout) assert res try: # <http://tools.ietf.org/html/rfc2617#section-2> try: auth = req.unredirected_hdrs['Authorization'] scheme, credentials = auth.split(' ', 1) if scheme.lower() != 'basic': raise ValueError('Only "Basic" authorization is supported') try: basic = base64.b64decode(credentials).decode('ISO8859-1') except Exception: raise ValueError('Invalid base64') self.baseurl.username, self.baseurl.password = basic.split(':', 1) except KeyError: pass except ValueError as ex: self.log.info("Failed to decode %s: %s", auth, ex) code = res.getcode() assert code size = int(res.info().get('content-length', 0)) content = res.read() self.log.info("Got %s %s: %d %d", req.get_method(), req.get_full_url(), code, size) return (code, size, content) finally: res.close() # direct | proxy | Error cause # | valid closed filter DNS auth | # HTTP:200 | HTTP:200 URL:111 URL:110 GAI:-2 HTTP:407 | OK # HTTP:404 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Path unknown # ---------+----------------------------------------------+---------------------- # URL:111 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Port closed # URL:110 | HTTP:404 URL:111 URL:110 GAI:-2 HTTP:407 | Port filtered # GAI:-2 | HTTP:502/4URL:111 URL:110 GAI:-2 HTTP:407 | Host name unknown # HTTP:401 | HTTP:401 URL:111 URL:110 GAI:-2 HTTP:407 | Authorization required except urllib_error.HTTPError as res: self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), res, exc_info=True) if res.code == httplib.UNAUTHORIZED: # 401 raise ConfigurationError(uri, 'credentials not accepted') if res.code == httplib.PROXY_AUTHENTICATION_REQUIRED: # 407 raise ProxyError(uri, 'credentials not accepted') if res.code in (httplib.BAD_GATEWAY, httplib.GATEWAY_TIMEOUT): # 502 504 self.failed_hosts.add(get_host()) raise ConfigurationError(uri, 'host is unresolvable') raise DownloadError(uri, res.code) except urllib_error.URLError as e: self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), e, exc_info=True) if isinstance(e.reason, six.string_types): reason = e.reason elif isinstance(e.reason, socket.timeout): raise ConfigurationError(uri, 'timeout in network connection') else: try: reason = e.reason.args[1] # default value for error message except IndexError: reason = str(e) # unknown if isinstance(e.reason, socket.gaierror): if e.reason.args[0] == socket.EAI_NONAME: # -2 reason = 'host is unresolvable' else: if e.reason.args[0] == errno.ETIMEDOUT: # 110 reason = 'port is blocked' elif e.reason.args[0] == errno.ECONNREFUSED: # 111 reason = 'port is closed' selector = req.selector if six.PY3 else req.get_selector() # type: ignore if selector.startswith('/'): # direct self.failed_hosts.add(get_host()) raise ConfigurationError(uri, reason) else: # proxy raise ProxyError(uri, reason) except socket.timeout as ex: self.log.debug("Failed %s %s: %s", req.get_method(), req.get_full_url(), ex, exc_info=True) raise ConfigurationError(uri, 'timeout in network connection')
[docs]class UCSLocalServer(_UCSServer): """ Access to UCS compatible local update server. """ def __init__(self, prefix): # type: (str) -> None """ Setup URL handler for accessing a UCS repository server. :param str prefix: The local path of the repository. """ self.log = logging.getLogger('updater.UCSFile') self.log.addHandler(logging.NullHandler()) prefix = str(prefix).strip('/') self._prefix = '%s/' % prefix if prefix else '' @property def prefix(self): # type: () -> str return self._prefix def __str__(self): # type: () -> str """ Absolute file-URI. """ return 'file:///%s' % self.prefix def __repr__(self): # type: () -> str """ Return canonical string representation. """ return 'UCSLocalServer(prefix=%r)' % (self.prefix,) def __add__(self, rel): # type: (str) -> UCSLocalServer """ Append relative path component. :param str rel: Relative path. :return: A clone of this instance using the new base path. :rtype: UCSLocalServer """ uri = copy.copy(self) uri._prefix += str(rel).lstrip('/') return uri
[docs] def join(self, rel): # type: (str) -> str """ Return joined URI without credential. :param str rel: relative URI. :return: The joined URI. :rtype: str """ uri = self.__str__() uri += str(rel).lstrip('/') return uri
[docs] def access(self, repo, filename=None, get=False): # type: (Optional[_UCSRepo], str, bool) -> Tuple[int, int, bytes] """ Access URI and optionally get data. :param _UCSRepo repo: the URI to access as an instance of :py:class:`_UCSRepo`. :param str filename: An optional relative path. :param bool get: Fetch data if True - otherwise check only. :return: a 3-tuple (code, size, content) :rtype: tuple(int, int, bytes) :raises DownloadError: if the server is unreachable. :raises ValueError: if the credentials use an invalid encoding. :raises ConfigurationError: if a permanent error in the configuration occurs, e.g. the credentials are invalid or the host is unresolvable. :raises ProxyError: if the HTTP proxy returned an error. """ rel = filename if repo is None else repo.path(filename) assert rel is not None uri = self.join(rel) ud.debug(ud.NETWORK, ud.ALL, "updater: %s" % (uri,)) # urllib2.urlopen() doesn't work for directories assert uri.startswith('file://') path = uri[len('file://'):] if os.path.exists(path): if os.path.isdir(path): self.log.info("Got %s", path) return (httplib.OK, 0, b'') # 200 elif os.path.isfile(path): with open(path, 'rb') as f: data = f.read() self.log.info("Got %s: %d", path, len(data)) return (httplib.OK, len(data), data) # 200 self.log.error("Failed %s", path) raise DownloadError(uri, -1)
[docs]class Component(object): FN_APTSOURCES = '/etc/apt/sources.list.d/20_ucs-online-component.list' UCRV = "repository/online/component/{}/{}" AVAILABLE = 'available' NOT_FOUND = 'not_found' DISABLED = 'disabled' UNKNOWN = 'unknown' PERMISSION_DENIED = 'permission_denied' def __init__(self, updater, name): # type: (UniventionUpdater, str) -> None self.updater = updater self.name = name def __lt__(self, other): return self.name < other.name if isinstance(other, Component) else NotImplemented def __le__(self, other): return self.name <= other.name if isinstance(other, Component) else NotImplemented def __eq__(self, other): return isinstance(other, Component) and self.name == other.name def __ne__(self, other): return not isinstance(other, Component) or self.name != other.name def __ge__(self, other): return self.name >= other.name if isinstance(other, Component) else NotImplemented def __gt__(self, other): return self.name > other.name if isinstance(other, Component) else NotImplemented def __hash__(self): return hash(self.name) def __str__(self): # type: () -> str return "Component({0.name})".format(self)
[docs] def ucrv(self, key=""): # type: (str) -> str return "/".join(filter(None, ("repository", "online", "component", self.name, key)))
def __getitem__(self, key): # type: (str) -> str return self.updater.configRegistry.get(self.ucrv(key)) or "" def __bool__(self): # type: () -> bool return self.updater.configRegistry.is_true(self.ucrv()) __nonzero__ = __bool__ def _versions(self, start=None, end=None): # type: (Optional[UCS_Version], Optional[UCS_Version]) -> Set[UCS_Version] version = self["version"] versions = set(RE_SPLIT_MULTI.split(version)) return { UCS_Version(ver.mm + (0,)) for ver, _data in self.updater.get_releases(start, end) if {ver.FORMAT % ver, "current", ""} & versions } @property def current(self): # type: () -> bool version = self["version"] versions = set(RE_SPLIT_MULTI.split(version)) return bool(versions & {"current"}) @property def default_packages(self): # type: () -> Set[str] """ Returns a set of (meta) package names to be installed for this component. :returns: a set of package names. """ return set(( pkg for var in ('defaultpackages', 'defaultpackage') for pkg in RE_SPLIT_MULTI.split(self[var]) )) - {""}
[docs] def defaultpackage_installed(self, ignore_invalid_package_names=True): # type: (bool) -> Optional[bool] """ Returns installation status of component's default packages :param bool ignore_invalid_package_names: Ignore invalid package names. :returns: On of the values: None no default packages are defined True all default packages are installed False at least one package is not installed :rtype: None or bool :raises ValueError: if UCR variable contains invalid package names if ignore_invalid_package_names=False """ pkglist = self.default_packages if not pkglist: return None # check package names for pkg in pkglist: match = RE_ALLOWED_DEBIAN_PKGNAMES.search(pkg) if not match: if ignore_invalid_package_names: continue raise ValueError('invalid package name (%s)' % pkg) cmd = ['/usr/bin/dpkg-query', '-W', '-f', '${Status}\\n'] cmd.extend(pkglist) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = (data.decode("UTF-8", errors="replace") for data in p.communicate()) # count number of "Status: install ok installed" lines installed_correctly = [x for x in stdout.splitlines() if x.endswith(' ok installed')] # if pkg count and number of counted lines match, all packages are installed return len(pkglist) == len(installed_correctly)
[docs] def baseurl(self, for_mirror_list=False): # type: (bool) -> UcsRepoUrl r""" Calculate the base URL for a component. :param bool for_mirror_list: Use external or local repository. CS (component server) value of `repository/online/component/%s/server` MS (mirror server) value of `repository/mirror/server` RS (repository server) value of `repository/online/server` \- value is unset or no entry /blank/ value is irrelevant +-------------+----------+------------+---------+------------+------------+-------------+ | UCR configuration |Result | | +-------------+----------+------------+---------+------------+------------+ | | isRepoServer|enabled |localmirror |server |sources.list mirror.list | | +=============+==========+============+=========+============+============+=============+ | False |False |False |\- |\- |\- |no | + +----------+------------+---------+------------+------------+local | | |True | |\- |RS |\- |repository | + +----------+------------+---------+------------+------------+mirror | | |True | |CS |CS |\- | | +-------------+----------+------------+---------+------------+------------+-------------+ | True |False |False | |\- |\- |local | + +----------+------------+---------+------------+------------+repository | | |False |True |\- |\- |MS |mirror | + +----------+------------+---------+------------+------------+ | | |False |True |CS |\- |CS | | + +----------+------------+---------+------------+------------+ | | |True |False |\- |MS |\- | | + +----------+------------+---------+------------+------------+ | | |True |False |CS |CS |\- | | + +----------+------------+---------+------------+------------+ | | |True |True |\- |RS |MS | | + +----------+------------+---------+------------+------------+ | | |True |True |CS |RS |CS | | + +----------+------------+---------+------------+------------+-------------+ | |False |\- |\- |\- |\- |backward | + +----------+ +---------+------------+------------+compabibility| | |True | |\- |RS |MS |[1]_ | + +----------+ +---------+------------+------------+ | | |True | |CS |RS |CS | | +-------------+----------+------------+---------+------------+------------+-------------+ .. [1] if `repository/online/component/%s/localmirror` is unset, then the value of `repository/online/component/%s` will be used to achieve backward compatibility. """ c_prefix = self.ucrv() if self.updater.is_repository_server: m_url = UcsRepoUrl(self.updater.configRegistry, 'repository/mirror') c_enabled = bool(self) c_localmirror = self.updater.configRegistry.is_true(self.ucrv("localmirror"), c_enabled) if for_mirror_list: # mirror.list if c_localmirror: return UcsRepoUrl(self.updater.configRegistry, c_prefix, m_url) else: # sources.list if c_enabled: if c_localmirror: return self.updater.repourl else: return UcsRepoUrl(self.updater.configRegistry, c_prefix, m_url) else: return UcsRepoUrl(self.updater.configRegistry, c_prefix, self.updater.repourl) raise CannotResolveComponentServerError(self.name, for_mirror_list)
[docs] def server(self, for_mirror_list=False): # type: (bool) -> UCSHttpServer """ Return :py:class:`UCSHttpServer` for component as configures via UCR. :param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`. :returns: The repository server for the component. :rtype: UCSHttpServer :raises ConfigurationError: if the configured server is not usable. """ c_url = copy.copy(self.baseurl(for_mirror_list)) c_url.path = '' prefix = self["prefix"] user_agent = self.updater._get_user_agent_string() server = UCSHttpServer( baseurl=c_url, user_agent=user_agent, timeout=self.updater.timeout, ) try: # if prefix.lower() == 'none' ==> use no prefix if prefix and prefix.lower() == 'none': try: assert server.access(None, '') except DownloadError as e: uri, code = e.args raise ConfigurationError(uri, 'absent prefix forced - component %s not found: %s' % (self.name, uri)) else: # FIXME: PMH stop iterating for testserver in [ server + '/univention-repository/', server + self.updater.repourl.path if self.updater.repourl.path else None, server, ]: if not testserver: continue if prefix: # append prefix if defined testserver = testserver + '%s/' % (prefix.strip('/'),) try: assert testserver.access(None, '') return testserver except DownloadError as e: ud.debug(ud.NETWORK, ud.ALL, "%s" % e) uri, code = e.args raise ConfigurationError(uri, 'non-existing component prefix: %s' % (uri,)) except ConfigurationError: if self.updater.check_access: raise return server
[docs] def versions(self, start, end, for_mirror_list=False): # type: (UCS_Version, UCS_Version, bool) -> Iterator[Tuple[UCSHttpServer, _UCSRepo]] """ Iterate component versions. :param start: Minimum requried version. :param end: Maximum allowed version. :param bool clean: Add additional `clean` statements for `apt-mirror`. :param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`. :returns: A iterator returning 2-tuples (server, ver). """ server = self.server(for_mirror_list=for_mirror_list) struct = self.layout(prefix=server, patch=self.name) versions = self._versions(start, end) parts = self._parts for ver in sorted(versions): struct.mmp = ver.mmp for struct.part in parts: yield server, struct
[docs] def repositories(self, start, end, clean=False, for_mirror_list=False, failed=None): # type: (UCS_Version, UCS_Version, bool, bool, Optional[Set[Tuple[Component, str]]]) -> Iterator[str] """ Return list of Debian repository statements for requested component. :param start: Minimum requried version. :param end: Maximum allowed version. :param bool clean: Add additional `clean` statements for `apt-mirror`. :param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`. :param failed: A set to recive the failed component names. :returns: A list of strings with APT statements. """ for server, struct in self.versions(start, end, for_mirror_list): try: for struct.arch in sorted(struct.ARCHS): assert server.access(struct, "Packages.gz") yield struct.deb(server) if clean: yield struct.clean(server) if self.updater.sources: struct.arch = "source" assert server.access(struct, "Sources.gz") yield struct.deb(server, "deb-src") except DownloadError as ex: if failed is not None: failed.add((self, str(ex))) else: raise
[docs] def status(self): # type: () -> str """ Returns the current status of specified component based on :file:`/etc/apt/sources.list.d/20_ucs-online-component.list` :returns: One of the strings: :py:const:`DISABLED` component has been disabled via UCR :py:const:`AVAILABLE` component is enabled and at least one valid repo string has been found in .list file :py:const:`NOT_FOUND` component is enabled but no valid repo string has been found in .list file :py:const:`PERMISSION_DENIED` component is enabled but authentication failed :py:const:`UNKNOWN` component's status is unknown :rtype: str """ if not bool(self): return self.DISABLED try: comp_file = open(self.FN_APTSOURCES, 'r') except IOError: return self.UNKNOWN rePath = re.compile('(un)?maintained/component/ ?%s/' % self.name) reDenied = re.compile('credentials not accepted: %s$' % self.name) try: # default: file contains no valid repo entry result = self.NOT_FOUND for line in comp_file: if line.startswith('deb ') and rePath.search(line): # at least one repo has been found result = self.AVAILABLE elif reDenied.search(line): # stop immediately if at least one repo has authentication problems return self.PERMISSION_DENIED # return result return result finally: comp_file.close()
@property def layout(self): # type: () -> Type[_UCSRepo] value = self["layout"] layouts = { "": UCSRepoPool, "arch": UCSRepoPool, "flat": UCSRepoPoolNoArch, } # type: Dict[str, Type[_UCSRepo]] try: return layouts[value] except LookupError: raise ValueError(value) @property def _parts(self): # type: () -> List[str] parts = ["maintained"] + ["unmaintained"][:self.updater.configRegistry.is_true(self.ucrv('unmaintained'))] return ['%s/component' % (part,) for part in parts]
[docs]class UniventionUpdater(object): """ Handle UCS package repositories. """ def __init__(self, check_access=True): # type: (bool) -> None """ Create new updater with settings from UCR. :param bool check_access: Check if repository server is reachable on init. :raises ConfigurationError: if configured server is not available immediately. """ self.log = logging.getLogger('updater.Updater') self.log.addHandler(logging.NullHandler()) self.check_access = check_access self.connection = None self.configRegistry = ConfigRegistry() self.ucr_reinit()
[docs] def config_repository(self): # type: () -> None """ Retrieve configuration to access repository. Overridden by :py:class:`univention.updater.UniventionMirror`. """ self.online_repository = self.configRegistry.is_true('repository/online', True) self.repourl = UcsRepoUrl(self.configRegistry, 'repository/online') self.sources = self.configRegistry.is_true('repository/online/sources', False) self.timeout = float(self.configRegistry.get('repository/online/timeout', 30)) self.script_verify = self.configRegistry.is_true('repository/online/verify', True) UCSHttpServer.http_method = self.configRegistry.get('repository/online/httpmethod', 'HEAD').upper()
[docs] def ucr_reinit(self): # type: () -> None """ Re-initialize settings. """ self.configRegistry.load() self.is_repository_server = self.configRegistry.is_true('local/repository', False) reinitUCSHttpServer = False if 'proxy/http' in self.configRegistry and self.configRegistry['proxy/http']: os.environ['http_proxy'] = self.configRegistry['proxy/http'] os.environ['https_proxy'] = self.configRegistry['proxy/http'] reinitUCSHttpServer = True if 'proxy/https' in self.configRegistry and self.configRegistry['proxy/https']: os.environ['https_proxy'] = self.configRegistry['proxy/https'] reinitUCSHttpServer = True if 'proxy/no_proxy' in self.configRegistry and self.configRegistry['proxy/no_proxy']: os.environ['no_proxy'] = self.configRegistry['proxy/no_proxy'] reinitUCSHttpServer = True if reinitUCSHttpServer: UCSHttpServer.reinit() # UCS version self.current_version = UCS_Version("%(version/version)s-%(version/patchlevel)s" % self.configRegistry) self.erratalevel = int(self.configRegistry.get('version/erratalevel', 0)) # UniventionMirror needs to provide its own settings self.config_repository() if not self.online_repository: self.log.info('Disabled') self.server = UCSLocalServer('') # type: _UCSServer self.releases = {"error": "offline"} return # generate user agent string user_agent = self._get_user_agent_string() UCSHttpServer.load_credentials(self.configRegistry) self.server = UCSHttpServer( baseurl=self.repourl, user_agent=user_agent, timeout=self.timeout, ) self._get_releases()
def _get_releases(self): # type: () -> None """ Detect server prefix and download `ucs-releases.json` file. """ try: if not self.repourl.path: try: _code, _size, data = self.server.access(None, '/univention-repository/ucs-releases.json', get=True) self.server += '/univention-repository/' self.log.info('Using detected prefix /univention-repository/') self.releases = json.loads(data) except DownloadError as e: self.log.info('No prefix /univention-repository/ detected, using /') ud.debug(ud.NETWORK, ud.ALL, "%s" % e) # Validate server settings try: _code, _size, data = self.server.access(None, 'ucs-releases.json', get=True) self.log.info('Using configured prefix %s', self.repourl.path) self.releases = json.loads(data) except DownloadError as e: self.log.error('Failed configured prefix %s', self.repourl.path, exc_info=True) uri, code = e.args raise ConfigurationError(uri, 'non-existing prefix "%s": %s' % (self.repourl.path, uri)) except ConfigurationError as e: if self.check_access: self.log.fatal('Failed server detection: %s', e, exc_info=True) raise self.releases = {"error": str(e)} except (ValueError, LookupError) as exc: ud.debug(ud.NETWORK, ud.ERROR, 'Querying maintenance information failed: %s' % (exc,)) self.releases = {"error": str(exc)}
[docs] def get_releases(self, start=None, end=None): # type: (Optional[UCS_Version], Optional[UCS_Version]) -> Iterator[Tuple[UCS_Version, Dict[str, Any]]] """ Return UCS releases in range. :param start: Minimum requried version. :param end: Maximum allowed version. :returns: Iterator of 2-tuples (UCS_Version, data). """ for major_release in self.releases.get('releases', []): for minor_release in major_release['minors']: for patchlevel_release in minor_release['patchlevels']: ver = UCS_Version(( major_release['major'], minor_release['minor'], patchlevel_release['patchlevel'] )) if start and ver < start: continue if end and ver > end: continue yield (ver, dict(patchlevel_release, major=major_release['major'], minor=minor_release['minor']))
[docs] def get_next_version(self, version, components=[], errorsto='stderr'): # type: (UCS_Version, Iterable[Component], Literal["stderr", "exception", "none"]) -> Optional[UCS_Version] """ Check if a new patchlevel, minor or major release is available for the given version. Components must be available for the same major.minor version. :param UCS_Version version: A UCS release version. :param components: A list of components, which must be available for the next release. :param str errorsto: Select method of reporting errors; on of 'stderr', 'exception', 'none'. :returns: The next UCS release or None. :rtype: UCS_Version or None :raises RequiredComponentError: if a required component is missing """ try: ver = min(ver for ver, _data in self.get_releases() if ver > version) except ValueError: return None self.log.info('Found version %s', ver) failed = set() # type: Set[Tuple[Component, str]] for component in components: self.log.info('Checking for component %s', component.name) any(component.repositories(ver, ver, failed=failed if component.current else set())) if failed: ex = RequiredComponentError(str(ver), {comp.name for comp, ex in failed}) if errorsto == 'exception': raise ex elif errorsto == 'stderr': print(ex, file=sys.stderr) return None self.log.info('Going for version %s', ver) return ver
[docs] def get_all_available_release_updates(self, ucs_version=None): # type: (Optional[UCS_Version]) -> Tuple[List[UCS_Version], Optional[Set[str]]] """ Returns a list of all available release updates - the function takes required components into account and stops if a required component is missing :param ucs_version: starts travelling through available version from version. :type ucs_version: UCS_Version or None :returns: a list of 2-tuple `(versions, blocking_component)`, where `versions` is a list of UCS release and `blocking_component` is the first missing component blocking the update. :rtype: tuple(list[str], str or None) """ ucs_version = ucs_version or self.current_version components = self.get_components(only_current=True) result = [] # type: List[UCS_Version] while ucs_version: try: ucs_version = self.get_next_version(ucs_version, components, errorsto='exception') except RequiredComponentError as ex: self.log.warning('Update blocked by components %s', ', '.join(ex.components)) # ex.components blocks update to next version ==> return current list and blocking component return result, ex.components if not ucs_version: break result.append(ucs_version) self.log.info('Found release updates %r', result) return result, None
[docs] def release_update_available(self, ucs_version=None, errorsto='stderr'): # type: (Optional[UCS_Version], Literal["stderr", "exception", "none"]) -> Optional[UCS_Version] """ Check if an update is available for the `ucs_version`. :param str ucs_version: The UCS release to check. :param str errorsto: Select method of reporting errors; on of 'stderr', 'exception', 'none'. :returns: The next UCS release or None. :rtype: str or None """ ucs_version = ucs_version or self.current_version components = self.get_components(only_current=True) return self.get_next_version(UCS_Version(ucs_version), components, errorsto)
[docs] def release_update_temporary_sources_list(self, version): # type: (UCS_Version) -> List[str] """ Return list of Debian repository statements for the release update including all enabled components. :param version: The UCS release. :returns: A list of Debian APT `sources.list` lines. :rtype: list[str] """ result = [UCSRepoPool5(version).deb(self.server)] for comp in self.get_components(): try: result += list(comp.repositories(version, version, failed=set())) except (ConfigurationError, ProxyError): if comp.current: raise return result
[docs] def component(self, name): # type: (str) -> Component return Component(self, name)
[docs] def get_components(self, only_localmirror_enabled=False, all=False, only_current=False): # type: (bool, bool, bool) -> Set[Component] """ Retrieve all (enabled) components from registry as set(). By default, only "enabled" components will be returned (repository/online/component/%s=$TRUE). :param bool only_localmirror_enabled: Only the components enabled for local mirroring. If only_`localmirror`_enabled is `True`, then all components with `repository/online/component/%s/localmirror=$TRUE` will be returned. If `repository/online/component/%s/localmirror` is not set, then the value of `repository/online/component/%s` is used for backward compatibility. :param bool all: Also return not enabled components. :param bool only_current: Only return components marked as "current". :returns: The set of (enabled) components. """ components = set() for key in self.configRegistry: match = RE_COMPONENT.match(key) if not match: continue component, = match.groups() comp = self.component(component) enabled = bool(comp) if only_localmirror_enabled: enabled = self.configRegistry.is_true(comp.ucrv("localmirror"), enabled) if only_current and not comp.current: continue if all or enabled: components.add(comp) return components
[docs] def component_update_get_packages(self): # type: () -> Tuple[List[Tuple[Text, Text]], List[Tuple[Text, Text, Text]], List[Tuple[Text, Text]]] """ Return tuple with list of (new, upgradeable, removed) packages. :return: A 3-tuple (new, upgraded, removed). :rtype: tuple(list[str], list[str], list[str]) """ env = dict(os.environ, LC_ALL="C.UTF-8") proc = subprocess.Popen(("univention-config-registry", "commit", Component.FN_APTSOURCES), stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate()) if stderr: ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr) if stdout: ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout) # FIXME: error handling proc = subprocess.Popen(cmd_update, shell=True, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate()) if stderr: ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr) if stdout: ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout) # FIXME: error handling proc = subprocess.Popen(cmd_dist_upgrade_sim, shell=True, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = (data.decode("UTF-8", errors="replace") for data in proc.communicate()) if stderr: ud.debug(ud.NETWORK, ud.PROCESS, 'stderr=%s' % stderr) if stdout: ud.debug(ud.NETWORK, ud.INFO, 'stdout=%s' % stdout) if proc.returncode == 100: raise UnmetDependencyError(stderr) new_packages = [] # type: List[Tuple[Text, Text]] upgraded_packages = [] # type: List[Tuple[Text, Text, Text]] removed_packages = [] # type: List[Tuple[Text, Text]] for line in stdout.splitlines(): line_split = line.split(' ') if line.startswith('Inst '): # upgrade: # Inst univention-updater [3.1.1-5] (3.1.1-6.408.200810311159 192.168.0.10) # inst: # Inst mc (1:4.6.1-6.12.200710211124 oxae-update.open-xchange.com) if len(line_split) > 3: if line_split[2].startswith('[') and line_split[2].endswith(']'): ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of upgraded packages' % line_split[1]) upgraded_packages.append((line_split[1], line_split[2].replace('[', '').replace(']', ''), line_split[3].replace('(', ''))) else: ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of new packages' % line_split[1]) new_packages.append((line_split[1], line_split[2].replace('(', ''))) else: ud.debug(ud.NETWORK, ud.WARN, 'unable to parse the update line: %s' % line) continue elif line.startswith('Remv '): if len(line_split) > 3: ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of removed packages' % line_split[1]) removed_packages.append((line_split[1], line_split[2].replace('(', ''))) elif len(line_split) > 2: ud.debug(ud.NETWORK, ud.PROCESS, 'Added %s to the list of removed packages' % line_split[1]) removed_packages.append((line_split[1], 'unknown')) else: ud.debug(ud.NETWORK, ud.WARN, 'unable to parse the update line: %s' % line) continue return (new_packages, upgraded_packages, removed_packages)
[docs] def run_dist_upgrade(self): # type: () -> int """ Run `apt-get dist-upgrade` command. :returns: a 3-tuple (return_code, stdout, stderr) :rtype: tuple(int, str, str) """ env = dict(os.environ, DEBIAN_FRONTEND="noninteractive") with open("/var/log/univention/updater.log", "a") as log: return subprocess.call(cmd_dist_upgrade, shell=True, env=env, stdout=log, stderr=log)
[docs] def print_component_repositories(self, clean=False, start=None, end=None, for_mirror_list=False): # type: (bool, Optional[UCS_Version], Optional[UCS_Version], bool) -> str """ Return a string of Debian repository statements for all enabled components. :param bool clean: Add additional `clean` statements for `apt-mirror` if enabled by UCRV `repository/online/component/%s/clean`. :param UCS_Version start: optional smallest UCS release to return. :param UCS_Version end: optional largest UCS release to return. :param bool for_mirror_list: component entries for `mirror.list` will be returned, otherwise component entries for local `sources.list`. :returns: A string with APT statement lines. :rtype: str """ if not self.online_repository: return '' if clean: clean = self.configRegistry.is_true('online/repository/clean', False) result = [] # type: List[str] failed = set() # type: Set[Tuple[Component, str]] for comp in sorted(self.get_components(only_localmirror_enabled=for_mirror_list)): result += comp.repositories(start, end, clean=clean, for_mirror_list=for_mirror_list, failed=failed) result += ["# Component %s: %s" % (comp.name, ex) for comp, ex in failed] return '\n'.join(result)
def _get_user_agent_string(self): # type: () -> str """ Return the HTTP user agent string encoding the enabled components. :returns: A HTTP user agent string. :rtype: str """ # USER_AGENT='updater/identify - version/version-version/patchlevel errata version/erratalevel - uuid/system - uuid/license' # USER_AGENT='UCS updater - 3.1-0 errata28 - 77e6406d-7a3e-40b3-a398-81cf119c9ef7 - 4c52d2da-d04d-4b05-a593-1974ee851fc8' # USER_AGENT='UCS updater - 3.1-0 errata28 - 77e6406d-7a3e-40b3-a398-81cf119c9ef7 - 00000000-0000-0000-0000-000000000000' return '%s - %s-%s errata%s - %s - %s - %s - %s' % ( self.configRegistry.get('updater/identify', 'UCS'), self.configRegistry.get('version/version'), self.configRegistry.get('version/patchlevel'), self.configRegistry.get('version/erratalevel'), self.configRegistry.get('uuid/system', UUID_NULL), self.configRegistry.get('uuid/license', UUID_NULL), ','.join(self.configRegistry.get('repository/app_center/installed', '').split('-')), self.configRegistry.get('updater/statistics', ''), )
[docs] @staticmethod def call_sh_files(scripts, logname, *args): # type: (Iterable[Tuple[_UCSServer, _UCSRepo, Optional[str], str, bytes]], str, *str) -> Iterator[Tuple[str, str]] """ Get pre- and postup.sh files and call them in the right order:: u = UniventionUpdater() ver = u.get_next_version(u.current_version) scripts = u.get_sh_files(ver, ver) for phase, order in u.call_sh_files(scripts, '/var/log/univention/updater.log', ver): if (phase, order) == ('update', 'main'): pass :param scripts: A generator returning the script to call, e.g. :py:meth:`get_sh_files` :param str logname: The file name of the log file. :param args: Additional arguments to pass through to the scripts. :returns: A generator returning 2-tuples (phase, part) """ def call(*cmd): # type: (*str) -> int """ Execute script. :param cmd: The command to execute in a sub-process. :type cmd: list(str) :returns: The exit code of the child process. :rtype: int """ commandline = ' '.join(["'%s'" % a.replace("'", "'\\''") for a in cmd]) ud.debug(ud.NETWORK, ud.INFO, "Calling %s" % commandline) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) tee = subprocess.Popen(('tee', '-a', logname), stdin=p.stdout) # Order is important! See bug #16454 tee.wait() p.wait() return p.returncode # download scripts yield "update", "pre" main = {'preup': [], 'postup': []} # type: Dict[str, List[Tuple[str, str]]] comp = {'preup': [], 'postup': []} # type: Dict[str, List[Tuple[str, str]]] # save scripts to temporary files with TemporaryDirectory() as tempdir: for server, struct, phase, path, data in scripts: if phase is None: continue assert data is not None uri = server.join(path) name = os.path.join(tempdir, uri.replace("/", "_")) try: with open(name, "wb") as fd: fd.write(data) os.fchmod(fd.fileno(), 0o744) ud.debug(ud.NETWORK, ud.INFO, "%s saved to %s" % (uri, name)) is_component = hasattr(struct, 'part') and struct.part.endswith('/component') memo = comp if is_component else main memo[phase].append((name, str(struct.patch))) except EnvironmentError as ex: ud.debug(ud.NETWORK, ud.ERROR, "Error saving %s to %s: %s" % (uri, name, ex)) # call component/preup.sh pre $args yield "preup", "pre" for (script, patch) in comp['preup']: if call(script, 'pre', *args) != 0: raise PreconditionError('preup', 'pre', patch, script) # call $next_version/preup.sh yield "preup", "main" for (script, patch) in main['preup']: if call(script, *args) != 0: raise PreconditionError('preup', 'main', patch, script) # call component/preup.sh post $args yield "preup", "post" for (script, patch) in comp['preup']: if call(script, 'post', *args) != 0: raise PreconditionError('preup', 'post', patch, script) # call $update/commands/distupgrade or $update/commands/upgrade yield "update", "main" # call component/postup.sh pos $args yield "postup", "pre" for (script, patch) in comp['postup']: if call(script, 'pre', *args) != 0: raise PreconditionError('postup', 'pre', patch, script) # call $next_version/postup.sh yield "postup", "main" for (script, patch) in main['postup']: if call(script, *args) != 0: raise PreconditionError('postup', 'main', patch, script) # call component/postup.sh post $args yield "postup", "post" for (script, patch) in comp['postup']: if call(script, 'post', *args) != 0: raise PreconditionError('postup', 'post', patch, script) # clean up yield "update", "post"
[docs] def get_sh_files(self, start, end, mirror=False): # type: (UCS_Version, UCS_Version, bool) -> Iterator[Tuple[_UCSServer, _UCSRepo, Optional[str], str, bytes]] """ Return all preup- and postup-scripts of repositories. :param UCS_Version start: The UCS release to start from. :param UCS_Version end: The UCS release where to stop. :param bool mirror: Use the settings for mirroring. :returns: iteratable (server, struct, phase, path, script) :raises VerificationError: if the PGP signature is invalid. See :py:meth:`call_sh_files` for an example. """ def all_repos(): # type: () -> Iterator[Tuple[_UCSServer, _UCSRepo, bool]] self.log.info('Searching releases [%s..%s]', start, end) for ver, _data in self.get_releases(start, end): yield self.server, UCSRepoPool5(release=ver, prefix=self.server), True self.log.info('Searching components [%s..%s]', start, end) components = self.get_components(only_localmirror_enabled=mirror) for comp in components: for server, struct in comp.versions(start, end, mirror): struct.arch = "all" self.log.info('Component %s from %s versions %r', comp.name, server, struct) yield server, struct, comp.current for server, struct, critical in all_repos(): uses_proxy = hasattr(server, "proxy_handler") and server.proxy_handler.proxies # type: ignore for phase in ('preup', 'postup'): name = '%s.sh' % phase path = struct.path(name) ud.debug(ud.NETWORK, ud.ALL, "Accessing %s" % path) try: _code, _size, script = server.access(struct, name, get=True) # Bug #37031: dansguarding is lying and returns 200 even for blocked content if not script.startswith(b'#!') and uses_proxy: uri = server.join(path) raise ProxyError(uri, "download blocked by proxy?") if self.script_verify and struct >= UCS_Version((3, 2, 0)): name_gpg = name + '.gpg' path_gpg = struct.path(name_gpg) try: _code, _size, signature = server.access(struct, name_gpg, get=True) if not signature.startswith(b"-----BEGIN PGP SIGNATURE-----") and uses_proxy: uri = server.join(path_gpg) raise ProxyError(uri, "download blocked by proxy?") except DownloadError: raise VerificationError(path_gpg, "Signature download failed") error = verify_script(script, signature) if error is not None: raise VerificationError(path, "Invalid signature: %r" % error) yield server, struct, None, path_gpg, signature yield server, struct, phase, path, script except DownloadError as e: ud.debug(ud.NETWORK, ud.ALL, "%s" % e) except ConfigurationError: if critical: raise
[docs]class LocalUpdater(UniventionUpdater): """ Direct file access to local repository. """ def __init__(self): # type: () -> None UniventionUpdater.__init__(self) self.log = logging.getLogger('updater.LocalUpdater') self.log.addHandler(logging.NullHandler()) repository_path = self.configRegistry.get('repository/mirror/basepath', '/var/lib/univention-repository') self.server = UCSLocalServer("%s/mirror/" % repository_path) # type: _UCSServer