feat(ldap): add functions to read and update OUs

This commit is contained in:
Julian Lobbes 2022-11-09 14:44:38 +01:00
parent 8e478b7740
commit aeefa61018
2 changed files with 145 additions and 44 deletions

View File

@ -19,9 +19,9 @@ def create_app(test_config=None):
LDAP_HOSTNAME='ldap://openldap', LDAP_HOSTNAME='ldap://openldap',
LDAP_BIND_USER_DN='cn=admin,dc=example,dc=com', LDAP_BIND_USER_DN='cn=admin,dc=example,dc=com',
LDAP_BIND_USER_PASSWORD='admin', LDAP_BIND_USER_PASSWORD='admin',
LDAP_BASE_DN='cn=example,cn=com', LDAP_BASE_DN='dc=example,dc=com',
LDAP_USER_SEARCH_BASE='ou=users,cn=example,cn=com', LDAP_USERS_OU='ou=users,dc=example,dc=com',
LDAP_GROUPS_SEARCH_BASE='ou=groups,cn=example,cn=com', LDAP_GROUPS_OU='ou=groups,dc=example,dc=com',
LDAP_USER_OBJECT_CLASS='inetOrgPerson', LDAP_USER_OBJECT_CLASS='inetOrgPerson',
LDAP_GROUP_OBJECT_CLASS='groupOfUniqueNames', LDAP_GROUP_OBJECT_CLASS='groupOfUniqueNames',
) )

View File

