feat(ldap): add functions to update users and get their groups
This commit is contained in:
parent
3531686572
commit
3d2462a784
@ -13,7 +13,7 @@ from base64 import b64decode, b64encode
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef
|
from ldap3 import Connection, Server, ALL, Reader, Writer, ObjectDef, MODIFY_REPLACE
|
||||||
from PIL import Image
|
from PIL import Image
|
||||||
|
|
||||||
from lumi2.usermodel import User, Group
|
from lumi2.usermodel import User, Group
|
||||||
@ -712,6 +712,53 @@ def create_user(connection: Connection, user: User) -> None:
|
|||||||
connection.add(user.get_dn(), "inetOrgPerson", attributes)
|
connection.add(user.get_dn(), "inetOrgPerson", attributes)
|
||||||
|
|
||||||
|
|
||||||
|
def update_user(connection: Connection, user: User) -> None:
|
||||||
|
"""Updates the LDAP entry for the specified User.
|
||||||
|
|
||||||
|
All attributes of the user's entry on the LDAP server are overwritten using
|
||||||
|
the User object's instance attributes.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
connection : ldap3.Connection
|
||||||
|
Bound Connection object to an LDAP server.
|
||||||
|
user : lumi2.usermodel.User
|
||||||
|
User object whose LDAP entry will be updated.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
TypeError
|
||||||
|
If user is not of type User.
|
||||||
|
EntryNotFoundException
|
||||||
|
If no entry for the specified user's uid (username) exists in the LDAP
|
||||||
|
server's DIT.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_assert_is_valid_connection(connection)
|
||||||
|
if not isinstance(user, User):
|
||||||
|
raise TypeError(f"Expected a User but got: '{type(user)}'.")
|
||||||
|
user_dn = user.get_dn()
|
||||||
|
if not user_exists(connection, user_dn):
|
||||||
|
raise EntryNotFoundException(
|
||||||
|
f"Failed to update user '{user.username}': no entry exists for " \
|
||||||
|
f"'{user_dn}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
new_picture_bytes = BytesIO()
|
||||||
|
user.picture.save(new_picture_bytes, format="jpeg")
|
||||||
|
|
||||||
|
new_attributes = {
|
||||||
|
"userPassword": [(MODIFY_REPLACE, ["{SHA512}" + user.password_hash])],
|
||||||
|
"mail": [(MODIFY_REPLACE, [user.email])],
|
||||||
|
"cn": [(MODIFY_REPLACE, [user.first_name])],
|
||||||
|
"sn": [(MODIFY_REPLACE, [user.last_name])],
|
||||||
|
"displayName": [(MODIFY_REPLACE, [user.display_name])],
|
||||||
|
"jpegPhoto": [(MODIFY_REPLACE, [new_picture_bytes.getvalue()])],
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.modify(user.get_dn(), new_attributes)
|
||||||
|
|
||||||
|
|
||||||
def delete_user(connection: Connection, uid: str) -> None:
|
def delete_user(connection: Connection, uid: str) -> None:
|
||||||
"""Deletes the user with the specified uid (username) from the LDAP server.
|
"""Deletes the user with the specified uid (username) from the LDAP server.
|
||||||
|
|
||||||
@ -739,8 +786,14 @@ def delete_user(connection: Connection, uid: str) -> None:
|
|||||||
if not user_exists(connection, user_dn):
|
if not user_exists(connection, user_dn):
|
||||||
raise EntryNotFoundException(f"No such user entry: '{user_dn}'.")
|
raise EntryNotFoundException(f"No such user entry: '{user_dn}'.")
|
||||||
|
|
||||||
# TODO Check if user is the sole member of any groups
|
# Check if user is the sole member of any groups
|
||||||
# If so, delete that group here, because groups need to have at least one member
|
# If so, delete that group here, because groups need to have at least one member
|
||||||
|
groups_to_delete = set()
|
||||||
|
for group in get_groups_of_user(connection, get_user(connection, uid)):
|
||||||
|
if len(group.members) == 1:
|
||||||
|
groups_to_delete.add(group)
|
||||||
|
for group in groups_to_delete:
|
||||||
|
delete_group(connection, group.groupname)
|
||||||
|
|
||||||
connection.delete(user_dn)
|
connection.delete(user_dn)
|
||||||
|
|
||||||
@ -960,3 +1013,42 @@ def get_groups(connection: Connection) -> set[Group]:
|
|||||||
all_groups.add(get_group(connection, str(entry.cn)))
|
all_groups.add(get_group(connection, str(entry.cn)))
|
||||||
|
|
||||||
return all_groups
|
return all_groups
|
||||||
|
|
||||||
|
def get_groups_of_user(connection: Connection, user: User) -> set[Group]:
|
||||||
|
"""Retrieves the set of Groups of which the specified user is a member.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
connection : ldap3.Connection
|
||||||
|
Bound Connection object to an LDAP server.
|
||||||
|
user : lumi2.usermodel.User
|
||||||
|
The User whose Groups to retrieve.
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
set[Groups]
|
||||||
|
A set containing all Groups of which the user is a member.
|
||||||
|
If the specified User is not present in any groups, returns an empty set.
|
||||||
|
|
||||||
|
Raises
|
||||||
|
------
|
||||||
|
EntryNotFoundException
|
||||||
|
If no entry for the specified User exists.
|
||||||
|
TypeError
|
||||||
|
If user is not of type User.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_assert_is_valid_connection(connection)
|
||||||
|
if not user_exists(connection, user.get_dn()):
|
||||||
|
raise EntryNotFoundException(
|
||||||
|
f"Failed to get Groups of User '{user.username}': the entry " \
|
||||||
|
f"'{user.get_dn()}' does not exist."
|
||||||
|
)
|
||||||
|
|
||||||
|
all_groups = get_groups(connection)
|
||||||
|
result = set()
|
||||||
|
for group in all_groups:
|
||||||
|
if user in group.members:
|
||||||
|
result.add(group)
|
||||||
|
|
||||||
|
return result
|
||||||
|
Loading…
x
Reference in New Issue
Block a user