181 lines
5.8 KiB
Python
181 lines
5.8 KiB
Python
import flask
|
|
from configurations import MAILTRAP_CONFIG as mail_config
|
|
from configurations import EMAIL_RECIPIENTS_CONFIG as email_sender_config
|
|
from datetime import datetime
|
|
from typing import Union
|
|
from flask import Flask, jsonify, Response, request
|
|
from flask_mail import Mail, Message
|
|
from monitoring import HttpMonitoringServices
|
|
from queue_services import HttpThreading
|
|
from flask_apscheduler import APScheduler
|
|
from exceptions import NoContentException, BadGatewayException
|
|
|
|
app = Flask(__name__)
|
|
app.config.update(
|
|
MAIL_SERVER=mail_config.get("MAIL_SERVER"),
|
|
MAIL_PORT=mail_config.get("MAIL_PORT"),
|
|
MAIL_USERNAME=mail_config.get("MAIL_USERNAME"),
|
|
MAIL_PASSWORD=mail_config.get("MAIL_PASSWORD"),
|
|
MAIL_USE_TLS=mail_config.get("MAIL_USE_TLS"),
|
|
MAIL_USE_SSL=mail_config.get("MAIL_USE_SSL"),
|
|
)
|
|
mail = Mail(app)
|
|
|
|
scheduler = APScheduler()
|
|
scheduler.init_app(app)
|
|
|
|
|
|
def get_timestamp() -> str:
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
@app.errorhandler(NoContentException)
|
|
def handle_no_content_exception(exc):
|
|
return (
|
|
jsonify(
|
|
{
|
|
"message": exc.message,
|
|
"exception": exc.__class__.__name__,
|
|
"category": "error",
|
|
}
|
|
),
|
|
exc.code,
|
|
)
|
|
|
|
|
|
@app.errorhandler(BadGatewayException)
|
|
def bad_gateway_exception(exc):
|
|
return (
|
|
jsonify(
|
|
{
|
|
"message": exc.message,
|
|
"exception": exc.__class__.__name__,
|
|
"category": "email_sent_error",
|
|
}
|
|
),
|
|
exc.code,
|
|
)
|
|
|
|
|
|
# best bet is use singleton to ensure just
|
|
# one object that controlling the initial resources
|
|
http_services = HttpMonitoringServices()
|
|
|
|
|
|
# bare minimum REST-way just to return the response
|
|
# without any exceptions or validation...
|
|
@scheduler.task("interval", id="hit_apis", seconds=300)
|
|
@app.route("/status", methods=["GET", "POST"])
|
|
def get_http_status() -> Union[Response, tuple[Response, int]]:
|
|
request_body = request.json
|
|
create_endpoint = http_services.set_endpoints(data=request_body)
|
|
|
|
if flask.request.method == "POST":
|
|
if create_endpoint is not None:
|
|
return (
|
|
jsonify(
|
|
message="Success create new target",
|
|
data=create_endpoint,
|
|
category="success",
|
|
),
|
|
201,
|
|
)
|
|
return jsonify(message="Can't create new target", data=None, category="error")
|
|
|
|
if flask.request.method == "GET":
|
|
if http_services.mapping_status_endpoints is None:
|
|
raise NoContentException
|
|
return (
|
|
jsonify(
|
|
message="Success get the response",
|
|
data={
|
|
"title": http_services.get_title,
|
|
"endpoint": http_services.get_endpoint,
|
|
"expected_response": http_services.get_expected_response,
|
|
"actual_response": http_services.get_actual_response,
|
|
"elapsed": f"{http_services.get_elapsed_time:.2f} seconds",
|
|
"is_down": False
|
|
if http_services.get_expected_response == 200
|
|
else True,
|
|
"is_up": True
|
|
if http_services.get_expected_response == 200
|
|
else False,
|
|
"timestamp": get_timestamp(),
|
|
},
|
|
category="success",
|
|
),
|
|
200,
|
|
)
|
|
|
|
|
|
@app.route("/emails", methods=["GET"])
|
|
def send_emails() -> tuple[Response, int]:
|
|
get_sender = email_sender_config.get("EMAIL_SENDER")
|
|
get_subject = email_sender_config.get("EMAIL_SUBJECT")
|
|
get_recipient = email_sender_config.get("EMAIL_RECIPIENT")
|
|
message = Message(
|
|
subject=get_subject,
|
|
sender=get_sender,
|
|
recipients=[get_recipient],
|
|
)
|
|
message.body = (
|
|
"Your target endpoint was down! please check your site, the targeted endpoint that down is : {0}, "
|
|
"with url : {1}".format(http_services.get_title, http_services.get_endpoint)
|
|
)
|
|
|
|
if http_services.get_actual_response != 200:
|
|
return (
|
|
jsonify(
|
|
message="Success to send the notifications to related recipients",
|
|
data=mail.send(message),
|
|
category="email_sent_success",
|
|
),
|
|
200,
|
|
)
|
|
elif http_services.get_actual_response == 200:
|
|
return (
|
|
jsonify(
|
|
message="Your endpoint wasn't down",
|
|
data=f"Status code for that endpoint is : {http_services.get_actual_response}",
|
|
category="success",
|
|
),
|
|
200,
|
|
)
|
|
else:
|
|
raise BadGatewayException()
|
|
|
|
|
|
@app.route("/status/queue", methods=["GET", "POST"])
|
|
def get_http_status_async() -> tuple[Response, int]:
|
|
request_body = request.json
|
|
http_threading_services = HttpThreading(queue_size=request_body)
|
|
get_worker_status = http_threading_services.start_worker_queue(
|
|
worker_thread=request_body, hosts=[http_services.get_endpoint]
|
|
)
|
|
start_time = get_worker_status.get("start_time")
|
|
end_time = get_worker_status.get("end_time")
|
|
queue_status = get_worker_status.get("is_queue")
|
|
|
|
if flask.request.method == "GET":
|
|
return (
|
|
jsonify(
|
|
message="Success get the response",
|
|
data={
|
|
"start_time": f"{start_time:.2f} seconds",
|
|
"end_time": f"{end_time:.2f} seconds",
|
|
"queue_status": queue_status,
|
|
},
|
|
category="success",
|
|
),
|
|
200,
|
|
)
|
|
|
|
if flask.request.method == "POST":
|
|
return jsonify(message="success", category="success", data=request_body), 201
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# change to False if tend to run it on production
|
|
app.run(host="0.0.0.0", port=105, debug=True)
|
|
scheduler.start()
|