Hjgugugjhuhjggg commited on
Commit
631e498
verified
1 Parent(s): bb6ace2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +53 -52
app.py CHANGED
@@ -5,8 +5,8 @@ import boto3
5
  from fastapi import FastAPI, HTTPException
6
  from fastapi.responses import JSONResponse
7
  from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
8
- from huggingface_hub import hf_hub_download
9
- from tqdm import tqdm
10
 
11
  logging.basicConfig(level=logging.INFO)
12
  logger = logging.getLogger(__name__)
@@ -15,7 +15,8 @@ AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
15
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
16
  AWS_REGION = os.getenv("AWS_REGION")
17
  S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
18
- HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
 
19
 
20
  s3_client = boto3.client(
21
  's3',
@@ -47,7 +48,11 @@ class S3DirectStream:
47
  )
48
  self.bucket_name = bucket_name
49
 
50
- def stream_from_s3(self, key):
 
 
 
 
51
  try:
52
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
53
  return response['Body']
@@ -56,7 +61,11 @@ class S3DirectStream:
56
  except Exception as e:
57
  raise HTTPException(status_code=500, detail=f"Error al descargar {key} desde S3: {str(e)}")
58
 
59
- def get_model_file_parts(self, model_name):
 
 
 
 
60
  try:
61
  model_prefix = model_name.lower()
62
  files = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=model_prefix)
@@ -65,21 +74,17 @@ class S3DirectStream:
65
  except Exception as e:
66
  raise HTTPException(status_code=500, detail=f"Error al obtener archivos del modelo {model_name} desde S3: {e}")
67
 
68
- def load_model_from_s3(self, model_name):
69
  try:
70
  profile, model = model_name.split("/", 1) if "/" in model_name else ("", model_name)
71
 
72
  model_prefix = f"{profile}/{model}".lower()
73
- model_files = self.get_model_file_parts(model_prefix)
74
-
75
- if not model_files:
76
- self.download_and_upload_from_huggingface(model_name)
77
- model_files = self.get_model_file_parts(model_prefix)
78
 
79
  if not model_files:
80
  raise HTTPException(status_code=404, detail=f"Archivos del modelo {model_name} no encontrados en S3.")
81
 
82
- config_stream = self.stream_from_s3(f"{model_prefix}/config.json")
83
  config_data = config_stream.read()
84
 
85
  if not config_data:
@@ -94,18 +99,13 @@ class S3DirectStream:
94
  except HTTPException as e:
95
  raise e
96
  except Exception as e:
97
- try:
98
- logger.error(f"Error al cargar el modelo desde S3, intentando desde Hugging Face: {e}")
99
- model = AutoModelForCausalLM.from_pretrained(model_name)
100
- return model
101
- except Exception as hf_error:
102
- raise HTTPException(status_code=500, detail=f"Error al cargar el modelo desde Hugging Face: {hf_error}")
103
-
104
- def load_tokenizer_from_s3(self, model_name):
105
  try:
106
  profile, model = model_name.split("/", 1) if "/" in model_name else ("", model_name)
107
 
108
- tokenizer_stream = self.stream_from_s3(f"{profile}/{model}/tokenizer.json")
109
  tokenizer_data = tokenizer_stream.read().decode("utf-8")
110
 
111
  tokenizer = AutoTokenizer.from_pretrained(f"{profile}/{model}")
@@ -113,46 +113,42 @@ class S3DirectStream:
113
  except Exception as e:
114
  raise HTTPException(status_code=500, detail=f"Error al cargar el tokenizer desde S3: {e}")
115
 
116
- def download_and_upload_from_huggingface(self, model_name):
117
- try:
118
- files_to_download = hf_hub_download(repo_id=model_name, use_auth_token=HUGGINGFACE_TOKEN, local_dir=model_name)
119
-
120
- for file in tqdm(files_to_download, desc="Subiendo archivos a S3"):
121
- file_name = os.path.basename(file)
122
- profile, model = model_name.split("/", 1) if "/" in model_name else ("", model_name)
123
- s3_key = f"{profile}/{model}/{file_name}"
124
- if not self.file_exists_in_s3(s3_key):
125
- self.upload_file_to_s3(file, s3_key)
126
-
127
- except Exception as e:
128
- raise HTTPException(status_code=500, detail=f"Error al descargar y subir modelo desde Hugging Face: {e}")
129
-
130
- def upload_file_to_s3(self, file_path, s3_key):
131
- try:
132
- self.create_s3_folders(s3_key)
133
- s3_client.put_object(Bucket=self.bucket_name, Key=s3_key, Body=open(file_path, 'rb'))
134
- os.remove(file_path)
135
- except Exception as e:
136
- raise HTTPException(status_code=500, detail=f"Error al subir archivo a S3: {e}")
137
-
138
- def create_s3_folders(self, s3_key):
139
  try:
