AZLABS commited on
Commit
bd0d6f3
Β·
verified Β·
1 Parent(s): 62ead92

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +143 -152
app.py CHANGED
@@ -10,234 +10,225 @@ from hercai import Hercai
10
  import uuid
11
  import time
12
  import gradio as gr
13
- from typing import Tuple, List
14
  import numpy as np
 
 
 
 
15
 
16
- # Configure logging with console output
17
  logging.basicConfig(
18
  level=logging.INFO,
19
- format='[%(asctime)s] %(message)s',
20
  handlers=[
21
- logging.FileHandler('app.log'),
22
- logging.StreamHandler() # This will print to console
 
 
 
23
  ]
24
  )
25
  LOGGER = logging.getLogger(__name__)
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  class Text2Video:
28
  def __init__(self) -> None:
29
  """Initialize the Text2Video class."""
30
  LOGGER.info("Initializing Text2Video application...")
31
  self.herc = Hercai()
32
- LOGGER.info("Hercai API initialized successfully")
33
-
34
- def get_image(self, img_prompt: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  """Generate an image based on the provided text prompt."""
36
  try:
37
  LOGGER.info(f"🎨 Starting image generation for prompt: {img_prompt}")
 
38
 
39
- # Enhanced prompt for better comic-style results
40
- comic_style_prompt = (
41
- f"{img_prompt}, comic book style, full scene composition, "
42
- "vibrant colors, clear speech bubbles with text, "
43
- "dramatic lighting, high contrast, detailed backgrounds, "
44
- "comic book panel layout, professional illustration"
45
- )
46
-
47
- LOGGER.info("πŸ“ Enhanced prompt with comic style elements")
48
- LOGGER.info(f"πŸ”„ Sending request to Hercai API...")
49
-
50
- image_result = self.herc.draw_image(
51
  model="v3",
52
- prompt=comic_style_prompt,
53
  negative_prompt="blurry, cropped, low quality, dark, gloomy"
54
  )
55
 
56
- image_url = image_result["url"]
57
- LOGGER.info(f"βœ… Image generated successfully: {image_url}")
58
- return image_url
59
-
60
  except Exception as e:
61
  LOGGER.error(f"❌ Error generating image: {str(e)}")
62
  raise
63
 
64
  def download_img_from_url(self, image_url: str, image_path: str) -> str:
65
- """Download and process image from URL."""
66
  try:
67
  LOGGER.info(f"πŸ“₯ Downloading image from: {image_url}")
68
 
69
- # Download image
70
- urllib.request.urlretrieve(image_url, image_path)
71
-
72
- # Process image to ensure full coverage
73
- img = Image.open(image_path)
74
-
75
- # Resize maintaining aspect ratio
76
- target_size = (1024, 1024)
77
- img.thumbnail(target_size, Image.Resampling.LANCZOS)
78
-
79
- # Create new image with padding if needed
80
- new_img = Image.new('RGB', target_size, (255, 255, 255))
81
- offset = ((target_size[0] - img.size[0]) // 2,
82
- (target_size[1] - img.size[1]) // 2)
83
- new_img.paste(img, offset)
84
-
85
- # Save processed image
86
- new_img.save(image_path, quality=95)
87
-
88
- LOGGER.info(f"βœ… Image processed and saved to: {image_path}")
 
 
89
  return image_path
90
 
91
  except Exception as e:
92
  LOGGER.error(f"❌ Error processing image: {str(e)}")
93
  raise
 
 
 
94
 
95
- def text_to_audio(self, img_prompt: str, audio_path: str) -> str:
96
- """Convert text to speech with enhanced quality."""
97
  try:
98
- LOGGER.info(f"πŸ”Š Converting text to audio: {img_prompt}")
 
99
 
100
- # Create audio with enhanced parameters
101
- tts = gTTS(text=img_prompt, lang='en', slow=False)
102
- LOGGER.info("πŸ“ Audio conversion complete")
103
-
104
- # Save audio file
105
- tts.save(audio_path)
106
- LOGGER.info(f"βœ… Audio saved to: {audio_path}")
107
-
108
- return audio_path
109
-
110
- except Exception as e:
111
- LOGGER.error(f"❌ Error in audio conversion: {str(e)}")
112
- raise
113
-
114
- def get_images_and_audio(self, list_prompts: List[str]) -> Tuple[List[str], List[str]]:
115
- """Process multiple prompts to generate images and audio."""
116
- img_list = []
117
- audio_paths = []
118
-
119
- LOGGER.info(f"🎬 Starting batch processing of {len(list_prompts)} prompts")
120
-
121
- for idx, img_prompt in enumerate(list_prompts, 1):
122
- try:
123
- LOGGER.info(f"πŸ“ Processing prompt {idx}/{len(list_prompts)}")
124
-
125
- # Generate unique identifier
126
- unique_id = uuid.uuid4().hex[:8]
127
-
128
- # Process image
129
- image_path = f"scene_{idx}_{unique_id}.png"
130
- img_url = self.get_image(img_prompt)
131
- image = self.download_img_from_url(img_url, image_path)
132
- img_list.append(image)
133
-
134
- # Process audio
135
- audio_path = f"audio_{idx}_{unique_id}.mp3"
136
- audio = self.text_to_audio(img_prompt, audio_path)
137
- audio_paths.append(audio)
138
-
139
- LOGGER.info(f"βœ… Completed processing prompt {idx}")
140
-
141
- except Exception as e:
142
- LOGGER.error(f"❌ Error processing prompt {idx}: {str(e)}")
143
- raise
144
-
145
- return img_list, audio_paths
146
-
147
- def create_video_from_images_and_audio(self, image_files: List[str],
148
- audio_files: List[str],
149
- output_path: str) -> None:
150
- """Create final video with enhanced quality."""
151
- try:
152
- LOGGER.info("πŸŽ₯ Starting video creation process")
153
 
154
- if len(image_files) != len(audio_files):
155
- raise ValueError("Number of images and audio files don't match")
156
 
157
- video_clips = []
158
- for idx, (image_file, audio_file) in enumerate(zip(image_files, audio_files), 1):
159
- LOGGER.info(f"πŸ”„ Processing scene {idx}/{len(image_files)}")
160
-
161
- # Load audio and create video clip
162
- audio_clip = mp.AudioFileClip(audio_file)
163
- video_clip = mp.ImageClip(image_file).set_duration(audio_clip.duration)
164
- video_clip = video_clip.set_audio(audio_clip)
165
- video_clips.append(video_clip)
166
-
167
- LOGGER.info(f"βœ… Scene {idx} processed successfully")
168
-
169
- LOGGER.info("πŸ”„ Concatenating all scenes")
170
- final_clip = mp.concatenate_videoclips(video_clips)
171
-
172
- LOGGER.info("πŸ’Ύ Writing final video file")
173
- final_clip.write_videofile(
174
- output_path,
175
- codec='libx264',
176
- fps=24,
177
- audio_codec='aac',
178
- audio_bitrate='192k',
179
- preset='medium'
180
- )
181
 
182
- LOGGER.info("βœ… Video created successfully")
183
-
184
  except Exception as e:
185
- LOGGER.error(f"❌ Error in video creation: {str(e)}")
186
  raise
187
 
188
- def generate_video(self, text: str) -> str:
189
- """Main function to generate video from text."""
190
  try:
191
  LOGGER.info("🎬 Starting video generation process")
 
192
 
193
- # Split text into prompts
194
- list_prompts = [sentence.strip() for sentence in text.split(",,") if sentence.strip()]
195
- LOGGER.info(f"πŸ“ Processed {len(list_prompts)} scenes from input text")
196
 
197
- output_path = f"comic_video_{uuid.uuid4().hex[:8]}.mp4"
 
 
 
 
 
198
 
199
- # Generate images and audio
200
- img_list, audio_paths = self.get_images_and_audio(list_prompts)
 
201
 
202
- # Create final video
203
- self.create_video_from_images_and_audio(img_list, audio_paths, output_path)
204
 
205
- LOGGER.info(f"βœ… Video generation completed: {output_path}")
206
  return output_path
207
-
208
  except Exception as e:
209
  LOGGER.error(f"❌ Error in video generation: {str(e)}")
210
  raise
 
 
211
 
212
  def gradio_interface(self):
213
- """Create Gradio interface."""
214
  LOGGER.info("🌐 Initializing Gradio interface")
215
 
216
- with gr.Blocks(theme='abidlabs/dracula_revamped') as demo:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
  gr.HTML("""
218
- <center><h1 style="color:#fff">Comic Video Generator</h1></center>
 
 
 
219
  """)
220
 
221
  with gr.Row():
222
  input_text = gr.Textbox(
223
  label="Comic Script",
224
- placeholder="Enter your story (separate scenes with ,,)"
 
225
  )
226
 
227
  with gr.Row():
228
- generate_btn = gr.Button("🎬 Generate Video")
229
 
230
  with gr.Row():
231
  output = gr.Video(label="Generated Comic Video")
232
 
233
- # Example text
234
- example_txt = """Once upon a time in a magical forest,, A brave knight discovered a mysterious crystal,, The crystal began to glow with incredible power"""
 
235
  gr.Examples([[example_txt]], [input_text])
236
 
237
- generate_btn.click(self.generate_video, inputs=[input_text], outputs=[output])
 
 
 
 
 
238
 
239
  LOGGER.info("βœ… Gradio interface initialized")
240
- demo.launch(debug=True)
241
 
242
  if __name__ == "__main__":
243
  text2video = Text2Video()
 
10
  import uuid
11
  import time
12
  import gradio as gr
13
+ from typing import Tuple, List, Optional
14
  import numpy as np
15
+ from concurrent.futures import ThreadPoolExecutor
16
+ from functools import partial
17
+ import tempfile
18
+ import contextlib
19
 
20
+ # Configure logging with console output and rotation
21
  logging.basicConfig(
22
  level=logging.INFO,
23
+ format='[%(asctime)s] [%(levelname)s] %(message)s',
24
  handlers=[
25
+ logging.FileHandler('app.log', mode='a'),
26
+ logging.StreamHandler(),
27
+ logging.handlers.RotatingFileHandler(
28
+ 'app.log', maxBytes=1024*1024, backupCount=5
29
+ )
30
  ]
31
  )
32
  LOGGER = logging.getLogger(__name__)
33
 
34
+ class ResourceManager:
35
+ """Manage temporary resources and cleanup."""
36
+
37
+ def __init__(self):
38
+ self.temp_files = set()
39
+
40
+ def add_temp_file(self, filepath: str) -> None:
41
+ self.temp_files.add(filepath)
42
+
43
+ def cleanup(self) -> None:
44
+ for file in self.temp_files:
45
+ try:
46
+ if os.path.exists(file):
47
+ os.remove(file)
48
+ except Exception as e:
49
+ LOGGER.warning(f"Failed to remove temporary file {file}: {e}")
50
+ self.temp_files.clear()
51
+
52
  class Text2Video:
53
  def __init__(self) -> None:
54
  """Initialize the Text2Video class."""
55
  LOGGER.info("Initializing Text2Video application...")
56
  self.herc = Hercai()
57
+ self.resource_manager = ResourceManager()
58
+ self.max_workers = min(os.cpu_count() or 1, 4) # Limit concurrent tasks
59
+ LOGGER.info("Initialization complete")
60
+
61
+ def _enhance_prompt(self, prompt: str) -> str:
62
+ """Enhance the prompt with comic-style elements."""
63
+ return (
64
+ f"{prompt}, comic book style, full scene composition, "
65
+ "vibrant colors, clear speech bubbles with text, "
66
+ "dramatic lighting, high contrast, detailed backgrounds, "
67
+ "comic book panel layout, professional illustration"
68
+ )
69
+
70
+ @staticmethod
71
+ def _create_unique_filename(prefix: str, suffix: str) -> str:
72
+ """Create a unique filename with given prefix and suffix."""
73
+ return f"{prefix}_{uuid.uuid4().hex[:8]}{suffix}"
74
+
75
+ async def get_image(self, img_prompt: str) -> Optional[str]:
76
  """Generate an image based on the provided text prompt."""
77
  try:
78
  LOGGER.info(f"🎨 Starting image generation for prompt: {img_prompt}")
79
+ enhanced_prompt = self._enhance_prompt(img_prompt)
80
 
81
+ image_result = await self.herc.draw_image(
 
 
 
 
 
 
 
 
 
 
 
82
  model="v3",
83
+ prompt=enhanced_prompt,
84
  negative_prompt="blurry, cropped, low quality, dark, gloomy"
85
  )
86
 
87
+ return image_result["url"]
 
 
 
88
  except Exception as e:
89
  LOGGER.error(f"❌ Error generating image: {str(e)}")
90
  raise
91
 
92
  def download_img_from_url(self, image_url: str, image_path: str) -> str:
93
+ """Download and process image from URL with improved error handling."""
94
  try:
95
  LOGGER.info(f"πŸ“₯ Downloading image from: {image_url}")
96
 
97
+ with tempfile.NamedTemporaryFile(delete=False) as temp_file:
98
+ urllib.request.urlretrieve(image_url, temp_file.name)
99
+
100
+ with Image.open(temp_file.name) as img:
101
+ # Convert to RGB if necessary
102
+ if img.mode != 'RGB':
103
+ img = img.convert('RGB')
104
+
105
+ # Resize with proper aspect ratio
106
+ target_size = (1024, 1024)
107
+ img.thumbnail(target_size, Image.Resampling.LANCZOS)
108
+
109
+ # Create new image with padding
110
+ new_img = Image.new('RGB', target_size, (255, 255, 255))
111
+ offset = ((target_size[0] - img.size[0]) // 2,
112
+ (target_size[1] - img.size[1]) // 2)
113
+ new_img.paste(img, offset)
114
+
115
+ # Save with optimization
116
+ new_img.save(image_path, 'PNG', optimize=True)
117
+
118
+ self.resource_manager.add_temp_file(image_path)
119
  return image_path
120
 
121
  except Exception as e:
122
  LOGGER.error(f"❌ Error processing image: {str(e)}")
123
  raise
124
+ finally:
125
+ if os.path.exists(temp_file.name):
126
+ os.unlink(temp_file.name)
127
 
128
+ async def process_scene(self, prompt: str, idx: int) -> Tuple[str, str]:
129
+ """Process a single scene (image and audio) concurrently."""
130
  try:
131
+ image_path = self._create_unique_filename(f"scene_{idx}", ".png")
132
+ audio_path = self._create_unique_filename(f"audio_{idx}", ".mp3")
133
 
134
+ # Generate image
135
+ image_url = await self.get_image(prompt)
136
+ image_path = self.download_img_from_url(image_url, image_path)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
+ # Generate audio
139
+ audio_path = self.text_to_audio(prompt, audio_path)
140
 
141
+ return image_path, audio_path
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
 
 
 
143
  except Exception as e:
144
+ LOGGER.error(f"Error processing scene {idx}: {e}")
145
  raise
146
 
147
+ async def generate_video(self, text: str) -> str:
148
+ """Main function to generate video from text with improved concurrency."""
149
  try:
150
  LOGGER.info("🎬 Starting video generation process")
151
+ list_prompts = [s.strip() for s in text.split(",,") if s.strip()]
152
 
153
+ output_path = self._create_unique_filename("comic_video", ".mp4")
154
+ self.resource_manager.add_temp_file(output_path)
 
155
 
156
+ # Process scenes concurrently
157
+ scenes = []
158
+ async with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
159
+ for idx, prompt in enumerate(list_prompts, 1):
160
+ scene = await self.process_scene(prompt, idx)
161
+ scenes.append(scene)
162
 
163
+ # Create video
164
+ img_list = [scene[0] for scene in scenes]
165
+ audio_paths = [scene[1] for scene in scenes]
166
 
167
+ await self.create_video_from_images_and_audio(img_list, audio_paths, output_path)
 
168
 
 
169
  return output_path
170
+
171
  except Exception as e:
172
  LOGGER.error(f"❌ Error in video generation: {str(e)}")
173
  raise
174
+ finally:
175
+ self.resource_manager.cleanup()
176
 
177
  def gradio_interface(self):
178
+ """Create Gradio interface with improved styling."""
179
  LOGGER.info("🌐 Initializing Gradio interface")
180
 
181
+ css = """
182
+ .gradio-container {
183
+ font-family: 'Arial', sans-serif;
184
+ max-width: 1200px;
185
+ margin: auto;
186
+ }
187
+ .header {
188
+ text-align: center;
189
+ padding: 2rem;
190
+ background: linear-gradient(135deg, #6e8efb, #a777e3);
191
+ color: white;
192
+ border-radius: 10px;
193
+ margin-bottom: 2rem;
194
+ }
195
+ """
196
+
197
+ with gr.Blocks(theme=gr.themes.Soft(), css=css) as demo:
198
  gr.HTML("""
199
+ <div class="header">
200
+ <h1>🎬 Comic Video Generator</h1>
201
+ <p>Transform your story into an animated comic!</p>
202
+ </div>
203
  """)
204
 
205
  with gr.Row():
206
  input_text = gr.Textbox(
207
  label="Comic Script",
208
+ placeholder="Enter your story (separate scenes with ,,)",
209
+ lines=5
210
  )
211
 
212
  with gr.Row():
213
+ generate_btn = gr.Button("🎬 Generate Video", variant="primary")
214
 
215
  with gr.Row():
216
  output = gr.Video(label="Generated Comic Video")
217
 
218
+ example_txt = """Once upon a time in a magical forest,,
219
+ A brave knight discovered a mysterious crystal,,
220
+ The crystal began to glow with incredible power"""
221
  gr.Examples([[example_txt]], [input_text])
222
 
223
+ generate_btn.click(
224
+ fn=self.generate_video,
225
+ inputs=[input_text],
226
+ outputs=[output],
227
+ api_name="generate_video"
228
+ )
229
 
230
  LOGGER.info("βœ… Gradio interface initialized")
231
+ demo.launch(debug=True, show_error=True)
232
 
233
  if __name__ == "__main__":
234
  text2video = Text2Video()