# -*- coding: utf-8 -*-
#
# Copyright 2004-2022 Univention GmbH
#
# https://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <https://www.gnu.org/licenses/>.
"""
|UDM| module for |DNS| reverse pointer records (PTR)
"""
from univention.admin.layout import Tab, Group
import univention.admin
import univention.admin.handlers
import univention.admin.localization
from univention.admin.filter import (expression, conjunction)
from univention.admin.handlers.dns import ARPA_IP4, ARPA_IP6
import univention.debug as ud
import ipaddress
translation = univention.admin.localization.translation('univention.admin.handlers.dns')
_ = translation.translate
module = 'dns/ptr_record'
operations = ['add', 'edit', 'remove', 'search']
columns = ['ptr_record']
superordinate = 'dns/reverse_zone'
childs = False
short_description = _('DNS: Pointer record')
object_name = _('Pointer record')
object_name_plural = _('Pointer records')
long_description = _('Map IP addresses back to hostnames.')
options = {
	'default': univention.admin.option(
		short_description=short_description,
		default=True,
		objectClasses=['top', 'dNSZone'],
	),
}
property_descriptions = {
	'address': univention.admin.property(
		short_description=_('Reverse address'),
		long_description=_('The host part of the IP address in reverse notation (e.g. \"172.16.1.2/16\" -> \"2.1\" or \"2001:0db8:0100::0007:0008/96\" -> \"8.0.0.0.7.0.0.0\").'),
		syntax=univention.admin.syntax.dnsPTR,
		required=True,
		identifies=True,
	),
	'ip': univention.admin.property(
		short_description=_('IP Address'),
		long_description='',
		syntax=univention.admin.syntax.ipAddress,
		include_in_default_search=True,
	),
	'ptr_record': univention.admin.property(
		short_description=_('Pointer record'),
		long_description=_("FQDNs must end with a dot."),
		syntax=univention.admin.syntax.dnsName,
		multivalue=True,
		include_in_default_search=True,
		required=True,
	),
}
layout = [
	Tab(_('General'), _('Basic settings'), layout=[
		Group(_('General pointer record settings'), layout=[
			['ip', 'ptr_record'],
		]),
	]),
]
mapping = univention.admin.mapping.mapping()
mapping.register('address', 'relativeDomainName', None, univention.admin.mapping.ListToString, encoding='ASCII')
mapping.register('ptr_record', 'pTRRecord', encoding='ASCII')
[docs]def ipv6(string):
	"""
	>>> ipv6('0123456789abcdef0123456789abcdef')
	'0123:4567:89ab:cdef:0123:4567:89ab:cdef'
	"""
	assert len(string) == 32, string
	return ':'.join(string[i:i + 4] for i in range(0, 32, 4)) 
[docs]def calc_ip(rev, subnet):
	"""
	>>> calc_ip(rev='8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0', subnet='0001:0002:0003:0').exploded
	'0001:0002:0003:0004:0005:0006:0007:0008'
	>>> calc_ip(rev='4.3', subnet='1.2').exploded
	'1.2.3.4'
	"""
	parts = rev.split('.')
	parts.reverse()
	if ':' in subnet:
		string = ''.join(subnet.split(':') + parts)
		ip = ipaddress.IPv6Address(u'%s' % (ipv6(string),))
	else:
		octets = subnet.split('.') + parts
		assert len(octets) == 4, octets
		addr = '.'.join(octets)
		ip = ipaddress.IPv4Address(u'%s' % (addr,))
	return ip 
[docs]def calc_rev(ip, subnet):
	"""
	>>> calc_rev(ip='1.2.3.4', subnet='1.2')
	'4.3'
	>>> calc_rev(ip='0001:0002:0003:0004:0005:0006:0007:0008', subnet='0001:0002:0003:0')
	'8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0'
	>>> calc_rev(ip='1:2:3:4:5:6:7:8', subnet='0001:0002:0003:0')
	'8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0'
	"""
	if ':' in subnet:
		string = ''.join(subnet.split(':'))
		prefix = len(string)
		assert 1 <= prefix < 32
		string += '0' * (32 - prefix)
		net = ipaddress.IPv6Network(u'%s/%d' % (ipv6(string), 4 * prefix), strict=False)
		addr = ipaddress.IPv6Address(u'%s' % (ip,))
		host = ''.join(addr.exploded.split(':'))
	else:
		octets = subnet.split('.')
		prefix = len(octets)
		assert 1 <= prefix < 4
		octets += ['0'] * (4 - prefix)
		net = ipaddress.IPv4Network(u'%s/%d' % ('.'.join(octets), 8 * prefix), strict=False)
		addr = ipaddress.IPv4Address(u'%s' % (ip,))
		host = addr.exploded.split('.')
	if addr not in net:
		raise ValueError()
	return '.'.join(reversed(host[prefix:])) 
[docs]class object(univention.admin.handlers.simpleLdap):
	module = module
[docs]	def description(self):
		try:
			return calc_ip(self.info['address'], self.superordinate.info['subnet']).compressed
		except (LookupError, ValueError, AssertionError) as ex:
			ud.debug(ud.ADMIN, ud.WARN, 'Failed to parse dn=%s: (%s)' % (self.dn, ex))
			return super(object, self).description() 
[docs]	def open(self):
		super(object, self).open()
		try:
			self.info['ip'] = calc_ip(self.info['address'], self.superordinate.info['subnet']).compressed
			self.save()
		except (LookupError, ValueError, AssertionError) as ex:
			ud.debug(ud.ADMIN, ud.WARN, 'Failed to parse dn=%s: (%s)' % (self.dn, ex)) 
