diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index e2d38cc..5f76c20 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -50,26 +50,32 @@ def __init__( self.started_at = started_at self.operate_from = operate_from - def to_dict(self): + def to_dict(self, include_context_data=True, include_output=True): from faas_scheduler.utils import datetime_to_isoformat_timestr - return { + entry = { "id": self.id, "dtable_uuid": self.dtable_uuid, "owner": self.owner, "script_name": self.script_name, - "context_data": ( - json.loads(self.context_data) if self.context_data else None - ), "started_at": datetime_to_isoformat_timestr(self.started_at), "finished_at": self.finished_at and datetime_to_isoformat_timestr(self.finished_at), "success": self.success, "return_code": self.return_code, - "output": self.output, "operate_from": self.operate_from, } + if include_context_data: + entry["context_data"] = ( + json.loads(self.context_data) if self.context_data else None + ) + + if include_output: + entry["output"] = self.output + + return entry + class DTableRunScriptStatistics(Base): __tablename__ = "dtable_run_script_statistics" diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index f43af3d..331a3f1 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -3,10 +3,12 @@ import logging import requests from datetime import datetime +from typing import List, Optional, Tuple from uuid import UUID from tzlocal import get_localzone -from sqlalchemy import desc, text +from sqlalchemy import case, desc, func, text +from sqlalchemy.orm import load_only from faas_scheduler.models import ScriptLog import sys @@ -496,6 +498,177 @@ def get_run_script_statistics_by_month( return month.strftime("%Y-%m"), total_count, results +def get_script_runs( + db_session, org_id, base_uuid, start, end, page, per_page +) -> Tuple[List[ScriptLog], int]: + fields = [ + ScriptLog.id, + ScriptLog.dtable_uuid, + ScriptLog.owner, + ScriptLog.org_id, + ScriptLog.script_name, + ScriptLog.started_at, + ScriptLog.finished_at, + ScriptLog.success, + ScriptLog.return_code, + ScriptLog.operate_from, + ] + query = db_session.query(ScriptLog).options(load_only(*fields)) + + if org_id: + query = query.filter_by(org_id=org_id) + + if base_uuid: + query = query.filter_by(dtable_uuid=base_uuid) + + if start: + query = query.filter(ScriptLog.started_at >= start) + + if end: + query = query.filter(ScriptLog.started_at <= end) + + total_count = query.count() + runs = query.limit(per_page).offset((page - 1) * per_page).all() + + return runs, total_count + + +def get_statistics_grouped_by_base( + db_session, + org_id: int, + start: Optional[datetime], + end: Optional[datetime], + page: int, + per_page: int, +) -> Tuple[List[dict], int]: + # pylint: disable=E1102 + # False positive caused by https://github.com/pylint-dev/pylint/issues/8138 + + fields = [ + ScriptLog.dtable_uuid, + func.count(ScriptLog.id).label("number_of_runs"), + # This calls MariaDB's TIMESTAMPDIFF() function with microsecond precision to prevent rounding errors + # Note: Scripts that haven't finished yet are simply ignored + func.sum( + func.timestampdiff( + text("MICROSECOND"), ScriptLog.started_at, ScriptLog.finished_at + ) + / 1_000_000 + ).label("total_run_time"), + func.count(case((ScriptLog.operate_from == "manualy", 1))).label( + "triggered_manually" + ), + func.count(case((ScriptLog.operate_from == "automation-rule", 1))).label( + "triggered_by_automation_rule" + ), + func.count(case((ScriptLog.success == True, 1))).label("successful_runs"), + func.count(case((ScriptLog.success == False, 1))).label("unsuccessful_runs"), + ] + + query = ( + db_session.query(*fields) + .filter_by(org_id=org_id) + .group_by(ScriptLog.dtable_uuid) + ) + + if start: + query = query.filter(ScriptLog.started_at >= start) + + if end: + query = query.filter(ScriptLog.started_at <= end) + + total_count = query.count() + rows = query.limit(per_page).offset((page - 1) * per_page).all() + + results = [] + + for row in rows: + results.append( + { + "base_uuid": row.dtable_uuid, + "number_of_runs": row.number_of_runs, + # int() is required since MariaDB returns total_run_time as a string + "total_run_time": int(row.total_run_time), + "triggered_manually": row.triggered_manually, + "triggered_by_automation_rule": row.triggered_by_automation_rule, + "successful_runs": row.successful_runs, + "unsuccessful_runs": row.unsuccessful_runs, + } + ) + + return results, total_count + + +def get_statistics_grouped_by_day( + db_session, + org_id: int, + base_uuid: Optional[str], + start: Optional[datetime], + end: Optional[datetime], + page: int, + per_page: int, +) -> Tuple[List[dict], int]: + # pylint: disable=E1102 + # False positive caused by https://github.com/pylint-dev/pylint/issues/8138 + + fields = [ + func.date(ScriptLog.started_at).label("date"), + func.count(ScriptLog.id).label("number_of_runs"), + # This calls MariaDB's TIMESTAMPDIFF() function with microsecond precision to prevent rounding errors + # Note: Scripts that haven't finished yet are simply ignored + func.sum( + func.timestampdiff( + text("MICROSECOND"), ScriptLog.started_at, ScriptLog.finished_at + ) + / 1_000_000 + ).label("total_run_time"), + func.count(case((ScriptLog.operate_from == "manualy", 1))).label( + "triggered_manually" + ), + func.count(case((ScriptLog.operate_from == "automation-rule", 1))).label( + "triggered_by_automation_rule" + ), + func.count(case((ScriptLog.success == True, 1))).label("successful_runs"), + func.count(case((ScriptLog.success == False, 1))).label("unsuccessful_runs"), + ] + + query = ( + db_session.query(*fields) + .filter_by(org_id=org_id) + .group_by(func.date(ScriptLog.started_at)) + ) + + if base_uuid: + query = query.filter(ScriptLog.dtable_uuid == base_uuid) + + if start: + query = query.filter(ScriptLog.started_at >= start) + + if end: + query = query.filter(ScriptLog.started_at <= end) + + total_count = query.count() + rows = query.limit(per_page).offset((page - 1) * per_page).all() + + results = [] + + for row in rows: + results.append( + { + "date": row.date.strftime("%Y-%m-%d"), + "number_of_runs": row.number_of_runs, + # int() is required since MariaDB returns total_run_time as a string + "total_run_time": int(row.total_run_time), + "triggered_manually": row.triggered_manually, + "triggered_by_automation_rule": row.triggered_by_automation_rule, + "successful_runs": row.successful_runs, + "unsuccessful_runs": row.unsuccessful_runs, + } + ) + + return results, total_count + + def datetime_to_isoformat_timestr(datetime_obj): if not datetime_obj: return "" @@ -510,6 +683,14 @@ def datetime_to_isoformat_timestr(datetime_obj): return "" +def is_date_yyyy_mm_dd(value: str) -> bool: + try: + datetime.strptime(value, "%Y-%m-%d") + return True + except ValueError: + return False + + def uuid_str_to_32_chars(uuid_str): return uuid_str.replace("-", "") diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index 77feb16..fd1047e 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -5,7 +5,7 @@ import os import json import logging -from datetime import datetime +from datetime import datetime, timedelta from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer from concurrent.futures import ThreadPoolExecutor @@ -13,6 +13,10 @@ from database import DBSession from faas_scheduler.utils import ( check_auth_token, + get_script_runs, + get_statistics_grouped_by_base, + get_statistics_grouped_by_day, + is_date_yyyy_mm_dd, run_script, get_script, add_script, @@ -385,6 +389,194 @@ def base_run_python_statistics(): return get_scripts_running_statistics_by_request(request, target="base") +# List all runs +@app.route("/admin/runs/", methods=["GET"]) +def list_runs(): + if not check_auth_token(request): + return make_response(("Forbidden: the auth token is not correct.", 403)) + + # org_id and base_uuid are optional + org_id = request.args.get("org_id") + base_uuid = request.args.get("base_uuid") + + if request.args.get("start"): + try: + start = datetime.fromisoformat(request.args.get("start")) + except ValueError: + return {"error": "Invalid value for start parameter"}, 400 + else: + start = None + + if request.args.get("end"): + try: + end = datetime.fromisoformat(request.args.get("end")) + except ValueError: + return {"error": "Invalid value for end parameter"}, 400 + + if is_date_yyyy_mm_dd(request.args.get("end")): + # If a plain date was passed in (i.e. without time information), + # we need to add 1 day to ensure that the results for the last day are included + end += timedelta(days=1) + else: + end = None + + try: + page = int(request.args.get("page", "1")) + except ValueError: + return {"error": "page must be an integer"}, 400 + + try: + per_page = int(request.args.get("per_page", "100")) + except ValueError: + return {"error": "per_page must be an integer"}, 400 + + if per_page > 1000: + return {"error": "per_page cannot be greater than 1000"}, 400 + + db_session = DBSession() + + try: + runs, total_count = get_script_runs( + db_session, org_id, base_uuid, start, end, page, per_page + ) + except Exception as e: + logger.exception(e) + return make_response(("Internal server error", 500)) + finally: + db_session.close() + + runs = [r.to_dict(include_context_data=False, include_output=False) for r in runs] + + return {"runs": runs, "total_count": total_count} + + +# Get run statistics grouped by base UUID +@app.route("/admin/statistics/by-base/", methods=["GET"]) +def get_run_statistics_grouped_by_base(): + if not check_auth_token(request): + return make_response(("Forbidden: the auth token is not correct.", 403)) + + org_id = request.args.get("org_id") + if not org_id: + return {"error": "org_id is required"}, 400 + + if request.args.get("start"): + try: + start = datetime.fromisoformat(request.args.get("start")) + except ValueError: + return {"error": "Invalid value for start parameter"}, 400 + else: + start = None + + if request.args.get("end"): + try: + end = datetime.fromisoformat(request.args.get("end")) + except ValueError: + return {"error": "Invalid value for end parameter"}, 400 + + if is_date_yyyy_mm_dd(request.args.get("end")): + # If a plain date was passed in (i.e. without time information), + # we need to add 1 day to ensure that the results for the last day are included + end += timedelta(days=1) + else: + end = None + + try: + page = int(request.args.get("page", "1")) + except ValueError: + return {"error": "page must be an integer"}, 400 + + try: + per_page = int(request.args.get("per_page", "100")) + except ValueError: + return {"error": "per_page must be an integer"}, 400 + + if per_page > 1000: + return {"error": "per_page cannot be greater than 1000"}, 400 + + db_session = DBSession() + + try: + results, total_count = get_statistics_grouped_by_base( + db_session, org_id, start, end, page, per_page + ) + except Exception as e: + logger.exception(e) + return make_response(("Internal server error", 500)) + finally: + db_session.close() + + return {"results": results, "total_count": total_count} + + +# Get run statistics grouped by day +@app.route("/admin/statistics/by-day/", methods=["GET"]) +def get_run_statistics_grouped_by_day(): + if not check_auth_token(request): + return make_response(("Forbidden: the auth token is not correct.", 403)) + + org_id = request.args.get("org_id") + if not org_id: + return {"error": "org_id is required"}, 400 + + # base_uuid is optional + base_uuid = request.args.get("base_uuid") + + if request.args.get("start"): + try: + start = datetime.fromisoformat(request.args.get("start")) + except ValueError: + return {"error": "Invalid value for start parameter"}, 400 + else: + start = None + + if request.args.get("end"): + try: + end = datetime.fromisoformat(request.args.get("end")) + except ValueError: + return {"error": "Invalid value for end parameter"}, 400 + + if is_date_yyyy_mm_dd(request.args.get("end")): + # If a plain date was passed in (i.e. without time information), + # we need to add 1 day to ensure that the results for the last day are included + end += timedelta(days=1) + else: + end = None + + try: + page = int(request.args.get("page", "1")) + except ValueError: + return {"error": "page must be an integer"}, 400 + + try: + per_page = int(request.args.get("per_page", "100")) + except ValueError: + return {"error": "per_page must be an integer"}, 400 + + if per_page > 1000: + return {"error": "per_page cannot be greater than 1000"}, 400 + + db_session = DBSession() + + try: + results, total_count = get_statistics_grouped_by_day( + db_session=db_session, + org_id=org_id, + base_uuid=base_uuid, + start=start, + end=end, + page=page, + per_page=per_page, + ) + except Exception as e: + logger.exception(e) + return make_response(("Internal server error", 500)) + finally: + db_session.close() + + return {"results": results, "total_count": total_count} + + if __name__ == "__main__": http_server = WSGIServer(("127.0.0.1", 5055), app) http_server.serve_forever()