feat(ldap): add functions to create/delete/get groups

This commit is contained in:
Julian Lobbes 2022-11-15 17:05:53 +01:00
parent 8c1f8775c4
commit 3531686572
2 changed files with 291 additions and 10 deletions

View File

@ -472,7 +472,7 @@ def _assert_is_valid_connection(connection: Connection) -> None:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Connection object to an LDAP server. Connection object to an LDAP server.
Raises Raises
@ -501,7 +501,7 @@ def ou_exists(connection: Connection, ou_dn: str) -> bool:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Bound Connection object to an LDAP server. Bound Connection object to an LDAP server.
ou_dn : str ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry. DN of an organizational unit (ou) directly below the DIT's root entry.
@ -533,7 +533,7 @@ def create_ou(connection: Connection, ou_dn: str) -> None:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Bound Connection object to an LDAP server. Bound Connection object to an LDAP server.
ou_dn : str ou_dn : str
DN of an organizational unit (ou) directly below the DIT's root entry. DN of an organizational unit (ou) directly below the DIT's root entry.
@ -561,9 +561,9 @@ def user_exists(connection: Connection, user_dn: str) -> bool:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Bound Connection object to an LDAP server. Bound Connection object to an LDAP server.
ou_dn : str user_dn : str
DN of a user entry (uid) directly below the LDAP_USERS_OU entry. DN of a user entry (uid) directly below the LDAP_USERS_OU entry.
Raises Raises
@ -590,7 +590,7 @@ def get_user(connection: Connection, uid: str) -> User:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Bound Connection object to an LDAP server. Bound Connection object to an LDAP server.
uid : str uid : str
Username of the user to be retrieved. Username of the user to be retrieved.
@ -675,7 +675,7 @@ def create_user(connection: Connection, user: User) -> None:
Parameters Parameters
---------- ----------
Connection : ldap3.Connection connection : ldap3.Connection
Bound Connection object to an LDAP server. Bound Connection object to an LDAP server.
user : lumi2.usermodel.User user : lumi2.usermodel.User
The User object from which a user LDAP entry will be created. The User object from which a user LDAP entry will be created.
@ -696,8 +696,6 @@ def create_user(connection: Connection, user: User) -> None:
except EntryNotFoundException: except EntryNotFoundException:
pass pass
user_dn = f"uid={user.username},{current_app.config['LDAP_USERS_OU']}"
user_image_bytes = BytesIO() user_image_bytes = BytesIO()
user.picture.save(user_image_bytes, format="jpeg") user.picture.save(user_image_bytes, format="jpeg")
@ -711,4 +709,254 @@ def create_user(connection: Connection, user: User) -> None:
"jpegPhoto": user_image_bytes.getvalue(), "jpegPhoto": user_image_bytes.getvalue(),
} }
connection.add(user_dn, "inetOrgPerson", attributes) connection.add(user.get_dn(), "inetOrgPerson", attributes)
def delete_user(connection: Connection, uid: str) -> None:
"""Deletes the user with the specified uid (username) from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
uid : str
Username of the user to be deleted.
Raises
------
EntryNotFoundException
If no user with the specified uid is found.
TypeError
If uid is not of type string.
"""
_assert_is_valid_connection(connection)
if not isinstance(uid, str):
raise TypeError(f"Expected a string but got: '{type(uid)}'.")
user_dn = "uid=" + uid + ',' + current_app.config['LDAP_USERS_OU']
if not user_exists(connection, user_dn):
raise EntryNotFoundException(f"No such user entry: '{user_dn}'.")
# TODO Check if user is the sole member of any groups
# If so, delete that group here, because groups need to have at least one member
connection.delete(user_dn)
def get_users(connection: Connection) -> set[User]:
"""Retrieves a set containing all Users from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
"""
_assert_is_valid_connection(connection)
connection.search(
current_app.config['LDAP_USERS_OU'],
"(objectclass=inetOrgPerson)",
attributes=['uid'],
)
all_users = set()
for entry in connection.entries:
all_users.add(get_user(connection, str(entry.uid)))
return all_users
def group_exists(connection: Connection, group_dn: str) -> bool:
"""Checks whether the specified group entry exists in the DIT.
The group DN is expected to represent an entry which is a direct child of the
LDAP_GROUPS_OU specified in the app config.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_dn : str
DN of a group entry (cn) directly below the LDAP_GROUPS_OU entry.
Raises
------
MissingParentEntryException
If the specified group DN's direct parent entry is not present in the DIT.
"""
_assert_is_valid_connection(connection)
_assert_is_valid_group_dn(group_dn)
connection.search(group_dn, '(objectclass=groupOfUniqueNames)')
return len(connection.entries) > 0
def create_group(connection: Connection, group: Group) -> None:
"""Creates the specified Group on the LDAP server.
All members of the group must already exist as users in the DIT.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group : lumi2.usermodel.Group
The Group for which an entry is to be created on the LDAP server.
Raises
------
TypeError
If group is not of type Group.
EntryNotFoundException
If a user who is a member of the Group does not exist in the DIT.
EntryExistsException
If a group with this name already exists on the LDAP server.
"""
_assert_is_valid_connection(connection)
if not isinstance(group, Group):
raise TypeError(f"Expected a lumi2.usermodel.Group but got: '{type(group)}'.")
if group_exists(connection, group.get_dn()):
raise EntryExistsException(
f"Failed to create group '{group.groupname}': entry exists already."
)
member_dn_list = []
for user in group.members:
user_dn = user.get_dn()
if not user_exists(connection, user_dn):
raise EntryNotFoundException(
f"Failed to create group '{group.groupname}': no entry found for " \
f"user '{user.username}'."
)
member_dn_list.append(user_dn)
connection.add(
group.get_dn(),
"groupOfUniqueNames",
{"uniqueMember": member_dn_list},
)
def delete_group(connection: Connection, group_cn: str) -> None:
"""Deletes the group with the specified CN from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_cn : str
The CN (common name) of the Group whose entry will be removed from the
LDAP server's DIT.
Raises
------
TypeError
If group_cn is not of type str.
EntryNotFoundException
If no group with the specified CN exists in the DIT.
"""
_assert_is_valid_connection(connection)
if not isinstance(group_cn, str):
raise TypeError(f"Expected a string but got: '{type(group_cn)}'.")
group_dn = f"cn={group_cn},{current_app.config['LDAP_GROUPS_OU']}"
_assert_is_valid_group_dn(group_dn)
if not group_exists(connection, group_dn):
raise EntryNotFoundException(
f"Failed to delete group '{group_cn}': no such entry found: " \
f"'{group_dn}'."
)
connection.delete(group_dn)
def get_group(connection: Connection, group_cn: str) -> Group:
"""Retrieves the group with the specified CN (common name) from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group_cn : str
CN (group name) of the group to be retrieved.
Returns
-------
lumi2.usermodel.Group
The Group object representing the retrieved Group.
Raises
------
EntryNotFoundException
If no group with the specified group_cn is found.
TypeError
If group_cn is not of type string.
"""
_assert_is_valid_connection(connection)
if not isinstance(group_cn, str):
raise TypeError(f"Expected a string but got: '{type(group_cn)}'.")
group_dn = f"cn={group_cn},{current_app.config['LDAP_GROUPS_OU']}"
_assert_is_valid_group_dn(group_dn)
required_attributes = [
"cn",
"uniqueMember",
]
connection.search(
group_dn, '(objectclass=groupOfUniqueNames)', attributes=required_attributes,
)
if not connection.entries:
raise EntryNotFoundException(f"No such group found: '{group_cn}'.")
entry = connection.entries[0]
# Convert entry to JSON and load attributes into a dict
attributes = json.loads(entry.entry_to_json())['attributes']
for attribute in required_attributes:
if attribute not in attributes.keys():
raise AttributeNotFoundException(
f"Attribute '{attribute}' not set in entry '{group_dn}'."
)
groupname = attributes['cn'][0]
member_dn_list = attributes['uniqueMember']
members = set()
for user_dn in member_dn_list:
user_uid = user_dn.split(',')[0][4:]
members.add(get_user(connection, user_uid))
return Group(groupname, members)
def get_groups(connection: Connection) -> set[Group]:
"""Retrieves a set containing all Groups from the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
"""
_assert_is_valid_connection(connection)
connection.search(
current_app.config['LDAP_GROUPS_OU'],
"(objectclass=groupOfUniqueNames)",
attributes=['cn'],
)
all_groups = set()
for entry in connection.entries:
all_groups.add(get_group(connection, str(entry.cn)))
return all_groups

