Hjgugugjhuhjggg commited on
Commit
f6a64dd
·
verified ·
1 Parent(s): 3a145aa

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +96 -83
app.py CHANGED
@@ -10,16 +10,18 @@ import torch
10
  import safetensors.torch
11
  from fastapi.responses import StreamingResponse
12
  from tqdm import tqdm
13
- import re
14
 
 
15
  load_dotenv()
16
 
 
17
  AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
18
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
19
  AWS_REGION = os.getenv("AWS_REGION")
20
- S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
21
- HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
22
 
 
23
  s3_client = boto3.client(
24
  's3',
25
  aws_access_key_id=AWS_ACCESS_KEY_ID,
@@ -29,6 +31,7 @@ s3_client = boto3.client(
29
 
30
  app = FastAPI()
31
 
 
32
  class DownloadModelRequest(BaseModel):
33
  model_name: str
34
  pipeline_task: str
@@ -46,135 +49,145 @@ class S3DirectStream:
46
 
47
  def stream_from_s3(self, key):
48
  try:
 
49
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
50
- return response['Body']
51
  except self.s3_client.exceptions.NoSuchKey:
52
  raise HTTPException(status_code=404, detail=f"El archivo {key} no existe en el bucket S3.")
53
- except Exception as e:
54
- raise HTTPException(status_code=500, detail=f"Error al descargar de S3: {e}")
55
 
56
  def file_exists_in_s3(self, key):
57
  try:
58
  self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
59
  return True
60
- except self.s3_client.exceptions.ClientError as e:
61
- if e.response['Error']['Code'] == '404':
62
- return False
63
- raise HTTPException(status_code=500, detail=f"Error al verificar archivo en S3: {e}")
64
 
65
  def load_model_from_stream(self, model_prefix):
66
  try:
67
- model_files = self.list_model_files(model_prefix)
68
- if not model_files:
69
- self.download_and_upload_to_s3(model_prefix)
70
- return self.load_model_from_stream(model_prefix)
71
-
72
- config_stream = self.stream_from_s3(f"{model_prefix}/config.json")
73
- config_data = config_stream.read().decode("utf-8")
74
-
75
- model_path = f"{model_prefix}/model.safetensors"
76
- if self.file_exists_in_s3(model_path):
77
- model_stream = self.stream_from_s3(model_path)
78
- model = AutoModelForCausalLM.from_config(config_data)
79
- model.load_state_dict(safetensors.torch.load_stream(model_stream))
80
- elif model_files:
81
- model = AutoModelForCausalLM.from_config(config_data)
82
- state_dict = {}
83
- for file_name in model_files:
84
- file_stream = self.stream_from_s3(f"{model_prefix}/{file_name}")
85
- tmp = torch.load(file_stream, map_location="cpu")
86
- state_dict.update(tmp)
87
- model.load_state_dict(state_dict)
88
- else:
89
- raise HTTPException(status_code=500, detail="Modelo no encontrado")
90
-
91
- return model
92
  except HTTPException as e:
93
- raise
94
- except Exception as e:
95
- raise HTTPException(status_code=500, detail=f"Error al cargar el modelo: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
- def list_model_files(self, model_prefix):
98
- try:
99
- response = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=f"{model_prefix}/pytorch_model-")
100
- model_files = []
101
- if 'Contents' in response:
102
- for obj in response['Contents']:
103
- if re.match(r"pytorch_model-\d+-of-\d+", obj['Key'].split('/')[-1]):
104
- model_files.append(obj['Key'].split('/')[-1])
105
- return model_files
106
- except Exception as e:
107
- return None
108
 
109
  def load_tokenizer_from_stream(self, model_prefix):
110
  try:
111
- tokenizer_path = f"{model_prefix}/tokenizer.json"
112
- if self.file_exists_in_s3(tokenizer_path):
113
- tokenizer_stream = self.stream_from_s3(tokenizer_path)
114
- tokenizer = AutoTokenizer.from_pretrained(tokenizer_stream)
115
- return tokenizer
116
- else:
117
- self.download_and_upload_to_s3(model_prefix)
118
- return self.load_tokenizer_from_stream(model_prefix)
119
  except HTTPException as e:
120
- raise
121
- except Exception as e:
122
- raise HTTPException(status_code=500, detail=f"Error al cargar el tokenizer: {e}")
123
 
 
 
 
 
 
124
 
125
  def download_and_upload_to_s3(self, model_prefix):
126
- urls = {
127
- "pytorch_model.bin": f"https://huggingface.co/{model_prefix}/resolve/main/pytorch_model.bin",
128
- "model.safetensors": f"https://huggingface.co/{model_prefix}/resolve/main/model.safetensors",
129
- "tokenizer.json": f"https://huggingface.co/{model_prefix}/resolve/main/tokenizer.json",
130
- "config.json": f"https://huggingface.co/{model_prefix}/resolve/main/config.json"
131
- }
132
-
133
- for filename, url in urls.items():
134
- try:
135
- response = requests.get(url, stream=True)
136
- response.raise_for_status()
137
- self.s3_client.upload_fileobj(response.raw, self.bucket_name, f"{model_prefix}/{filename}")
138
- except requests.exceptions.RequestException as e:
139
- raise HTTPException(status_code=500, detail=f"Error al descargar {filename}: {e}")
140
- except Exception as e:
141
- raise HTTPException(status_code=500, detail=f"Error al subir {filename} a S3: {e}")
 
 
 
 
 
142
 
143
 
144
  @app.post("/predict/")
145
  async def predict(model_request: DownloadModelRequest):
146
  try:
 
 
147
  streamer = S3DirectStream(S3_BUCKET_NAME)
148
  model = streamer.load_model_from_stream(model_request.model_name)
149
  tokenizer = streamer.load_tokenizer_from_stream(model_request.model_name)
150
 
 
151
  task = model_request.pipeline_task
152
  if task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering", "text-to-speech", "text-to-image", "text-to-audio", "text-to-video"]:
153
  raise HTTPException(status_code=400, detail="Pipeline task no soportado")
154
 
 
155
  nlp_pipeline = pipeline(task, model=model, tokenizer=tokenizer)
 
 
156
  input_text = model_request.input_text
157
  outputs = nlp_pipeline(input_text)
158
 
 
159
  if task in ["text-generation", "translation", "fill-mask", "sentiment-analysis", "question-answering"]:
160
  return {"response": outputs}
 
161
  elif task == "text-to-image":
162
- s3_key = f"{model_request.model_name}/generated_image.png"
 
163
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="image/png")
 
164
  elif task == "text-to-audio":
165
- s3_key = f"{model_request.model_name}/generated_audio.wav"
 
166
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="audio/wav")
 
167
  elif task == "text-to-video":
168
- s3_key = f"{model_request.model_name}/generated_video.mp4"
 
169
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="video/mp4")
 
170
  else:
171
  raise HTTPException(status_code=400, detail="Tipo de tarea desconocido")
172
 
173
- except HTTPException as e:
174
- raise
175
  except Exception as e:
176
- raise HTTPException(status_code=500, detail=f"Error inesperado: {str(e)}")
177
 
178
 
179
  if __name__ == "__main__":
180
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
10
  import safetensors.torch
11
  from fastapi.responses import StreamingResponse
12
  from tqdm import tqdm
 
13
 
14
+ # Cargar las variables de entorno desde el archivo .env
15
  load_dotenv()
16
 
17
+ # Cargar las credenciales de AWS y el token de Hugging Face desde las variables de entorno
18
  AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
19
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
20
  AWS_REGION = os.getenv("AWS_REGION")
21
+ S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # Nombre del bucket de S3
22
+ HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Token de Hugging Face
23
 
24
+ # Cliente S3 de Amazon
25
  s3_client = boto3.client(
26
  's3',
27
  aws_access_key_id=AWS_ACCESS_KEY_ID,
 
31
 
32
  app = FastAPI()
33
 
34
+ # Pydantic Model para el cuerpo de la solicitud del endpoint /download_model/
35
  class DownloadModelRequest(BaseModel):
36
  model_name: str
37
  pipeline_task: str
 
49
 
50
  def stream_from_s3(self, key):
51
  try:
52
+ print(f"Descargando archivo {key} desde S3...")
53
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
54
+ return response['Body'] # Devolver el cuerpo directamente para el StreamingResponse
55
  except self.s3_client.exceptions.NoSuchKey:
56
  raise HTTPException(status_code=404, detail=f"El archivo {key} no existe en el bucket S3.")
 
 
57
 
58
  def file_exists_in_s3(self, key):
59
  try:
60
  self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
61
  return True
62
+ except self.s3_client.exceptions.ClientError:
63
+ return False
 
 
64
 
65
  def load_model_from_stream(self, model_prefix):
66
  try:
67
+ print(f"Cargando el modelo {model_prefix} desde S3...")
68
+ if self.file_exists_in_s3(f"{model_prefix}/config.json") and \
69
+ (self.file_exists_in_s3(f"{model_prefix}/pytorch_model.bin") or self.file_exists_in_s3(f"{model_prefix}/model.safetensors")):
70
+ print(f"Modelo {model_prefix} ya existe en S3. No es necesario descargarlo.")
71
+ return self.load_model_from_existing_s3(model_prefix)
72
+
73
+ print(f"Modelo {model_prefix} no encontrado. Procediendo a descargar...")
74
+ self.download_and_upload_to_s3(model_prefix)
75
+ return self.load_model_from_stream(model_prefix)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  except HTTPException as e:
77
+ print(f"Error al cargar el modelo: {e}")
78
+ return None
79
+
80
+ def load_model_from_existing_s3(self, model_prefix):
81
+ # Cargar el modelo y los archivos necesarios desde S3
82
+ print(f"Cargando los archivos {model_prefix} desde S3...")
83
+ config_stream = self.stream_from_s3(f"{model_prefix}/config.json")
84
+ config_data = config_stream.read().decode("utf-8")
85
+
86
+ print(f"Cargando el modelo de lenguaje {model_prefix}...")
87
+
88
+ # Verificar si el archivo es un safetensor o un archivo binario
89
+ if self.file_exists_in_s3(f"{model_prefix}/model.safetensors"):
90
+ # Usar safetensors si el archivo es de tipo safetensors
91
+ model_stream = self.stream_from_s3(f"{model_prefix}/model.safetensors")
92
+ model = AutoModelForCausalLM.from_config(config_data)
93
+ model.load_state_dict(safetensors.torch.load_stream(model_stream)) # Cargar el modelo utilizando safetensors
94
+ else:
95
+ # Cargar el modelo utilizando pytorch si el archivo es .bin
96
+ model_stream = self.stream_from_s3(f"{model_prefix}/pytorch_model.bin")
97
+ model = AutoModelForCausalLM.from_config(config_data)
98
+ model.load_state_dict(torch.load(model_stream, map_location="cpu"))
99
 
100
+ return model
 
 
 
 
 
 
 
 
 
 
101
 
102
  def load_tokenizer_from_stream(self, model_prefix):
103
  try:
104
+ if self.file_exists_in_s3(f"{model_prefix}/tokenizer.json"):
105
+ print(f"Tokenizer para {model_prefix} ya existe en S3. No es necesario descargarlo.")
106
+ return self.load_tokenizer_from_existing_s3(model_prefix)
107
+
108
+ print(f"Tokenizer para {model_prefix} no encontrado. Procediendo a descargar...")
109
+ self.download_and_upload_to_s3(model_prefix)
110
+ return self.load_tokenizer_from_stream(model_prefix)
 
111
  except HTTPException as e:
112
+ print(f"Error al cargar el tokenizer: {e}")
113
+ return None
 
114
 
115
+ def load_tokenizer_from_existing_s3(self, model_prefix):
116
+ print(f"Cargando el tokenizer para {model_prefix} desde S3...")
117
+ tokenizer_stream = self.stream_from_s3(f"{model_prefix}/tokenizer.json")
118
+ tokenizer = AutoTokenizer.from_pretrained(tokenizer_stream)
119
+ return tokenizer
120
 
121
  def download_and_upload_to_s3(self, model_prefix):
122
+ # URLs de los archivos de Hugging Face
123
+ model_url = f"https://huggingface.co/{model_prefix}/resolve/main/pytorch_model.bin"
124
+ safetensors_url = f"https://huggingface.co/{model_prefix}/resolve/main/model.safetensors"
125
+ tokenizer_url = f"https://huggingface.co/{model_prefix}/resolve/main/tokenizer.json"
126
+ config_url = f"https://huggingface.co/{model_prefix}/resolve/main/config.json"
127
+
128
+ print(f"Descargando y subiendo archivos para el modelo {model_prefix} a S3...")
129
+ self.download_and_upload_to_s3_url(model_url, f"{model_prefix}/pytorch_model.bin")
130
+ self.download_and_upload_to_s3_url(safetensors_url, f"{model_prefix}/model.safetensors")
131
+ self.download_and_upload_to_s3_url(tokenizer_url, f"{model_prefix}/tokenizer.json")
132
+ self.download_and_upload_to_s3_url(config_url, f"{model_prefix}/config.json")
133
+
134
+ def download_and_upload_to_s3_url(self, url: str, s3_key: str):
135
+ print(f"Descargando archivo desde {url}...")
136
+ response = requests.get(url)
137
+ if response.status_code == 200:
138
+ # Subir archivo a S3
139
+ print(f"Subiendo archivo a S3 con key {s3_key}...")
140
+ self.s3_client.put_object(Bucket=self.bucket_name, Key=s3_key, Body=response.content)
141
+ else:
142
+ raise HTTPException(status_code=500, detail=f"Error al descargar el archivo desde {url}")
143
 
144
 
145
  @app.post("/predict/")
146
  async def predict(model_request: DownloadModelRequest):
147
  try:
148
+ print(f"Recibiendo solicitud para predecir con el modelo {model_request.model_name}...")
149
+ # Cargar el modelo y tokenizer desde S3
150
  streamer = S3DirectStream(S3_BUCKET_NAME)
151
  model = streamer.load_model_from_stream(model_request.model_name)
152
  tokenizer = streamer.load_tokenizer_from_stream(model_request.model_name)
153
 
154
+ # Obtener el pipeline adecuado según la solicitud
155
  task = model_request.pipeline_task
156
  if task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering", "text-to-speech", "text-to-image", "text-to-audio", "text-to-video"]:
157
  raise HTTPException(status_code=400, detail="Pipeline task no soportado")
158
 
159
+ # Crear el pipeline dinámicamente basado en el tipo de tarea
160
  nlp_pipeline = pipeline(task, model=model, tokenizer=tokenizer)
161
+
162
+ # Ejecutar el pipeline con el input_text
163
  input_text = model_request.input_text
164
  outputs = nlp_pipeline(input_text)
165
 
166
+ # Procesar los diferentes tipos de respuestas según el pipeline
167
  if task in ["text-generation", "translation", "fill-mask", "sentiment-analysis", "question-answering"]:
168
  return {"response": outputs}
169
+
170
  elif task == "text-to-image":
171
+ # Asumir que outputs es la imagen generada
172
+ s3_key = f"{model_request.model_name}/generated_image.png" # Definir el key del archivo de imagen
173
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="image/png")
174
+
175
  elif task == "text-to-audio":
176
+ # Asumir que outputs es el audio generado
177
+ s3_key = f"{model_request.model_name}/generated_audio.wav" # Definir el key del archivo de audio
178
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="audio/wav")
179
+
180
  elif task == "text-to-video":
181
+ # Asumir que outputs es el video generado
182
+ s3_key = f"{model_request.model_name}/generated_video.mp4" # Definir el key del archivo de video
183
  return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="video/mp4")
184
+
185
  else:
186
  raise HTTPException(status_code=400, detail="Tipo de tarea desconocido")
187
 
 
 
188
  except Exception as e:
189
+ raise HTTPException(status_code=500, detail=f"Error al procesar la solicitud: {str(e)}")
190
 
191
 
192
  if __name__ == "__main__":
193
+ uvicorn.run(app, host="0.0.0.0", port=8000)