AZLABS commited on
Commit
49f4a30
Β·
verified Β·
1 Parent(s): bd0d6f3

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +142 -146
app.py CHANGED
@@ -10,225 +10,221 @@ from hercai import Hercai
10
  import uuid
11
  import time
12
  import gradio as gr
13
- from typing import Tuple, List, Optional
14
  import numpy as np
15
- from concurrent.futures import ThreadPoolExecutor
16
- from functools import partial
17
- import tempfile
18
- import contextlib
19
 
20
- # Configure logging with console output and rotation
21
  logging.basicConfig(
22
  level=logging.INFO,
23
- format='[%(asctime)s] [%(levelname)s] %(message)s',
24
  handlers=[
25
- logging.FileHandler('app.log', mode='a'),
26
- logging.StreamHandler(),
27
- logging.handlers.RotatingFileHandler(
28
- 'app.log', maxBytes=1024*1024, backupCount=5
29
- )
30
  ]
31
  )
32
  LOGGER = logging.getLogger(__name__)
33
 
34
- class ResourceManager:
35
- """Manage temporary resources and cleanup."""
36
-
37
- def __init__(self):
38
- self.temp_files = set()
39
-
40
- def add_temp_file(self, filepath: str) -> None:
41
- self.temp_files.add(filepath)
42
-
43
- def cleanup(self) -> None:
44
- for file in self.temp_files:
45
- try:
46
- if os.path.exists(file):
47
- os.remove(file)
48
- except Exception as e:
49
- LOGGER.warning(f"Failed to remove temporary file {file}: {e}")
50
- self.temp_files.clear()
51
-
52
  class Text2Video:
53
  def __init__(self) -> None:
54
  """Initialize the Text2Video class."""
55
  LOGGER.info("Initializing Text2Video application...")
56
  self.herc = Hercai()
57
- self.resource_manager = ResourceManager()
58
- self.max_workers = min(os.cpu_count() or 1, 4) # Limit concurrent tasks
59
- LOGGER.info("Initialization complete")
60
-
61
- def _enhance_prompt(self, prompt: str) -> str:
62
- """Enhance the prompt with comic-style elements."""
63
- return (
64
- f"{prompt}, comic book style, full scene composition, "
65
- "vibrant colors, clear speech bubbles with text, "
66
- "dramatic lighting, high contrast, detailed backgrounds, "
67
- "comic book panel layout, professional illustration"
68
- )
69
-
70
- @staticmethod
71
- def _create_unique_filename(prefix: str, suffix: str) -> str:
72
- """Create a unique filename with given prefix and suffix."""
73
- return f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}"
74
-
75
- async def get_image(self, img_prompt: str) -> Optional[str]:
76
  """Generate an image based on the provided text prompt."""
77
  try:
78
  LOGGER.info(f"🎨 Starting image generation for prompt: {img_prompt}")
79
- enhanced_prompt = self._enhance_prompt(img_prompt)
80
 
81
- image_result = await self.herc.draw_image(
82
- model="v3",
83
- prompt=enhanced_prompt,
 
 
 
 
 
 
 
 
 
 
 
84
  negative_prompt="blurry, cropped, low quality, dark, gloomy"
85
  )
86
 
87
- return image_result["url"]
 
 
 
88
  except Exception as e:
89
  LOGGER.error(f"❌ Error generating image: {str(e)}")
90
  raise
91
 
92
  def download_img_from_url(self, image_url: str, image_path: str) -> str:
93
- """Download and process image from URL with improved error handling."""
94
  try:
95
- LOGGER.info(f"πŸ“₯ Downloading image from: {image_url}")
96
 