140
  folder_keys = s3_key.split('/')
141
  for i in range(1, len(folder_keys)):
142
  folder_key = '/'.join(folder_keys[:i]) + '/'
143
- if not self.file_exists_in_s3(folder_key):
144
  self.s3_client.put_object(Bucket=self.bucket_name, Key=folder_key, Body='')
145
 
146
  except Exception as e:
147
  raise HTTPException(status_code=500, detail=f"Error al crear carpetas en S3: {e}")
148
 
149
- def file_exists_in_s3(self, s3_key):
150
  try:
151
  self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key)
152
  return True
153
  except self.s3_client.exceptions.ClientError:
154
  return False
155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  @app.post("/predict/")
157
  async def predict(model_request: dict):
158
  try:
@@ -164,18 +160,23 @@ async def predict(model_request: dict):
164
  raise HTTPException(status_code=400, detail="Faltan par谩metros en la solicitud.")
165
 
166
  streamer = S3DirectStream(S3_BUCKET_NAME)
167
- model = streamer.load_model_from_s3(model_name)
168
- tokenizer = streamer.load_tokenizer_from_s3(model_name)
169
 
170
  if task not in PIPELINE_MAP:
171
  raise HTTPException(status_code=400, detail="Pipeline task no soportado")
172
 
173
  nlp_pipeline = pipeline(PIPELINE_MAP[task], model=model, tokenizer=tokenizer)
174
 
175
- result = nlp_pipeline(input_text)
 
 
176
 
177
- if isinstance(result, dict) and 'file' in result:
178
- return JSONResponse(content={"file": result['file']})
 
 
 
179
  else:
180
  return JSONResponse(content={"result": result})
181
 
@@ -184,4 +185,4 @@ async def predict(model_request: dict):
184
 
185
  if __name__ == "__main__":
186
  import uvicorn
187
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
5
  from fastapi import FastAPI, HTTPException
6
  from fastapi.responses import JSONResponse
7
  from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
8
+ import asyncio
9
+ import concurrent.futures
10
 
11
  logging.basicConfig(level=logging.INFO)
12
  logger = logging.getLogger(__name__)
 
15
  AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
16
  AWS_REGION = os.getenv("AWS_REGION")
17
  S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
18
+
19
+ MAX_TOKENS = 1024 # Limite de tokens por fragmento
20
 
21
  s3_client = boto3.client(
22
  's3',
 
48
  )
49
  self.bucket_name = bucket_name
50
 
51
+ async def stream_from_s3(self, key):
52
+ loop = asyncio.get_event_loop()
53
+ return await loop.run_in_executor(None, self._stream_from_s3, key)
54
+
55
+ def _stream_from_s3(self, key):
56
  try:
57
  response = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
58
  return response['Body']
 
61
  except Exception as e:
62
  raise HTTPException(status_code=500, detail=f"Error al descargar {key} desde S3: {str(e)}")
63
 
64
+ async def get_model_file_parts(self, model_name):
65
+ loop = asyncio.get_event_loop()
66
+ return await loop.run_in_executor(None, self._get_model_file_parts, model_name)
67
+
68
+ def _get_model_file_parts(self, model_name):
69
  try:
70
  model_prefix = model_name.lower()
71
  files = self.s3_client.list_objects_v2(Bucket=self.bucket_name, Prefix=model_prefix)
 
74
  except Exception as e:
75
  raise HTTPException(status_code=500, detail=f"Error al obtener archivos del modelo {model_name} desde S3: {e}")
76
 
77
+ async def load_model_from_s3(self, model_name):
78
  try:
79
  profile, model = model_name.split("/", 1) if "/" in model_name else ("", model_name)
80
 
81
  model_prefix = f"{profile}/{model}".lower()
