http-monitoring/src/app.py
2023-02-05 09:54:59 +07:00

181 lines
5.8 KiB
Python

import flask
from configurations import MAILTRAP_CONFIG as mail_config
from configurations import EMAIL_RECIPIENTS_CONFIG as email_sender_config
from datetime import datetime
from typing import Union
from flask import Flask, jsonify, Response, request
from flask_mail import Mail, Message
from monitoring import HttpMonitoringServices
from queue_services import HttpThreading
from flask_apscheduler import APScheduler
from exceptions import NoContentException, BadGatewayException
app = Flask(__name__)
app.config.update(
MAIL_SERVER=mail_config.get("MAIL_SERVER"),
MAIL_PORT=mail_config.get("MAIL_PORT"),
MAIL_USERNAME=mail_config.get("MAIL_USERNAME"),
MAIL_PASSWORD=mail_config.get("MAIL_PASSWORD"),
MAIL_USE_TLS=mail_config.get("MAIL_USE_TLS"),
MAIL_USE_SSL=mail_config.get("MAIL_USE_SSL"),
)
mail = Mail(app)
scheduler = APScheduler()
scheduler.init_app(app)
def get_timestamp() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@app.errorhandler(NoContentException)
def handle_no_content_exception(exc):
return (
jsonify(
{
"message": exc.message,
"exception": exc.__class__.__name__,
"category": "error",
}
),
exc.code,
)
@app.errorhandler(BadGatewayException)
def bad_gateway_exception(exc):
return (
jsonify(
{
"message": exc.message,
"exception": exc.__class__.__name__,
"category": "email_sent_error",
}
),
exc.code,
)
# best bet is use singleton to ensure just
# one object that controlling the initial resources
http_services = HttpMonitoringServices()
# bare minimum REST-way just to return the response
# without any exceptions or validation...
@scheduler.task("interval", id="hit_apis", seconds=300)
@app.route("/status", methods=["GET", "POST"])
def get_http_status() -> Union[Response, tuple[Response, int]]:
request_body = request.json
create_endpoint = http_services.set_endpoints(data=request_body)
if flask.request.method == "POST":
if create_endpoint is not None:
return (
jsonify(
message="Success create new target",
data=create_endpoint,
category="success",
),
201,
)
return jsonify(message="Can't create new target", data=None, category="error")
if flask.request.method == "GET":
if http_services.mapping_status_endpoints is None:
raise NoContentException
return (
jsonify(
message="Success get the response",
data={
"title": http_services.get_title,
"endpoint": http_services.get_endpoint,
"expected_response": http_services.get_expected_response,
"actual_response": http_services.get_actual_response,
"elapsed": f"{http_services.get_elapsed_time:.2f} seconds",
"is_down": False
if http_services.get_expected_response == 200
else True,
"is_up": True
if http_services.get_expected_response == 200
else False,
"timestamp": get_timestamp(),
},
category="success",
),
200,
)
@app.route("/emails", methods=["GET"])
def send_emails() -> tuple[Response, int]:
get_sender = email_sender_config.get("EMAIL_SENDER")
get_subject = email_sender_config.get("EMAIL_SUBJECT")
get_recipient = email_sender_config.get("EMAIL_RECIPIENT")
message = Message(
subject=get_subject,
sender=get_sender,
recipients=[get_recipient],
)
message.body = (
"Your target endpoint was down! please check your site, the targeted endpoint that down is : {0}, "
"with url : {1}".format(http_services.get_title, http_services.get_endpoint)
)
if http_services.get_actual_response != 200:
return (
jsonify(
message="Success to send the notifications to related recipients",
data=mail.send(message),
category="email_sent_success",
),
200,
)
elif http_services.get_actual_response == 200:
return (
jsonify(
message="Your endpoint wasn't down",
data=f"Status code for that endpoint is : {http_services.get_actual_response}",
category="success",
),
200,
)
else:
raise BadGatewayException()
@app.route("/status/queue", methods=["GET", "POST"])
def get_http_status_async() -> tuple[Response, int]:
request_body = request.json
http_threading_services = HttpThreading(queue_size=request_body)
get_worker_status = http_threading_services.start_worker_queue(
worker_thread=request_body, hosts=[http_services.get_endpoint]
)
start_time = get_worker_status.get("start_time")
end_time = get_worker_status.get("end_time")
queue_status = get_worker_status.get("is_queue")
if flask.request.method == "GET":
return (
jsonify(
message="Success get the response",
data={
"start_time": f"{start_time:.2f} seconds",
"end_time": f"{end_time:.2f} seconds",
"queue_status": queue_status,
},
category="success",
),
200,
)
if flask.request.method == "POST":
return jsonify(message="success", category="success", data=request_body), 201
if __name__ == "__main__":
# change to False if tend to run it on production
app.run(host="0.0.0.0", port=105, debug=True)
scheduler.start()