import asyncio import aiohttp import json import nest_asyncio nest_asyncio.apply() # API Endpoints OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" SERPAPI_URL = "https://serpapi.com/search" JINA_BASE_URL = "https://r.jina.ai/" # Modify the default model selection DEFAULT_MODEL = "google/gemini-2.0-flash-lite-preview-02-05:free" # Gemini Flash 2.0 model identifier # Helper class to hold extracted content along with its source URL class SourcedContext: def __init__(self, text, source_url): self.text = text self.source_url = source_url async def call_openrouter_async(session, messages, model=DEFAULT_MODEL): """ Make an asynchronous request to the OpenRouter chat completion API with the given messages. Returns the assistant's reply text. """ headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "HTTP-Referer": "https://github.com/Pygen", "X-Title": "Research Assistant", "Content-Type": "application/json" } payload = { "model": model, "messages": messages, "temperature": 0.7, "max_tokens": 4096 } try: async with session.post(OPENROUTER_URL, headers=headers, json=payload) as resp: if resp.status == 200: result = await resp.json() try: return result['choices'][0]['message']['content'] except (KeyError, IndexError) as e: print("Unexpected response structure from OpenRouter:", result) return None else: text = await resp.text() print(f"OpenRouter API error: {resp.status} - {text}") return None except Exception as e: print("Error during OpenRouter call:", e) return None async def generate_search_queries_async(session, user_query): """ Use the LLM to produce up to four clear search queries based on the user's topic. """ prompt = ( "You are a seasoned research assistant. Based on the user's topic, produce as many as four distinct and precise " "search queries that will help collect thorough information on the subject. " "Return a Python list of strings only, without any code formatting or backticks. " "For example: ['query1', 'query2', 'query3']" ) messages = [ {"role": "system", "content": "You are a precise and supportive research assistant."}, {"role": "user", "content": f"User Topic: {user_query}\n\n{prompt}"} ] response = await call_openrouter_async(session, messages) if response: try: cleaned_response = response.strip() if cleaned_response.startswith("```"): cleaned_response = cleaned_response.split("```")[1] if cleaned_response.startswith("python"): cleaned_response = cleaned_response[6:] cleaned_response = cleaned_response.strip() search_queries = eval(cleaned_response) if isinstance(search_queries, list): return search_queries else: print("The LLM response is not a list. Response:", response) return [] except Exception as e: print("Error interpreting search queries:", e, "\nResponse:", response) return [] return [] # Modify perform_search_async function async def perform_search_async(session, query, result_limit=5): """ Make an asynchronous SERPAPI call to perform a Google search for the provided query. result_limit: Maximum number of search results to return """ params = { "q": query, "api_key": SERPAPI_API_KEY, "engine": "google", "num": result_limit # Add this parameter for limiting results } try: async with session.get(SERPAPI_URL, params=params) as resp: if resp.status == 200: results = await resp.json() if "organic_results" in results: links = [item.get("link") for item in results["organic_results"] if "link" in item] return links[:result_limit] # Ensure we don't exceed the limit else: print("No organic results found in SERPAPI response.") return [] else: text = await resp.text() print(f"SERPAPI error: {resp.status} - {text}") return [] except Exception as e: print("Error during SERPAPI search:", e) return [] async def fetch_webpage_text_async(session, url): """ Fetch the textual content of a webpage asynchronously using the Jina service. """ full_url = f"{JINA_BASE_URL}{url}" headers = { "Authorization": f"Bearer {JINA_API_KEY}" } try: async with session.get(full_url, headers=headers) as resp: if resp.status == 200: return await resp.text() else: text = await resp.text() print(f"Jina fetch error for {url}: {resp.status} - {text}") return "" except Exception as e: print("Error retrieving webpage text with Jina:", e) return "" async def is_page_useful_async(session, user_query, page_text): """ Request the LLM to determine if the provided webpage content is pertinent to answering the user's topic. """ prompt = ( "You are a discerning evaluator of research. Given the user's topic and a snippet of webpage content, " "decide if the page contains valuable information to address the query. " "Reply strictly with one word: 'Yes' if the content is useful, or 'No' if it is not. Provide no extra text." ) messages = [ {"role": "system", "content": "You are a concise and strict research relevance evaluator."}, {"role": "user", "content": f"User Topic: {user_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"} ] response = await call_openrouter_async(session, messages) if response: answer = response.strip() if answer in ["Yes", "No"]: return answer else: if "Yes" in answer: return "Yes" elif "No" in answer: return "No" return "No" async def extract_relevant_context_async(session, user_query, search_query, page_text): """ Derive and return the important details from the webpage text to address the user's topic. """ prompt = ( "You are an expert extractor of information. Given the user's topic, the search query that produced this page, " "and the webpage text, extract all pertinent details needed to answer the inquiry. " "Return only the relevant text without any additional commentary." ) messages = [ {"role": "system", "content": "You excel at summarizing and extracting relevant details."}, {"role": "user", "content": f"User Topic: {user_query}\nSearch Query: {search_query}\n\nWebpage Snippet (up to 20000 characters):\n{page_text[:20000]}\n\n{prompt}"} ] response = await call_openrouter_async(session, messages) if response: return response.strip() return "" async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts): """ Evaluate if additional search queries are necessary based on the current research progress. """ context_combined = "\n".join(all_contexts) prompt = ( "You are a systematic research planner. Taking into account the original topic, prior search queries, " "and the extracted information from webpages, determine if more research is required. " "If so, produce up to four new search queries as a Python list " "(for example: ['new query1', 'new query2']). If no further research is needed, reply with an empty string." "\nReturn only a Python list or an empty string without extra commentary." ) messages = [ {"role": "system", "content": "You are methodical in planning further research steps."}, {"role": "user", "content": f"User Topic: {user_query}\nPrevious Queries: {previous_search_queries}\n\nCollected Context:\n{context_combined}\n\n{prompt}"} ] response = await call_openrouter_async(session, messages) if response: cleaned = response.strip() if cleaned == "": return "" try: if cleaned.startswith("```"): cleaned = cleaned.split("```")[1] if cleaned.startswith("python"): cleaned = cleaned[6:] cleaned = cleaned.strip() new_queries = eval(cleaned) if isinstance(new_queries, list): return new_queries else: print("LLM response is not a list for extra search queries. Response:", response) return [] except Exception as e: print("Failed to parse additional search queries:", e, "\nResponse:", response) return [] return [] async def generate_final_report_async(session, user_query, sourced_contexts): """ Construct the ultimate detailed report including proper citations and references. """ # Assign citation numbers to contexts based on source URL references = {} ref_number = 1 formatted_contexts = [] for ctx in sourced_contexts: if ctx.source_url not in references: references[ctx.source_url] = ref_number ref_number += 1 formatted_contexts.append(f"{ctx.text} [{references[ctx.source_url]}]") context_combined = "\n".join(formatted_contexts) # Build the reference section reference_list = [f"[{num}] {url}" for url, num in sorted(references.items(), key=lambda x: x[1])] reference_section = "\n\nReferences:\n" + "\n".join(reference_list) prompt = ( "You are a proficient academic report writer. Using the compiled contexts below and the original topic, " "compose a comprehensive, well-organized, and in-depth report that fully addresses the inquiry. " "Ensure that each piece of evidence is tagged with citation numbers in square brackets (e.g., [1], [2]). " "Maintain these tags in your final report to show the references. " "The style should be academic with proper in-text citations. Do not alter or add citation numbers." ) messages = [ {"role": "system", "content": "You are an expert academic report composer."}, {"role": "user", "content": f"User Topic: {user_query}\n\nCollected Context:\n{context_combined}\n\n{prompt}"} ] report = await call_openrouter_async(session, messages) if report: return report + reference_section return "Error occurred while generating the report." async def process_link(session, link, user_query, search_query): """ Handle a single URL: fetch its content, assess its relevance, and if it qualifies, extract the associated context. Returns a SourcedContext object upon success, or None otherwise. """ print(f"Retrieving content from: {link}") page_text = await fetch_webpage_text_async(session, link) if not page_text: return None usefulness = await is_page_useful_async(session, user_query, page_text) print(f"Relevance of {link}: {usefulness}") if usefulness == "Yes": context = await extract_relevant_context_async(session, user_query, search_query, page_text) if context: print(f"Context extracted from {link} (first 200 characters): {context[:200]}") return SourcedContext(context, link) return None # Modify research_flow function to accept search_limit parameter async def research_flow(user_query, iteration_limit, search_limit=5): """ Primary research procedure intended for integration with Streamlit. search_limit: Maximum number of search results per query """ sourced_contexts = [] all_search_queries = [] iteration = 0 async with aiohttp.ClientSession() as session: new_search_queries = await generate_search_queries_async(session, user_query) if not new_search_queries: return "No search queries were generated by the LLM. Terminating process." all_search_queries.extend(new_search_queries) while iteration < iteration_limit: print(f"\n--- Iteration {iteration + 1} ---") iteration_contexts = [] # Update to include search_limit search_tasks = [perform_search_async(session, query, search_limit) for query in new_search_queries] search_results = await asyncio.gather(*search_tasks) unique_links = {} for idx, links in enumerate(search_results): query = new_search_queries[idx] for link in links: if link not in unique_links: unique_links[link] = query print(f"Collected {len(unique_links)} distinct links in this iteration.") link_tasks = [ process_link(session, link, user_query, unique_links[link]) for link in unique_links ] link_results = await asyncio.gather(*link_tasks) for res in link_results: if res: iteration_contexts.append(res) if iteration_contexts: sourced_contexts.extend(iteration_contexts) else: print("No relevant information was found in this iteration.") context_texts = [ctx.text for ctx in sourced_contexts] new_search_queries = await get_new_search_queries_async( session, user_query, all_search_queries, context_texts ) if new_search_queries == "": print("LLM has determined that additional research is unnecessary.") break elif new_search_queries: print("LLM provided extra search queries:", new_search_queries) all_search_queries.extend(new_search_queries) else: print("LLM returned no further search queries. Concluding the loop.") break iteration += 1 final_report = await generate_final_report_async(session, user_query, sourced_contexts) return final_report def main(): """ CLI entry point for testing this research module. """ user_query = input("Enter your research topic/question: ").strip() iter_limit_input = input("Enter the maximum number of iterations (default is 10): ").strip() iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10 final_report = asyncio.run(research_flow(user_query, iteration_limit)) print("\n==== FINAL REPORT ====\n") print(final_report) if __name__ == "__main__": main()