File size: 4,043 Bytes
f51bb92 1e2550f 4fc2bf8 1e2550f f51bb92 4fc2bf8 f51bb92 4fc2bf8 f51bb92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
import aiohttp
from aiohttp import ClientSession
import asyncio
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urldefrag
from modules.config.constants import TIMEOUT
class WebpageCrawler:
def __init__(self):
self.dict_href_links = {}
async def fetch(self, session: ClientSession, url: str) -> str:
async with session.get(url) as response:
try:
return await response.text()
except UnicodeDecodeError:
return await response.text(encoding="latin1")
def url_exists(self, url: str) -> bool:
try:
response = requests.head(url, timeout=TIMEOUT)
return response.status_code == 200
except requests.ConnectionError:
return False
async def get_links(self, session: ClientSession, website_link: str, base_url: str):
html_data = await self.fetch(session, website_link)
soup = BeautifulSoup(html_data, "html.parser")
list_links = []
for link in soup.find_all("a", href=True):
href = link["href"].strip()
full_url = urljoin(base_url, href)
normalized_url = self.normalize_url(full_url) # sections removed
if (
normalized_url not in self.dict_href_links
and self.is_child_url(normalized_url, base_url)
and self.url_exists(normalized_url)
):
self.dict_href_links[normalized_url] = None
list_links.append(normalized_url)
return list_links
async def get_subpage_links(
self, session: ClientSession, urls: list, base_url: str
):
tasks = [self.get_links(session, url, base_url) for url in urls]
results = await asyncio.gather(*tasks)
all_links = [link for sublist in results for link in sublist]
return all_links
async def get_all_pages(self, url: str, base_url: str):
async with aiohttp.ClientSession() as session:
dict_links = {url: "Not-checked"}
counter = None
while counter != 0:
unchecked_links = [
link
for link, status in dict_links.items()
if status == "Not-checked"
]
if not unchecked_links:
break
new_links = await self.get_subpage_links(
session, unchecked_links, base_url
)
for link in unchecked_links:
dict_links[link] = "Checked"
dict_links.update(
{
link: "Not-checked"
for link in new_links
if link not in dict_links
}
)
counter = len(
[
status
for status in dict_links.values()
if status == "Not-checked"
]
)
checked_urls = [
url for url, status in dict_links.items() if status == "Checked"
]
return checked_urls
def is_webpage(self, url: str) -> bool:
try:
response = requests.head(url, allow_redirects=True, timeout=TIMEOUT)
content_type = response.headers.get("Content-Type", "").lower()
return "text/html" in content_type
except requests.RequestException:
return False
def clean_url_list(self, urls):
files, webpages = [], []
for url in urls:
if self.is_webpage(url):
webpages.append(url)
else:
files.append(url)
return files, webpages
def is_child_url(self, url, base_url):
return url.startswith(base_url)
def normalize_url(self, url: str):
# Strip the fragment identifier
defragged_url, _ = urldefrag(url)
return defragged_url
|