# ruff: noqa: E722 import json import os import requests import re import markdownify import mimetypes import html import puremagic import tempfile import copy import mammoth import pptx import pandas as pd import traceback from urllib.parse import urlparse, parse_qs from bs4 import BeautifulSoup from typing import Any, Dict, List, Optional, Union import pdfminer import pdfminer.high_level from youtube_transcript_api import YouTubeTranscriptApi class DocumentConverterResult: """The result of converting a document to text.""" def __init__(self, title: Union[str, None] = None, text_content: str = ""): self.title = title self.text_content = text_content class DocumentConverter: def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: raise NotImplementedError() class PlainTextConverter(DocumentConverter): """Anything with content type text/plain""" def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: extension = kwargs.get("file_extension", "") if extension == "": return None content_type, encoding = mimetypes.guess_type("__placeholder" + extension) text_content = "" with open(local_path, "rt") as fh: text_content = fh.read() return DocumentConverterResult( title=None, text_content=text_content, ) class HtmlConverter(DocumentConverter): """Anything with content type text/html""" def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not html extension = kwargs.get("file_extension", "") if extension.lower() not in [".html", ".htm"]: return None result = None with open(local_path, "rt") as fh: result = self._convert(fh.read()) return result def _convert(self, html_content) -> Union[None, DocumentConverterResult]: """Helper function that converts and HTML string.""" # Parse the string soup = BeautifulSoup(html_content, "html.parser") # Remove javascript and style blocks for script in soup(["script", "style"]): script.extract() # Print only the main content body_elm = soup.find("body") webpage_text = "" if body_elm: webpage_text = markdownify.MarkdownConverter().convert_soup(body_elm) else: webpage_text = markdownify.MarkdownConverter().convert_soup(soup) return DocumentConverterResult( title=None if soup.title is None else soup.title.string, text_content=webpage_text, ) class WikipediaConverter(DocumentConverter): """Handle Wikipedia pages separately, focusing only on the main document content.""" def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not Wikipedia extension = kwargs.get("file_extension", "") if extension.lower() not in [".html", ".htm"]: return None url = kwargs.get("url", "") if not re.search(r"^https?:\/\/[a-zA-Z]{2,3}\.wikipedia.org\/", url): return None # Parse the file soup = None with open(local_path, "rt") as fh: soup = BeautifulSoup(fh.read(), "html.parser") # Remove javascript and style blocks for script in soup(["script", "style"]): script.extract() # Print only the main content body_elm = soup.find("div", {"id": "mw-content-text"}) title_elm = soup.find("span", {"class": "mw-page-title-main"}) webpage_text = "" if body_elm: # What's the title main_title = soup.title.string if title_elm and len(title_elm) > 0: main_title = title_elm.string # Convert the page webpage_text = "# " + main_title + "\n\n" + markdownify.MarkdownConverter().convert_soup(body_elm) else: webpage_text = markdownify.MarkdownConverter().convert_soup(soup) return DocumentConverterResult( title=soup.title.string, text_content=webpage_text, ) class YouTubeConverter(DocumentConverter): """Handle YouTube specially, focusing on the video title, description, and transcript.""" def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not YouTube extension = kwargs.get("file_extension", "") if extension.lower() not in [".html", ".htm"]: return None url = kwargs.get("url", "") if not url.startswith("https://www.youtube.com/watch?"): return None # Parse the file soup = None with open(local_path, "rt") as fh: soup = BeautifulSoup(fh.read(), "html.parser") # Read the meta tags metadata = {"title": soup.title.string} for meta in soup(["meta"]): for a in meta.attrs: if a in ["itemprop", "property", "name"]: metadata[meta[a]] = meta.get("content", "") break # We can also try to read the full description. This is more prone to breaking, since it reaches into the page implementation try: for script in soup(["script"]): content = script.text if "ytInitialData" in content: lines = re.split(r"\r?\n", content) obj_start = lines[0].find("{") obj_end = lines[0].rfind("}") if obj_start >= 0 and obj_end >= 0: data = json.loads(lines[0][obj_start : obj_end + 1]) attrdesc = self._findKey(data, "attributedDescriptionBodyText") if attrdesc: metadata["description"] = attrdesc["content"] break except: pass # Start preparing the page webpage_text = "# YouTube\n" title = self._get(metadata, ["title", "og:title", "name"]) if title: webpage_text += f"\n## {title}\n" stats = "" views = self._get(metadata, ["interactionCount"]) if views: stats += f"- **Views:** {views}\n" keywords = self._get(metadata, ["keywords"]) if keywords: stats += f"- **Keywords:** {keywords}\n" runtime = self._get(metadata, ["duration"]) if runtime: stats += f"- **Runtime:** {runtime}\n" if len(stats) > 0: webpage_text += f"\n### Video Metadata\n{stats}\n" description = self._get(metadata, ["description", "og:description"]) if description: webpage_text += f"\n### Description\n{description}\n" transcript_text = "" parsed_url = urlparse(url) params = parse_qs(parsed_url.query) video_id = params["v"][0] # Must be a single transcript. print("VIDDDD ID:", video_id) transcript = YouTubeTranscriptApi.get_transcript(video_id) transcript_text = " ".join([part["text"] for part in transcript]) # Alternative formatting: # formatter = TextFormatter() # formatter.format_transcript(transcript) if transcript_text: webpage_text += f"\n### Transcript\n{transcript_text}\n" return DocumentConverterResult( title=title if title else soup.title.string, text_content=webpage_text, ) def _get(self, json, keys, default=None): for k in keys: if k in json: return json[k] return default def _findKey(self, json, key): if isinstance(json, list): for elm in json: ret = self._findKey(elm, key) if ret is not None: return ret elif isinstance(json, dict): for k in json: if k == key: return json[k] else: ret = self._findKey(json[k], key) if ret is not None: return ret return None class PdfConverter(DocumentConverter): def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not a PDF extension = kwargs.get("file_extension", "") if extension.lower() != ".pdf": return None return DocumentConverterResult( title=None, text_content=pdfminer.high_level.extract_text(local_path), ) from huggingface_hub import InferenceClient class AudioConverter(DocumentConverter): def __init__(self): super().__init__() self.client = InferenceClient("distil-whisper/distil-large-v3") def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not an audio file extension = kwargs.get("file_extension", "") if extension.lower() not in [".wav", ".mp3", ".flac", ".m4a"]: return None try: result = self.client.automatic_speech_recognition(audio=local_path).text except Exception as e: print("Exception in decoding audio:", e) from openai import OpenAI oai_client = OpenAI() from pathlib import Path result = oai_client.audio.transcriptions.create( model="whisper-1", file=Path(local_path) ).text return DocumentConverterResult( title=None, text_content=result, ) class DocxConverter(HtmlConverter): def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not a DOCX extension = kwargs.get("file_extension", "") if extension.lower() != ".docx": return None result = None with open(local_path, "rb") as docx_file: result = mammoth.convert_to_html(docx_file) html_content = result.value result = self._convert(html_content) return result class XlsxConverter(HtmlConverter): def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not a XLSX extension = kwargs.get("file_extension", "") if extension.lower() not in [".xlsx", ".xls"]: return None sheets = pd.read_excel(local_path, sheet_name=None) md_content = "" for s in sheets: md_content += f"## {s}\n" html_content = sheets[s].to_html(index=False) md_content += self._convert(html_content).text_content.strip() + "\n\n" return DocumentConverterResult( title=None, text_content=md_content.strip(), ) import xml.etree.ElementTree as ET class XmlConverter(DocumentConverter): def convert(self, local_path, **kwargs) -> None | DocumentConverterResult: # Parse the XML string extension = kwargs.get("file_extension", "") if extension.lower() not in [".xml"]: return None xml_string = "" with open(local_path, "rt") as fh: xml_string = fh.read() def extract_table_from_html_like(xml_root): table = xml_root.find('.//table') if table is None: raise ValueError("No table found in the XML") headers = [th.text for th in table.find('thead').findall('th')] rows = [[td.text for td in tr.findall('td')] for tr in table.find('tbody').findall('tr')] # Create markdown table markdown = '| ' + ' | '.join(headers) + ' |\n' markdown += '| ' + ' | '.join(['---'] * len(headers)) + ' |\n' for row in rows: markdown += '| ' + ' | '.join(row) + ' |\n' def extract_table_from_wordml(xml_root, namespaces): # Parse the XML content root = xml_root namespace = {'w': 'http://schemas.microsoft.com/office/word/2003/wordml'} # Extract text content body = root.find('w:body', namespace) paragraphs = body.findall('.//w:p', namespace) text_content = [] for para in paragraphs: texts = para.findall('.//w:t', namespace) for text in texts: text_content.append(text.text) return '\n'.join(text_content) # Parse the XML string root = ET.fromstring(xml_string) namespaces = {'w': 'http://schemas.microsoft.com/office/word/2003/wordml'} if root.tag.endswith('wordDocument'): markdown = extract_table_from_wordml(root, namespaces) else: markdown = extract_table_from_html_like(root) return DocumentConverterResult( title=None, text_content=markdown.strip(), ) class PptxConverter(HtmlConverter): def convert(self, local_path, **kwargs) -> Union[None, DocumentConverterResult]: # Bail if not a PPTX extension = kwargs.get("file_extension", "") if extension.lower() != ".pptx": return None md_content = "" presentation = pptx.Presentation(local_path) slide_num = 0 for slide in presentation.slides: slide_num += 1 md_content += f"\n\n\n" title = slide.shapes.title for shape in slide.shapes: # Pictures if self._is_picture(shape): # https://github.com/scanny/python-pptx/pull/512#issuecomment-1713100069 alt_text = "" try: alt_text = shape._element._nvXxPr.cNvPr.attrib.get("descr", "") except: pass # A placeholder name filename = re.sub(r"\W", "", shape.name) + ".jpg" # try: # filename = shape.image.filename # except: # pass md_content += "\n![" + (alt_text if alt_text else shape.name) + "](" + filename + ")\n" # Tables if self._is_table(shape): html_table = "
" + html.escape(cell.text) + " | " else: html_table += "" + html.escape(cell.text) + " | " html_table += "
---|