feat(ldap): add function to update Group

This commit is contained in:
Julian Lobbes 2022-11-16 14:02:22 +01:00
parent 9b16ae6178
commit 8925b00109
2 changed files with 63 additions and 0 deletions

View File

@ -924,6 +924,52 @@ def delete_group(connection: Connection, group_cn: str) -> None:
connection.delete(group_dn)
def update_group(connection: Connection, group: Group) -> None:
"""Updates the specified Group on the LDAP server.
Parameters
----------
connection : ldap3.Connection
Bound Connection object to an LDAP server.
group : lumi2.usermodel.Group
The Group for which the LDAP entry is to be updated on the server.
Raises
------
TypeError
If group is not of type Group.
EntryNotFoundException
If the specified group does not exist in the DIT, or if a user who is a
member of the Group does not exist in the DIT.
"""
_assert_is_valid_connection(connection)
if not isinstance(group, Group):
raise TypeError(f"Expected a lumi2.usermodel.Group but got: '{type(group)}'.")
if not group_exists(connection, group.get_dn()):
raise EntryNotFoundException(
f"Failed to update group '{group.groupname}': no such entry found."
)
member_dn_list = []
for user in group.members:
user_dn = user.get_dn()
if not user_exists(connection, user_dn):
raise EntryNotFoundException(
f"Failed to create group '{group.groupname}': no entry found for " \
f"user '{user.username}'."
)
member_dn_list.append(user_dn)
connection.modify(
group.get_dn(),
{
"uniqueMember": [(MODIFY_REPLACE, member_dn_list)],
}
)
def get_group(connection: Connection, group_cn: str) -> Group:
"""Retrieves the group with the specified CN (common name) from the LDAP server.

View File

@ -322,6 +322,23 @@ def test_delete_group(app, connection):
ll.delete_group(connection, "employees")
def test_update_group(app, connection):
with app.app_context():
employees = ll.get_group(connection, "employees")
user0 = lu.User(
username="user0",
password_hash=lu.User.generate_password_hash("password"),
email="valid@example.com",
first_name="Test",
last_name="User",
)
employees.members.add(user0)
with pytest.raises(ll.EntryNotFoundException):
ll.update_group(connection, employees)
ll.create_user(connection, user0)
ll.update_group(connection, employees)
assert user0 in ll.get_group(connection, "employees").members
def test_get_group(app, connection):
with app.app_context():