97
- with tempfile.NamedTemporaryFile(delete=False) as temp_file:
98
- urllib.request.urlretrieve(image_url, temp_file.name)
99
-
100
- with Image.open(temp_file.name) as img:
101
- # Convert to RGB if necessary
102
- if img.mode != 'RGB':
103
- img = img.convert('RGB')
104
-
105
- # Resize with proper aspect ratio
106
- target_size = (1024, 1024)
107
- img.thumbnail(target_size, Image.Resampling.LANCZOS)
108
-
109
- # Create new image with padding
110
- new_img = Image.new('RGB', target_size, (255, 255, 255))
111
- offset = ((target_size[0] - img.size[0]) // 2,
112
- (target_size[1] - img.size[1]) // 2)
113
- new_img.paste(img, offset)
114
-
115
- # Save with optimization
116
- new_img.save(image_path, 'PNG', optimize=True)
117
-
118
- self.resource_manager.add_temp_file(image_path)
119
  return image_path
120
 
121
  except Exception as e:
122
- LOGGER.error(f"❌ Error processing image: {str(e)}")
123
  raise
124
- finally:
125
- if os.path.exists(temp_file.name):
126
- os.unlink(temp_file.name)
127
 
128
- async def process_scene(self, prompt: str, idx: int) -> Tuple[str, str]:
129
- """Process a single scene (image and audio) concurrently."""
130
  try:
131
- image_path = self._create_unique_filename(f"scene_{idx}", ".png")
132
- audio_path = self._create_unique_filename(f"audio_{idx}", ".mp3")
 
 
 
133
 
134
- # Generate image
135
- image_url = await self.get_image(prompt)
136
- image_path = self.download_img_from_url(image_url, image_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
- # Generate audio
139
- audio_path = self.text_to_audio(prompt, audio_path)
140
 
141
- return image_path, audio_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
 
 
 
143
  except Exception as e:
144
- LOGGER.error(f"Error processing scene {idx}: {e}")
145
  raise
146
 
147
- async def generate_video(self, text: str) -> str:
148
- """Main function to generate video from text with improved concurrency."""
149
  try:
150
  LOGGER.info("🎬 Starting video generation process")
151
- list_prompts = [s.strip() for s in text.split(",,") if s.strip()]
152
 
153
- output_path = self._create_unique_filename("comic_video", ".mp4")
154
- self.resource_manager.add_temp_file(output_path)
 
155
 
156
- # Process scenes concurrently
157
- scenes = []
158
- async with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
159
- for idx, prompt in enumerate(list_prompts, 1):
160
- scene = await self.process_scene(prompt, idx)
161
- scenes.append(scene)
162
 
163
- # Create video
164
- img_list = [scene[0] for scene in scenes]
165
- audio_paths = [scene[1] for scene in scenes]
166
 
167
- await self.create_video_from_images_and_audio(img_list, audio_paths, output_path)
 
168
 
 
169
  return output_path
170
-
171
  except Exception as e:
172
  LOGGER.error(f"❌ Error in video generation: {str(e)}")
173
  raise
174
- finally:
175
- self.resource_manager.cleanup()
176
 
177
  def gradio_interface(self):
178
- """Create Gradio interface with improved styling."""
179
  LOGGER.info("🌐 Initializing Gradio interface")
180
 
181
- css = """
182
- .gradio-container {
183
- font-family: 'Arial', sans-serif;
184
- max-width: 1200px;
185
- margin: auto;
186
- }
187
- .header {
188
- text-align: center;
189
- padding: 2rem;
190
- background: linear-gradient(135deg, #6e8efb, #a777e3);
191
- color: white;
192
- border-radius: 10px;
193
- margin-bottom: 2rem;
194
- }
195
- """
196
-
197
- with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo:
198
  gr.HTML("""
199
- <div class="header">
200
- <h1>🎬 Comic Video Generator</h1>
201
- <p>Transform your story into an animated comic!</p>
202
- </div>
203
  """)
204
 
205
  with gr.Row():
206
  input_text = gr.Textbox(
207
  label="Comic Script",
208
- placeholder="Enter your story (separate scenes with ,,)",
209
- lines=5
210
  )
211
 
212
  with gr.Row():
213
- generate_btn = gr.Button("🎬 Generate Video", variant="primary")
214
 
215
  with gr.Row():
216
  output = gr.Video(label="Generated Comic Video")
217
 
218
- example_txt = """Once upon a time in a magical forest,,
219
- A brave knight discovered a mysterious crystal,,
220
- The crystal began to glow with incredible power"""
221
  gr.Examples([[example_txt]], [input_text])
222
 
223
- generate_btn.click(
224
- fn=self.generate_video,
225
- inputs=[input_text],
226
- outputs=[output],
227
- api_name="generate_video"
228
- )
229
 
230
  LOGGER.info("βœ… Gradio interface initialized")
231
- demo.launch(debug=True, show_error=True)
232
 
233
  if __name__ == "__main__":
234
  text2video = Text2Video()
 
10
  import uuid
11
  import time
12
  import gradio as gr
13
+ from typing import Tuple, List
14
  import numpy as np
 
 
 
 
15
 
16
+ # Configure logging with console output
17
  logging.basicConfig(
18
  level=logging.INFO,
19
+ format='[%(asctime)s] %(message)s',
20
  handlers=[
21
+ logging.FileHandler('app.log'),
22
+ logging.StreamHandler() # This will print to console
 
 
 
23
  ]
24
  )
25
  LOGGER = logging.getLogger(__name__)
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  class Text2Video:
28
  def __init__(self) -> None:
29
  """Initialize the Text2Video class."""
30
  LOGGER.info("Initializing Text2Video application...")
31
  self.herc = Hercai()
32
+ LOGGER.info("Hercai API initialized successfully")
33
+
34
+ def get_image(self, img_prompt: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  """Generate an image based on the provided text prompt."""
36
  try:
37
  LOGGER.info(f"🎨 Starting image generation for prompt: {img_prompt}")
 
38
 
39
+ # Enhanced prompt for better comic-style results
40
+ comic_style_prompt = (
41
+ f"{img_prompt}, comic book style, full scene composition, "
42
+ "vibrant colors, clear speech bubbles with text, "
43
+ "dramatic lighting, high contrast, detailed backgrounds, "
44
+ "comic book panel layout, professional illustration"
45
+ )
46
+
47
+ LOGGER.info("πŸ“ Enhanced prompt with comic style elements")
48
+ LOGGER.info(f"πŸ”„ Sending request to Hercai API...")
49
+
50
+ image_result = self.herc.draw_image(
51
+ model="simurg",
52
+ prompt=comic_style_prompt,
53
  negative_prompt="blurry, cropped, low quality, dark, gloomy"
54
  )
55
 
56
+ image_url = image_result["url"]
57
+ LOGGER.info(f"βœ… Image generated successfully: {image_url}")
58
+ return image_url
59
+
60
  except Exception as e:
61
  LOGGER.error(f"❌ Error generating image: {str(e)}")
62
  raise
63
 
64
  def download_img_from_url(self, image_url: str, image_path: str) -> str:
65
+ """Download and process image from URL."""
66
  try:
67
+ urllib.request.urlretrieve(image_url, image_path)
68
 
69
+ # Image processing for consistent quality
70
+ img = Image.open(image_path)
71
+ target_size = (1792, 1024)
72
+ img = img.resize(target_size, Image.Resampling.LANCZOS)
73
+ img.save(image_path, quality=95)
74
+
75
+ LOGGER.info(f"Successfully downloaded and processed image: {image_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  return image_path
77
 
78
  except Exception as e:
79
+ LOGGER.error(f"Error downloading image: {e}")
80
  raise
 
 
 
81
 
82
+ def text_to_audio(self, img_prompt: str, audio_path: str) -> str:
83
+ """Convert text to speech with enhanced quality."""
84
  try:
85
+ LOGGER.info(f"πŸ”Š Converting text to audio: {img_prompt}")
86
+
87
+ # Create audio with enhanced parameters
88
+ tts = gTTS(text=img_prompt, lang='en', slow=False)
89
+ LOGGER.info("πŸ“ Audio conversion complete")
90
 
91
+ # Save audio file
92
+ tts.save(audio_path)
93
+ LOGGER.info(f"βœ… Audio saved to: {audio_path}")
94
+
95
+ return audio_path
96
+
97
+ except Exception as e:
98
+ LOGGER.error(f"❌ Error in audio conversion: {str(e)}")
99
+ raise
100
+
101
+ def get_images_and_audio(self, list_prompts: List[str]) -> Tuple[List[str], List[str]]:
102
+ """Process multiple prompts to generate images and audio."""
103
+ img_list = []
104
+ audio_paths = []
105
+
106
+ LOGGER.info(f"🎬 Starting batch processing of {len(list_prompts)} prompts")
107
+
108
+ for idx, img_prompt in enumerate(list_prompts, 1):
109
+ try:
110
+ LOGGER.info(f"πŸ“ Processing prompt {idx}/{len(list_prompts)}")
111
+
112
+ # Generate unique identifier
113
+ unique_id = uuid.uuid4().hex[:8]
114
+
115
+ # Process image
116
+ image_path = f"scene_{idx}_{unique_id}.png"
117
+ img_url = self.get_image(img_prompt)
118
+ image = self.download_img_from_url(img_url, image_path)
119
+ img_list.append(image)
120
+
121
+ # Process audio
122
+ audio_path = f"audio_{idx}_{unique_id}.mp3"
123
+ audio = self.text_to_audio(img_prompt, audio_path)
124
+ audio_paths.append(audio)
125
+
126
+ LOGGER.info(f"βœ… Completed processing prompt {idx}")
127
+
128
+ except Exception as e:
129
+ LOGGER.error(f"❌ Error processing prompt {idx}: {str(e)}")
130
+ raise
131
+
132
+ return img_list, audio_paths
133
+
134
+ def create_video_from_images_and_audio(self, image_files: List[str],
135
+ audio_files: List[str],
136
+ output_path: str) -> None:
137
+ """Create final video with enhanced quality."""
138
+ try:
139
+ LOGGER.info("πŸŽ₯ Starting video creation process")
140
 
141
+ if len(image_files) != len(audio_files):
142
+ raise ValueError("Number of images and audio files don't match")
143
 
144
+ video_clips = []
145
+ for idx, (image_file, audio_file) in enumerate(zip(image_files, audio_files), 1):
146
+ LOGGER.info(f"πŸ”„ Processing scene {idx}/{len(image_files)}")
147
+
148
+ # Load audio and create video clip
149
+ audio_clip = mp.AudioFileClip(audio_file)
150
+ video_clip = mp.ImageClip(image_file).set_duration(audio_clip.duration)
151
+ video_clip = video_clip.set_audio(audio_clip)
152
+ video_clips.append(video_clip)
153
+
154
+ LOGGER.info(f"βœ… Scene {idx} processed successfully")
155
+
156
+ LOGGER.info("πŸ”„ Concatenating all scenes")
157
+ final_clip = mp.concatenate_videoclips(video_clips)
158
+
159
+ LOGGER.info("πŸ’Ύ Writing final video file")
160
+ final_clip.write_videofile(
161
+ output_path,
162
+ codec='libx264',
163
+ fps=24,
164
+ audio_codec='aac',
165
+ audio_bitrate='192k',
166
+ preset='medium'
167
+ )
168
 
169
+ LOGGER.info("βœ… Video created successfully")
170
+
171
  except Exception as e:
172
+ LOGGER.error(f"❌ Error in video creation: {str(e)}")
173
  raise
174
 
175
+ def generate_video(self, text: str) -> str:
176
+ """Main function to generate video from text."""
177
  try:
178
  LOGGER.info("🎬 Starting video generation process")
 
179
 
180
+ # Split text into prompts
181
+ list_prompts = [sentence.strip() for sentence in text.split(",,") if sentence.strip()]
182
+ LOGGER.info(f"πŸ“ Processed {len(list_prompts)} scenes from input text")
183
 
184
+ output_path = f"comic_video_{uuid.uuid4().hex[:8]}.mp4"
 
 
 
 
 
185
 
186
+ # Generate images and audio
187
+ img_list, audio_paths = self.get_images_and_audio(list_prompts)
 
188
 
189
+ # Create final video
190
+ self.create_video_from_images_and_audio(img_list, audio_paths, output_path)
191
 
192
+ LOGGER.info(f"βœ… Video generation completed: {output_path}")
193
  return output_path
194
+
195
  except Exception as e:
196
  LOGGER.error(f"❌ Error in video generation: {str(e)}")
197
  raise
 
 
198
 
199
  def gradio_interface(self):
200
+ """Create Gradio interface."""
201
  LOGGER.info("🌐 Initializing Gradio interface")
202
 
203
+ with gr.Blocks(theme='abidlabs/dracula_revamped') as demo:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  gr.HTML("""
205
+ <center><h1 style="color:#fff">Comic Video Generator</h1></center>
 
 
 
206
  """)
207
 
208
  with gr.Row():
209
  input_text = gr.Textbox(
210
  label="Comic Script",
211
+ placeholder="Enter your story (separate scenes with ,,)"
 
212
  )
213
 
214
  with gr.Row():
215
+ generate_btn = gr.Button("🎬 Generate Video")
216
 
217
  with gr.Row():
218
  output = gr.Video(label="Generated Comic Video")
219
 
220
+ # Example text
221
+ example_txt = """Once upon a time in a magical forest,, A brave knight discovered a mysterious crystal,, The crystal began to glow with incredible power"""
 
222
  gr.Examples([[example_txt]], [input_text])
223
 
224
+ generate_btn.click(self.generate_video, inputs=[input_text], outputs=[output])
 
 
 
 
 
225
 
226
  LOGGER.info("βœ… Gradio interface initialized")
227
+ demo.launch(debug=True)
228
 
229
  if __name__ == "__main__":
230
  text2video = Text2Video()