|
import advertools as adv |
|
import streamlit as st |
|
import tempfile |
|
import pandas as pd |
|
from urllib.parse import urlparse |
|
import base64 |
|
import requests |
|
import time |
|
from bs4 import BeautifulSoup |
|
import re |
|
|
|
def get_seo_powersuite_data(domains, api_key): |
|
url_domain_inlink_rank = "https://api.seopowersuite.com/backlinks/v1.0/get-domain-inlink-rank" |
|
url_refdomains_count = "https://api.seopowersuite.com/backlinks/v1.0/get-refdomains-count" |
|
headers = {"Content-Type": "application/json"} |
|
|
|
results = [] |
|
for i in range(0, len(domains), 100): |
|
batch_domains = domains[i:i+100] |
|
|
|
|
|
start_time = time.time() |
|
payload_domain_inlink_rank = {"target": list(batch_domains)} |
|
params_domain_inlink_rank = {"apikey": api_key, "output": "json"} |
|
response_domain_inlink_rank = requests.post(url_domain_inlink_rank, json=payload_domain_inlink_rank, headers=headers, params=params_domain_inlink_rank) |
|
duration = time.time() - start_time |
|
print(f"get-domain-inlink-rank API call for {len(batch_domains)} domains took {duration:.2f} seconds") |
|
|
|
if response_domain_inlink_rank.status_code == 200: |
|
data_domain_inlink_rank = response_domain_inlink_rank.json() |
|
domain_inlink_rank_dict = {page["url"]: page["domain_inlink_rank"] for page in data_domain_inlink_rank["pages"]} |
|
else: |
|
st.error(f"Error fetching domain inlink rank data from SEO PowerSuite API: {response_domain_inlink_rank.status_code}") |
|
st.error("Error Response:") |
|
st.write(response_domain_inlink_rank.text) |
|
return None |
|
|
|
|
|
start_time = time.time() |
|
payload_refdomains_count = {"target": list(batch_domains), "mode": "domain"} |
|
params_refdomains_count = {"apikey": api_key, "output": "json"} |
|
response_refdomains_count = requests.post(url_refdomains_count, json=payload_refdomains_count, headers=headers, params=params_refdomains_count) |
|
duration = time.time() - start_time |
|
print(f"get-refdomains-count API call for {len(batch_domains)} domains took {duration:.2f} seconds") |
|
|
|
if response_refdomains_count.status_code == 200: |
|
data_refdomains_count = response_refdomains_count.json() |
|
for metric in data_refdomains_count["metrics"]: |
|
result = { |
|
"target": metric["target"], |
|
"domain_inlink_rank": domain_inlink_rank_dict.get(metric["target"], None), |
|
"refdomains": metric["refdomains"] |
|
} |
|
results.append(result) |
|
else: |
|
st.error(f"Error fetching refdomains count data from SEO PowerSuite API: {response_refdomains_count.status_code}") |
|
st.error("Error Response:") |
|
st.write(response_refdomains_count.text) |
|
return None |
|
|
|
return pd.DataFrame(results) |
|
|
|
def get_peter_lowe_domains(): |
|
url = "https://pgl.yoyo.org/adservers/serverlist.php?hostformat=adblockplus&mimetype=plaintext" |
|
response = requests.get(url) |
|
lines = response.text.split('\n') |
|
domains = [line.strip('|^') for line in lines if line.startswith('||')] |
|
return set(domains) |
|
|
|
def extract_hostname(url): |
|
return urlparse(url).netloc |
|
|
|
def remove_subdomain(domain): |
|
parts = domain.split('.') |
|
if len(parts) > 2: |
|
return '.'.join(parts[-2:]) |
|
return domain |
|
|
|
def domain_matches_blacklist(domain, regex_patterns): |
|
for pattern in regex_patterns: |
|
if re.search(pattern, domain, re.IGNORECASE): |
|
return 'Yes' |
|
return 'No' |
|
|
|
def find_sitemap(url): |
|
robots_url = f"{urlparse(url).scheme}://{urlparse(url).netloc}/robots.txt" |
|
try: |
|
robots_response = requests.get(robots_url) |
|
if robots_response.status_code == 200: |
|
for line in robots_response.text.split("\n"): |
|
if line.startswith("Sitemap:"): |
|
return line.split(":", 1)[1].strip() |
|
except requests.exceptions.RequestException: |
|
pass |
|
|
|
sitemap_urls = [ |
|
"/sitemap.xml", "/wp-sitemap.xml", "/?sitemap=1", "/sitemap_index/xml", |
|
"/sitemap-index.xml", "/sitemap.php", "/sitemap.txt", "/sitemap.xml.gz", |
|
"/sitemap/", "/sitemap/sitemap.xml", "/sitemapindex.xml", "/sitemap/index.xml", "/sitemap1.xml" |
|
] |
|
|
|
for sitemap_url in sitemap_urls: |
|
try: |
|
sitemap_response = requests.get(f"{urlparse(url).scheme}://{urlparse(url).netloc}{sitemap_url}") |
|
if sitemap_response.status_code == 200: |
|
return f"{urlparse(url).scheme}://{urlparse(url).netloc}{sitemap_url}" |
|
except requests.exceptions.RequestException: |
|
pass |
|
|
|
return None |
|
|
|
def crawl_posts(df, page_count): |
|
crawl_results = [] |
|
for i, row in df.head(page_count).iterrows(): |
|
url = row['loc'] |
|
try: |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
html = response.text |
|
soup = BeautifulSoup(html, 'html.parser') |
|
title = soup.title.text if soup.title else '' |
|
meta_desc = soup.find('meta', attrs={'name': 'description'})['content'] if soup.find('meta', attrs={'name': 'description'}) else '' |
|
links = [] |
|
for a in soup.find_all('a', href=True): |
|
link_url = a['href'] |
|
link_text = a.text.strip() |
|
link_nofollow = 'nofollow' in a.get('rel', []) |
|
links.append({'url': link_url, 'text': link_text, 'nofollow': link_nofollow}) |
|
crawl_results.append({ |
|
'url': url, |
|
'title': title, |
|
'meta_desc': meta_desc, |
|
'links': links |
|
}) |
|
except requests.exceptions.RequestException: |
|
pass |
|
return pd.DataFrame(crawl_results) |
|
|
|
def download_csv(df, filename): |
|
csv = df.to_csv(index=False) |
|
b64 = base64.b64encode(csv.encode()).decode() |
|
href = f'<a href="data:file/csv;base64,{b64}" download="{filename}.csv">Download {filename} CSV</a>' |
|
return href |
|
|
|
def main(): |
|
st.title("Website Crawler") |
|
|
|
urls = st.text_area("Enter the website URLs (one per line):", value="") |
|
page_count = st.number_input("Enter the number of pages to crawl:", value=2000, min_value=1, step=1) |
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
domain_filter_regex_input = st.text_area("Filter out Unique Outbound Domains:", help="This uses a regex filter to find domains in the unique outbound domains list. Enter one regex per line.", value="instagram\nfacebook\ntwitter\nlinkedin\nsnapchat\ntiktok\nreddit\npinterest\namazon\ncdn\nyoutube\nyoutu.be") |
|
with col2: |
|
domain_match_regex_input = st.text_area("Domain Blacklist:", help="This uses a regex filter to match domains in the Unique Outbound Domains to the blacklist entered. Enter one regex per line.", value="xyz\ncasino\ncbd\nessay") |
|
use_seo_powersuite = st.checkbox("Use SEO PowerSuite") |
|
api_key = None |
|
if use_seo_powersuite: |
|
api_key = st.text_input("Enter the SEO PowerSuite API key:", type="password") |
|
download_links = st.checkbox("Show Download Links") |
|
|
|
if st.button("Crawl"): |
|
if urls: |
|
url_list = [url.strip() for url in urls.split('\n') if url.strip()] |
|
if url_list: |
|
all_link_df = pd.DataFrame() |
|
all_unique_outbound_links_df = pd.DataFrame() |
|
all_final_df = pd.DataFrame() |
|
all_analysis_df = pd.DataFrame() |
|
|
|
for url in url_list: |
|
with st.spinner(f"Finding sitemap for {url}..."): |
|
sitemap_url = find_sitemap(url) |
|
if sitemap_url: |
|
with st.spinner(f"Crawling {url}..."): |
|
sitemap_df = adv.sitemap_to_df(sitemap_url) |
|
crawl_results = crawl_posts(sitemap_df, page_count) |
|
|
|
if not crawl_results.empty: |
|
link_df = pd.DataFrame(crawl_results['links'].explode().tolist()) |
|
link_df = link_df[~link_df['url'].str.startswith(('/','#'))] |
|
link_df['internal'] = link_df['url'].apply(lambda x: extract_hostname(url) in extract_hostname(x)) |
|
link_df = link_df[link_df['internal'] == False] |
|
link_df.insert(0, 'Originating Domain', url) |
|
link_df = link_df[['Originating Domain', 'url', 'text', 'nofollow']] |
|
|
|
outbound_links_df = link_df.copy() |
|
unique_links_df = link_df['url'].value_counts().reset_index() |
|
unique_links_df = unique_links_df[~unique_links_df['url'].str.startswith(('/','#'))] |
|
unique_links_df.columns = ['Link', 'Count'] |
|
unique_links_df.insert(0, 'Originating Domain', url) |
|
|
|
unique_outbound_links_df = outbound_links_df['url'].value_counts().reset_index() |
|
unique_outbound_links_df = unique_outbound_links_df[~unique_outbound_links_df['url'].str.startswith(('/','#'))] |
|
unique_outbound_links_df.columns = ['Link', 'Count'] |
|
unique_outbound_links_df.insert(0, 'Originating Domain', url) |
|
|
|
outbound_links_df['url'] = outbound_links_df['url'].astype(str) |
|
domain_df = outbound_links_df['url'].apply(extract_hostname).value_counts().reset_index() |
|
domain_df.columns = ['Domain', 'Count'] |
|
domain_df = domain_df[domain_df['Domain'] != ''] |
|
peter_lowe_domains = get_peter_lowe_domains() |
|
domain_df['In Peter Lowe List'] = domain_df['Domain'].apply(lambda x: 'Yes' if remove_subdomain(x) in peter_lowe_domains else 'No') |
|
domain_df.insert(0, 'Originating Domain', url) |
|
|
|
|
|
domain_df['DoFollow'] = domain_df['Domain'].apply(lambda x: any(outbound_links_df[(outbound_links_df['url'].str.contains(x)) & (outbound_links_df['nofollow'] == False)])) |
|
|
|
if not domain_df.empty: |
|
if domain_filter_regex_input: |
|
domain_filter_regex_patterns = domain_filter_regex_input.split('\n') |
|
domain_filter_regex = '|'.join(domain_filter_regex_patterns) |
|
domain_df = domain_df[~domain_df['Domain'].str.contains(domain_filter_regex, case=False, regex=True)] |
|
|
|
if not domain_df.empty: |
|
if domain_match_regex_input: |
|
domain_match_regex_patterns = domain_match_regex_input.split('\n') |
|
domain_df['Blacklist'] = domain_df['Domain'].apply(lambda x: domain_matches_blacklist(x, domain_match_regex_patterns) == 'Yes') |
|
else: |
|
domain_df['Blacklist'] = False |
|
|
|
total_domains = len(domain_df) |
|
peter_lowe_percentage = round((domain_df['In Peter Lowe List'] == 'No').sum() / total_domains * 100, 2) |
|
blacklist_percentage = round((domain_df['Blacklist'] == True).sum() / total_domains * 100, 2) |
|
|
|
analysis_data = { |
|
'Originating Domain': [url] * 2, |
|
'Metric': ['Percentage of domains not in Peter Lowe\'s list', 'Percentage of domains in the Blacklist'], |
|
'Value': [f"{peter_lowe_percentage}%", f"{blacklist_percentage}%"] |
|
} |
|
|
|
analysis_df = pd.DataFrame(analysis_data) |
|
|
|
if use_seo_powersuite and api_key: |
|
seo_powersuite_df = get_seo_powersuite_data(domain_df['Domain'].tolist(), api_key) |
|
if seo_powersuite_df is not None: |
|
domain_df = pd.merge(domain_df, seo_powersuite_df, left_on='Domain', right_on='target', how='left') |
|
domain_df.drop('target', axis=1, inplace=True) |
|
|
|
avg_domain_inlink_rank = round(domain_df['domain_inlink_rank'].mean(), 2) |
|
avg_domain_inlink_rank_less_than_70 = round(domain_df[domain_df['domain_inlink_rank'] < 70]['domain_inlink_rank'].mean(), 2) |
|
avg_refdomains = round(domain_df['refdomains'].mean(), 2) |
|
|
|
additional_analysis_data = { |
|
'Originating Domain': [url] * 3, |
|
'Metric': [ |
|
'Average domain inlink rank', |
|
'Average domain inlink rank (< 70)', |
|
'Average number of refdomains' |
|
], |
|
'Value': [ |
|
avg_domain_inlink_rank, |
|
avg_domain_inlink_rank_less_than_70, |
|
avg_refdomains |
|
] |
|
} |
|
|
|
analysis_df = pd.concat([analysis_df, pd.DataFrame(additional_analysis_data)], ignore_index=True) |
|
|
|
desired_columns = ['Originating Domain', 'Domain', 'Count', 'In Peter Lowe List', 'DoFollow', 'Blacklist', 'domain_inlink_rank', 'refdomains'] |
|
final_df = domain_df[desired_columns] |
|
else: |
|
desired_columns = ['Originating Domain', 'Domain', 'Count', 'In Peter Lowe List', 'DoFollow', 'Blacklist'] |
|
final_df = domain_df[desired_columns] |
|
else: |
|
st.warning(f"No unique outbound domains found for {url} after filtering.") |
|
else: |
|
st.warning(f"No unique outbound domains found for {url}.") |
|
|
|
all_link_df = pd.concat([all_link_df, link_df], ignore_index=True) |
|
all_unique_outbound_links_df = pd.concat([all_unique_outbound_links_df, unique_outbound_links_df], ignore_index=True) |
|
all_final_df = pd.concat([all_final_df, final_df], ignore_index=True) |
|
all_analysis_df = pd.concat([all_analysis_df, analysis_df], ignore_index=True) |
|
else: |
|
st.warning(f"No posts found in the sitemap for {url}.") |
|
else: |
|
st.warning(f"Sitemap not found for {url}.") |
|
|
|
st.subheader("Outbound Links") |
|
if download_links: |
|
st.markdown(download_csv(all_link_df, "Outbound Links"), unsafe_allow_html=True) |
|
else: |
|
st.write(all_link_df) |
|
|
|
st.subheader("Unique Outbound Links") |
|
if download_links: |
|
st.markdown(download_csv(all_unique_outbound_links_df, "Unique Outbound Links"), unsafe_allow_html=True) |
|
else: |
|
st.write(all_unique_outbound_links_df) |
|
|
|
st.subheader("Unique Outbound Domains") |
|
if download_links: |
|
st.markdown(download_csv(all_final_df, "Unique Outbound Domains"), unsafe_allow_html=True) |
|
else: |
|
st.write(all_final_df) |
|
|
|
st.subheader("Analytics") |
|
all_analysis_df = all_analysis_df.pivot(index='Originating Domain', columns='Metric', values='Value').reset_index() |
|
all_analysis_df.columns.name = None |
|
if use_seo_powersuite and api_key: |
|
numeric_columns = ['Average domain inlink rank', 'Average domain inlink rank (< 70)', 'Average number of refdomains'] |
|
all_analysis_df[numeric_columns] = all_analysis_df[numeric_columns].astype(int) |
|
if download_links: |
|
st.markdown(download_csv(all_analysis_df, "Analytics"), unsafe_allow_html=True) |
|
else: |
|
st.table(all_analysis_df) |
|
else: |
|
st.warning("Please enter at least one website URL.") |
|
else: |
|
st.warning("Please enter website URLs.") |
|
|
|
if __name__ == '__main__': |
|
main() |