New way to check table and routes access
This commit is contained in:
parent
a5aad5b5ea
commit
b799d1312b
2 changed files with 166 additions and 61 deletions
42
dba.py
42
dba.py
|
|
@ -206,6 +206,20 @@ def get_user_by_access_token(conn: DBConnector, access_token: str | None):
|
|||
return None, None
|
||||
|
||||
|
||||
def get_user_permissions_for_table(conn: DBConnector, table_name:str, user: User) -> AccessType:
|
||||
try:
|
||||
groups = get_user_groups(conn, user.id)
|
||||
for g in groups:
|
||||
acl = get_table_access_level(conn, table_name, g)
|
||||
if acl is not None and acl != AccessType.NONE:
|
||||
return acl
|
||||
|
||||
return AccessType.NONE
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return AccessType.NONE
|
||||
|
||||
|
||||
def check_user(conn: DBConnector, username: str, password: str):
|
||||
try:
|
||||
hashedPwd = hash_password(password)
|
||||
|
|
@ -294,6 +308,25 @@ def set_user_group(conn: DBConnector, user_id: int, group_id: int):
|
|||
return False, e
|
||||
|
||||
|
||||
def get_user_groups(conn: DBConnector, user_id: int) -> list[UserGroup]:
|
||||
try:
|
||||
_groups = conn.filterFromTable(
|
||||
USER_IN_USER_GROUP_JOIN_TABLE_NAME,
|
||||
["*"],
|
||||
[ColumnCondition("user_id", "eq", user_id)],
|
||||
)
|
||||
if len(_groups) == 0:
|
||||
logger.warning(f"User with id {user_id} not found, so no group")
|
||||
return []
|
||||
|
||||
u_groups = [UserInUserGroup.parse_obj(group) for group in _groups]
|
||||
groups = [get_group_by_id(conn, ug.user_group_id) for ug in u_groups]
|
||||
return [group for group in groups if group is not None]
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return []
|
||||
|
||||
|
||||
def get_user_group(conn: DBConnector, user_id: int):
|
||||
try:
|
||||
grp_usr_joint = conn.filterFromTable(
|
||||
|
|
@ -353,13 +386,10 @@ def create_table(conn: DBConnector, table_name: str, schema: list[ColumnDefiniti
|
|||
|
||||
|
||||
def get_table_access_level(
|
||||
conn: DBConnector, table_name: str, user_id: int
|
||||
conn: DBConnector, table_name: str, group: UserGroup
|
||||
) -> AccessType:
|
||||
try:
|
||||
user_group = get_user_group(conn, user_id)
|
||||
if not user_group:
|
||||
return AccessType.NONE
|
||||
elif user_group.name == "admin":
|
||||
if group.id == 2 and group.name == "admin":
|
||||
return AccessType.READ_WRITE
|
||||
|
||||
access = conn.filterFromTable(
|
||||
|
|
@ -367,7 +397,7 @@ def get_table_access_level(
|
|||
["*"],
|
||||
[
|
||||
ColumnCondition("table_name", "eq", table_name),
|
||||
ColumnCondition("user_group_id", "eq", user_group.id),
|
||||
ColumnCondition("user_group_id", "eq", group.id),
|
||||
],
|
||||
)
|
||||
if not access:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue