Spaces:
Paused
Paused
import gradio as gr | |
from gradio.inputs import Textbox | |
import torch | |
from diffusers import StableDiffusionPipeline | |
import boto3 | |
from io import BytesIO | |
import os | |
import botocore | |
from time import sleep | |
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") | |
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") | |
S3_BUCKET_NAME = 'pineblogs101145-dev' | |
model_id = "CompVis/stable-diffusion-v1-4" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
pipe = StableDiffusionPipeline.from_pretrained( | |
model_id, torch_dtype=torch.float32) | |
pipe = pipe.to(device) | |
s3 = boto3.resource('s3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
s3_client = boto3.client('s3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
bucket_name = 'pineblogs101145-dev' | |
folder = 'public/mdx/' | |
def text_to_image(prompt, save_as, key_id): | |
if AWS_ACCESS_KEY_ID != key_id: | |
return "not permition" | |
# Create an instance of the S3 client | |
s3 = boto3.client('s3', | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY) | |
image_name = '-'.join(save_as.split()) + ".webp" | |
def save_image_to_s3(image): | |
# Create a BytesIO object to store the image. | |
image_buffer = BytesIO() | |
image.save(image_buffer, format='WEBP') | |
image_buffer.seek(0) | |
# Full path of the file in the bucket | |
s3_key = "public/" + image_name | |
print('Saving image to s3') | |
# Upload the image to the S3 bucket | |
s3.upload_fileobj(image_buffer, S3_BUCKET_NAME, s3_key) | |
print('Image saved to s3') | |
def generator_image(prompt): | |
prompt = prompt | |
print('Starting to generate the image ...') | |
try: | |
image = pipe(prompt).images[0] | |
except Exception as e: | |
print('Error: ', e) | |
print('Image generation completed') | |
# Save the image in S3 | |
save_image_to_s3(image) | |
generator_image(prompt) | |
return image_name | |
def check_if_exist(bucket_name, key): | |
try: | |
s3.Object(bucket_name, key).load() | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == "404": | |
# The object does not exist. | |
return False | |
else: | |
# Something else has gone wrong. | |
raise | |
else: | |
return True | |
def list_s3_files(bucket_name, folder): | |
my_bucket = s3.Bucket(bucket_name) | |
for objects in my_bucket.objects.filter(Prefix=folder): | |
print(objects.key) | |
filename_ext = '%s' % os.path.basename(objects.key) | |
filename = os.path.splitext(filename_ext)[0] | |
s3image = 'public/%s.webp' % filename | |
if check_if_exist(bucket_name, s3image): | |
print('Image %s already exists!' % s3image) | |
else: | |
response = s3_client.head_object(Bucket=bucket_name, Key=objects.key) | |
metadata = response['Metadata'] | |
print(metadata) | |
if 'resumen' in metadata: | |
print('Has resume, ready to create image!') | |
print('Start creating image.. %s ' % s3image) | |
resumen = metadata['resumen'] | |
else: | |
print('There is NOT resume, skipping..') | |
sleep(500/1000) | |
text_to_image(resumen, filename, AWS_ACCESS_KEY_ID) | |
list_s3_files(bucket_name, folder) | |
iface = gr.Interface(fn=list_s3_files, inputs=[Textbox(label="bucket_name"), Textbox(label="folder")], outputs="text") | |
iface.launch() | |