tuuli_backend/app.py
2023-04-12 03:04:27 +07:00

382 lines
11 KiB
Python

import io
from typing import Any
from fastapi import FastAPI, status, Header, UploadFile, Response
from starlette.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from based import db
import psycopg
from secrets import token_hex
from minio import Minio
from minio.helpers import ObjectWriteResult
from urllib3 import HTTPResponse
import uvicorn
from dba import *
from models import (
AuthModel,
ColumnsDefinitionList,
ErrorResponseDefinition,
ItemDeletionDefinitionList,
ItemsFieldSelectorList,
TableDefinition,
TableListDefinition,
UserDefinition,
)
from utils import (
check_if_admin_access_token,
parse_columns_from_definition,
)
conninfo = "postgresql://postgres:asarch6122@localhost"
connector = db.DBConnector(conninfo)
bootstrapDB(connector)
BUCKET_NAME = "tuuli-files"
minioClient = Minio(
"localhost:8090",
access_key="mxR0F5PK8CpCM8SA",
secret_key="yFJsG70xLU3BiIMslinz6dhqKHqNpUc6",
secure=False,
)
found = minioClient.bucket_exists(BUCKET_NAME)
if found:
print(f"Bucket '{BUCKET_NAME}' already exists")
else:
minioClient.make_bucket(BUCKET_NAME)
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/api/getAccessToken")
async def getAccessToken(userData: AuthModel):
user = check_user(connector, userData.username, userData.password)
if not user:
return {"error": "Wrong username or password"}
return {"access_token": user.access_token}
@app.get("/api/listTables")
async def listTables(
response: Response,
access_token: str | None = Header(default=None),
) -> TableListDefinition | ErrorResponseDefinition:
is_admin = check_if_admin_access_token(connector, access_token)
if not is_admin:
return ErrorResponseDefinition(error="Not allowed")
return TableListDefinition(
tables=[TableDefinition.parse_obj(table) for table in connector.tables()]
)
@app.post("/api/createTable/{tableName}")
async def createTable(
tableName: str,
columns: ColumnsDefinitionList,
access_token: str | None = Header(default=None),
):
is_admin = check_if_admin_access_token(connector, access_token)
if not is_admin:
return {"error": "Not allowed"}
try:
columnsDefinition = parse_columns_from_definition(",".join(columns.columns))
create_table(connector, tableName, columnsDefinition)
except psycopg.errors.UniqueViolation:
return {"error": "Username already exists"}
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/api/dropTable/{tableName}")
async def dropTable(
tableName: str,
access_token: str | None = Header(default=None),
):
is_admin = check_if_admin_access_token(connector, access_token)
if not is_admin:
return {"error": "Not allowed"}
try:
drop_table(connector, tableName)
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/api/createUser")
async def createUser(
user: UserDefinition,
access_token: str | None = Header(default=None),
):
is_admin = check_if_admin_access_token(connector, access_token)
if not is_admin:
return {"error": "Not allowed"}
try:
create_user(connector, user.username, user.password)
except psycopg.errors.UniqueViolation:
return {"error": "Username already exists"}
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/api/updateUser")
async def updateUser(
user: UserDefinition,
access_token: str | None = Header(default=None),
):
is_admin = check_if_admin_access_token(connector, access_token)
if not is_admin:
return {"error": "Not allowed"}
if not user.user_id or not user.password or not user.access_token:
return {"error": "Malformed request"}
try:
update_user(connector, user.user_id, user.password, user.access_token)
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/items/{tableName}")
async def items(
tableName: str,
selector: ItemsFieldSelectorList,
access_token: str | None = Header(default=None),
):
table_info = connector.getTable(tableName)
if not table_info:
return {"error": "Not allowed"}
is_admin = check_if_admin_access_token(connector, access_token)
if table_info["system"] and not is_admin:
return {"error": "Not allowed"}
columns = parse_columns_from_definition(table_info["columns"])
columnsNames = set(column.name for column in columns)
userSelectedColumns = list(set(selector.fields)) if selector.fields else ["*"]
if userSelectedColumns != ["*"]:
for column in userSelectedColumns:
if column not in columnsNames:
return {"error": f"Column {column} not found on table {tableName}"}
else:
userSelectedColumns = columnsNames
user, group = get_user_by_access_token(connector, access_token)
if not user:
return {"error": "Not allowed"}
if not is_admin:
allowedColumns = get_allowed_columns_for_group(
connector, tableName, group.id if group else -1
)
if not allowedColumns:
return {"error": "Not allowed"}
elif len(allowedColumns) == 1 and allowedColumns[0] == "*":
pass
else:
for column in userSelectedColumns:
if column not in allowedColumns:
return {"error": "Not allowed"}
table_items = connector.selectFromTable(
tableName, selector.fields if selector.fields else ["*"]
)
return {"items": table_items}
@app.post("/items/{tableName}/+")
async def itemsCreate(
tableName: str,
item: dict[str, str],
access_token: str | None = Header(default=None),
):
table_info = connector.getTable(tableName)
if not table_info:
return {"error": "Not found"}
is_admin = check_if_admin_access_token(connector, access_token)
if table_info["system"] and not is_admin:
return {"error": "Not allowed"}
user, group = get_user_by_access_token(connector, access_token)
if not is_admin:
allowedColumns = get_allowed_columns_for_group(
connector, tableName, group.id if group else -1
)
if not allowedColumns:
return {"error": "Not allowed"}
elif len(allowedColumns) == 1 and allowedColumns[0] == "*":
pass
else:
for column in item:
if column not in allowedColumns:
return {"error": "Not allowed"}
try:
connector.insertIntoTable(tableName, item)
except psycopg.errors.UndefinedColumn:
return {"error": "Column not found"}
except psycopg.errors.UniqueViolation:
return {"error": "Unique constraint violation"}
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/items/{tableName}/*")
async def itemsUpdate(
tableName: str,
item: dict[str, str],
oldItem: dict[str, str],
access_token: str | None = Header(default=None),
):
table_info = connector.getTable(tableName)
if not table_info:
return {"error": "Not found"}
is_admin = check_if_admin_access_token(connector, access_token)
if table_info["system"] and not is_admin:
return {"error": "Not allowed"}
user, group = get_user_by_access_token(connector, access_token)
if not is_admin:
allowedColumns = get_allowed_columns_for_group(
connector, tableName, group.id if group else -1
)
if not allowedColumns:
return {"error": "Not allowed"}
elif len(allowedColumns) == 1 and allowedColumns[0] == "*":
pass
else:
for column in item:
if column not in allowedColumns:
return {"error": "Not allowed"}
try:
connector.updateDataInTable(
tableName,
[ColumnUpdate(column=c, value=item[c]) for c in item],
[ColumnCondition(column=c, value=oldItem[c]) for c in oldItem],
)
except psycopg.errors.UndefinedColumn:
return {"error": "Column not found"}
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.post("/items/{tableName}/-")
async def itemsDelete(
tableName: str,
deleteWhere: ItemDeletionDefinitionList,
access_token: str | None = Header(default=None),
):
table_info = connector.getTable(tableName)
if not table_info:
return {"error": "Not found"}
is_admin = check_if_admin_access_token(connector, access_token)
if table_info["system"] and not is_admin:
return {"error": "Not allowed"}
user, group = get_user_by_access_token(connector, access_token)
if not is_admin:
allowedColumns = get_allowed_columns_for_group(
connector, tableName, group.id if group else -1
)
if not allowedColumns:
return {"error": "Not allowed"}
elif len(allowedColumns) == 1 and allowedColumns[0] == "*":
pass
else:
return {"error": "Not allowed"}
try:
connector.deleteFromTable(
tableName,
[
ColumnCondition(where.name, where.value, where.isString, where.isLike)
for where in deleteWhere.defs
],
)
except Exception as e:
return {"error": str(e)}
return {"ok": True}
@app.get("/assets/{fid}")
async def getAsset(fid: str, access_token: str | None = Header(default=None)):
asset = get_asset(connector, access_token, fid)
if not asset:
return status.HTTP_404_NOT_FOUND
response: HTTPResponse | None = None
try:
response = minioClient.get_object(BUCKET_NAME, asset.name, version_id=asset.fid)
if response is None:
return status.HTTP_404_NOT_FOUND
return StreamingResponse(
content=io.BytesIO(response.data),
media_type=response.getheader("Content-Type"),
status_code=status.HTTP_200_OK,
)
finally:
if response is not None:
response.close()
response.release_conn()
@app.post("/assets/+")
async def createAsset(
asset: UploadFile,
access_token: str | None = Header(default=None),
):
user, _ = get_user_by_access_token(connector, access_token)
if not user:
return {"error": "Not allowed"}
filename = asset.filename
if not filename:
filename = f"unnamed"
filename = f"{token_hex()}_{filename}"
result: ObjectWriteResult = minioClient.put_object(
BUCKET_NAME,
filename,
data=asset.file,
content_type=(
asset.content_type if asset.content_type else "application/octet-stream"
),
length=asset.size,
)
if not create_asset(connector, filename, "", str(result.version_id)):
return {"error": "Failed to create asset"}
return {"ok": True, "fid": result.version_id}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)