View File

@ -250,6 +250,7 @@ class User:
return b64encode(hash_bytes.digest()).decode("ASCII") return b64encode(hash_bytes.digest()).decode("ASCII")
@staticmethod
def _get_default_picture() -> Image.Image: def _get_default_picture() -> Image.Image:
"""Returns the default user picture as a PIL Image object. """Returns the default user picture as a PIL Image object.
@ -303,6 +304,22 @@ class User:
self.picture = User._get_default_picture() self.picture = User._get_default_picture()
def get_dn(self) -> str:
"""Returns the LDAP DN for the DIT entry representing this User.
The method does not check whether or not an entry with this DN actually
exists, it merely returns the DN as a string.
Returns
-------
str
The LDAP DN (distinguished name) uniquely identifying this User in
the DIT.
"""
return "uid=" + self.username + ',' + current_app.config['LDAP_USERS_OU']
def __eq__(self, other): def __eq__(self, other):
return self.username == other.username return self.username == other.username
@ -388,6 +405,22 @@ class Group:
self.members = members self.members = members
def get_dn(self) -> str:
"""Returns the LDAP DN for the DIT entry representing this Group.
The method does not check whether or not an entry with this DN actually
exists, it merely returns the DN as a string.
Returns
-------
str
The LDAP DN (distinguished name) uniquely identifying this Group in
the DIT.
"""
return "cn=" + self.groupname + ',' + current_app.config['LDAP_GROUPS_OU']
def __eq__(self, other): def __eq__(self, other):
return self.groupname == other.groupname return self.groupname == other.groupname