#!/usr/share/ucs-test/runner python
# -*- coding: utf-8 -*.
## desc: Test the UMC user authentication and password change
## bugs: [34369]
## roles:
##  - domaincontroller_master
## exposure: dangerous

import sys
sys.path.insert(0, '.')
from TestUMCSystemModule import TestUMCSystem

import simplejson as json
from subprocess import Popen, PIPE
from time import sleep, time, localtime, strftime

import univention.testing.utils as utils
from univention.testing.udm import UCSTestUDM
from univention.testing.strings import random_username
from univention.config_registry import handler_set
from univention.lib.umc_connection import UMCConnection


class TestUMCUserAuthentication(TestUMCSystem):

    def __init__(self):
        """Test Class constructor"""
        super(TestUMCUserAuthentication, self).__init__()

        self.UDM = None

        self.test_user_dn = ''
        self.test_username = ''
        self.test_password = ''
        self.test_password_unicode = ''

        self.initial_ucr_password_setting = ''

    def create_user_group_and_policy(self):
        """
        Creates a group, a user in it and a policy (applied to group)
        with 'udm-self' operation set for the test via UDM.
        """
        test_groupname = 'umc_test_group_' + random_username(6)
        test_policyname = 'umc_test_policy_' + random_username(6)

        # operation set that allows users to change own password
        test_operation_set = ["cn=udm-self,cn=operations,cn=UMC,cn=univention,"
                              + self.UCR.get('ldap/base')]

        print("\nCreating a group '%s' and a user '%s' in it "
              "for the test:\n" % (test_groupname, self.test_username))
        test_group_dn = self.UDM.create_group(name=test_groupname)[0]
        utils.verify_ldap_object(test_group_dn)

        self.test_user_dn = self.UDM.create_user(password=self.test_password,
                                                 username=self.test_username,
                                                 primaryGroup=test_group_dn)[0]
        utils.verify_ldap_object(self.test_user_dn)

        print("\nCreating a custom policy '%s' with 'udm-self' operation "
              "set for the test and adding it to the test group '%s':\n"
              % (test_policyname, test_groupname))
        test_policy_dn = self.UDM.create_object('policies/umc',
                                                name=test_policyname,
                                                allow=test_operation_set)
        utils.verify_ldap_object(test_policy_dn)

        # **kwargs are as dict to pass the 'policy-reference'
        self.UDM.modify_object('groups/group',
                               **{'dn': test_group_dn,
                                  'policy-reference': test_policy_dn})

    def restart_ldap_server(self):
        """
        Invokes the 'service slapd restart' to restart the LDAP server
        (Needed for changes in the UCR to work).
        """
        print "\nRestarting the LDAP Server..."
        try:
            proc = Popen(("service", "slapd", "restart"),
                         stdout=PIPE,
                         stderr=PIPE)
            stdout, stderr = proc.communicate()

            if stderr:
                utils.fail("An error occured while trying to restart LDAP "
                           "server: '%s'" % stderr)
        except (ValueError, OSError) as exc:
            utils.fail("An exception occured during LDAP server restart: %r"
                       % exc)

    def save_reset_initial_ucr_password_setting(self):
        """
        Saves the 'ldap/acl/user/password/change' UCR setting to a global
        variable to be restored after the test. Returns code 77 in case
        setting is empty. Sets the setting to 'yes'.
        """
        print "\nSaving initial UCR user's password change setting"
        self.initial_ucr_password_setting = self.UCR.get(
                                            'ldap/acl/user/password/change')
        if not self.initial_ucr_password_setting:
            print("\nFailed to get 'ldap/acl/user/password/change' "
                  "from the UCR or variable is unset, skipping test..")
            self.return_code_result_skip()

        if self.initial_ucr_password_setting != 'yes':
            print "\nAllowing user password change in the UCR:"
            handler_set(['ldap/acl/user/password/change=yes'])

        self.restart_ldap_server()

    def authenticate_to_umc(self, password):
        """
        Authenticates to UMC using 'self.Connection' and given
        'password' with self.test_username. Updates the cookie.
        Returns 'True' on success and 'False' in any other case.
        """
        options = {"options": {"username": self.test_username,
                               "password": password}}
        try:
            umc_connection = self.Connection.get_connection()
            umc_connection.request('POST',
                                   '/umcp/auth',
                                   json.dumps(options),
                                   self.Connection._headers)

            request_result = umc_connection.getresponse()
            cookie = request_result.getheader('set-cookie')
            status_code = request_result.status

            print "Response Code:", status_code
            print "Response Cookie:", cookie
            print "Response Message:", request_result.read()

            self.Connection._headers['Cookie'] = cookie
            if cookie and status_code == 200:
                return True

            return False
        except Exception as exc:
            utils.fail("An exception while trying to authenticate to UMC "
                       "with a 'username'=%s and 'password'=%s: %r"
                       % (self.test_username, password, exc))

    def change_user_password(self, user_dn, new_password):
        """
        Makes a 'udm/put' UMC request with a 'new_password' and
        a 'user_dn' in options to change the password.
        """
        print "\nChanging '%s' user password to '%s'" % (user_dn, new_password)
        options = [{"object": {"password": new_password,
                               "$dn$": user_dn},
                    "options": None}]

        request_result = self.make_udm_request("put", options, "users/self")
        if not request_result[0].get('success'):
            print "Change password UMC response: '%s'" % request_result
            utils.fail("The UMC request to change 'user'=%s password to '%s' "
                       "failed, as there are no 'success'=True in response"
                       % (user_dn, new_password))
        sleep(30)  # wait for new password to sync

    def check_random_and_valid_passwords(self):
        """
        Tries to authenticate to UMC with a random password
        and with a valid password.
        """
        print("\nTrying to authenticate to UMC with username '%s' and "
              "a random password:" % self.test_username)
        if self.authenticate_to_umc(self.test_password + random_username(4)):
            utils.fail("Authentication with a valid 'username'=%s and "
                       "random password succeeded." % self.test_username)

        print("\nTrying to authenticate to UMC with username '%s' and "
              "a valid initial password '%s':"
              % (self.test_username, self.test_password))
        if not self.authenticate_to_umc(self.test_password):
            utils.fail("Authentication with a valid 'username'=%s and "
                       "'password'=%s failed."
                       % (self.test_username, self.test_password))

    def check_change_old_new_unicode_passwords(self):
        """
        Changes test user password and tries to authenticate to UMC with
        old and new passwords. After adds unicode chars to user password
        and tries to authenticate with it.
        """
        self.change_user_password(self.test_user_dn, self.test_password + '1')

        print("\nTrying to authenticate to UMC with username '%s' and "
              "an old password '%s':"
              % (self.test_username, self.test_password))
        if self.authenticate_to_umc(self.test_password):
            utils.fail("Authentication with old password '%s' succeeded "
                       "after password change." % self.test_password)

        print("\nTrying to authenticate to UMC with username '%s' and "
              "a new valid password '%s':"
              % (self.test_username, self.test_password + '1'))
        if not self.authenticate_to_umc(self.test_password + '1'):
            utils.fail("Authentication with a 'username'=%s and a new "
                       "'password'=%s failed."
                       % (self.test_username, self.test_password + '1'))

        # New password with (Latin-1 Latin-A Latin-B unicode chars)
        self.test_password_unicode = self.test_password + '_ÃŎǦ'
        self.change_user_password(self.test_user_dn,
                                  self.test_password_unicode)

        print("\nTrying to authenticate to UMC with username '%s' and "
              "a new valid unicode password '%s':"
              % (self.test_username, self.test_password_unicode))
        if not self.authenticate_to_umc(self.test_password_unicode):
            utils.fail("Authentication with a new unicode password '%s' "
                       "did not succeeded." % self.test_password_unicode)

    def generate_expiry_date(self, time_seconds):
        """
        Returns the account expiry date in a format: 'YYYY-MM-DD'
        by adding the given 'time_seconds' to the current time.
        """
        expiry_date = time()
        expiry_date += time_seconds
        return strftime('%Y-%m-%d', localtime(expiry_date))

    def check_expired_deactivated_removed_account(self):
        """
        Sets test user account a past expiry date and tries to authenticate.
        Re-sets the expiry date back to the future.
        Deactivates user account and tries to authenticate.
        Removes user account and tries to authenticate.
        """
        # approx. amount of seconds in 1 month is 2630000
        expiry_date = self.generate_expiry_date(-2630000)

        print("\nSetting an expiry date in the past for the "
              "test user '%s' account and trying to authenticate:\n"
              % self.test_username)
        self.UDM.modify_object('users/user',
                               dn=self.test_user_dn,
                               userexpiry=expiry_date)

        self.wait_for_samba_replication(self.test_username)
        if self.authenticate_to_umc(self.test_password_unicode):
            utils.fail("Authentication with an expired user account "
                       "succeeded.")

        # approx. amount of seconds in 1 month is 2630000
        expiry_date = self.generate_expiry_date(2630000)
        self.UDM.modify_object('users/user',
                               dn=self.test_user_dn,
                               userexpiry=expiry_date)

        print("\nDeactivating the test user '%s' account and "
              "trying to authenticate:\n" % self.test_username)
        # user modified two times as disabling account and
        # changing expiry date at the same time does not work:
        self.UDM.modify_object('users/user',
                               dn=self.test_user_dn,
                               disabled="all")

        self.wait_for_samba_replication(self.test_username)
        if self.authenticate_to_umc(self.test_password_unicode):
            utils.fail("Authentication with a deactivated user account "
                       "succeeded.")

        print("\nRemoving the test user '%s' account and "
              "trying to authenticate:\n" % self.test_username)
        self.UDM.remove_object('users/user',
                               dn=self.test_user_dn)

        sleep(50)  # should be changed after Bug #35634 is resolved
        if self.authenticate_to_umc(self.test_password_unicode):
            utils.fail("Authentication with a removed user account "
                       "succeeded.")

    def main(self):
        """
        Tests the UMC user authentication and various password change cases
        """
        self.test_username = 'umc_test_user_' + random_username(6)
        self.test_password = 'univention'

        with UCSTestUDM() as self.UDM:
            self.reload_ucr()
            self.create_user_group_and_policy()

            try:
                self.save_reset_initial_ucr_password_setting()
                self.Connection = UMCConnection(self.UCR['hostname'])

                self.check_random_and_valid_passwords()
                self.check_change_old_new_unicode_passwords()
                self.check_expired_deactivated_removed_account()
            finally:
                if self.initial_ucr_password_setting:
                    print "\nRestoring initial UCR user password setting:"
                    handler_set(['ldap/acl/user/password/change=' +
                                 self.initial_ucr_password_setting])
                    self.restart_ldap_server()


if __name__ == '__main__':
    TestUMC = TestUMCUserAuthentication()
    sys.exit(TestUMC.main())
