228 lines
8.4 KiB
Python
228 lines
8.4 KiB
Python
import time
|
|
import json
|
|
import requests
|
|
import urllib.parse
|
|
from datetime import datetime, timedelta
|
|
from requests import Response
|
|
from urllib3.util.retry import Retry # noqa
|
|
from requests.adapters import HTTPAdapter
|
|
from typing import Any, Union, Optional, Generator
|
|
from configurations import HTTP_MONITORING_CONFIG
|
|
from utils.logger import http_logging
|
|
from utils.event_emitter import HttpEventEmitter
|
|
|
|
|
|
class TimeoutAdapter(HTTPAdapter):
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
self.timeout = None
|
|
if "timeout" in kwargs:
|
|
self.timeout = kwargs["timeout"]
|
|
del kwargs["timeout"]
|
|
super(TimeoutAdapter, self).__init__(*args, **kwargs)
|
|
|
|
def send(self, request, **kwargs) -> requests.Response:
|
|
timeout = kwargs.get("timeout")
|
|
if timeout is None:
|
|
kwargs["timeout"] = self.timeout
|
|
return super().send(request, **kwargs)
|
|
|
|
|
|
class HttpMonitoringServices:
|
|
get_http_config = HTTP_MONITORING_CONFIG
|
|
event_emitter = HttpEventEmitter()
|
|
|
|
def __init__(self) -> None:
|
|
self.session_request = requests.Session()
|
|
self.log = http_logging()
|
|
self.start_time = time.time()
|
|
self.end_time = self.start_time - time.time()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.session_request.close()
|
|
|
|
def get(self, url: str) -> Union[Response, Response]:
|
|
response = self.session_request.get(url=url, allow_redirects=False)
|
|
self.event_emitter.handle_on_request(
|
|
"emitted_http_request"
|
|
) # for now, just use immutable argument
|
|
if response.status_code != self.get_http_config.get("DEFAULT_HTTP_STATUS_CODE"):
|
|
response.raise_for_status()
|
|
self.log.error(
|
|
"something went wrong during the request, see the full response => {0}, {1}, {2}".format(
|
|
response.status_code, response.reason, response.headers
|
|
)
|
|
)
|
|
# is this a correct setup to handle the event emitter?
|
|
self.event_emitter.handle_emit_request("emitted_http_request")
|
|
return response
|
|
|
|
@staticmethod
|
|
def http_response(
|
|
title: str,
|
|
endpoint: str,
|
|
actual_response: int,
|
|
expected_response: int,
|
|
elapsed_time: float,
|
|
) -> dict:
|
|
return {
|
|
"title": title,
|
|
"endpoint": endpoint,
|
|
"actual_response": actual_response,
|
|
"expected_response": expected_response,
|
|
"elapsed_time": elapsed_time,
|
|
}
|
|
|
|
def get_endpoints(self) -> Any:
|
|
with open(self.get_http_config.get("DEFAULT_HTTP_CONFIG"), "r") as file:
|
|
endpoint_targets = json.load(file)
|
|
targets = endpoint_targets.get("targets", [])
|
|
|
|
if len(endpoint_targets["targets"]) >= self.get_http_config.get(
|
|
"MAX_HTTP_TARGET"
|
|
):
|
|
raise OverflowError("Maximum target of endpoint reached the limit")
|
|
|
|
for item in targets:
|
|
if item.get("title") not in targets[0]["title"]:
|
|
raise ValueError("title not in the settings")
|
|
elif item.get("endpoint") not in targets[0]["endpoint"]:
|
|
raise ValueError("endpoint not in the settings")
|
|
return item
|
|
|
|
def set_endpoints(self, data: dict) -> str:
|
|
if not isinstance(data, dict):
|
|
raise TypeError("parameter of data must be a pair of key-value object")
|
|
|
|
with open(
|
|
self.get_http_config.get("DEFAULT_HTTP_CONFIG"), "w", encoding="utf-8"
|
|
) as file:
|
|
json.dump(data, file, ensure_ascii=False, indent=4)
|
|
return str(file)
|
|
|
|
@staticmethod
|
|
def retries_request(
|
|
retries: int, backoff_factor: int, error_codes: list
|
|
) -> Union[HTTPAdapter, HTTPAdapter]:
|
|
max_retries = 3
|
|
if retries > max_retries:
|
|
raise OverflowError("number of retries per session can't be more than 3")
|
|
|
|
allowed_error_codes = [500, 502, 503, 504]
|
|
if error_codes not in allowed_error_codes:
|
|
raise TypeError(
|
|
"error codes that allowed should be between 500, 502, 503 or 504"
|
|
)
|
|
|
|
retries = Retry(
|
|
total=retries,
|
|
backoff_factor=backoff_factor,
|
|
status_forcelist=error_codes,
|
|
method_whitelist=["GET", "POST"],
|
|
)
|
|
retries_adapter = HTTPAdapter(max_retries=retries)
|
|
return retries_adapter
|
|
|
|
def get_status_for_endpoints(self, target: dict, **kwargs) -> http_response:
|
|
title = target.get("title")
|
|
endpoint = target.get("endpoint")
|
|
expected_http_code = target.get(
|
|
"expected_http_code"
|
|
) or self.get_http_config.get("DEFAULT_HTTP_STATUS_CODE")
|
|
timeout = self.get_http_config.get("DEFAULT_HTTP_TIMEOUT") or kwargs["timeout"]
|
|
max_retries = self.get_http_config.get("DEFAULT_HTTP_RETRIES")
|
|
backoff_factor = self.get_http_config.get("DEFAULT_BACKOFF_FACTOR")
|
|
error_codes = self.get_http_config.get("DEFAULT_ERROR_CODES")
|
|
|
|
try:
|
|
timeout_adapter = TimeoutAdapter(timeout=timeout)
|
|
retries_adapter = self.retries_request(
|
|
max_retries, backoff_factor, error_codes
|
|
)
|
|
if urllib.parse.urlparse(endpoint).scheme == "https":
|
|
# to be fair, im not sure if we can mounted the adapter consecutively
|
|
self.session_request.mount("https", timeout_adapter)
|
|
self.session_request.mount("https", retries_adapter)
|
|
else:
|
|
raise ValueError(
|
|
"Can't mounted into that url because still using HTTP scheme"
|
|
)
|
|
response = self.get(url=endpoint)
|
|
except requests.ConnectTimeout as e:
|
|
raise Exception(
|
|
"HTTP connection got timeout, see the trace :: {}".format(e)
|
|
)
|
|
except requests.ConnectionError as e:
|
|
raise Exception("HTTP connection error, see the trace :: {}".format(e))
|
|
elapsed_time = time.time() - self.start_time
|
|
|
|
return self.http_response(
|
|
title=title,
|
|
endpoint=endpoint,
|
|
actual_response=response.status_code,
|
|
expected_response=expected_http_code,
|
|
elapsed_time=elapsed_time,
|
|
)
|
|
|
|
@property
|
|
def mapping_status_endpoints(self) -> dict:
|
|
targets = self.get_endpoints()
|
|
get_all_status = self.get_status_for_endpoints(targets)
|
|
return get_all_status
|
|
|
|
@property
|
|
def get_actual_response(self) -> Optional[Any]:
|
|
return self.mapping_status_endpoints.get("actual_response")
|
|
|
|
@property
|
|
def get_expected_response(self) -> Optional[Any]:
|
|
return self.mapping_status_endpoints.get("expected_response")
|
|
|
|
@property
|
|
def get_title(self) -> Optional[str]:
|
|
return self.mapping_status_endpoints.get("title")
|
|
|
|
@property
|
|
def get_endpoint(self) -> Optional[str]:
|
|
return self.mapping_status_endpoints.get("endpoint")
|
|
|
|
@property
|
|
def get_elapsed_time(self) -> Optional[Any]:
|
|
return self.mapping_status_endpoints.get("elapsed_time")
|
|
|
|
@property
|
|
def past_24_hours(self) -> int:
|
|
# tricky solution, particular i want to convert a
|
|
# datetime within past 24 hours to get how many
|
|
# APIs that success or failure, and then count them altogether
|
|
start = datetime.now()
|
|
int_convert = start.strftime("%Y%m%d%H%M%S")
|
|
end = timedelta(days=1).total_seconds()
|
|
return 1 * int(int_convert) - int(end)
|
|
|
|
def success_generator(self) -> Generator:
|
|
# need generator expression in order to iterate
|
|
# throughout response status code that success
|
|
is_success = [True if self.get_actual_response == 200 else False]
|
|
for index in range(5):
|
|
yield is_success
|
|
|
|
def failed_generator(self) -> Generator:
|
|
is_failed = [True if self.get_actual_response != 200 else False]
|
|
for index in range(self.past_24_hours):
|
|
yield is_failed
|
|
|
|
@property
|
|
def success_metrics(self) -> float:
|
|
map_success_events = [index for index in self.success_generator()]
|
|
threshold_of_success = 100
|
|
return (len(map_success_events) / threshold_of_success) * 100
|
|
|
|
@property
|
|
def false_metrics(self) -> float:
|
|
map_failed_events = [index for index in self.failed_generator()]
|
|
threshold_of_failed = 100
|
|
return (len(map_failed_events) / threshold_of_failed) * 100
|