Spaces:
Running
Running
import asyncio | |
import aiohttp | |
import json | |
import nest_asyncio | |
nest_asyncio.apply() | |
# API Endpoints | |
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" | |
SERPAPI_URL = "https://serpapi.com/search" | |
JINA_BASE_URL = "https://r.jina.ai/" | |
# Modify the default model selection | |
DEFAULT_MODEL = "google/gemini-2.0-flash-lite-preview-02-05:free" # Gemini Flash 2.0 model identifier | |
# Helper class to hold extracted content along with its source URL | |
class SourcedContext: | |
def __init__(self, text, source_url): | |
self.text = text | |
self.source_url = source_url | |
async def call_openrouter_async(session, messages, model=DEFAULT_MODEL): | |
""" | |
Make an asynchronous request to the OpenRouter chat completion API with the given messages. | |
Returns the assistant's reply text. | |
""" | |
headers = { | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"HTTP-Referer": "https://github.com/Pygen", | |
"X-Title": "Research Assistant", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"model": model, | |
"messages": messages, | |
"temperature": 0.7, | |
"max_tokens": 4096 | |
} | |
try: | |
async with session.post(OPENROUTER_URL, headers=headers, json=payload) as resp: | |
if resp.status == 200: | |
result = await resp.json() | |
try: | |
return result['choices'][0]['message']['content'] | |
except (KeyError, IndexError) as e: | |
print("Unexpected response structure from OpenRouter:", result) | |
return None | |
else: | |
text = await resp.text() | |
print(f"OpenRouter API error: {resp.status} - {text}") | |
return None | |
except Exception as e: | |
print("Error during OpenRouter call:", e) | |
return None | |
async def generate_search_queries_async(session, user_query): | |
""" | |
Use the LLM to produce up to four clear search queries based on the user's topic. | |
""" | |
prompt = ( | |
"You are a seasoned research assistant. Based on the user's topic, produce as many as four distinct and precise " | |
"search queries that will help collect thorough information on the subject. " | |
"Return a Python list of strings only, without any code formatting or backticks. " | |
"For example: ['query1', 'query2', 'query3']" | |
) | |
messages = [ | |
{"role": "system", "content": "You are a precise and supportive research assistant."}, | |
{"role": "user", "content": f"User Topic: {user_query}\n\n{prompt}"} | |
] | |
response = await call_openrouter_async(session, messages) | |
if response: | |
try: | |
cleaned_response = response.strip() | |
if cleaned_response.startswith("```"): | |
cleaned_response = cleaned_response.split("```")[1] | |
if cleaned_response.startswith("python"): | |
cleaned_response = cleaned_response[6:] | |
cleaned_response = cleaned_response.strip() | |
search_queries = eval(cleaned_response) | |
if isinstance(search_queries, list): | |
return search_queries | |
else: | |
print("The LLM response is not a list. Response:", response) | |
return [] | |
except Exception as e: | |
print("Error interpreting search queries:", e, "\nResponse:", response) | |
return [] | |
return [] | |
# Modify perform_search_async function | |
async def perform_search_async(session, query, result_limit=5): | |
""" | |
Make an asynchronous SERPAPI call to perform a Google search for the provided query. | |
result_limit: Maximum number of search results to return | |
""" | |
params = { | |
"q": query, | |
"api_key": SERPAPI_API_KEY, | |
"engine": "google", | |
"num": result_limit # Add this parameter for limiting results | |
} | |
try: | |
async with session.get(SERPAPI_URL, params=params) as resp: | |
if resp.status == 200: | |
results = await resp.json() | |
if "organic_results" in results: | |
links = [item.get("link") for item in results["organic_results"] if "link" in item] | |
return links[:result_limit] # Ensure we don't exceed the limit | |
else: | |
print("No organic results found in SERPAPI response.") | |
return [] | |
else: | |
text = await resp.text() | |
print(f"SERPAPI error: {resp.status} - {text}") | |
return [] | |
except Exception as e: | |
print("Error during SERPAPI search:", e) | |
return [] | |
async def fetch_webpage_text_async(session, url): | |
""" | |
Fetch the textual content of a webpage asynchronously using the Jina service. | |
""" | |
full_url = f"{JINA_BASE_URL}{url}" | |
headers = { | |
"Authorization": f"Bearer {JINA_API_KEY}" | |
} | |
try: | |
async with session.get(full_url, headers=headers) as resp: | |
if resp.status == 200: | |
return await resp.text() | |
else: | |
text = await resp.text() | |
print(f"Jina fetch error for {url}: {resp.status} - {text}") | |
return "" | |
except Exception as e: | |
print("Error retrieving webpage text with Jina:", e) | |
return "" | |
async def is_page_useful_async(session, user_query, page_text): | |
""" | |
Request the LLM to determine if the provided webpage content is pertinent to answering the user's topic. | |
""" | |
prompt = ( | |
"You are a discerning evaluator of research. Given the user's topic and a snippet of webpage content, " | |
"decide if the page contains valuable information to address the query. " | |
"Reply strictly with one word: 'Yes' if the content is useful, or 'No' if it is not. Provide no extra text." | |
) | |
messages = [ | |
{"role": "system", "content": "You are a concise and strict research relevance evaluator."}, | |
{"role": "user", "content": f"User Topic: {user_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"} | |
] | |
response = await call_openrouter_async(session, messages) | |
if response: | |
answer = response.strip() | |
if answer in ["Yes", "No"]: | |
return answer | |
else: | |
if "Yes" in answer: | |
return "Yes" | |
elif "No" in answer: | |
return "No" | |
return "No" | |
async def extract_relevant_context_async(session, user_query, search_query, page_text): | |
""" | |
Derive and return the important details from the webpage text to address the user's topic. | |
""" | |
prompt = ( | |
"You are an expert extractor of information. Given the user's topic, the search query that produced this page, " | |
"and the webpage text, extract all pertinent details needed to answer the inquiry. " | |
"Return only the relevant text without any additional commentary." | |
) | |
messages = [ | |
{"role": "system", "content": "You excel at summarizing and extracting relevant details."}, | |
{"role": "user", "content": f"User Topic: {user_query}\nSearch Query: {search_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"} | |
] | |
response = await call_openrouter_async(session, messages) | |
if response: | |
return response.strip() | |
return "" | |
async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts): | |
""" | |
Evaluate if additional search queries are necessary based on the current research progress. | |
""" | |
context_combined = "\n".join(all_contexts) | |
prompt = ( | |
"You are a systematic research planner. Taking into account the original topic, prior search queries, " | |
"and the extracted information from webpages, determine if more research is required. " | |
"If so, produce up to four new search queries as a Python list " | |
"(for example: ['new query1', 'new query2']). If no further research is needed, reply with an empty string." | |
"\nReturn only a Python list or an empty string without extra commentary." | |
) | |
messages = [ | |
{"role": "system", "content": "You are methodical in planning further research steps."}, | |
{"role": "user", "content": f"User Topic: {user_query}\nPrevious Queries: {previous_search_queries}\n\nCollected Context:\n{context_combined}\n\n{prompt}"} | |
] | |
response = await call_openrouter_async(session, messages) | |
if response: | |
cleaned = response.strip() | |
if cleaned == "": | |
return "" | |
try: | |
if cleaned.startswith("```"): | |
cleaned = cleaned.split("```")[1] | |
if cleaned.startswith("python"): | |
cleaned = cleaned[6:] | |
cleaned = cleaned.strip() | |
new_queries = eval(cleaned) | |
if isinstance(new_queries, list): | |
return new_queries | |
else: | |
print("LLM response is not a list for extra search queries. Response:", response) | |
return [] | |
except Exception as e: | |
print("Failed to parse additional search queries:", e, "\nResponse:", response) | |
return [] | |
return [] | |
async def generate_final_report_async(session, user_query, sourced_contexts): | |
""" | |
Construct the ultimate detailed report including proper citations and references. | |
""" | |
# Assign citation numbers to contexts based on source URL | |
references = {} | |
ref_number = 1 | |
formatted_contexts = [] | |
for ctx in sourced_contexts: | |
if ctx.source_url not in references: | |
references[ctx.source_url] = ref_number | |
ref_number += 1 | |
formatted_contexts.append(f"{ctx.text} [{references[ctx.source_url]}]") | |
context_combined = "\n".join(formatted_contexts) | |
# Build the reference section | |
reference_list = [f"[{num}] {url}" for url, num in sorted(references.items(), key=lambda x: x[1])] | |
reference_section = "\n\nReferences:\n" + "\n".join(reference_list) | |
prompt = ( | |
"You are a proficient academic report writer. Using the compiled contexts below and the original topic, " | |
"compose a comprehensive, well-organized, and in-depth report that fully addresses the inquiry. " | |
"Ensure that each piece of evidence is tagged with citation numbers in square brackets (e.g., [1], [2]). " | |
"Maintain these tags in your final report to show the references. " | |
"The style should be academic with proper in-text citations. Do not alter or add citation numbers." | |
) | |
messages = [ | |
{"role": "system", "content": "You are an expert academic report composer."}, | |
{"role": "user", "content": f"User Topic: {user_query}\n\nCollected Context:\n{context_combined}\n\n{prompt}"} | |
] | |
report = await call_openrouter_async(session, messages) | |
if report: | |
return report + reference_section | |
return "Error occurred while generating the report." | |
async def process_link(session, link, user_query, search_query): | |
""" | |
Handle a single URL: fetch its content, assess its relevance, and if it qualifies, extract the associated context. | |
Returns a SourcedContext object upon success, or None otherwise. | |
""" | |
print(f"Retrieving content from: {link}") | |
page_text = await fetch_webpage_text_async(session, link) | |
if not page_text: | |
return None | |
usefulness = await is_page_useful_async(session, user_query, page_text) | |
print(f"Relevance of {link}: {usefulness}") | |
if usefulness == "Yes": | |
context = await extract_relevant_context_async(session, user_query, search_query, page_text) | |
if context: | |
print(f"Context extracted from {link} (first 200 characters): {context[:200]}") | |
return SourcedContext(context, link) | |
return None | |
# Modify research_flow function to accept search_limit parameter | |
async def research_flow(user_query, iteration_limit, search_limit=5): | |
""" | |
Primary research procedure intended for integration with Streamlit. | |
search_limit: Maximum number of search results per query | |
""" | |
sourced_contexts = [] | |
all_search_queries = [] | |
iteration = 0 | |
async with aiohttp.ClientSession() as session: | |
new_search_queries = await generate_search_queries_async(session, user_query) | |
if not new_search_queries: | |
return "No search queries were generated by the LLM. Terminating process." | |
all_search_queries.extend(new_search_queries) | |
while iteration < iteration_limit: | |
print(f"\n--- Iteration {iteration + 1} ---") | |
iteration_contexts = [] | |
# Update to include search_limit | |
search_tasks = [perform_search_async(session, query, search_limit) for query in new_search_queries] | |
search_results = await asyncio.gather(*search_tasks) | |
unique_links = {} | |
for idx, links in enumerate(search_results): | |
query = new_search_queries[idx] | |
for link in links: | |
if link not in unique_links: | |
unique_links[link] = query | |
print(f"Collected {len(unique_links)} distinct links in this iteration.") | |
link_tasks = [ | |
process_link(session, link, user_query, unique_links[link]) | |
for link in unique_links | |
] | |
link_results = await asyncio.gather(*link_tasks) | |
for res in link_results: | |
if res: | |
iteration_contexts.append(res) | |
if iteration_contexts: | |
sourced_contexts.extend(iteration_contexts) | |
else: | |
print("No relevant information was found in this iteration.") | |
context_texts = [ctx.text for ctx in sourced_contexts] | |
new_search_queries = await get_new_search_queries_async( | |
session, user_query, all_search_queries, context_texts | |
) | |
if new_search_queries == "": | |
print("LLM has determined that additional research is unnecessary.") | |
break | |
elif new_search_queries: | |
print("LLM provided extra search queries:", new_search_queries) | |
all_search_queries.extend(new_search_queries) | |
else: | |
print("LLM returned no further search queries. Concluding the loop.") | |
break | |
iteration += 1 | |
final_report = await generate_final_report_async(session, user_query, sourced_contexts) | |
return final_report | |
def main(): | |
""" | |
CLI entry point for testing this research module. | |
""" | |
user_query = input("Enter your research topic/question: ").strip() | |
iter_limit_input = input("Enter the maximum number of iterations (default is 10): ").strip() | |
iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10 | |
final_report = asyncio.run(research_flow(user_query, iteration_limit)) | |
print("\n==== FINAL REPORT ====\n") | |
print(final_report) | |
if __name__ == "__main__": | |
main() |