Hjgugugjhuhjggg commited on
Commit
972e5ee
verified
1 Parent(s): 0b9d8bf

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +52 -79
app.py CHANGED
@@ -6,22 +6,19 @@ import boto3
6
  from dotenv import load_dotenv
7
  import os
8
  import uvicorn
9
- from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
10
  import safetensors.torch
 
11
  from fastapi.responses import StreamingResponse
12
- from tqdm import tqdm
13
 
14
- # Cargar las variables de entorno desde el archivo .env
15
  load_dotenv()
16
 
17
- # Cargar las credenciales de AWS y el token de Hugging Face desde las variables de entorno
18
  AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
19
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
20
  AWS_REGION = os.getenv("AWS_REGION")
21
- S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # Nombre del bucket de S3
22
- HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Token de Hugging Face
23
 
24
- # Cliente S3 de Amazon
25
  s3_client = boto3.client(
26
  's3',
27
  aws_access_key_id=AWS_ACCESS_KEY_ID,
@@ -31,12 +28,11 @@ s3_client = boto3.client(
31
 
32
  app = FastAPI()
33
 
34
- # Pydantic Model para el cuerpo de la solicitud del endpoint /download_model/
35
  class DownloadModelRequest(BaseModel):
36
  model_name: str
37
  pipeline_task: str
38
  input_text: str
39
- revision: str = "main" # Revisi贸n por defecto
40
 
41
  class S3DirectStream:
42
  def __init__(self, bucket_name):
@@ -50,11 +46,10 @@ class S3DirectStream:
50
 
51
  def stream_from_s3(self, key):
52
  try:
53
- print(f"Descargando archivo {key} desde S3...")
54
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
55
- return response['Body'] # Devolver el cuerpo directamente para el StreamingResponse
56
  except self.s3_client.exceptions.NoSuchKey:
57
- raise HTTPException(status_code=404, detail=f"El archivo {key} no existe en el bucket S3.")
58
 
59
  def file_exists_in_s3(self, key):
60
  try:
@@ -65,127 +60,105 @@ class S3DirectStream:
65
 
66
  def load_model_from_stream(self, model_prefix, revision):
67
  try:
68
- print(f"Cargando el modelo {model_prefix} desde S3...")
69
  if self.file_exists_in_s3(f"{model_prefix}/config.json") and \
70
  (self.file_exists_in_s3(f"{model_prefix}/pytorch_model.bin") or self.file_exists_in_s3(f"{model_prefix}/model.safetensors")):
71
- print(f"Modelo {model_prefix} ya existe en S3. No es necesario descargarlo.")
72
  return self.load_model_from_existing_s3(model_prefix)
73
-
74
- print(f"Modelo {model_prefix} no encontrado. Procediendo a descargar...")
75
- self.download_and_upload_to_s3(model_prefix, revision) # Pasamos 'revision' aqu铆
76
  return self.load_model_from_stream(model_prefix, revision)
77
  except HTTPException as e:
78
- print(f"Error al cargar el modelo: {e}")
79
  return None
80
 
81
  def load_model_from_existing_s3(self, model_prefix):
82
- # Cargar el modelo y los archivos necesarios desde S3
83
- print(f"Cargando los archivos {model_prefix} desde S3...")
84
  config_stream = self.stream_from_s3(f"{model_prefix}/config.json")
85
- config_data = config_stream.read().decode("utf-8")
86
-
87
- print(f"Cargando el modelo de lenguaje {model_prefix}...")
88
 
89
- # Verificar si el archivo es un safetensor o un archivo binario
90
  if self.file_exists_in_s3(f"{model_prefix}/model.safetensors"):
91
- # Usar safetensors si el archivo es de tipo safetensors
92
  model_stream = self.stream_from_s3(f"{model_prefix}/model.safetensors")
93
- model = AutoModelForCausalLM.from_config(config_data)
94
- model.load_state_dict(safetensors.torch.load_stream(model_stream)) # Cargar el modelo utilizando safetensors
95
- else:
96
- # Cargar el modelo utilizando pytorch si el archivo es .bin
97
  model_stream = self.stream_from_s3(f"{model_prefix}/pytorch_model.bin")
98
- model = AutoModelForCausalLM.from_config(config_data)
99
- model.load_state_dict(torch.load(model_stream, map_location="cpu"))
100
-
 
 
101
  return model
102
 
 
 
103
  def load_tokenizer_from_stream(self, model_prefix):
104
  try:
105
  if self.file_exists_in_s3(f"{model_prefix}/tokenizer.json"):
106
- print(f"Tokenizer para {model_prefix} ya existe en S3. No es necesario descargarlo.")
107
  return self.load_tokenizer_from_existing_s3(model_prefix)
108
-
109
- print(f"Tokenizer para {model_prefix} no encontrado. Procediendo a descargar...")
110
- self.download_and_upload_to_s3(model_prefix) # Pasamos 'revision' aqu铆 tambi茅n
111
  return self.load_tokenizer_from_stream(model_prefix)
112
  except HTTPException as e:
113
- print(f"Error al cargar el tokenizer: {e}")
114
  return None
115
 
116
  def load_tokenizer_from_existing_s3(self, model_prefix):
117
- print(f"Cargando el tokenizer para {model_prefix} desde S3...")
118
  tokenizer_stream = self.stream_from_s3(f"{model_prefix}/tokenizer.json")
119
- tokenizer = AutoTokenizer.from_pretrained(tokenizer_stream)
120
  return tokenizer
121
 
122
- def download_and_upload_to_s3(self, model_prefix, revision):
123
- # URLs de los archivos de Hugging Face
124
  model_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/pytorch_model.bin"
125
  safetensors_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/model.safetensors"
126
  tokenizer_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/tokenizer.json"
127
  config_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/config.json"
128
 
129
- print(f"Descargando y subiendo archivos para el modelo {model_prefix} a S3...")
130
  self.download_and_upload_to_s3_url(model_url, f"{model_prefix}/pytorch_model.bin")
131
  self.download_and_upload_to_s3_url(safetensors_url, f"{model_prefix}/model.safetensors")
132
  self.download_and_upload_to_s3_url(tokenizer_url, f"{model_prefix}/tokenizer.json")
133
  self.download_and_upload_to_s3_url(config_url, f"{model_prefix}/config.json")
134
 
135
- def download_and_upload_to_s3_url(self, url: str, s3_key: str):
136
- print(f"Descargando archivo desde {url}...")
137
- response = requests.get(url)
138
  if response.status_code == 200:
139
- # Subir archivo a S3
140
- print(f"Subiendo archivo a S3 con key {s3_key}...")
141
- self.s3_client.put_object(Bucket=self.bucket_name, Key=s3_key, Body=response.content)
 
142
  else:
143
- raise HTTPException(status_code=500, detail=f"Error al descargar el archivo desde {url}")
 
144
 
145
 
146
  @app.post("/predict/")
147
  async def predict(model_request: DownloadModelRequest):
148
  try:
149
- print(f"Recibiendo solicitud para predecir con el modelo {model_request.model_name}...")
150
-
151
  model_name = model_request.model_name
152
  revision = model_request.revision
153
 
154
- # Cargar el modelo y tokenizer desde S3
155
  streamer = S3DirectStream(S3_BUCKET_NAME)
156
  model = streamer.load_model_from_stream(model_name, revision)
157
  tokenizer = streamer.load_tokenizer_from_stream(model_name)
158
 
159
- # Obtener el pipeline adecuado seg煤n la solicitud
160
  task = model_request.pipeline_task
161
- if task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering", "text-to-speech", "text-to-image", "text-to-audio", "text-to-video"]:
162
- raise HTTPException(status_code=400, detail="Pipeline task no soportado")
163
-
164
- # Crear el pipeline din谩micamente basado en el tipo de tarea
165
- nlp_pipeline = pipeline(task, model=model, tokenizer=tokenizer, use_auth_token=HUGGINGFACE_TOKEN, revision=revision)
166
-
167
- # Ejecutar el pipeline con el input_text
168
- outputs = nlp_pipeline(model_request.input_text)
169
-
170
- # Almacenar el resultado en S3 dependiendo del tipo de tarea
171
- if task == "text-to-image":
172
- s3_key = f"{model_request.model_name}/generated_image.png"
173
- return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="image/png")
174
-
175
- elif task == "text-to-speech":
176
- s3_key = f"{model_request.model_name}/generated_audio.wav"
177
- return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="audio/wav")
178
-
179
- elif task == "text-to-video":
180
- s3_key = f"{model_request.model_name}/generated_video.mp4"
181
- return StreamingResponse(streamer.stream_from_s3(s3_key), media_type="video/mp4")
182
-
183
- # Devolver resultados de texto u otros tipos de tarea
184
- return {"result": outputs}
185
 
