|
import json |
|
import time |
|
import requests |
|
import jwt |
|
|
|
def get_access_token(project_id, client_email, private_key): |
|
current_time = int(time.time()) |
|
expiration_time = current_time + 600 |
|
|
|
claims = { |
|
'iss': client_email, |
|
'scope': 'https://www.googleapis.com/auth/cloud-platform', |
|
'aud': 'https://oauth2.googleapis.com/token', |
|
'exp': expiration_time, |
|
'iat': current_time |
|
} |
|
|
|
try: |
|
signed_jwt = jwt.encode( |
|
claims, |
|
private_key, |
|
algorithm='RS256' |
|
) |
|
except Exception as e: |
|
return False, e |
|
|
|
|
|
response = requests.post( |
|
'https://oauth2.googleapis.com/token', |
|
data={ |
|
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', |
|
'assertion': signed_jwt |
|
} |
|
) |
|
|
|
if response.status_code == 200: |
|
access_token = response.json()['access_token'] |
|
return True, access_token |
|
else: |
|
return False, response.text |
|
|
|
async def send_gcp_request(session, project_id, access_token, payload, region='us-east5', model='claude-3-5-sonnet@20240620'): |
|
VERTEX_URL = f'https://{region}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{region}/publishers/anthropic/models/{model}:streamRawPredict' |
|
headers = { |
|
'Authorization': f'Bearer {access_token}', |
|
'Content-Type': 'application/json; charset=utf-8' |
|
} |
|
|
|
async with session.post(url=VERTEX_URL, headers=headers, data=payload) as response: |
|
if response.status != 200: |
|
response_data = await response.text() |
|
return json.loads(response_data) |
|
|
|
return await response.json() |