http-monitoring/src/queue_services.py
2023-01-27 10:28:07 +07:00

54 lines
1.8 KiB
Python

import threading
import requests
from typing import Any
from queue import Queue
from monitoring import HttpMonitoringServices
class HttpThreading(HttpMonitoringServices):
def __init__(self, queue_size: dict = None) -> None:
super().__init__()
if queue_size.get("queue_size") == 0:
queue_size["queue_size"] = 5
self.queue = Queue(maxsize=queue_size.get("queue_size"))
def fetch_multiple_urls(self, tasks_queue: Queue) -> Any:
while True:
try:
self.get(url=tasks_queue.get())
except requests.ConnectTimeout as e:
raise Exception(
"HTTP connection got timeout, see the trace :: {0}".format(e)
)
except requests.ConnectionError as e:
raise Exception("HTTP connection error, see the trace :: {0}".format(e))
finally:
tasks_queue.task_done()
def mapping(self, hosts: list[str]) -> None:
for host in hosts:
self.queue.put(host)
self.queue.join()
def start_worker_queue(self, worker_thread: dict, hosts: list[str]) -> dict:
if worker_thread.get("worker_thread") == 0:
worker_thread["worker_thread"] = 3
if hosts is None:
raise ValueError("hosts of url can't be empty or None-type object")
for _ in range(worker_thread.get("worker_thread")):
worker = threading.Thread(
target=self.fetch_multiple_urls, args=(self.queue,)
)
worker.daemon = True
worker.start()
self.mapping(hosts=hosts)
if self.queue.not_empty:
return {
"start_time": self.start_time,
"end_time": self.end_time,
"is_queue": False if self.queue.not_empty else True,
}