#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention Management Console
#  authentication mechanisms
#
# Copyright 2014-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
from __future__ import absolute_import
import traceback
import ldap
from ldap.filter import filter_format
import notifier
import notifier.signals as signals
import notifier.threads as threads
import univention.admin.uexceptions as udm_errors
from univention.lib.i18n import Locale
from univention.management.console.log import AUTH
from univention.management.console.config import ucr
from univention.management.console.ldap import get_machine_connection, reset_cache
from univention.management.console.pam import PamAuth, AuthenticationError, AuthenticationFailed, AuthenticationInformationMissing, PasswordExpired, AccountExpired, PasswordChangeFailed
try:
	from typing import Any, Dict, Optional, Tuple, Union  # noqa: F401
	from univention.management.console.protocol.meesage import Request  # noqa: F401
except ImportError:
	pass
[docs]class AuthenticationResult(object):
	def __init__(self, result, locale):  # type: (Union[BaseException, Dict[str, str]], Optional[str]) -> None
		from univention.management.console.protocol.definitions import SUCCESS, BAD_REQUEST_UNAUTH
		self.credentials = None
		self.status = SUCCESS
		self.authenticated = not isinstance(result, BaseException)
		if self.authenticated:
			self.credentials = result
		self.message = None
		self.result = None  # type: Optional[Dict[str, Any]]
		self.password_expired = False
		if isinstance(result, AuthenticationError):
			self.status = BAD_REQUEST_UNAUTH
			self.message = str(result)
			self.result = {}
			if isinstance(result, PasswordExpired):
				self.result['password_expired'] = True
			elif isinstance(result, AccountExpired):
				self.result['account_expired'] = True
			elif isinstance(result, AuthenticationInformationMissing):
				self.result['missing_prompts'] = result.missing_prompts
			elif isinstance(result, PasswordChangeFailed):
				self.result['password_change_failed'] = True
			if isinstance(result, (PasswordExpired, PasswordChangeFailed)):
				_locale = Locale(locale)
				self.message += (' %s' % (ucr.get('umc/login/password-complexity-message/%s' % (_locale.language,), ucr.get('umc/login/password-complexity-message/en', '')),)).rstrip()
		elif isinstance(result, BaseException):
			self.status = 500
			self.message = str(result)
		else:
			self.result = {'username': result['username']}
	def __bool__(self):  # type: () -> bool
		return self.authenticated
	__nonzero__ = __bool__  # Python 2 
[docs]class AuthHandler(signals.Provider):
	def __init__(self):  # type: () -> None
		signals.Provider.__init__(self)
		self.signal_new('authenticated')
[docs]	def authenticate(self, msg):  # type: (Request) -> None
		# PAM MUST be initialized outside of a thread. Otherwise it segfaults e.g. with pam_saml.so.
		# See http://pam-python.sourceforge.net/doc/html/#bugs
		args = msg.body.copy()
		locale = args.pop('locale', None)
		args.pop('pam', None)
		args.setdefault('new_password', None)
		args.setdefault('username', '')
		args.setdefault('password', '')
		pam = PamAuth(locale)
		thread = threads.Simple('pam', notifier.Callback(self.__authenticate_thread, pam, **args), notifier.Callback(self.__authentication_result, pam, msg, locale))
		thread.run() 
	def __authenticate_thread(self, pam, username, password, new_password, auth_type=None, **custom_prompts):  # type: (PamAuth, str, str, Optional[str], Optional[str], **Optional[str]) -> Tuple[str, str]
		AUTH.info('Trying to authenticate user %r (auth_type: %r)' % (username, auth_type))
		username = self.__canonicalize_username(username)
		try:
			pam.authenticate(username, password, **custom_prompts)
		except AuthenticationFailed as auth_failed:
			AUTH.error(str(auth_failed))
			raise
		except PasswordExpired as pass_expired:
			AUTH.info(str(pass_expired))
			if new_password is None:
				raise
			try:
				pam.change_password(username, password, new_password)
			except PasswordChangeFailed as change_failed:
				AUTH.error(str(change_failed))
				raise
			else:
				AUTH.info('Password change for %r was successful' % (username,))
				return (username, new_password)
		else:
			AUTH.info('Authentication for %r was successful' % (username,))
			return (username, password)
	def __canonicalize_username(self, username):  # type: (str) -> str
		try:
			lo, po = get_machine_connection(write=False)
			result = None
			if lo:
				attr = 'mailPrimaryAddress' if '@' in username else 'uid'
				result = lo.search(filter_format('(&(%s=%s)(objectClass=person))', (attr, username)), attr=['uid'], unique=True)
			if result and result[0][1].get('uid'):
				username = result[0][1]['uid'][0].decode('utf-8')
				AUTH.info('Canonicalized username: %r' % (username,))
		except (ldap.LDAPError, udm_errors.ldapError) as exc:
			# /etc/machine.secret missing or LDAP server not reachable
			AUTH.warn('Canonicalization of username was not possible: %s' % (exc,))
			reset_cache()
		except Exception:
			AUTH.error('Canonicalization of username failed: %s' % (traceback.format_exc(),))
		return username
	def __authentication_result(self, thread, result, pam, request, locale):  # type: (threads.Simple, Union[BaseException, Tuple[str, str], Dict[str, str]], PamAuth, Request, Optional[str]) -> None
		pam.end()
		if isinstance(result, BaseException) and not isinstance(result, (AuthenticationFailed, AuthenticationInformationMissing, PasswordExpired, PasswordChangeFailed, AccountExpired)):
			msg = ''.join(thread.trace + traceback.format_exception_only(*thread.exc_info[:2]))
			AUTH.error(msg)
		if isinstance(result, tuple):
			username, password = result
			result = {'username': username, 'password': password, 'auth_type': request.body.get('auth_type')}
		auth_result = AuthenticationResult(result, locale)
		self.signal_emit('authenticated', auth_result, request)