import os import shutil import torch from flask import Flask, render_template, request, jsonify from whoosh.index import create_in from whoosh.fields import Schema, TEXT from whoosh.qparser import QueryParser from transformers import AutoTokenizer, AutoModel from deep_translator import GoogleTranslator # Ensure the necessary directories exist PERSIST_DIR = "db" PDF_DIRECTORY = 'data' os.makedirs(PDF_DIRECTORY, exist_ok=True) os.makedirs(PERSIST_DIR, exist_ok=True) # Load the XLM-R tokenizer and model tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base") model = AutoModel.from_pretrained("xlm-roberta-base") # Setup Whoosh schema for indexing schema = Schema(title=TEXT(stored=True), content=TEXT(stored=True)) # Create an index in the persist directory if not os.path.exists(PERSIST_DIR): os.mkdir(PERSIST_DIR) index = create_in(PERSIST_DIR, schema) # Function to load documents from a directory def load_documents(): documents = [] for filename in os.listdir(PDF_DIRECTORY): if filename.endswith(".txt"): # Assuming documents are in .txt format with open(os.path.join(PDF_DIRECTORY, filename), 'r', encoding='utf-8') as file: content = file.read() documents.append({'title': filename, 'content': content}) return documents # Function to index documents def index_documents(documents): writer = index.writer() for doc in documents: writer.add_document(title=doc['title'], content=doc['content']) writer.commit() # Data ingestion from the directory def data_ingestion_from_directory(): # Clear previous data by removing the persist directory if os.path.exists(PERSIST_DIR): shutil.rmtree(PERSIST_DIR) # Remove the persist directory and all its contents # Recreate the persist directory after removal os.makedirs(PERSIST_DIR, exist_ok=True) # Load new documents from the directory new_documents = load_documents() # Index the new documents index_documents(new_documents) # Function to retrieve documents based on a query def retrieve_documents(query): with index.searcher() as searcher: query_parser = QueryParser("content", index.schema) query_object = query_parser.parse(query) results = searcher.search(query_object) return [(result['title'], result['content']) for result in results] # Function to generate embeddings def get_embeddings(text): inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): outputs = model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) # Average pooling return embeddings.squeeze().numpy() # Function to handle queries and generate responses def handle_query(query): retrieved_docs = retrieve_documents(query) if not retrieved_docs: return "Sorry, I couldn't find an answer." # Construct a response using the retrieved documents response = "Here are some insights based on your query:\n" + "\n".join( [f"Title: {title}\nContent: {content[:100]}..." for title, content in retrieved_docs] ) return response # Initialize Flask app app = Flask(__name__) # Data ingestion data_ingestion_from_directory() # Generate Response def generate_response(query, language): try: # Call the handle_query function to get the response bot_response = handle_query(query) # Map of supported languages supported_languages = { "hindi": "hi", "bengali": "bn", "telugu": "te", "marathi": "mr", "tamil": "ta", "gujarati": "gu", "kannada": "kn", "malayalam": "ml", "punjabi": "pa", "odia": "or", "urdu": "ur", "assamese": "as", "sanskrit": "sa", "arabic": "ar", "australian": "en-AU", "bangla-india": "bn-IN", "chinese": "zh-CN", "dutch": "nl", "french": "fr", "filipino": "tl", "greek": "el", "indonesian": "id", "italian": "it", "japanese": "ja", "korean": "ko", "latin": "la", "nepali": "ne", "portuguese": "pt", "romanian": "ro", "russian": "ru", "spanish": "es", "swedish": "sv", "thai": "th", "ukrainian": "uk", "turkish": "tr" } # Initialize the translated text translated_text = bot_response # Translate only if the language is supported and not English try: if language in supported_languages: target_lang = supported_languages[language] translated_text = GoogleTranslator(source='auto', target=target_lang).translate(bot_response) else: print(f"Unsupported language: {language}") except Exception as e: print(f"Translation error: {e}") translated_text = "Sorry, I couldn't translate the response." return translated_text except Exception as e: return f"Error fetching the response: {str(e)}" # Route for the homepage @app.route('/') def index(): return render_template('index.html') # Route to handle chatbot messages @app.route('/chat', methods=['POST']) def chat(): try: user_message = request.json.get("message") language = request.json.get("language") if not user_message: return jsonify({"response": "Please say something!"}) bot_response = generate_response(user_message, language) return jsonify({"response": bot_response}) except Exception as e: return jsonify({"response": f"An error occurred: {str(e)}"}) if __name__ == '__main__': app.run(debug=True)