186
  except Exception as e:
187
- raise HTTPException(status_code=500, detail=f"Error al procesar la solicitud: {str(e)}")
 
 
188
 
189
 
190
  if __name__ == "__main__":
191
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
6
  from dotenv import load_dotenv
7
  import os
8
  import uvicorn
9
+ from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, AutoConfig, TextIteratorStreamer
10
  import safetensors.torch
11
+ import torch
12
  from fastapi.responses import StreamingResponse
 
13
 
 
14
  load_dotenv()
15
 
 
16
  AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
17
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
18
  AWS_REGION = os.getenv("AWS_REGION")
19
+ S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
20
+ HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
21
 
 
22
  s3_client = boto3.client(
23
  's3',
24
  aws_access_key_id=AWS_ACCESS_KEY_ID,
 
28
 
29
  app = FastAPI()
30
 
 
31
  class DownloadModelRequest(BaseModel):
32
  model_name: str
33
  pipeline_task: str
34
  input_text: str
35
+ revision: str = "main"
36
 
37
  class S3DirectStream:
38
  def __init__(self, bucket_name):
 
46
 
47
  def stream_from_s3(self, key):
48
  try:
 
49
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
50
+ return response['Body']
51
  except self.s3_client.exceptions.NoSuchKey:
52
+ raise HTTPException(status_code=404, detail=f"File {key} not found in S3")
53
 
54
  def file_exists_in_s3(self, key):
55
  try:
 
60
 
61
  def load_model_from_stream(self, model_prefix, revision):
62
  try:
 
63
  if self.file_exists_in_s3(f"{model_prefix}/config.json") and \
64
  (self.file_exists_in_s3(f"{model_prefix}/pytorch_model.bin") or self.file_exists_in_s3(f"{model_prefix}/model.safetensors")):
 
65
  return self.load_model_from_existing_s3(model_prefix)
66
+
67
+ self.download_and_upload_to_s3(model_prefix, revision)
 
68
  return self.load_model_from_stream(model_prefix, revision)
69
  except HTTPException as e:
 
70
  return None
71
 
72
  def load_model_from_existing_s3(self, model_prefix):
 
 
73
  config_stream = self.stream_from_s3(f"{model_prefix}/config.json")
74
+ config = AutoConfig.from_pretrained(config_stream) # Directly from stream
 
 
75
 
 
76
  if self.file_exists_in_s3(f"{model_prefix}/model.safetensors"):
 
77
  model_stream = self.stream_from_s3(f"{model_prefix}/model.safetensors")
78
+ model = AutoModelForCausalLM.from_config(config)
79
+ model.load_state_dict(safetensors.torch.load_stream(model_stream))
80
+ elif self.file_exists_in_s3(f"{model_prefix}/pytorch_model.bin"):
 
81
  model_stream = self.stream_from_s3(f"{model_prefix}/pytorch_model.bin")
82
+ model = AutoModelForCausalLM.from_config(config)
83
+ state_dict = torch.load(model_stream, map_location="cpu") # Load directly
84
+ model.load_state_dict(state_dict)
85
+ else:
86
+ raise EnvironmentError(f"No model file found for {model_prefix} in S3")
87
  return model
88
 
89
+
90
+
91
  def load_tokenizer_from_stream(self, model_prefix):
92
  try:
93
  if self.file_exists_in_s3(f"{model_prefix}/tokenizer.json"):
 
94
  return self.load_tokenizer_from_existing_s3(model_prefix)
95
+ self.download_and_upload_to_s3(model_prefix)
 
 
96
  return self.load_tokenizer_from_stream(model_prefix)
97
  except HTTPException as e:
 
98
  return None
99
 
100
  def load_tokenizer_from_existing_s3(self, model_prefix):
 
101
  tokenizer_stream = self.stream_from_s3(f"{model_prefix}/tokenizer.json")
102
+ tokenizer = AutoTokenizer.from_pretrained(tokenizer_stream) # Directly from stream
103
  return tokenizer
104
 
105
+
106
+ def download_and_upload_to_s3(self, model_prefix, revision="main"):
107
  model_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/pytorch_model.bin"
108
  safetensors_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/model.safetensors"
109
  tokenizer_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/tokenizer.json"
110
  config_url = f"https://huggingface.co/{model_prefix}/resolve/{revision}/config.json"
111
 
 
112
  self.download_and_upload_to_s3_url(model_url, f"{model_prefix}/pytorch_model.bin")
113
  self.download_and_upload_to_s3_url(safetensors_url, f"{model_prefix}/model.safetensors")
114
  self.download_and_upload_to_s3_url(tokenizer_url, f"{model_prefix}/tokenizer.json")
115
  self.download_and_upload_to_s3_url(config_url, f"{model_prefix}/config.json")
116
 
117
+
118
+ def download_and_upload_to_s3_url(self, url, s3_key):
119
+ response = requests.get(url, stream=True)
120
  if response.status_code == 200:
121
+ self.s3_client.upload_fileobj(response.raw, self.bucket_name, s3_key) # Direct upload
122
+ elif response.status_code == 404:
123
+ raise HTTPException(status_code=404, detail=f"Error downloading file from {url}. File not found.")
124
+
125
  else:
126
+ raise HTTPException(status_code=500, detail=f"Error downloading file from {url}")
127
+
128
 
129
 
130
  @app.post("/predict/")
131
  async def predict(model_request: DownloadModelRequest):
132
  try:
 
 
133
  model_name = model_request.model_name
134
  revision = model_request.revision
135
 
 
136
  streamer = S3DirectStream(S3_BUCKET_NAME)
137
  model = streamer.load_model_from_stream(model_name, revision)
138
  tokenizer = streamer.load_tokenizer_from_stream(model_name)
139
 
 
140
  task = model_request.pipeline_task
141
+ if task not in ["text-generation", "sentiment-analysis", "translation", "fill-mask", "question-answering", "summarization", "zero-shot-classification"]:
142
+ raise HTTPException(status_code=400, detail="Unsupported pipeline task")
143
+
144
+ if task == "text-generation":
145
+ text_streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
146
+ inputs = tokenizer(model_request.input_text, return_tensors="pt").to(model.device)
147
+ generation_kwargs = dict(inputs, streamer=text_streamer)
148
+ model.generate(**generation_kwargs)
149
+ return StreamingResponse(iter([tokenizer.decode(token) for token in text_streamer]), media_type="text/event-stream")
150
+
151
+ else:
152
+ nlp_pipeline = pipeline(task, model=model, tokenizer=tokenizer, device_map="auto", trust_remote_code=True)
153
+ outputs = nlp_pipeline(model_request.input_text)
154
+ return {"result": outputs}
155
+
 
 
 
 
 
 
 
 
 
156
 
157
  except Exception as e:
158
+ print(f"Complete Error: {e}")
159
+ raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")
160
+
161
 
162
 
163
  if __name__ == "__main__":
164
+ uvicorn.run(app, host="0.0.0.0", port=7860)