#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import os import shutil import subprocess import glob import re from pathlib import Path def create_video(input_dir, sample, temp_dir, step_multiplier, repeat, max_images): output_filename = f"{os.path.basename(os.getcwd())}_sample{sample}.mp4" print(f"Processing sample {sample}. Output filename: {output_filename}") # Create repeated images print(f"Creating repeated images for sample {sample}...") for img in glob.glob(f"{input_dir}/{sample}_*.png"): for i in range(repeat): base = os.path.splitext(os.path.basename(img))[0] shutil.copy(img, f"{temp_dir}/{base}_{i+1}.png") # Prepare ffmpeg options vf_options = "scale=1024x1024" if step_multiplier: vf_options += f",drawtext=fontfile=/usr/share/fonts/TTF/Inconsolata-Light.ttf:text='Steps\\: %{{expr\\:trunc(n*{step_multiplier}/{repeat})}}':x=10:y=h-th-10:fontsize=24:fontcolor=white" if max_images: vf_options = f"select='not(mod(n\\,{max_images}))',{vf_options}" # Run first ffmpeg command temp_output = f"{temp_dir}/temp_{sample}.mp4" ffmpeg_cmd = [ "ffmpeg", "-framerate", "60", "-pattern_type", "glob", "-i", f"{temp_dir}/{sample}_*.png", "-vf", vf_options, "-crf", "18", "-c:v", "libx264", "-b:v", "12M", "-pix_fmt", "yuv420p", "-y", temp_output ] try: subprocess.run(ffmpeg_cmd, check=True) except subprocess.CalledProcessError: print(f"Error: ffmpeg command failed for sample {sample}.") return False # Get duration and process final video try: duration_cmd = ["ffmpeg", "-i", temp_output] result = subprocess.run(duration_cmd, capture_output=True, text=True) duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2})', result.stderr) if duration_match: hours, minutes, seconds = map(float, duration_match.groups()) duration = hours * 3600 + minutes * 60 + seconds final_cmd = [ "ffmpeg", "-i", temp_output, "-vf", "tpad=stop_mode=clone:stop_duration=8", "-c:v", "libx264", "-b:v", "12M", "-crf", "18", "-pix_fmt", "yuv420p", "-y", output_filename ] subprocess.run(final_cmd, check=True) else: print("Error: Could not determine video duration.") return False except subprocess.CalledProcessError: print(f"Error: Final ffmpeg processing failed for sample {sample}.") return False # Clean up temporary files for this sample for f in glob.glob(f"{temp_dir}/{sample}_*.png"): os.remove(f) os.remove(temp_output) return True def get_step_size_from_filenames(sample): files = sorted(glob.glob(f"{sample}_*.png")) if len(files) < 2: return None # Extract step numbers from first two files pattern = r'_(\d{5})_' first_match = re.search(pattern, files[0]) second_match = re.search(pattern, files[1]) if first_match and second_match: first_step = int(first_match.group(1)) second_step = int(second_match.group(1)) return second_step - first_step return None def main(): parser = argparse.ArgumentParser(description='Convert PNG sequence to MP4') parser.add_argument('--max', type=int, help='Maximum number of images') parser.add_argument('--step', type=int, help='Step multiplier') parser.add_argument('--repeat', type=int, default=1, help='Repeat count') parser.add_argument('--steps-from-filename', action='store_true', help='Calculate steps from filename') args = parser.parse_args() # Create temporary directory temp_dir = os.path.expanduser("~/.local/tmp") os.makedirs(temp_dir, exist_ok=True) print("Created temporary directory...") # Check for PNG files png_files = glob.glob("*.png") if not png_files: print("Error: No PNG files found in the current directory.") return 1 # Find all unique sample numbers sample_pattern = r'([a-zA-Z]+)_\d{5}_' samples = sorted(set(re.findall(sample_pattern, ' '.join(png_files)))) for sample in samples: if args.steps_from_filename: step_multiplier = get_step_size_from_filenames(sample) if step_multiplier: print(f"Detected step size: {step_multiplier}") else: print("Error: Could not determine step size from filenames") continue else: step_multiplier = args.step success = create_video(".", sample, temp_dir, step_multiplier, args.repeat, args.max) if not success: shutil.rmtree(temp_dir) return 1 # Clean up print("Cleaning up temporary directory...") shutil.rmtree(temp_dir) print("All samples processed successfully.") return 0 if __name__ == "__main__": exit(main())