Source code for univention.testing.ucs_samba

from __future__ import print_function

import contextlib
import re
import socket
import sqlite3
import subprocess
import time
from typing import Any, Dict, Iterator, List, Optional, Union  # noqa: F401

import ldap
import six

import univention.config_registry as config_registry
from univention.testing.utils import package_installed

if not six.PY2:
	import ldb
	from samba.auth import system_session
	from samba.samdb import SamDB
	from samba.param import LoadParm
else:
	class ldb(object):
		LdbError = None
		SCOPE_SUBTREE = 2
		ERR_NO_SUCH_OBJECT = 32
		ERR_INVALID_DN_SYNTAX = 34


[docs]class DRSReplicationFailed(Exception): pass
[docs]class WaitForS4ConnectorTimeout(Exception): pass
[docs]@contextlib.contextmanager def password_policy(complexity=False, minimum_password_age=0, maximum_password_age=3): # type: (bool, int, int) -> Iterator[None] if not package_installed('univention-samba4'): print('skipping samba password policy adjustment') yield return min_pwd_age = subprocess.check_output('samba-tool domain passwordsettings show | grep "Minimum password age" | sed s/[^0-9]*/""/', shell=True).strip() max_pwd_age = subprocess.check_output('samba-tool domain passwordsettings show | grep "Maximum password age" | sed s/[^0-9]*/""/', shell=True).strip() pwd_complexity = subprocess.check_output('samba-tool domain passwordsettings show | grep complexity | sed "s/Password complexity: //"', shell=True).strip() if complexity != pwd_complexity or str(minimum_password_age) != min_pwd_age or str(maximum_password_age) != max_pwd_age: subprocess.call(['samba-tool', 'domain', 'passwordsettings', 'set', '--min-pwd-age', str(minimum_password_age), '--max-pwd-age', str(maximum_password_age), '--complexity', 'on' if complexity else 'off']) yield if complexity != pwd_complexity or str(minimum_password_age) != min_pwd_age: subprocess.call(['samba-tool', 'domain', 'passwordsettings', 'set', '--min-pwd-age', min_pwd_age, '--max-pwd-age', max_pwd_age, '--complexity', pwd_complexity])
[docs]def wait_for_drs_replication(*args, **kwargs): # type: (*Any, **Any) -> None if six.PY2: process = subprocess.Popen(['/usr/bin/python3', '-'], stdin=subprocess.PIPE) stdout, stderr = process.communicate(b''' import ldb import sys from univention.testing.ucs_samba import wait_for_drs_replication, DRSReplicationFailed try: wait_for_drs_replication(*%s, **%s) except DRSReplicationFailed as exc: print(repr(exc), file=sys.stderr) sys.exit(2) except ldb.LdbError as exc: print(repr(exc), file=sys.stderr) sys.exit(3) ''' % (repr(args).encode('UTF-8'), repr(kwargs).encode('UTF-8'))) if process.returncode == 2: raise DRSReplicationFailed((stderr or b'').decode('UTF-8', 'replace')) elif process.returncode: raise Exception((stderr or b'').decode('UTF-8', 'replace')) return return _wait_for_drs_replication(*args, **kwargs)
def _wait_for_drs_replication(ldap_filter, attrs=None, base=None, scope=ldb.SCOPE_SUBTREE, lp=None, timeout=360, delta_t=1, verbose=True, should_exist=True, controls=None): # type: (str, Union[List[str], None, str], Optional[str], int, Optional[LoadParm], int, int, bool, bool, Optional[List[str]]) -> None if not package_installed('univention-samba4'): if package_installed('univention-samba'): time.sleep(15) print('Sleeping 15 seconds as a workaround for http://forge.univention.org/bugzilla/show_bug.cgi?id=52145') elif verbose: print('wait_for_drs_replication(): skip, univention-samba4 not installed.') return if not attrs: attrs = ['dn'] elif not isinstance(attrs, list): attrs = [attrs] if not lp: lp = LoadParm() lp.load('/etc/samba/smb.conf') samdb = SamDB("tdb://%s" % lp.private_path("sam.ldb"), session_info=system_session(lp), lp=lp) if not controls: controls = ["domain_scope:0"] if base is None: ucr = config_registry.ConfigRegistry() ucr.load() base = ucr['samba4/ldap/base'] else: if len(ldap.dn.str2dn(base)[0]) > 1: if verbose: print('wait_for_drs_replication(): skip, multiple RDNs are not supported') return if not base: if verbose: print('wait_for_drs_replication(): skip, no samba domain found') return if verbose: print("Waiting for DRS replication, filter: %r, base: %r, scope: %r, should_exist: %r" % (ldap_filter, base, scope, should_exist), end=' ') t = t0 = time.time() while t < t0 + timeout: try: res = samdb.search(base=base, scope=scope, expression=ldap_filter, attrs=attrs, controls=controls) if bool(res) is bool(should_exist): if verbose: print("\nDRS replication took %d seconds" % (t - t0, )) return # res except ldb.LdbError as exc: (_num, msg) = exc.args if _num == ldb.ERR_INVALID_DN_SYNTAX: raise if _num == ldb.ERR_NO_SUCH_OBJECT and not should_exist: if verbose: print("\nDRS replication took %d seconds" % (t - t0, )) return print("Error during samdb.search: %s" % (msg, )) print('.', end=' ') time.sleep(delta_t) t = time.time() raise DRSReplicationFailed("DRS replication for filter: %r failed due to timeout after %d sec." % (ldap_filter, t - t0))
[docs]def get_available_s4connector_dc(): # type: () -> str cmd = ("/usr/bin/univention-ldapsearch", "-LLL", "(univentionService=S4 Connector)", "uid") p = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout, _stderr = p.communicate() if not stdout: print("WARNING: Automatic S4 Connector host detection failed") return "" matches = re.compile(r'^uid: (.*)\$$', re.M).findall(stdout.decode('utf-8', 'replace')) if len(matches) == 1: return matches[0] elif len(matches) == 0: print("WARNING: Automatic S4 Connector host detection failed") return "" # check if this is UCS@school cmd = ("/usr/bin/univention-ldapsearch", "-LLL", "(univentionService=UCS@school)", "dn") p = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout, _stderr = p.communicate() if not stdout: print("ERROR: Automatic S4 Connector host detection failed: Found %s S4 Connector services" % len(matches)) return "" # Look for replicating DCs dcs_replicating_with_this_one = [] for s4c in matches: cmd = ("/usr/bin/samba-tool", "drs", "showrepl", s4c) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, _stderr = p.communicate() if p.returncode != 0: continue dcs_replicating_with_this_one.append(s4c) if len(dcs_replicating_with_this_one) == 1: return dcs_replicating_with_this_one[0] else: print("ERROR: Automatic S4 Connector host detection failed: Replicating with %s S4 Connector services" % len(dcs_replicating_with_this_one)) return ""
[docs]def force_drs_replication(source_dc=None, destination_dc=None, partition_dn=None, direction="in"): # type: (Optional[str], Optional[str], Optional[str], str) -> int if not package_installed('univention-samba4'): print('force_drs_replication(): skip, univention-samba4 not installed.') return 0 src = source_dc or get_available_s4connector_dc() if not src: return 1 dst = destination_dc or socket.gethostname() if src == dst: return 0 if not partition_dn: ucr = config_registry.ConfigRegistry() ucr.load() partition_dn = str(ucr.get('samba4/ldap/base')) print("USING partition_dn:", partition_dn) cmd = ("/usr/bin/samba-tool", "drs", "replicate", dst, src, partition_dn) return subprocess.call(cmd)
def _ldap_replication_complete(verbose=True): # type: (bool) -> bool kwargs = {} # type: Dict[str, Any] if not verbose: kwargs = {'stdout': open('/dev/null', 'w'), 'stderr': subprocess.STDOUT} return subprocess.call('/usr/lib/nagios/plugins/check_univention_replication', **kwargs) == 0
[docs]def wait_for_s4connector(timeout=360, delta_t=1, s4cooldown_t=5): # type: (int, int, int) -> int ucr = config_registry.ConfigRegistry() ucr.load() if not package_installed('univention-s4-connector'): print('wait_for_s4connector(): skip, univention-s4-connector not installed.') return 0 if ucr.is_false('connector/s4/autostart'): print('wait_for_s4connector(): skip, connector/s4/autostart is set to false.') return 0 conn = sqlite3.connect('/etc/univention/connector/s4internal.sqlite') c = conn.cursor() static_count = 0 replication_complete = False highestCommittedUSN = -1 lastUSN = -1 t = t0 = time.time() while t < t0 + timeout: time.sleep(delta_t) if not _ldap_replication_complete(verbose=False): continue else: if not replication_complete: print('Start waiting for S4-Connector replication') replication_complete = True previous_highestCommittedUSN = highestCommittedUSN ldbresult = subprocess.Popen([ 'ldbsearch', '--url', '/var/lib/samba/private/sam.ldb', '--scope', 'base', '--basedn', '', 'highestCommittedUSN', ], stdout=subprocess.PIPE) assert ldbresult.stdout for chunk in ldbresult.stdout: line = chunk.decode('utf-8').strip() if line.startswith('highestCommittedUSN: '): highestCommittedUSN = int(line[len('highestCommittedUSN: '):]) break else: raise KeyError('No highestCommittedUSN in ldbsearch') previous_lastUSN = lastUSN c.execute('select value from S4 where key=="lastUSN"') lastUSN = int(c.fetchone()[0]) if not (lastUSN == highestCommittedUSN and lastUSN == previous_lastUSN and highestCommittedUSN == previous_highestCommittedUSN): static_count = 0 print('Reset counter') else: static_count += 1 print('Counter: {}; highestCommittedUSN: {!r}; lastUSN: {!r}'.format(static_count, highestCommittedUSN, lastUSN)) if static_count * delta_t >= s4cooldown_t: return 0 t = time.time() conn.close() raise WaitForS4ConnectorTimeout()
[docs]def append_dot(verify_list): # type: (List[str]) -> List[str] """The S4-Connector appends dots to various dns records. Helper function to adjust a list.""" if not package_installed('univention-s4-connector'): return verify_list return ['%s.' % (x,) for x in verify_list]