|
import os |
|
import json |
|
import urllib.request |
|
from PIL import Image |
|
from gtts import gTTS |
|
import cv2 |
|
import moviepy.editor as mp |
|
import logging |
|
from hercai import Hercai |
|
import uuid |
|
import time |
|
import gradio as gr |
|
from typing import Tuple, List, Optional |
|
import numpy as np |
|
from concurrent.futures import ThreadPoolExecutor |
|
from functools import partial |
|
import tempfile |
|
import contextlib |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='[%(asctime)s] [%(levelname)s] %(message)s', |
|
handlers=[ |
|
logging.FileHandler('app.log', mode='a'), |
|
logging.StreamHandler(), |
|
logging.handlers.RotatingFileHandler( |
|
'app.log', maxBytes=1024*1024, backupCount=5 |
|
) |
|
] |
|
) |
|
LOGGER = logging.getLogger(__name__) |
|
|
|
class ResourceManager: |
|
"""Manage temporary resources and cleanup.""" |
|
|
|
def __init__(self): |
|
self.temp_files = set() |
|
|
|
def add_temp_file(self, filepath: str) -> None: |
|
self.temp_files.add(filepath) |
|
|
|
def cleanup(self) -> None: |
|
for file in self.temp_files: |
|
try: |
|
if os.path.exists(file): |
|
os.remove(file) |
|
except Exception as e: |
|
LOGGER.warning(f"Failed to remove temporary file {file}: {e}") |
|
self.temp_files.clear() |
|
|
|
class Text2Video: |
|
def __init__(self) -> None: |
|
"""Initialize the Text2Video class.""" |
|
LOGGER.info("Initializing Text2Video application...") |
|
self.herc = Hercai() |
|
self.resource_manager = ResourceManager() |
|
self.max_workers = min(os.cpu_count() or 1, 4) |
|
LOGGER.info("Initialization complete") |
|
|
|
def _enhance_prompt(self, prompt: str) -> str: |
|
"""Enhance the prompt with comic-style elements.""" |
|
return ( |
|
f"{prompt}, comic book style, full scene composition, " |
|
"vibrant colors, clear speech bubbles with text, " |
|
"dramatic lighting, high contrast, detailed backgrounds, " |
|
"comic book panel layout, professional illustration" |
|
) |
|
|
|
@staticmethod |
|
def _create_unique_filename(prefix: str, suffix: str) -> str: |
|
"""Create a unique filename with given prefix and suffix.""" |
|
return f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}" |
|
|
|
async def get_image(self, img_prompt: str) -> Optional[str]: |
|
"""Generate an image based on the provided text prompt.""" |
|
try: |
|
LOGGER.info(f"π¨ Starting image generation for prompt: {img_prompt}") |
|
enhanced_prompt = self._enhance_prompt(img_prompt) |
|
|
|
image_result = await self.herc.draw_image( |
|
model="v3", |
|
prompt=enhanced_prompt, |
|
negative_prompt="blurry, cropped, low quality, dark, gloomy" |
|
) |
|
|
|
return image_result["url"] |
|
except Exception as e: |
|
LOGGER.error(f"β Error generating image: {str(e)}") |
|
raise |
|
|
|
def download_img_from_url(self, image_url: str, image_path: str) -> str: |
|
"""Download and process image from URL with improved error handling.""" |
|
try: |
|
LOGGER.info(f"π₯ Downloading image from: {image_url}") |
|
|
|
with tempfile.NamedTemporaryFile(delete=False) as temp_file: |
|
urllib.request.urlretrieve(image_url, temp_file.name) |
|
|
|
with Image.open(temp_file.name) as img: |
|
|
|
if img.mode != 'RGB': |
|
img = img.convert('RGB') |
|
|
|
|
|
target_size = (1024, 1024) |
|
img.thumbnail(target_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
new_img = Image.new('RGB', target_size, (255, 255, 255)) |
|
offset = ((target_size[0] - img.size[0]) // 2, |
|
(target_size[1] - img.size[1]) // 2) |
|
new_img.paste(img, offset) |
|
|
|
|
|
new_img.save(image_path, 'PNG', optimize=True) |
|
|
|
self.resource_manager.add_temp_file(image_path) |
|
return image_path |
|
|
|
except Exception as e: |
|
LOGGER.error(f"β Error processing image: {str(e)}") |
|
raise |
|
finally: |
|
if os.path.exists(temp_file.name): |
|
os.unlink(temp_file.name) |
|
|
|
async def process_scene(self, prompt: str, idx: int) -> Tuple[str, str]: |
|
"""Process a single scene (image and audio) concurrently.""" |
|
try: |
|
image_path = self._create_unique_filename(f"scene_{idx}", ".png") |
|
audio_path = self._create_unique_filename(f"audio_{idx}", ".mp3") |
|
|
|
|
|
image_url = await self.get_image(prompt) |
|
image_path = self.download_img_from_url(image_url, image_path) |
|
|
|
|
|
audio_path = self.text_to_audio(prompt, audio_path) |
|
|
|
return image_path, audio_path |
|
|
|
except Exception as e: |
|
LOGGER.error(f"Error processing scene {idx}: {e}") |
|
raise |
|
|
|
async def generate_video(self, text: str) -> str: |
|
"""Main function to generate video from text with improved concurrency.""" |
|
try: |
|
LOGGER.info("π¬ Starting video generation process") |
|
list_prompts = [s.strip() for s in text.split(",,") if s.strip()] |
|
|
|
output_path = self._create_unique_filename("comic_video", ".mp4") |
|
self.resource_manager.add_temp_file(output_path) |
|
|
|
|
|
scenes = [] |
|
async with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
for idx, prompt in enumerate(list_prompts, 1): |
|
scene = await self.process_scene(prompt, idx) |
|
scenes.append(scene) |
|
|
|
|
|
img_list = [scene[0] for scene in scenes] |
|
audio_paths = [scene[1] for scene in scenes] |
|
|
|
await self.create_video_from_images_and_audio(img_list, audio_paths, output_path) |
|
|
|
return output_path |
|
|
|
except Exception as e: |
|
LOGGER.error(f"β Error in video generation: {str(e)}") |
|
raise |
|
finally: |
|
self.resource_manager.cleanup() |
|
|
|
def gradio_interface(self): |
|
"""Create Gradio interface with improved styling.""" |
|
LOGGER.info("π Initializing Gradio interface") |
|
|
|
css = """ |
|
.gradio-container { |
|
font-family: 'Arial', sans-serif; |
|
max-width: 1200px; |
|
margin: auto; |
|
} |
|
.header { |
|
text-align: center; |
|
padding: 2rem; |
|
background: linear-gradient(135deg, #6e8efb, #a777e3); |
|
color: white; |
|
border-radius: 10px; |
|
margin-bottom: 2rem; |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo: |
|
gr.HTML(""" |
|
<div class="header"> |
|
<h1>π¬ Comic Video Generator</h1> |
|
<p>Transform your story into an animated comic!</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
input_text = gr.Textbox( |
|
label="Comic Script", |
|
placeholder="Enter your story (separate scenes with ,,)", |
|
lines=5 |
|
) |
|
|
|
with gr.Row(): |
|
generate_btn = gr.Button("π¬ Generate Video", variant="primary") |
|
|
|
with gr.Row(): |
|
output = gr.Video(label="Generated Comic Video") |
|
|
|
example_txt = """Once upon a time in a magical forest,, |
|
A brave knight discovered a mysterious crystal,, |
|
The crystal began to glow with incredible power""" |
|
gr.Examples([[example_txt]], [input_text]) |
|
|
|
generate_btn.click( |
|
fn=self.generate_video, |
|
inputs=[input_text], |
|
outputs=[output], |
|
api_name="generate_video" |
|
) |
|
|
|
LOGGER.info("β
Gradio interface initialized") |
|
demo.launch(debug=True, show_error=True) |
|
|
|
if __name__ == "__main__": |
|
text2video = Text2Video() |
|
text2video.gradio_interface() |