Source code for univention.appcenter.actions.install_base

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention App Center
#  univention-app base module for installing, uninstalling, upgrading an app
#
# Copyright 2015-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
#

import os.path
import shutil
from glob import glob
from getpass import getuser
import subprocess
from argparse import SUPPRESS, Action
from tempfile import NamedTemporaryFile
import traceback

from six import string_types
from six.moves import input

from univention.appcenter.app import App, DATA_DIR
from univention.appcenter.actions import StoreAppAction, get_action
from univention.appcenter.exceptions import Abort, NetworkError, AppCenterError, ParallelOperationInProgress
from univention.appcenter.actions.register import Register
from univention.appcenter.utils import get_locale, resolve_dependencies, call_process2
from univention.appcenter.ucr import ucr_get
from univention.appcenter.settings import SettingValueError
from univention.appcenter.packages import package_lock, LockError
from univention.appcenter.install_checks import get_requirement, check


[docs]class StoreConfigAction(Action): def __call__(self, parser, namespace, value, option_string=None): set_vars = {} for val in value: try: key, val = val.split('=', 1) except ValueError: parser.error('Could not parse %s. Use var=val. Skipping...' % val) else: set_vars[key] = val setattr(namespace, self.dest, set_vars)
[docs]class InstallRemoveUpgrade(Register): prescript_ext = None pre_readme = None post_readme = None
[docs] def setup_parser(self, parser): super(Register, self).setup_parser(parser) parser.add_argument('--set', nargs='+', action=StoreConfigAction, metavar='KEY=VALUE', dest='set_vars', help='Sets the configuration variable. Example: --set some/variable=value some/other/variable="value 2"') parser.add_argument('--skip-checks', nargs='*', choices=[req.name for req in App._requirements if self.get_action_name() in req.actions], help=SUPPRESS) parser.add_argument('--do-not-configure', action='store_false', dest='configure', help=SUPPRESS) parser.add_argument('--do-not-update-certificates', action='store_false', dest='update_certificates', help=SUPPRESS) parser.add_argument('--do-not-call-join-scripts', action='store_false', dest='call_join_scripts', help=SUPPRESS) parser.add_argument('--do-not-send-info', action='store_false', dest='send_info', help=SUPPRESS) parser.add_argument('--autoinstalled', nargs='+', dest='autoinstalled', help=SUPPRESS) parser.add_argument('--dry-run', action='store_true', dest='dry_run', help='Perform only a dry-run. App state is not touched') parser.add_argument('app', nargs='+', action=StoreAppAction, help='The ID of the App')
main = None # no action by itself def _write_start_event(self, app, args): pass def _write_success_event(self, app, context_id, args): pass def _write_fail_event(self, app, context_id, status, args): pass def _call_action_hooks(self, directory): """ abstract method is empty, because there is no default hook for any action. The implementation has to be done in each derived class if needed. """ pass def _run_parts(self, directory): """ in order to call hooks we use run-parts, so that administrators can better comprehend what is done behind the scenes and test their script folders manually using that tool. """ if os.path.isdir(directory): (retval, output) = call_process2(["run-parts", directory]) # self.log(output) is unnecessary, because call_process2 logs its # output, but if you are replacing call_process2 with something # different, please remember to to inform the user about the output # of the scripts! else: self.log('Potential script hook folder is unused: {folder}'.format(folder=directory))
[docs] def do_it(self, args): i = -1 apps = [] try: try: action = self.get_action_name() apps = resolve_dependencies(args.app, action) for app in apps: self.log('Going to %s %s (%s)' % (action, app.name, app.version)) new_apps = [app for app in apps if app.id not in [_a.id for _a in args.app]] new_apps_have_settings = False for app in new_apps: if app.get_settings(): new_apps_have_settings = True self.fatal('Automatically added App %s has its own settings. You should explicitely mention this App. This way, you may (or may not) set settings for this App via --set.' % app) if new_apps_have_settings: self.fatal('Unable to %s. Aborting...' % action) return False if not args.autoinstalled: # save the installed status for those apps that were not explicitely given # but where added by resolving the dependencies args.autoinstalled = [app.id for app in new_apps] errors, warnings = check(apps, action) can_continue = self._handle_errors(args, errors, True) can_continue = self._handle_errors(args, warnings, fatal=not can_continue) and can_continue if can_continue and self.needs_credentials(app) and not self.check_user_credentials(args): can_continue = False if not can_continue: self.fatal('Unable to %s. Aborting...' % action) return False for app in apps: try: self._show_license(app, args) self._show_pre_readme(app, args) except Abort: self.warn('Cancelled...') return except Exception: trace = traceback.format_exc() if apps: for app in apps: self._send_information_on_app(app, 502, trace, args) else: self._send_information_on_app(None, 502, trace, args) raise for i, app in enumerate(apps): args.app = app success = self.do_it_once(app, args) if not success: break else: return True return False finally: not_touched = apps[i + 1:] if not_touched: self.warn('Failure will leave these apps untouched: %s' % ', '.join(app.id for app in not_touched)) if i >= 0: for app in apps[:i]: try: self._show_post_readme(app, args) except Abort: pass upgrade_search = get_action('upgrade-search') upgrade_search.call_safe(app=apps, update=False)
def _send_information_on_app(self, app, status, status_details, args): if args.send_info: try: # do not send more than 500 char of status_details if isinstance(status_details, string_types): status_details = status_details[-5000:] self._send_information(app, status, status_details) except NetworkError: self.log('Ignoring this error...')
[docs] def do_it_once(self, app, args): status = 200 status_details = None context_id = self._write_start_event(app, args) try: self._configure(app, args, run_script='no', scope='outside') if not self._call_prescript(app, args): self.fatal('Running prescript of %s failed. Aborting...' % app) status = 0 else: try: try: with package_lock(): if args.dry_run: self.dry_run(app, args) else: self._do_it(app, args) except LockError: raise ParallelOperationInProgress() except (Abort, KeyboardInterrupt) as exc: msg = str(exc) if msg: self.fatal(msg) self.warn('Aborting...') if exc.__class__ is KeyboardInterrupt: status = 401 else: status = exc.code status_details = exc.get_exc_details() except Exception as exc: status = 500 status_details = repr(exc) raise except AppCenterError as exc: status = exc.code raise else: return status == 200 finally: if args.dry_run: pass elif status == 0: pass else: if status == 200: self._write_success_event(app, context_id, args) self._call_action_hooks( "{data_dir}/{app_id}/local/hooks/" "{when}-{action}.d".format( data_dir=DATA_DIR, app_id=app.id, action=self.get_action_name(), when="post" ) ) else: self._write_fail_event(app, context_id, status, args) if status != 200: self._revert(app, args) self._send_information_on_app(app, status, status_details, args) self._register_installed_apps_in_ucr()
[docs] def needs_credentials(self, app): if os.path.exists(app.get_cache_file(self.prescript_ext)): return True if os.path.exists(app.get_cache_file('schema')): return True if os.path.exists(app.get_cache_file('attributes')) or app.generic_user_activation: return True if app.docker and app.docker_script_setup: return True return False
def _handle_errors(self, args, errors, fatal): can_continue = True for error in errors: details = errors[error] try: requirement = get_requirement(error) try: message = requirement.__doc__ % details except TypeError: message = requirement.__doc__ except KeyError: message = '' message = '(%s) %s' % (error, message or '') if args.skip_checks is not None and (error in args.skip_checks or args.skip_checks == []): self.log(message) else: if fatal: self.fatal(message) else: self.warn(message) can_continue = False if not can_continue: if fatal: return False else: if args.noninteractive: return True try: aggreed = input('Do you want to %s anyway [y/N]? ' % self.get_action_name()) except (KeyboardInterrupt, EOFError): return False else: return aggreed.lower()[:1] in ['y', 'j'] return True def _call_prescript(self, app, args, **kwargs): ext = self.prescript_ext self.debug('Calling prescript (%s)' % ext) if not ext: return True script = app.get_cache_file(ext) # check here to not bother asking for a password # otherwise self._call_script could handle it, too if not os.path.exists(script): self.debug('%s does not exist' % script) return True with NamedTemporaryFile('r') as error_file: with self._get_password_file(args) as pwdfile: if not pwdfile: self.warn('Could not get password') return False kwargs['version'] = app.version kwargs['error_file'] = error_file.name kwargs['binddn'] = self._get_userdn(args) kwargs['bindpwdfile'] = pwdfile locale = get_locale() if locale: kwargs['locale'] = locale success = self._call_cache_script(app, ext, **kwargs) if not success: for line in error_file: self.fatal(line) return success def _revert(self, app, args): pass def _do_it(self, app, args): raise NotImplementedError() def _show_license(self, app, args): self.log('Showing License agreement for %s' % app) if self._show_file(app, 'license_agreement', args, agree=True) is False: raise Abort() def _show_pre_readme(self, app, args): self.log('Showing README for %s' % app) if self._show_file(app, self.pre_readme, args, confirm=True) is False: raise Abort() def _show_post_readme(self, app, args): self.log('Showing README for %s' % app) if self._show_file(app, self.post_readme, args, confirm=True) is False: raise Abort() def _show_file(self, app, attr, args, confirm=False, agree=False): filename = getattr(app.__class__, attr).get_filename(app.get_ini_file()) if not filename or not os.path.exists(filename): return None elinks_exists = subprocess.call(['which', 'elinks'], stdout=subprocess.PIPE) == 0 if not elinks_exists: return None self._subprocess(['elinks', '-dump', filename], 'readme') if not args.noninteractive: try: if agree: aggreed = input('Do you agree [y/N]? ') return aggreed.lower()[:1] in ['y', 'j'] elif confirm: input('Press [ENTER] to continue') return True except (KeyboardInterrupt, EOFError): return False return True def _call_unjoin_script(self, app, args): return self._call_join_script(app, args, unjoin=True) def _call_join_script(self, app, args, unjoin=False): if not args.call_join_scripts: return other_script = self._get_joinscript_path(app, unjoin=not unjoin) any_number_basename = os.path.basename(other_script) any_number_basename = '[0-9][0-9]%s' % any_number_basename[2:] any_number_scripts = os.path.join(os.path.dirname(other_script), any_number_basename) for other_script in glob(any_number_scripts): self.log('Uninstalling %s' % other_script) os.unlink(other_script) if unjoin: ext = 'uinst' else: ext = 'inst' joinscript = app.get_cache_file(ext) ret = dest = None if os.path.exists(joinscript): self.log('Installing join script %s' % joinscript) dest = self._get_joinscript_path(app, unjoin=unjoin) shutil.copy2(joinscript, dest) # change to UCS umask + +x: -rwxr-xr-x os.chmod(dest, 0o755) if ucr_get('server/role') == 'domaincontroller_master' and getuser() == 'root': ret = self._call_script('/usr/sbin/univention-run-join-scripts') else: with self._get_password_file(args) as password_file: if password_file: username = self._get_username(args) ret = self._call_script('/usr/sbin/univention-run-join-scripts', '-dcaccount', username, '-dcpwd', password_file) return ret def _get_configure_settings(self, app, filter_action=True): set_vars = {} phase = self.get_action_name().title() for setting in app.get_settings(): if setting.name in set_vars: continue if filter_action and phase not in setting.show: continue try: value = setting.get_value(app, phase=phase) except SettingValueError: value = setting.get_initial_value(app) set_vars[setting.name] = value return set_vars def _update_certificates(self, app, args): if not args.update_certificates: return uc = get_action('update-certificates') uc.call(apps=[app]) def _configure(self, app, args, run_script=None, scope=None): if not args.configure: return if run_script is None: run_script = self.get_action_name() configure = get_action('configure') set_vars = self._get_configure_settings(app) if args.set_vars: for setting in app.get_settings(): # we only take those settings from the args # that are used for our App # => installing two apps at once will sort the # settings correctly if setting.name in args.set_vars: set_vars[setting.name] = args.set_vars[setting.name] configure.call(app=app, run_script=run_script, set_vars=set_vars, scope=scope) def _reload_apache(self): self._call_script('/etc/init.d/apache2', 'reload')
[docs] def dry_run(self, app, args): ret = self._dry_run(app, args) if ret['install']: self.log('The following packages would be INSTALLED/UPGRADED:') for pkg in ret['install']: self.log(' * %s' % pkg) if ret['remove']: self.log('The following packages would be REMOVED:') for pkg in ret['remove']: self.log(' * %s' % pkg) if ret['broken']: self.log('The following packages are BROKEN:') for pkg in ret['broken']: self.log(' * %s' % pkg) return ret
def _dry_run(self, app, args): raise NotImplementedError()