Source code for univention.admin.password

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
|UDM| password encryption methods.
"""

from __future__ import absolute_import

import re
import bcrypt
import hashlib
from typing import List, Optional, Tuple  # noqa: F401

import heimdal
import passlib.hash

import univention.debug as ud
from univention.admin._ucr import configRegistry

RE_PASSWORD_SCHEME = re.compile(r'^{(\w+)}(!?)(.*)', re.I)


[docs]def crypt(password, method_id=None, salt=None): # type: (str, Optional[str], Optional[str]) -> str """ Return crypt hash. :param password: password string. :param method_id: optional hash type, MD5, SHA256/SHA-256, SHA512/SHA-512. :param salt: salt for randomize the hashing. :returns: the hashed password string. """ hashing_method = configRegistry.get('password/hashing/method', 'sha-512').upper() if salt is None: salt = '' valid = [ '.', '/', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'] urandom = open("/dev/urandom", "rb") for i in range(0, 16): # up to 16 bytes of salt are evaluated by crypt(3), overhead is ignored o = ord(urandom.read(1)) while not o < 256 // len(valid) * len(valid): # make sure not to skew the distribution when using modulo o = ord(urandom.read(1)) salt = salt + valid[(o % len(valid))] urandom.close() if method_id is None: method_id = { 'MD5': '1', 'SHA256': '5', 'SHA-256': '5', 'SHA512': '6', 'SHA-512': '6', }.get(hashing_method, '6') from crypt import crypt as _crypt return _crypt(password, '$%s$%s$' % (method_id, salt, ))
[docs]def bcrypt_hash(password): # type: (str) -> str """ Return bcrypt hash. :param password: password string. :returns: the hashed password string. """ cost_factor = int(configRegistry.get('password/hashing/bcrypt/cost_factor', '12')) prefix = configRegistry.get('password/hashing/bcrypt/prefix', '2b').encode('utf8') salt = bcrypt.gensalt(rounds=cost_factor, prefix=prefix) return bcrypt.hashpw(password.encode('utf-8'), salt).decode('ASCII')
[docs]def ntlm(password): # type: (str) -> Tuple[str, str] """ Return tuple with NT and LanMan hash. :param password: password string. :returns: 2-tuple (NT, LanMan) """ nt = passlib.hash.nthash.hash(password).upper() if configRegistry.is_true('password/samba/lmhash', False): lm = passlib.hash.lmhash.hash(password).upper() else: lm = '' return (nt, lm)
[docs]def krb5_asn1(principal, password, krb5_context=None): # type: (str, str, Optional[heimdal.context]) -> List[bytes] """ Generate Kerberos password hashes. :param principal: Kerberos principal name. :param password: password string. :param krb5_context: optional Kerberos context. :returns: list of ASN1 encoded Kerberos hashes. """ list = [] if not krb5_context: krb5_context = heimdal.context() for krb5_etype in krb5_context.get_permitted_enctypes(): if str(krb5_etype) == 'des3-cbc-md5' and configRegistry.is_false('password/krb5/enctype/des3-cbc-md5', True): continue krb5_principal = heimdal.principal(krb5_context, principal) krb5_keyblock = heimdal.keyblock(krb5_context, krb5_etype, password, krb5_principal) krb5_salt = heimdal.salt(krb5_context, krb5_principal) list.append(heimdal.asn1_encode_key(krb5_keyblock, krb5_salt, 0)) return list
[docs]def is_locked(password): # type: (str) -> bool """ Check is the password (hash) is locked :param password: password hash. :returns: `True` when locked, `False` otherwise. >>> is_locked('foo') False >>> is_locked('{crypt}$1$foo') False >>> is_locked('{crypt}!$1$foo') True >>> is_locked('{KINIT}') False >>> is_locked('{LANMAN}!') True """ match = RE_PASSWORD_SCHEME.match(password or '') return match is not None and '!' == match.group(2)
[docs]def unlock_password(password): # type: (str) -> str """ Remove prefix from password used for locking. :param password: password hash. :returns: the unlocked password hash. >>> unlock_password('{crypt}!$1$foo') '{crypt}$1$foo' >>> unlock_password('{LANMAN}!') '{LANMAN}' >>> unlock_password('{SASL}!') '{SASL}' >>> unlock_password('{KINIT}!') '{KINIT}' >>> unlock_password('{BCRYPT}!') '{BCRYPT}' """ if is_locked(password): match = RE_PASSWORD_SCHEME.match(password).groups() password = '{%s}%s' % (match[0], match[2]) return password
[docs]def lock_password(password): # type: (str) -> str """ Add prefix to password used for locking. :param password: password hash. :returns: the locked password hash. >>> lock_password('{crypt}$1$foo') '{crypt}!$1$foo' >>> lock_password('{LANMAN}') '{LANMAN}!' >>> lock_password('{SASL}') '{SASL}!' >>> lock_password('{KINIT}') '{KINIT}!' >>> lock_password('{BCRYPT}') '{BCRYPT}!' >>> lock_password('foo').startswith('{crypt}!$') True """ # cleartext password? if not RE_PASSWORD_SCHEME.match(password): if configRegistry.is_true('password/hashing/bcrypt'): return "{BCRYPT}!%s" % (bcrypt_hash(password)) return "{crypt}!%s" % (crypt(password)) if not is_locked(password): match = RE_PASSWORD_SCHEME.match(password).groups() password = '{%s}!%s' % (match[0], match[2]) return password
[docs]def password_is_auth_saslpassthrough(password): # type: (str) -> bool """ Check if the password hash indicates the use of |SASL|. :param apssword: password hash. :returns: `True` is |SASL| shall be used, `False` otherwise. """ return password.startswith('{SASL}') and configRegistry.get('directory/manager/web/modules/users/user/auth/saslpassthrough', 'no').lower() == 'keep'
[docs]def get_password_history(password, pwhistory, pwhlen): # type: (str, str, int) -> str """ Append the given password as hash to the history of password hashes :param password: the new password. :param pwhistory: history of previous password hashes. :param pwhlen: length of the password history. :returns: modified password hash history. >>> get_password_history("a", "b", 0) 'b' >>> len(get_password_history("a", "", 1).split(' ')) 1 >>> len(get_password_history("a", "b", 1).split(' ')) 1 >>> len(get_password_history("a", "b", 2).split(' ')) 2 """ # create hash if configRegistry.is_true('password/hashing/bcrypt'): newpwhash = "{BCRYPT}%s" % (bcrypt_hash(password)) else: newpwhash = crypt(password) # this preserves a temporary disabled history if pwhlen > 0: # split the history pwlist = pwhistory.strip().split(' ') # append new hash pwlist.append(newpwhash) # strip old hashes pwlist = pwlist[-pwhlen:] # build history pwhistory = ' '.join(pwlist) return pwhistory
[docs]def password_already_used(password, pwhistory): # type: (str, str) -> bool """ Check if the password is already used in the password hash history. :param password: new password hash. :param pwhistory: history of previous password hashes. :returns: `True` when already used, `False` otherwise, >>> password_already_used('a', '') False >>> password_already_used('a', 'b') False >>> password_already_used('a', 'b ' + crypt('a')) True """ for line in pwhistory.split(" "): linesplit = line.split("$") # $method_id$salt$password_hash try: if linesplit[0] == '{BCRYPT}': password_hash = line[len('{BCRYPT}'):] if bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('ASCII')): return True else: password_hash = crypt(password, linesplit[1], linesplit[2]) except IndexError: # old style password history entry, no method id/salt in there hash_algorithm = hashlib.new("sha1") hash_algorithm.update(password.encode("utf-8")) password_hash = hash_algorithm.hexdigest().upper() if password_hash == line: return True return False
[docs]class PasswortHistoryPolicy(object): """ Policy for handling history of password hashes. """ def __init__(self, pwhistoryPolicy): super(PasswortHistoryPolicy, self).__init__() self.pwhistoryPolicy = pwhistoryPolicy self.pwhistoryLength = None self.pwhistoryPasswordLength = 0 self.pwhistoryPasswordCheck = False self.expiryInterval = 0 if pwhistoryPolicy: try: self.pwhistoryLength = max(0, int(pwhistoryPolicy['length'] or 0)) except ValueError: ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (history length): %r' % (pwhistoryPolicy.dn,)) try: self.pwhistoryPasswordLength = max(0, int(pwhistoryPolicy['pwLength'] or 0)) except ValueError: ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (password length): %r' % (pwhistoryPolicy.dn,)) self.pwhistoryPasswordCheck = (pwhistoryPolicy['pwQualityCheck'] or '').lower() in ['true', '1'] try: self.expiryInterval = max(0, int(pwhistoryPolicy['expiryInterval'] or 0)) except ValueError: ud.debug(ud.ADMIN, ud.WARN, 'Corrupt Password history policy (expiry interval): %r' % (pwhistoryPolicy.dn,))