feat(ldap): add get_user function

This commit is contained in:
Julian Lobbes 2022-11-10 00:15:23 +01:00
parent 4c9151e93c
commit 1cbf2ce99b

View File

@ -8,10 +8,15 @@ https://ldap3.readthedocs.io/en/latest/
"""
from string import ascii_lowercase, ascii_uppercase, digits
import json
from base64 import b64decode, b64encode
from io import BytesIO
from flask import current_app
from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef
from PIL import Image
from lumi2.usermodel import User
from lumi2.exceptions import MissingConfigKeyError
@ -30,6 +35,20 @@ class EntryExistsException(Exception):
pass
class EntryNotFoundException(Exception):
"""Exception raised when an entry is unexpectedly not found."""
pass
class AttributeNotFoundException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
class InvalidAttributeException(Exception):
"""Exception raised when an entry's is unexpectedly not set."""
pass
def _assert_is_valid_base_dn(input_str) -> None:
"""Checks whether the input string is a valid LDAP base DN.
@ -532,3 +551,94 @@ def create_ou(connection: Connection, ou_dn: str) -> None:
raise EntryExistsException(f"Cannot create '{ou_dn}': entry already exists.")
connection.add(ou_dn, 'organizationalUnit')
def get_user(connection: Connection, uid: str) -> User:
"""Retrieves the user with the specified uid (username) from the LDAP server.
The DIT is queried for an entry of class 'inetOrgPerson' directly under the
users ou specified in the app config.
The retrieved attributes are parsed and used to create a User object.
The follwing attributes are expected to be set for the entry:
'cn', 'sn', 'displayName', 'uid', 'userPassword', 'jpegPhoto', 'mail'.
Parameters
----------
Connection : ldap3.Connection
Bound Connection object to an LDAP server.
uid : str
Username of the user to be retrieved.
Returns
-------
lumi2.usermodel.User
The User object representing the retrieved user.
Raises
------
EntryNotFoundException
If no user with the specified uid is found.
AttributeNotFoundException
If one of the above-mentioned attributes for the user is unset.
TypeError
If uid is not of type string.
InvalidAttributeException
If an attribute's value is in an invalid format.
"""
_assert_is_valid_connection(connection)
if not isinstance(uid, str):
raise TypeError(f"Expected a string but got: '{type(uid)}'.")
user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU']
required_attributes = [
'cn',
'sn',
'displayName',
'uid',
'password',
'jpegPhoto',
'mail',
]
connection.search(
user_dn, '(objectclass=inetOrgPerson)', attributes=required_attributes,
)
if not connection.entries:
raise EntryNotFoundException(f"No such user found: '{user_dn}'.")
entry = connection.entries[0]
# Convert entry to JSON and load attributes into a dict
attributes = json.loads(entry.entry_to_json())['attributes']
for attribute in required_attributes:
if attribute not in attributes.keys():
raise AttributeNotFoundException(
f"Attribute '{attribute}' not set in entry '{user_dn}'."
)
username = attributes['uid'][0]
email = attributes['mail'][0]
first_name = attributes['cn'][0]
last_name = attributes['sn'][0]
display_name = attributes['displayName'][0]
# Retrieve base64-encoded password hash prefixed with '{SHA512}'
password_hash = attributes['userPassword'][0]
expected_hash_type = '{SHA512}'
if not password_hash.startswith(expected_hash_type):
raise InvalidAttributeException(
f"Unexpected password hash in entry '{user_dn}': expected " \
f"'{expected_hash_type}' but got: '{password_hash}'."
)
# Strip prefix
password_hash = password_hash[len(expected_hash_type):]
picture_encoded = attributes['jpegPhoto'][0]['encoded']
# Decode base64-encoded picture and create a PIL Image object
picture = Image.open(BytesIO(b64decode(picture_encoded)))
return User(
username, password_hash, email,
first_name, last_name, display_name,
picture
)