82
+ model_files = await self.get_model_file_parts(model_prefix)
 
 
 
 
83
 
84
  if not model_files:
85
  raise HTTPException(status_code=404, detail=f"Archivos del modelo {model_name} no encontrados en S3.")
86
 
87
+ config_stream = await self.stream_from_s3(f"{model_prefix}/config.json")
88
  config_data = config_stream.read()
89
 
90
  if not config_data:
 
99
  except HTTPException as e:
100
  raise e
101
  except Exception as e:
102
+ raise HTTPException(status_code=500, detail=f"Error al cargar el modelo desde S3: {e}")
103
+
104
+ async def load_tokenizer_from_s3(self, model_name):
 
 
 
 
 
105
  try:
106
  profile, model = model_name.split("/", 1) if "/" in model_name else ("", model_name)
107
 
108
+ tokenizer_stream = await self.stream_from_s3(f"{profile}/{model}/tokenizer.json")
109
  tokenizer_data = tokenizer_stream.read().decode("utf-8")
110
 
111
  tokenizer = AutoTokenizer.from_pretrained(f"{profile}/{model}")
 
113
  except Exception as e:
114
  raise HTTPException(status_code=500, detail=f"Error al cargar el tokenizer desde S3: {e}")
115
 
116
+ async def create_s3_folders(self, s3_key):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  try:
118
  folder_keys = s3_key.split('/')
119
  for i in range(1, len(folder_keys)):
120
  folder_key = '/'.join(folder_keys[:i]) + '/'
121
+ if not await self.file_exists_in_s3(folder_key):
122
  self.s3_client.put_object(Bucket=self.bucket_name, Key=folder_key, Body='')
123
 
124
  except Exception as e:
125
  raise HTTPException(status_code=500, detail=f"Error al crear carpetas en S3: {e}")
126
 
127
+ async def file_exists_in_s3(self, s3_key):
128
  try:
129
  self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key)
130
  return True
131
  except self.s3_client.exceptions.ClientError:
132
  return False
133
 
134
+ def split_text_by_tokens(text, tokenizer, max_tokens=MAX_TOKENS):
135
+ tokens = tokenizer.encode(text)
136
+ chunks = []
137
+ for i in range(0, len(tokens), max_tokens):
138
+ chunk = tokens[i:i+max_tokens]
139
+ chunks.append(tokenizer.decode(chunk))
140
+ return chunks
141
+
142
+ def continue_generation(input_text, model, tokenizer, max_tokens=MAX_TOKENS):
143
+ generated_text = ""
144
+ while len(input_text) > 0:
145
+ tokens = tokenizer.encode(input_text)
146
+ input_text = tokenizer.decode(tokens[:max_tokens])
147
+ output = model.generate(input_ids=tokenizer.encode(input_text, return_tensors="pt").input_ids)
148
+ generated_text += tokenizer.decode(output[0], skip_special_tokens=True)
149
+ input_text = input_text[len(input_text):] # Si la entrada se agot贸, ya no hay m谩s que procesar
150
+ return generated_text
151
+
152
  @app.post("/predict/")
153
  async def predict(model_request: dict):
154
  try:
 
160
  raise HTTPException(status_code=400, detail="Faltan par谩metros en la solicitud.")
161
 
162
  streamer = S3DirectStream(S3_BUCKET_NAME)
163
+ model = await streamer.load_model_from_s3(model_name)
164
+ tokenizer = await streamer.load_tokenizer_from_s3(model_name)
165
 
166
  if task not in PIPELINE_MAP:
167
  raise HTTPException(status_code=400, detail="Pipeline task no soportado")
168
 
169
  nlp_pipeline = pipeline(PIPELINE_MAP[task], model=model, tokenizer=tokenizer)
170
 
171
+ result = await asyncio.to_thread(nlp_pipeline, input_text)
172
+
173
+ chunks = split_text_by_tokens(result, tokenizer)
174
 
175
+ if len(chunks) > 1:
176
+ full_result = ""
177
+ for chunk in chunks:
178
+ full_result += continue_generation(chunk, model, tokenizer)
179
+ return JSONResponse(content={"result": full_result})
180
  else:
181
  return JSONResponse(content={"result": result})
182
 
 
185
 
186
  if __name__ == "__main__":
187
  import uvicorn
188
+ uvicorn.run(app, host="0.0.0.0", port=8000)