http-monitoring/src/monitoring.py
2023-03-11 15:06:35 +07:00

228 lines
8.4 KiB
Python

import time
import json
import requests
import urllib.parse
from datetime import datetime, timedelta
from requests import Response
from urllib3.util.retry import Retry # noqa
from requests.adapters import HTTPAdapter
from typing import Any, Union, Optional, Generator
from configurations import HTTP_MONITORING_CONFIG
from utils.logger import http_logging
from utils.event_emitter import HttpEventEmitter
class TimeoutAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs) -> None:
self.timeout = None
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super(TimeoutAdapter, self).__init__(*args, **kwargs)
def send(self, request, **kwargs) -> requests.Response:
timeout = kwargs.get("timeout")
if timeout is None:
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)
class HttpMonitoringServices:
get_http_config = HTTP_MONITORING_CONFIG
event_emitter = HttpEventEmitter()
def __init__(self) -> None:
self.session_request = requests.Session()
self.log = http_logging()
self.start_time = time.time()
self.end_time = self.start_time - time.time()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session_request.close()
def get(self, url: str) -> Union[Response, Response]:
response = self.session_request.get(url=url, allow_redirects=False)
self.event_emitter.handle_on_request(
"emitted_http_request"
) # for now, just use immutable argument
if response.status_code != self.get_http_config.get("DEFAULT_HTTP_STATUS_CODE"):
response.raise_for_status()
self.log.error(
"something went wrong during the request, see the full response => {0}, {1}, {2}".format(
response.status_code, response.reason, response.headers
)
)
# is this a correct setup to handle the event emitter?
self.event_emitter.handle_emit_request("emitted_http_request")
return response
@staticmethod
def http_response(
title: str,
endpoint: str,
actual_response: int,
expected_response: int,
elapsed_time: float,
) -> dict:
return {
"title": title,
"endpoint": endpoint,
"actual_response": actual_response,
"expected_response": expected_response,
"elapsed_time": elapsed_time,
}
def get_endpoints(self) -> Any:
with open(self.get_http_config.get("DEFAULT_HTTP_CONFIG"), "r") as file:
endpoint_targets = json.load(file)
targets = endpoint_targets.get("targets", [])
if len(endpoint_targets["targets"]) >= self.get_http_config.get(
"MAX_HTTP_TARGET"
):
raise OverflowError("Maximum target of endpoint reached the limit")
for item in targets:
if item.get("title") not in targets[0]["title"]:
raise ValueError("title not in the settings")
elif item.get("endpoint") not in targets[0]["endpoint"]:
raise ValueError("endpoint not in the settings")
return item
def set_endpoints(self, data: dict) -> str:
if not isinstance(data, dict):
raise TypeError("parameter of data must be a pair of key-value object")
with open(
self.get_http_config.get("DEFAULT_HTTP_CONFIG"), "w", encoding="utf-8"
) as file:
json.dump(data, file, ensure_ascii=False, indent=4)
return str(file)
@staticmethod
def retries_request(
retries: int, backoff_factor: int, error_codes: list
) -> Union[HTTPAdapter, HTTPAdapter]:
max_retries = 3
if retries > max_retries:
raise OverflowError("number of retries per session can't be more than 3")
allowed_error_codes = [500, 502, 503, 504]
if error_codes not in allowed_error_codes:
raise TypeError(
"error codes that allowed should be between 500, 502, 503 or 504"
)
retries = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=error_codes,
method_whitelist=["GET", "POST"],
)
retries_adapter = HTTPAdapter(max_retries=retries)
return retries_adapter
def get_status_for_endpoints(self, target: dict, **kwargs) -> http_response:
title = target.get("title")
endpoint = target.get("endpoint")
expected_http_code = target.get(
"expected_http_code"
) or self.get_http_config.get("DEFAULT_HTTP_STATUS_CODE")
timeout = self.get_http_config.get("DEFAULT_HTTP_TIMEOUT") or kwargs["timeout"]
max_retries = self.get_http_config.get("DEFAULT_HTTP_RETRIES")
backoff_factor = self.get_http_config.get("DEFAULT_BACKOFF_FACTOR")
error_codes = self.get_http_config.get("DEFAULT_ERROR_CODES")
try:
timeout_adapter = TimeoutAdapter(timeout=timeout)
retries_adapter = self.retries_request(
max_retries, backoff_factor, error_codes
)
if urllib.parse.urlparse(endpoint).scheme == "https":
# to be fair, im not sure if we can mounted the adapter consecutively
self.session_request.mount("https", timeout_adapter)
self.session_request.mount("https", retries_adapter)
else:
raise ValueError(
"Can't mounted into that url because still using HTTP scheme"
)
response = self.get(url=endpoint)
except requests.ConnectTimeout as e:
raise Exception(
"HTTP connection got timeout, see the trace :: {}".format(e)
)
except requests.ConnectionError as e:
raise Exception("HTTP connection error, see the trace :: {}".format(e))
elapsed_time = time.time() - self.start_time
return self.http_response(
title=title,
endpoint=endpoint,
actual_response=response.status_code,
expected_response=expected_http_code,
elapsed_time=elapsed_time,
)
@property
def mapping_status_endpoints(self) -> dict:
targets = self.get_endpoints()
get_all_status = self.get_status_for_endpoints(targets)
return get_all_status
@property
def get_actual_response(self) -> Optional[Any]:
return self.mapping_status_endpoints.get("actual_response")
@property
def get_expected_response(self) -> Optional[Any]:
return self.mapping_status_endpoints.get("expected_response")
@property
def get_title(self) -> Optional[str]:
return self.mapping_status_endpoints.get("title")
@property
def get_endpoint(self) -> Optional[str]:
return self.mapping_status_endpoints.get("endpoint")
@property
def get_elapsed_time(self) -> Optional[Any]:
return self.mapping_status_endpoints.get("elapsed_time")
@property
def past_24_hours(self) -> int:
# tricky solution, particular i want to convert a
# datetime within past 24 hours to get how many
# APIs that success or failure, and then count them altogether
start = datetime.now()
int_convert = start.strftime("%Y%m%d%H%M%S")
end = timedelta(days=1).total_seconds()
return 1 * int(int_convert) - int(end)
def success_generator(self) -> Generator:
# need generator expression in order to iterate
# throughout response status code that success
is_success = [True if self.get_actual_response == 200 else False]
for index in range(5):
yield is_success
def failed_generator(self) -> Generator:
is_failed = [True if self.get_actual_response != 200 else False]
for index in range(self.past_24_hours):
yield is_failed
@property
def success_metrics(self) -> float:
map_success_events = [index for index in self.success_generator()]
threshold_of_success = 100
return (len(map_success_events) / threshold_of_success) * 100
@property
def false_metrics(self) -> float:
map_failed_events = [index for index in self.failed_generator()]
threshold_of_failed = 100
return (len(map_failed_events) / threshold_of_failed) * 100