Source code for univention.admin.uldap

# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

"""
|UDM| wrapper around :py:mod:`univention.uldap` that replaces exceptions.
"""

from __future__ import absolute_import

import ldap
import time
from typing import Any, Callable, Dict, List, Optional, Tuple  # noqa: F401

import univention.debug as ud
import univention.uldap
from univention.admin import localization
import univention.admin.license
from univention.admin._ucr import configRegistry

translation = localization.translation('univention/admin')
_ = translation.translate

explodeDn = univention.uldap.explodeDn


[docs]class DN(object): """ A |LDAP| Distinguished Name. """ def __init__(self, dn): # type: (str) -> None self.dn = dn try: self._dn = ldap.dn.str2dn(self.dn) except ldap.DECODING_ERROR: raise ValueError('Malformed DN syntax: %r' % (self.dn,)) def __str__(self): return ldap.dn.dn2str(self._dn) def __unicode__(self): return unicode(str(self)) # noqa: F821 def __repr__(self): return '<%s %r>' % (type(self).__name__, str(self),) def __eq__(self, other): """ >>> DN('foo=1') == DN('foo=1') True >>> DN('foo=1') == DN('foo=2') False >>> DN('Foo=1') == DN('foo=1') True >>> DN('Foo=1') == DN('foo=2') False >>> DN('foo=1,bar=2') == DN('foo=1,bar=2') True >>> DN('bar=2,foo=1') == DN('foo=1,bar=2') False >>> DN('foo=1+bar=2') == DN('foo=1+bar=2') True >>> DN('bar=2+foo=1') == DN('foo=1+bar=2') True >>> DN('bar=2+Foo=1') == DN('foo=1+Bar=2') True >>> DN(r'foo=%s31' % chr(92)) == DN(r'foo=1') True """ return hash(self) == hash(other) def __ne__(self, other): return not self == other def __hash__(self): # TODO: attributes which's values are case insensitive should be respected return hash(tuple([tuple(sorted((x.lower(), y, z) for x, y, z in rdn)) for rdn in self._dn]))
[docs] @classmethod def set(cls, values): """ >>> len(DN.set(['CN=computers,dc=foo', 'cn=computers,dc=foo', 'cn = computers,dc=foo'])) 1 """ return set(map(cls, values))
[docs] @classmethod def values(cls, values): """ >>> DN.values(DN.set(['cn=foo', 'cn=bar']) - DN.set(['cn = foo'])) == {'cn=bar'} True """ return set(map(str, values))
[docs]def getBaseDN(host='localhost', port=None, uri=None): # type: (str, Optional[int], Optional[str]) -> str """ Return the naming context of the LDAP server. :param str host: The hostname of the LDAP server. :param int port: The TCP port number of the LDAP server. :param str uri: A complete LDAP URI. :returns: The distinguished name of the LDAP root. :rtype: str """ if not uri: if not port: port = int(configRegistry.get('ldap/server/port', 7389)) uri = "ldap://%s:%s" % (host, port) try: lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None) result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts']) return result[0][1]['namingContexts'][0].decode('utf-8') except ldap.SERVER_DOWN: time.sleep(60) lo = ldap.ldapobject.ReconnectLDAPObject(uri, trace_stack_limit=None) result = lo.search_s('', ldap.SCOPE_BASE, 'objectClass=*', ['NamingContexts']) return result[0][1]['namingContexts'][0].decode('utf-8')
[docs]def getAdminConnection(start_tls=2, decode_ignorelist=[]): # type: (int, List[str]) -> Tuple[univention.admin.uldap.access, univention.admin.uldap.position] """ Open a LDAP connection using the admin credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :return: A 2-tuple (LDAP-access, LDAP-position) :rtype: tuple[univention.admin.uldap.access, univention.admin.uldap.position] """ lo = univention.uldap.getAdminConnection(start_tls, decode_ignorelist=decode_ignorelist) pos = position(lo.base) return access(lo=lo), pos
[docs]def getMachineConnection(start_tls=2, decode_ignorelist=[], ldap_master=True): # type: (int, List[str], bool) -> Tuple[univention.admin.uldap.access, univention.admin.uldap.position] """ Open a LDAP connection using the machine credentials. :param int start_tls: Negotiate TLS with server. If `2` is given, the command will require the operation to be successful. :param decode_ignorelist: List of LDAP attribute names which shall be handled as binary attributes. :type decode_ignorelist: list[str] :param bool ldap_master: Open a connection to the Primary if True, to the preferred LDAP server otherwise. :return: A 2-tuple (LDAP-access, LDAP-position) :rtype: tuple[univention.admin.uldap.access, univention.admin.uldap.position] """ lo = univention.uldap.getMachineConnection(start_tls, decode_ignorelist=decode_ignorelist, ldap_master=ldap_master) pos = position(lo.base) return access(lo=lo), pos
def _err2str(err): # type: (Exception) -> str """ Convert exception arguments to string. :param Exception err: An exception instance. :returns: A concatenated string formatted from the exception :rtype: str """ msgs = [] for iarg in err.args: if isinstance(iarg, dict): msg = ': '.join([str(m) for m in (iarg.get('desc'), iarg.get('info')) if m]) else: msg = str(iarg) if msg: msgs.append(msg) if not msgs: msgs.append(': '.join([str(type(err).__name__), str(err)])) return '. '.join(msgs)
[docs]class domain(object): """ A |UDM| domain name. """ def __init__(self, lo, position): # type: (univention.admin.uldap.access, univention.admin.uldap.position) -> None """ :param univention.admin.uldap.access lo: A LDAP connection object. :param univention.admin.uldap.position position: A UDM position specifying the LDAP base container. """ self.lo = lo self.position = position self.domain = self.lo.get(self.position.getDomain(), attr=['sambaDomain', 'sambaSID', 'krb5RealmName'])
[docs] def getKerberosRealm(self): # type: () -> Optional[str] """ Return the name of the Kerberos realms. :returns: The name of the Kerberos realm. :rtype: str """ if 'krb5RealmName' not in self.domain: return None return self.domain['krb5RealmName'][0].decode('ASCII')
[docs]class position(object): """ The position of a |LDAP| container. Supports relative distinguished names. """ def __init__(self, base, loginDomain=u''): # type: (str, str) -> None """ :param str base: The base distinguished name. :param str loginDomain: The login domain name. """ if not base: raise univention.admin.uexceptions.insufficientInformation(_("There was no LDAP base specified.")) self.__loginDomain = loginDomain or base self.__base = base self.__pos = u"" self.__indomain = False
[docs] def setBase(self, base): # type: (str) -> None """ Set a new base distinguished name. :param str base: The new base distinguished name. """ self.__base = base
[docs] def setLoginDomain(self, loginDomain): # type: (str) -> None """ Set a new login domain name. :param str loginDomain: The new login domain name. """ self.__loginDomain = loginDomain
def __setPosition(self, pos): # type: (str) -> None self.__pos = pos self.__indomain = any('dc' == y[0] for x in ldap.dn.str2dn(self.__pos) for y in x)
[docs] def getDn(self): # type: () -> str """ Return the distinguished name. :returns: The absolute DN. :rtype: str """ return ldap.dn.dn2str(ldap.dn.str2dn(self.__pos) + ldap.dn.str2dn(self.__base))
[docs] def setDn(self, dn): # type: (str) -> None """ Set a new distinguished name. :param str dn: The new distinguished name. """ # strip out the trailing base from the DN; store relative dn dn = ldap.dn.str2dn(dn) base = ldap.dn.str2dn(self.getBase()) if dn[-len(base):] == base: dn = dn[:-len(base)] self.__setPosition(ldap.dn.dn2str(dn))
[docs] def getRdn(self): # type: () -> str """ Return the distinguished name relative to the LDAP base. :returns: The relative DN. :rtype: str """ return ldap.dn.explode_rdn(self.getDn())[0]
[docs] def getBase(self): # type: () -> str """ Return the LDAP base DN. :returns: The distinguished name of the LDAP base. :rtype: str """ return self.__base
[docs] def isBase(self): # type: () -> bool """ Check if the position equals the LDAP base DN. :returns: True if the position equals the base DN, False otherwise. :rtype: bool """ return access.compare_dn(self.getDn(), self.getBase())
[docs] def getDomain(self): # type: () -> str """ Return the distinguished name of the domain part of the position. :returns: The distinguished name. :rtype: str """ if not self.__indomain or self.getDn() == self.getBase(): return self.getBase() dn = [] for part in ldap.dn.str2dn(self.getDn())[::-1]: if not any('dc' == y[0] for y in part): break dn.append(part) return ldap.dn.dn2str(dn[::-1])
[docs] def getDomainConfigBase(self): # type: () -> str """ Return the distinguished name of the configuration container. :returns: The distinguished name. :rtype: str """ return u'cn=univention,' + self.getDomain()
[docs] def isDomain(self): # type: () -> bool """ Check if the position equals the domain DN. :returns: True if the position equals the domain DN, False otherwise. :rtype: bool """ return self.getDn() == self.getDomain()
[docs] def getLoginDomain(self): # type: () -> str """ Return the login domain name. :returns: The login domain name. :rtype: str """ return self.__loginDomain
[docs] def switchToParent(self): # type: () -> bool """ Switch position to parent container. :returns: False if already at the Base, True otherwise. :rtype: bool """ if self.isBase(): return False self.__setPosition(ldap.dn.dn2str(ldap.dn.str2dn(self.__pos)[1:])) return True
[docs]class access(object): """ A |UDM| class to access a |LDAP| server. """ @property def binddn(self): # type: () -> Optional[str] """ Return the distinguished name of the account. :returns: The distinguished name of the account (or `None` with |SAML|). :rtype: str """ return self.lo.binddn @property def bindpw(self): # type: () -> str """ Return the user password or credentials. :returns: The user password or credentials. :rtype: str """ return self.lo.bindpw @property def host(self): # type: () -> str """ Return the host name of the LDAP server. :returns: the host name of the LDAP server. :rtype: str """ return self.lo.host @property def port(self): # type: () -> int """ Return the TCP port number of the LDAP server. :returns: the TCP port number of the LDAP server. :rtype: int """ return self.lo.port @property def base(self): # type: () -> str """ Return the LDAP base of the LDAP server. :returns: the LDAP base of the LDAP server. :rtype: str """ return self.lo.base @property def start_tls(self): # type: () -> int return self.lo.start_tls def __init__(self, host='localhost', port=None, base=u'', binddn=u'', bindpw=u'', start_tls=2, lo=None, follow_referral=False): # type: (str, int, str, str, str, int, univention.uldap.access, bool) -> None """ :param str host: The hostname of the |LDAP| server. :param int port: The |TCP| port number of the |LDAP| server. :param str base: The base distinguished name. :param str binddn: The distinguished name of the account. :param str bindpw: The user password for simple authentication. :param int start_tls: Negotiate |TLS| with server. If `2` is given, the command will require the operation to be successful. :param univention.uldap.access lo: |LDAP| connection. :param:bool follow_referral: Follow |LDAP| referrals. """ if lo: self.lo = lo else: if not port: port = int(configRegistry.get('ldap/server/port', 7389)) try: self.lo = univention.uldap.access(host, port, base, binddn, bindpw, start_tls, follow_referral=follow_referral) except ldap.INVALID_CREDENTIALS: raise univention.admin.uexceptions.authFail(_("Authentication failed")) except ldap.UNWILLING_TO_PERFORM: raise univention.admin.uexceptions.authFail(_("Authentication failed")) self.require_license = False self.allow_modify = True self.licensetypes = ['UCS']
[docs] def bind(self, binddn, bindpw): # type: (str, str) -> None """ Do simple LDAP bind using DN and password. :param str binddn: The distinguished name of the account. :param str bindpw: The user password for simple authentication. """ try: self.lo.bind(binddn, bindpw) except ldap.INVALID_CREDENTIALS: raise univention.admin.uexceptions.authFail(_("Authentication failed")) except ldap.UNWILLING_TO_PERFORM: raise univention.admin.uexceptions.authFail(_("Authentication failed")) self.__require_licence()
[docs] def bind_saml(self, bindpw): # type: (str) -> None """ Do LDAP bind using SAML message. :param str bindpw: The SAML authentication cookie. """ try: return self.lo.bind_saml(bindpw) except (ldap.INVALID_CREDENTIALS, ldap.UNWILLING_TO_PERFORM): raise univention.admin.uexceptions.authFail(_("Authentication failed")) self.__require_licence()
def __require_licence(self): # type: () -> None if self.require_license: res = univention.admin.license.init_select(self.lo, 'admin') assert univention.admin.license._license self.licensetypes = univention.admin.license._license.types if res == 1: self.allow_modify = False raise univention.admin.uexceptions.licenseClients() elif res == 2: self.allow_modify = False raise univention.admin.uexceptions.licenseAccounts() elif res == 3: self.allow_modify = False raise univention.admin.uexceptions.licenseDesktops() elif res == 4: self.allow_modify = False raise univention.admin.uexceptions.licenseGroupware() elif res == 5: # Free for personal use edition raise univention.admin.uexceptions.freeForPersonalUse() # License Version 2: elif res == 6: self.allow_modify = False raise univention.admin.uexceptions.licenseUsers() elif res == 7: self.allow_modify = False raise univention.admin.uexceptions.licenseServers() elif res == 8: self.allow_modify = False raise univention.admin.uexceptions.licenseManagedClients() elif res == 9: self.allow_modify = False raise univention.admin.uexceptions.licenseCorporateClients() elif res == 10: self.allow_modify = False raise univention.admin.uexceptions.licenseDVSUsers() elif res == 11: self.allow_modify = False raise univention.admin.uexceptions.licenseDVSClients()
[docs] def unbind(self): # type: () -> None """ Unauthenticate. """ self.lo.unbind()
[docs] def whoami(self): # type: () -> str """ Return the distinguished name of the authenticated user. :returns: The distinguished name. :rtype: str """ return self.lo.whoami()
[docs] def requireLicense(self, require=True): # type: (bool) -> None """ Enable or disable the UCS licence check. :param bool require: `True` to require a valid licence. """ self.require_license = require
def _validateLicense(self): # type: () -> None """ Check if the UCS licence is valid. """ if self.require_license: univention.admin.license.select('admin')
[docs] def get_schema(self): # type: () -> ldap.schema.subentry.SubSchema """ Retrieve |LDAP| schema information from |LDAP| server. :returns: The |LDAP| schema. :rtype: ldap.schema.subentry.SubSchema """ return self.lo.get_schema()
[docs] @classmethod def compare_dn(cls, a, b): # type: (str, str) -> bool """ Compare two distinguished names for equality. :param str a: The first distinguished name. :param str b: A second distinguished name. :returns: True if the DNs are the same, False otherwise. :rtype: bool """ return univention.uldap.access.compare_dn(a, b)
[docs] def get(self, dn, attr=[], required=False, exceptions=False): # type: (str, List[str], bool, bool) -> Dict[str, List[bytes]] """ Return multiple attributes of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param attr: The list of attributes to fetch. :param bool required: Raise an exception instead of returning an empty dictionary. :param bool exceptions: Ignore. :returns: A dictionary mapping the requested attributes to a list of their values. :rtype: dict[str, list[str]] :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. """ return self.lo.get(dn, attr, required)
[docs] def getAttr(self, dn, attr, required=False, exceptions=False): # type: (str, str, bool, bool) -> List[bytes] """ Return a single attribute of a single LDAP object. :param str dn: The distinguished name of the object to lookup. :param str attr: The attribute to fetch. :param bool required: Raise an exception instead of returning an empty dictionary. :param bool exceptions: Ignore. :returns: A list of values. :rtype: list[bytes] :raises ldap.NO_SUCH_OBJECT: If the LDAP object is not accessible. """ return self.lo.getAttr(dn, attr, required)
[docs] def search(self, filter=u'(objectClass=*)', base=u'', scope=u'sub', attr=[], unique=False, required=False, timeout=-1, sizelimit=0, serverctrls=None, response=None): # type: (str, str, str, List[str], bool, bool, int, int, Optional[List[ldap.controls.LDAPControl]], Optional[Dict[str, ldap.controls.LDAPControl]]) -> List[Tuple[str, Dict[str, List[bytes]]]] """ Perform LDAP search and return values. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param attr: The list of attributes to fetch. :type attr: list[str] :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most `timeout` seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most `sizelimit` entries for a search. `0` for no limit. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of 2-tuples (dn, values) for each LDAP object, where values is a dictionary mapping attribute names to a list of values. :rtype: list[tuple[str, dict[str, list[str]]]] :raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found. :raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. :raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result. :raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded. :raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter. :raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ try: return self.lo.search(filter, base, scope, attr, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: raise univention.admin.uexceptions.noObject(_err2str(msg)) except ldap.INAPPROPRIATE_MATCHING as msg: raise univention.admin.uexceptions.insufficientInformation(_err2str(msg)) except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapTimeout(_err2str(msg)) except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg)) except ldap.FILTER_ERROR as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter)) except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg) except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def searchDn(self, filter=u'(objectClass=*)', base=u'', scope=u'sub', unique=False, required=False, timeout=-1, sizelimit=0, serverctrls=None, response=None): # type: (str, str, str, bool, bool, int, int, Optional[List[ldap.controls.LDAPControl]], Optional[Dict[str, ldap.controls.LDAPControl]]) -> List[str] """ Perform LDAP search and return distinguished names only. :param str filter: LDAP search filter. :param str base: the starting point for the search. :param str scope: Specify the scope of the search to be one of `base`, `base+one`, `one`, `sub`, or `domain` to specify a base object, base plus one-level, one-level, subtree, or children search. :param bool unique: Raise an exception if more than one object matches. :param bool required: Raise an exception instead of returning an empty dictionary. :param int timeout: wait at most timeout seconds for a search to complete. `-1` for no limit. :param int sizelimit: retrieve at most sizelimit entries for a search. `0` for no limit. :param serverctrls: a list of :py:class:`ldap.controls.LDAPControl` instances sent to the server along with the LDAP request. :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: A list of distinguished names. :rtype: list[str] :raises univention.admin.uexceptions.noObject: Indicates the target object cannot be found. :raises univention.admin.uexceptions.insufficientInformation: Indicates that the matching rule specified in the search filter does not match a rule defined for the attribute's syntax. :raises univention.admin.uexceptions.ldapTimeout: Indicates that the time limit of the LDAP client was exceeded while waiting for a result. :raises univention.admin.uexceptions.ldapSizelimitExceeded: Indicates that in a search operation, the size limit specified by the client or the server has been exceeded. :raises univention.admin.uexceptions.ldapError: Indicates that the search method was called with an invalid search filter. :raises univention.admin.uexceptions.ldapError: Indicates that the syntax of the DN is incorrect. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ try: return self.lo.searchDn(filter, base, scope, unique, required, timeout, sizelimit, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: raise univention.admin.uexceptions.noObject(_err2str(msg)) except ldap.INAPPROPRIATE_MATCHING as msg: raise univention.admin.uexceptions.insufficientInformation(_err2str(msg)) except (ldap.TIMEOUT, ldap.TIMELIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapTimeout(_err2str(msg)) except (ldap.SIZELIMIT_EXCEEDED, ldap.ADMINLIMIT_EXCEEDED) as msg: raise univention.admin.uexceptions.ldapSizelimitExceeded(_err2str(msg)) except ldap.FILTER_ERROR as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), filter)) except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), base), original_exception=msg) except ldap.LDAPError as msg: raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def getPolicies(self, dn, policies=None, attrs=None, result=None, fixedattrs=None): # type: (str, Optional[List[str]], Optional[Dict[str, List[Any]]], Any, Any) -> Dict[str, Dict[str, Any]] """ Return |UCS| policies for |LDAP| entry. :param str dn: The distinguished name of the |LDAP| entry. :param list policies: List of policy object classes... :param dict attrs: |LDAP| attributes. If not given, the data is fetched from LDAP. :param result: UNUSED! :param fixedattrs: UNUSED! :returns: A mapping of policy names to """ ud.debug(ud.ADMIN, ud.INFO, 'getPolicies modules dn %s result' % dn) return self.lo.getPolicies(dn, policies, attrs, result, fixedattrs)
[docs] def add(self, dn, al, exceptions=False, serverctrls=None, response=None): # type: (str, List[Tuple[str, Any]], bool, Optional[List[ldap.controls.LDAPControl]], Optional[Dict]) -> None """ Add LDAP entry at distinguished name and attributes in add_list=(attribute-name, old-values. new-values) or (attribute-name, new-values). :param str dn: The distinguished name of the object to add. :param al: The add-list of 2-tuples (attribute-name, new-values). :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :raises univention.admin.uexceptions.licenseDisableModify: if the UCS licence prohibits any modificcation :raises univention.admin.uexceptions.objectExists: if the LDAP object already exists. :raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions. :raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ self._validateLicense() if not self.allow_modify: ud.debug(ud.ADMIN, ud.ERROR, 'add dn: %s' % dn) raise univention.admin.uexceptions.licenseDisableModify() ud.debug(ud.LDAP, ud.ALL, 'add dn=%s al=%s' % (dn, al)) if exceptions: return self.lo.add(dn, al, serverctrls=serverctrls, response=response) try: return self.lo.add(dn, al, serverctrls=serverctrls, response=response) except ldap.ALREADY_EXISTS as msg: ud.debug(ud.LDAP, ud.ALL, 'add dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.objectExists(dn) except ldap.INSUFFICIENT_ACCESS as msg: ud.debug(ud.LDAP, ud.ALL, 'add dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: ud.debug(ud.LDAP, ud.ALL, 'add dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def modify(self, dn, changes, exceptions=False, ignore_license=False, serverctrls=None, response=None, rename_callback=None): # type: (str, List[Tuple[str, Any, Any]], bool, int, Optional[List[ldap.controls.LDAPControl]], Optional[Dict], Optional[Callable]) -> str """ Modify LDAP entry DN with attributes in changes=(attribute-name, old-values, new-values). :param str dn: The distinguished name of the object to modify. :param changes: The modify-list of 3-tuples (attribute-name, old-values, new-values). :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :param bool ignore_license: Ignore license check if True. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. :returns: The distinguished name. :rtype: str """ self._validateLicense() if not self.allow_modify and not ignore_license: ud.debug(ud.ADMIN, ud.ERROR, 'modify dn: %s' % dn) raise univention.admin.uexceptions.licenseDisableModify() ud.debug(ud.LDAP, ud.ALL, 'mod dn=%s ml=%s' % (dn, changes)) if exceptions: return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback) try: return self.lo.modify(dn, changes, serverctrls=serverctrls, response=response, rename_callback=rename_callback) except ldap.NO_SUCH_OBJECT as msg: ud.debug(ud.LDAP, ud.ALL, 'mod dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: ud.debug(ud.LDAP, ud.ALL, 'mod dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: ud.debug(ud.LDAP, ud.ALL, 'mod dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def rename(self, dn, newdn, move_childs=0, ignore_license=False, serverctrls=None, response=None): # type: (str, str, int, bool, Optional[List[ldap.controls.LDAPControl]], Optional[Dict]) -> None """ Rename a LDAP object. :param str dn: The old distinguished name of the object to rename. :param str newdn: The new distinguished name of the object to rename. :param int move_childs: Also rename the sub children. Must be `0` always as `1` is not implemented. :param bool ignore_license: Ignore license check if True. :param serverctrls: a list of ldap.controls.LDAPControl instances sent to the server along with the LDAP request :type serverctrls: list[ldap.controls.LDAPControl] :param dict response: An optional dictionary to receive the server controls of the result. """ if not move_childs == 0: raise univention.admin.uexceptions.noObject(_("Moving children is not supported.")) self._validateLicense() if not self.allow_modify and not ignore_license: ud.debug(ud.ADMIN, ud.WARN, 'move dn: %s' % dn) raise univention.admin.uexceptions.licenseDisableModify() ud.debug(ud.LDAP, ud.ALL, 'ren dn=%s newdn=%s' % (dn, newdn)) try: return self.lo.rename(dn, newdn, serverctrls=serverctrls, response=response) except ldap.NO_SUCH_OBJECT as msg: ud.debug(ud.LDAP, ud.ALL, 'ren dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: ud.debug(ud.LDAP, ud.ALL, 'ren dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: ud.debug(ud.LDAP, ud.ALL, 'ren dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def delete(self, dn, exceptions=False): # type: (str, bool) -> None """ Delete a LDAP object. :param str dn: The distinguished name of the object to remove. :param bool exceptions: Raise the low level exception instead of the wrapping UDM exceptions. :raises univention.admin.uexceptions.noObject: if the object does not exist. :raises univention.admin.uexceptions.permissionDenied: if the user does not have the required permissions. :raises univention.admin.uexceptions.ldapError: if the syntax of the DN is invalid. :raises univention.admin.uexceptions.ldapError: on any other LDAP error. """ self._validateLicense() if exceptions: try: return self.lo.delete(dn) except ldap.INSUFFICIENT_ACCESS: raise univention.admin.uexceptions.permissionDenied() ud.debug(ud.LDAP, ud.ALL, 'del dn=%s' % (dn,)) try: return self.lo.delete(dn) except ldap.NO_SUCH_OBJECT as msg: ud.debug(ud.LDAP, ud.ALL, 'del dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.noObject(dn) except ldap.INSUFFICIENT_ACCESS as msg: ud.debug(ud.LDAP, ud.ALL, 'del dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.permissionDenied() except ldap.INVALID_DN_SYNTAX as msg: raise univention.admin.uexceptions.ldapError('%s: %s' % (_err2str(msg), dn), original_exception=msg) except ldap.LDAPError as msg: ud.debug(ud.LDAP, ud.ALL, 'del dn=%s err=%s' % (dn, msg)) raise univention.admin.uexceptions.ldapError(_err2str(msg), original_exception=msg)
[docs] def parentDn(self, dn): # type: (str) -> Optional[str] """ Return the parent container of a distinguished name. :param str dn: The distinguished name. :return: The parent distinguished name or None if the LDAP base is reached. :rtype: str or None """ return self.lo.parentDn(dn)
[docs] def explodeDn(self, dn, notypes=False): # type: (str, bool) -> List[str] """ Break up a DN into its component parts. :param str dn: The distinguished name. :param bool notypes: Return only the component's attribute values if True. Also the attribute types if False. :return: A list of relative distinguished names. :rtype: list[str] """ return self.lo.explodeDn(dn, notypes)