@ -1,27 +1,39 @@
"""Interactions with an OpenLDAP server. """This module is an API used to interact with an OpenLDAP server.
Interactions include setting up authenticated connections, querying the DIT and Interactions include setting up authenticated connections, querying the DIT and
creating/reading/updating/deleting DIT entries. creating/reading/updating/deleting DIT entries.
All function calls within this module rely heavily on the `ldap3 module <https://ldap3.readthedocs.io/en/latest/>`_. Functions within this module rely heavily on the ldap3 module:
https://ldap3.readthedocs.io/en/latest/
""" """
from string import ascii_lowercase, ascii_uppercase, digits from string import ascii_lowercase, ascii_uppercase, digits
from flask import current_app from flask import current_app
from ldap3 import Connection, Server, ALL from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef
from lumi2.exceptions import MissingConfigKeyError
from exceptions import MissingConfigKeyError
class InvalidStringFormatException(Exception): class InvalidStringFormatException(Exception):
"""Exception raised when an invalid string format is encountered.""" """Exception raised when an invalid string format is encountered."""
pass pass
class InvalidConnectionException(Exception):
"""Exception raised when an invalid Connection is encountered."""
pass
class EntryExistsException(Exception):
"""Exception raised when an entry unexpectedly already exists."""
pass
def _assert_is_valid_base_dn(input_str) -> None: def _assert_is_valid_base_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP base DN. """Checks whether the input string is a valid LDAP base DN.
A valid base DN is a string of the format 'cn=<SLD>,cn=<TLD>', whereby: A valid base DN is a string of the format 'dc=<SLD>,dc=<TLD>', whereby:
<SLD> (second level domain) is a substring containing only lowercase <SLD> (second level domain) is a substring containing only lowercase
alphanumeric characters with minimum length 1. alphanumeric characters with minimum length 1.
<TLD> (top level domain) is a substring containing only lowercase latin <TLD> (top level domain) is a substring containing only lowercase latin
@ -51,12 +63,13 @@ def _assert_is_valid_base_dn(input_str) -> None:
if len(tokens) != 2: if len(tokens) != 2:
raise InvalidStringFormatException("Expected exactly one ',' character.") raise InvalidStringFormatException("Expected exactly one ',' character.")
for token in tokens: for token in tokens:
if not token.startswith("cn="): if not token.startswith("dc="):
raise InvalidStringFormatException( raise InvalidStringFormatException(
"Expected DN component to start with 'cn='." "Expected DN component to start with 'dc='."
) )
token_sld, token_tld = tokens token_sld = tokens[0][3:]
token_tld = tokens[1][3:]
if not len(token_sld): if not len(token_sld):
raise InvalidStringFormatException( raise InvalidStringFormatException(
"Expected at least one character in second level domain." "Expected at least one character in second level domain."
@ -79,10 +92,11 @@ def _assert_is_valid_base_dn(input_str) -> None:
f"domain but got: '{char}'." f"domain but got: '{char}'."
) )
def _assert_is_valid_search_base(input_str) -> None:
"""Checks whether the input string is a valid LDAP search base string.
A valid search base is a string of the format 'ou=<NAME>,<BASE_DN>', whereby: def _assert_is_valid_ou_dn(input_str) -> None:
"""Checks whether the input string is a valid organizational unit DN.
A valid ou DN is a string of the format 'ou=<NAME>,<BASE_DN>', whereby:
<NAME> (name of organizational unit) is a substring containing only <NAME> (name of organizational unit) is a substring containing only
alphanumeric characters with minimum length 1. alphanumeric characters with minimum length 1.
<BASE_DN> is a substring representing an LDAP base DN as expected by <BASE_DN> is a substring representing an LDAP base DN as expected by
@ -102,7 +116,7 @@ def _assert_is_valid_search_base(input_str) -> None:
TypeError TypeError
If input_str is not of type string. If input_str is not of type string.
InvalidStringFormatException InvalidStringFormatException
If input_str is not a valid LDAP search base string. If input_str is not a valid LDAP ou DN string.
""" """
if not isinstance(input_str, str): if not isinstance(input_str, str):
@ -114,7 +128,9 @@ def _assert_is_valid_search_base(input_str) -> None:
if not tokens[0].startswith("ou="): if not tokens[0].startswith("ou="):
raise InvalidStringFormatException("Expected 'ou='.") raise InvalidStringFormatException("Expected 'ou='.")
token_ou = tokens[0][:3] token_ou = tokens[0][3:]
if not len(token_ou):
raise InvalidStringFormatException("OU name cannot be empty.")
token_base_dn = tokens[1] + "," + tokens[2] token_base_dn = tokens[1] + "," + tokens[2]
_assert_is_valid_base_dn(token_base_dn) _assert_is_valid_base_dn(token_base_dn)
@ -125,6 +141,7 @@ def _assert_is_valid_search_base(input_str) -> None:
f"but got: '{char}'." f"but got: '{char}'."
) )
def _assert_is_valid_user_object_class(input_str) -> None: def _assert_is_valid_user_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP user object class. """Checks whether the input string is a valid LDAP user object class.
@ -155,6 +172,7 @@ def _assert_is_valid_user_object_class(input_str) -> None:
f"Expected 'inetOrgPerson' but got: '{input_str}'." f"Expected 'inetOrgPerson' but got: '{input_str}'."
) )
def _assert_is_valid_group_object_class(input_str) -> None: def _assert_is_valid_group_object_class(input_str) -> None:
"""Checks whether the input string is a valid LDAP group object class. """Checks whether the input string is a valid LDAP group object class.
@ -185,6 +203,7 @@ def _assert_is_valid_group_object_class(input_str) -> None:
f"Expected 'groupOfUniqueNames' but got: '{input_str}'." f"Expected 'groupOfUniqueNames' but got: '{input_str}'."
) )
def _assert_is_valid_bind_user_dn(input_str) -> None: def _assert_is_valid_bind_user_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP bind user DN. """Checks whether the input string is a valid LDAP bind user DN.
@ -232,6 +251,7 @@ def _assert_is_valid_bind_user_dn(input_str) -> None:
f"but got: '{char}'." f"but got: '{char}'."
) )
def _assert_app_config_is_valid() -> None: def _assert_app_config_is_valid() -> None:
"""Checks the app config's LDAP settings for completeness and correctness. """Checks the app config's LDAP settings for completeness and correctness.
@ -242,8 +262,8 @@ def _assert_app_config_is_valid() -> None:
Ensures that the following config keys are present and contain valid values: Ensures that the following config keys are present and contain valid values:
- 'LDAP_BIND_USER_DN' - 'LDAP_BIND_USER_DN'
- 'LDAP_BASE_DN' - 'LDAP_BASE_DN'
- 'LDAP_USER_SEARCH_BASE' - 'LDAP_USERS_OU'
- 'LDAP_GROUPS_SEARCH_BASE' - 'LDAP_GROUPS_OU'
- 'LDAP_USER_OBJECT_CLASS' - 'LDAP_USER_OBJECT_CLASS'
- 'LDAP_GROUP_OBJECT_CLASS' - 'LDAP_GROUP_OBJECT_CLASS'
@ -266,8 +286,8 @@ def _assert_app_config_is_valid() -> None:
'LDAP_BIND_USER_DN', 'LDAP_BIND_USER_DN',
'LDAP_BIND_USER_PASSWORD', 'LDAP_BIND_USER_PASSWORD',
'LDAP_BASE_DN', 'LDAP_BASE_DN',
'LDAP_USER_SEARCH_BASE', 'LDAP_USERS_OU',
'LDAP_GROUPS_SEARCH_BASE', 'LDAP_GROUPS_OU',
'LDAP_USER_OBJECT_CLASS', 'LDAP_USER_OBJECT_CLASS',
'LDAP_GROUP_OBJECT_CLASS', 'LDAP_GROUP_OBJECT_CLASS',
] ]
@ -284,38 +304,33 @@ def _assert_app_config_is_valid() -> None:
_assert_is_valid_base_dn(current_app.config['LDAP_BASE_DN']) _assert_is_valid_base_dn(current_app.config['LDAP_BASE_DN'])
_assert_is_valid_bind_user_dn(current_app.config['LDAP_BIND_USER_DN']) _assert_is_valid_bind_user_dn(current_app.config['LDAP_BIND_USER_DN'])
for base in ['LDAP_USER_SEARCH_BASE', 'LDAP_GROUPS_SEARCH_BASE']: for base in ['LDAP_USERS_OU', 'LDAP_GROUPS_OU']:
_assert_is_valid_search_base(current_app.config[base]) _assert_is_valid_ou_dn(current_app.config[base])
_assert_is_valid_user_object_class(current_app.config['LDAP_USER_OBJECT_CLASS']) _assert_is_valid_user_object_class(current_app.config['LDAP_USER_OBJECT_CLASS'])
_assert_is_valid_group_object_class(current_app.config['LDAP_GROUP_OBJECT_CLASS']) _assert_is_valid_group_object_class(current_app.config['LDAP_GROUP_OBJECT_CLASS'])
def get_authenticated_connection(
hostname=current_app.config['LDAP_HOSTNAME'],
user=current_app.config['LDAP_BIND_USER_DN'],
password=current_app.config['LDAP_BIND_USER_PASSWORD'],
) -> Connection:
"""Returns a Connection object to the LDAP server using bind credentials.
The bind credentials and server hostname are read from the :mod:`core.settings` def get_connection() -> Connection:
module. """Returns a Connection to the LDAP server specified in the app config.
Attributes A connection attempt is made to the LDAP server at the specified hostname
---------- using the specified credentials.
hostname : str
Hostname at which the LDAP server can be reached. Returns
user : str -------
DN of the bind user used to authenticate to the server. ldap3.Connection
password : str A bound (i.e. authenticated), active Connection object to the server.
Password of the bind user authenticating to the server.
Raises Raises
------ ------
:class:`ldap3.core.exceptions.LDAPSocketOpenError` ldap3.core.exceptions.LDAPSocketOpenError
If the server specified by the ``hostname`` cannot be reached. If the server specified by the hostname cannot be reached.
:class:`ldap3.core.exceptions.LDAPBindError` ldap3.core.exceptions.LDAPBindError
If the bind credentials ``user`` and/or ``password`` are If the bind credentials (user DN and/or password) are invalid.
invalid.
""" """
hostname = current_app.config['LDAP_HOSTNAME']
user = current_app.config['LDAP_BIND_USER_DN']
password = current_app.config['LDAP_BIND_USER_PASSWORD']
return Connection( return Connection(
Server(hostname, get_info=ALL), Server(hostname, get_info=ALL),
@ -323,3 +338,89 @@ def get_authenticated_connection(
password=password, password=password,
auto_bind=True, auto_bind=True,
) )
def _assert_is_valid_connection(connection: Connection) -> None:
"""Ensures that the connection is valid, open and bound.
Parameters
----------
Connection : ldap3.Connection
Connection object to an LDAP server.
Raises
------
TypeError
If connection is not of type ldap3.Connection
InvalidConnectionException
If the connection is not bound.
If the connection is closed.
"""
if not isinstance(connection, Connection):
raise TypeError(f"Expected a Connection object but got '{type(connection)}'.")
if not connection.open:
raise InvalidConnectionException("Connection is not open.")
if not connection.bound:
raise InvalidConnectionException("Connection is not bound.")
def ou_exists(connection: Connection, ou_dn: str) -> bool:
"""Checks whether the specified ou entry currently exists in the DIT.
The DN must be a direct child of the DIT's base DN.
The check is carried out by querying the LDAP server for the existence of
an entry at the specified DN.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Returns
-------
Bool
True if the specified ou exists in the DIT, and False otherwise.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
ou_name = ou_dn.split(',')[0][3:]
ou_objectdef = ObjectDef('organizationalUnit', connection)
reader = Reader(connection, ou_objectdef, current_app.config['LDAP_BASE_DN'])
search_results = reader.search()
for result in search_results:
if result.ou == ou_name:
return True
return False
def create_ou(connection: Connection, ou_dn: str) -> None:
"""Creates an entry for the specified organizational unit.
The ou entry must be a direct child of the DIT's base DN.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry.
Raises
------
EntryExistsException
If an entry for the specified ou already exists in the DIT.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_ou_dn(ou_dn)
if ou_exists(connection, ou_dn):
raise EntryExistsException(f"Cannot create '{ou_dn}': entry already exists.")
connection.add(ou_dn, 'organizationalUnit')