Source code for univention.s4connector.s4.dc

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Univention S4 Connector
#  dc sync
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.

import ldap
import univention.debug2 as ud
import univention.s4connector.s4
import univention.admin.uldap
from univention.s4connector.s4 import format_escaped, decode_sid

import univention.admin.handlers
import univention.admin.handlers.settings.sambadomain
import univention.admin.handlers.container.dc


def _unixTimeInverval2seconds(unixTime):
	if not isinstance(unixTime, list):
		return unixTime

	if len(unixTime) != 2:
		ud.debug(ud.LDAP, ud.WARN, 'dc _unixTimeInverval2seconds: Not a valid time format: %s' % unixTime)
		return 0

	if int(unixTime[0]) < 0:
		return 0

	if unixTime[1] == 'seconds':
		return int(unixTime[0])
	elif unixTime[1] == 'minutes':
		return int(unixTime[0]) * 60
	elif unixTime[1] == 'hours':
		return int(unixTime[0]) * 3600  # 60 * 60
	elif unixTime[1] == 'days':
		return int(unixTime[0]) * 86400  # 60 * 60 * 24
	else:
		ud.debug(ud.LDAP, ud.WARN, 'dc _unixTimeInverval2seconds: Not a valid time unit: %s' % unixTime)
		return 0

# Time interval in S4 / AD is often 100-nanosecond intervals:
# http://msdn.microsoft.com/en-us/library/windows/desktop/ms676863%28v=vs.85%29.aspx


def _s2nano(seconds):
	return seconds * 10000000


def _nano2s(nanoseconds):
	return int(nanoseconds / 10000000)


[docs]def ucs2con(s4connector, key, object): ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: Object (%s): %s' % (object['dn'], object)) s4base_dn, s4base_attr = s4connector.lo_s4.lo.search_s(s4connector.s4_ldap_base, ldap.SCOPE_BASE, '(objectClass=*)')[0] ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: S4 object: %r' % (s4base_dn,)) ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: S4 object: %r' % (s4base_attr,)) if b'univentionBase' in object['attributes'].get('objectClass'): # DC object → sync GPO if s4connector.configRegistry.is_true('connector/s4/mapping/gpo', True): ucs_val = object['attributes'].get('msGPOLink', [None])[0] # msGPOLink is a single value s4_val = s4base_attr.get('msGPOLink') if ucs_val != s4_val: s4connector.lo_s4.lo.modify_s(s4connector.s4_ldap_base, [(ldap.MOD_REPLACE, 'gPLink', ucs_val)]) elif b'sambaDomain' in object['attributes'].get('objectClass'): # Samba Domain object ml = [] sync_times = [('sambaMaxPwdAge', 'maxPwdAge'), ('sambaMinPwdAge', 'minPwdAge'), ('sambaLockoutDuration', 'lockoutDuration')] for (ucs_attr, s4_attr) in sync_times: ucs_time = int(object['attributes'].get(ucs_attr, [0])[0]) s4_time = _nano2s(int(s4base_attr.get(s4_attr, [0])[0]) * -1) ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: ucs_time (%s): %s' % (ucs_attr, ucs_time)) ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: s4-time (%s): %s' % (s4_attr, s4_time)) if ucs_time != s4_time: if ucs_time > 0: s4_time = str(_s2nano(ucs_time) * -1) else: s4_time = "0" ml.append((ldap.MOD_REPLACE, s4_attr, [s4_time.encode('ASCII')])) sync_integers = [('sambaPwdHistoryLength', 'pwdHistoryLength'), ('sambaMinPwdLength', 'minPwdLength'), ('univentionSamba4pwdProperties', 'pwdProperties')] for (ucs_attr, s4_attr) in sync_integers: ucs_val = object['attributes'].get(ucs_attr, b'0') s4_val = s4base_attr.get(s4_attr, [b'0'])[0] if ucs_val != s4_val: ml.append((ldap.MOD_REPLACE, s4_attr, ucs_val)) if ml: ud.debug(ud.LDAP, ud.INFO, 'dc ucs2con: S4 object modlist: %s' % (ml)) s4connector.lo_s4.lo.modify_s(s4connector.s4_ldap_base, ml) return True
[docs]def con2ucs(s4connector, key, object): ud.debug(ud.LDAP, ud.INFO, 'dc con2ucs: Object (%s): %s' % (object['dn'], object)) # Search sambaDomainname object via sambaSID object_sid = decode_sid(object['attributes']['objectSid'][0]) sambadomainnameObject = univention.admin.handlers.settings.sambadomain.lookup(None, s4connector.lo, format_escaped('sambaSID={0!e}', object_sid)) if len(sambadomainnameObject) > 1: ud.debug(ud.LDAP, ud.WARN, 'dc con2ucs: Found more than one sambaDomainname object with sambaSID %r' % (object_sid,)) elif len(sambadomainnameObject) == 1: # Use the first sambaDomain sambadomainnameObject = sambadomainnameObject[0] # Do we modify this UCS object modify = False sync_times = [('maxPasswordAge', 'maxPwdAge'), ('minPasswordAge', 'minPwdAge'), ('lockoutDuration', 'lockoutDuration')] for (ucs_attr, s4_attr) in sync_times: ucs_time = _unixTimeInverval2seconds(sambadomainnameObject.get(ucs_attr, 0)) s4_time = _nano2s(int(object['attributes'].get(s4_attr, [0])[0]) * -1) if ucs_time != s4_time: sambadomainnameObject[ucs_attr] = [str(s4_time), 'seconds'] modify = True sync_integers = [('passwordHistory', 'pwdHistoryLength'), ('passwordLength', 'minPwdLength'), ('domainPwdProperties', 'pwdProperties')] for (ucs_attr, s4_attr) in sync_integers: ucs_val = sambadomainnameObject.get(ucs_attr, 0) s4_val = object['attributes'].get(s4_attr, [None])[0] if ucs_val != s4_val: sambadomainnameObject[ucs_attr] = s4_val.decode('UTF-8') modify = True if modify: sambadomainnameObject.modify() if s4connector.configRegistry.is_true('connector/s4/mapping/gpo', True): # Search DC object via ldap search dn, attr = s4connector.lo.search('objectClass=*', scope='base')[0] ml = [] ucs_val = attr.get('msGPOLink') s4_val = object['attributes'].get('gPLink') if ucs_val != s4_val: if b'msGPO' not in attr.get('objectClass', []): ml.append(('objectClass', b'', b'msGPO')) ml.append(('msGPOLink', ucs_val, s4_val)) if ml: s4connector.lo.modify(dn, ml) return True
[docs]def identify(dn, attr, canonical=0): return bool({b'univentionBase', b'sambaDomain'} & set(attr.get('objectClass', [])))