[docs]	def ready(self):
		old_ip = self.oldinfo.get('ip')
		new_ip = self.info.get('ip')
		if old_ip != new_ip:
			try:
				self.info['address'] = calc_rev(new_ip, self.superordinate.info['subnet'])
			except (LookupError, ValueError, AssertionError) as ex:
				ud.debug(ud.ADMIN, ud.WARN, 'Failed to handle address: dn=%s addr=%r (%s)' % (self.dn, new_ip, ex))
				raise univention.admin.uexceptions.InvalidDNS_Information(_('Reverse zone and IP address are incompatible.'))
		super(object, self).ready() 
	def _updateZone(self):
		if self.update_zone:
			self.superordinate.open()
			self.superordinate.modify()
	def __init__(self, co, lo, position, dn='', superordinate=None, attributes=[], update_zone=True):
		self.update_zone = update_zone
		univention.admin.handlers.simpleLdap.__init__(self, co, lo, position, dn, superordinate, attributes=attributes)
	def _ldap_addlist(self):
		return super(object, self)._ldap_addlist() + [
			(self.superordinate.mapping.mapName('subnet'), self.superordinate.mapping.mapValue('subnet', self.superordinate['subnet'])),
		]
	def _ldap_post_modify(self):
		super(object, self)._ldap_post_modify()
		if self.hasChanged(self.descriptions.keys()):
			self._updateZone()
	def _ldap_post_create(self):
		super(object, self)._ldap_post_create()
		self._updateZone()
	def _ldap_post_remove(self):
		super(object, self)._ldap_post_remove()
		self._updateZone()
[docs]	@classmethod
	def lookup_filter_superordinate(cls, filter, superordinate):
		filter.expressions.append(univention.admin.filter.expression('zoneName', superordinate.mapping.mapValueDecoded('subnet', superordinate['subnet']), escape=True))
		filter = rewrite_rev(filter, superordinate.info['subnet'])
		return filter 
[docs]	@classmethod
	def unmapped_lookup_filter(cls):
		return univention.admin.filter.conjunction('&', [
			univention.admin.filter.expression('objectClass', 'dNSZone'),
			univention.admin.filter.conjunction('!', [univention.admin.filter.expression('relativeDomainName', '@')]),
			univention.admin.filter.conjunction('|', [
				univention.admin.filter.expression('zoneName', '*.in-addr.arpa', escape=False),
				univention.admin.filter.expression('zoneName', '*.ip6.arpa', escape=False),
			]),
		])  
[docs]def rewrite_rev(filter, subnet):
	"""Rewrite LDAP filter expression and convert (ip) -> (zone,reversed)
	>>> rewrite_rev(expression('ip', '1.2.3.4'), subnet='1.2')
	conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '4.3', '=')])
	>>> rewrite_rev(expression('ip', '1.2.3.*', escape=False), subnet='1.2')
	conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*.3', '=')])
	>>> rewrite_rev(expression('ip', '1.2.*.*', escape=False), subnet='1.2')
	conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*.*', '=')])
	>>> rewrite_rev(expression('ip', '1.2.*.4', escape=False), subnet='1.2')
	conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '4.*', '=')])
	>>> rewrite_rev(expression('ip', '1.2.*', escape=False), subnet='1.2')
	conjunction('&', [expression('zoneName', '2.1.in-addr.arpa', '='), expression('relativeDomainName', '*', '=')])
	>>> rewrite_rev(expression('ip', '1:2:3:4:5:6:7:8'), subnet='0001:0002')
	conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '8.0.0.0.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')])
	>>> rewrite_rev(expression('ip', '1:2:3:4:5:6:7:*', escape=False), subnet='0001:0002')
	conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '*.7.0.0.0.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')])
	>>> rewrite_rev(expression('ip', '1:2:3:4:5:6:*:8', escape=False), subnet='0001:0002')
	conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '8.0.0.0.*.6.0.0.0.5.0.0.0.4.0.0.0.3.0.0.0', '=')])
	>>> rewrite_rev(expression('ip', '1:2:3:*', escape=False), subnet='0001:0002')
	conjunction('&', [expression('zoneName', '2.0.0.0.1.0.0.0.ip6.arpa', '='), expression('relativeDomainName', '*.3.0.0.0', '=')])
	"""
	if isinstance(filter, conjunction):
		filter.expressions = [rewrite_rev(expr, subnet) for expr in filter.expressions]
	if isinstance(filter, expression) and filter.variable == 'ip':
		if ':' in subnet:
			string = ''.join(subnet.split(':'))
			prefix = len(string)
			assert 1 <= prefix < 32
			addr = ''.join(
				part if '*' in part else part.rjust(4, '0')[-4:]
				for part in filter.value.split(':')
			)
			suffix = '.ip6.arpa'
		else:
			octets = subnet.split('.')
			prefix = len(octets)
			assert 1 <= prefix < 4
			addr = filter.value.split('.')
			suffix = '.in-addr.arpa'
		addr_net, addr_host = ['.'.join(reversed(_)) for _ in (addr[:prefix], addr[prefix:])]
		filter = conjunction('&', [
			expression('zoneName', addr_net + suffix),
			expression('relativeDomainName', addr_host or '*', escape=False),
		])
	return filter 
lookup = object.lookup
lookup_filter = object.lookup_filter
[docs]def identify(dn, attr):
	return all([
		b'dNSZone' in attr.get('objectClass', []),
		b'@' not in attr.get('relativeDomainName', []),
		(attr.get('zoneName', [b''])[0].decode('ASCII').endswith(ARPA_IP4) or attr.get('zoneName', [b''])[0].decode('ASCII').endswith(ARPA_IP6))
	])