|
import requests |
|
import json |
|
import base64 |
|
import hashlib |
|
import secrets |
|
from typing import Optional, Dict, Any |
|
from urllib.parse import urlencode |
|
|
|
class MeldRxAPI: |
|
def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str): |
|
self.base_url = "https://app.meldrx.com" |
|
self.api_base_url = f"{self.base_url}/api" |
|
self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}" |
|
self.mips_base_url = f"{self.base_url}/mms-api" |
|
self.token_url = f"{self.base_url}/connect/token" |
|
self.authorize_url = f"{self.base_url}/connect/authorize" |
|
self.client_id = client_id |
|
self.client_secret = client_secret |
|
self.workspace_id = workspace_id |
|
self.redirect_uri = redirect_uri |
|
self.access_token = None |
|
self.code_verifier = None |
|
self.session = requests.Session() |
|
|
|
def _generate_code_verifier(self) -> str: |
|
self.code_verifier = secrets.token_urlsafe(32) |
|
return self.code_verifier |
|
|
|
def _generate_code_challenge(self, code_verifier: str) -> str: |
|
sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest() |
|
code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=') |
|
return code_challenge |
|
|
|
def authenticate(self) -> bool: |
|
payload = { |
|
"grant_type": "client_credentials", |
|
"client_id": self.client_id, |
|
"client_secret": self.client_secret |
|
} |
|
headers = {"Content-Type": "application/x-www-form-urlencoded"} |
|
try: |
|
response = self.session.post(self.token_url, data=payload, headers=headers) |
|
response.raise_for_status() |
|
token_data = response.json() |
|
self.access_token = token_data.get("access_token") |
|
if not self.access_token: |
|
raise ValueError("No access token received.") |
|
return True |
|
except requests.RequestException as e: |
|
print(f"Authentication failed: {e}") |
|
return False |
|
except ValueError as e: |
|
print(f"Authentication error: {e}") |
|
return False |
|
|
|
def _get_headers(self) -> Dict[str, str]: |
|
headers = {"Content-Type": "application/json"} |
|
if self.access_token: |
|
headers["Authorization"] = f"Bearer {self.access_token}" |
|
return headers |
|
|
|
def get_patients(self) -> Optional[Dict[str, Any]]: |
|
url = f"{self.fhir_base_url}/Patient" |
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return None |
|
try: |
|
response = self.session.get(url, headers=self._get_headers()) |
|
response.raise_for_status() |
|
return response.json() if response.text else {} |
|
except requests.RequestException as e: |
|
print(f"Failed to retrieve patients: {e}") |
|
return None |
|
|
|
def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str: |
|
code_verifier = self._generate_code_verifier() |
|
code_challenge = self._generate_code_challenge(code_verifier) |
|
params = { |
|
"response_type": "code", |
|
"client_id": self.client_id, |
|
"redirect_uri": self.redirect_uri, |
|
"scope": scope, |
|
"state": state, |
|
"aud": self.fhir_base_url, |
|
"code_challenge": code_challenge, |
|
"code_challenge_method": "S256" |
|
} |
|
query_string = "&".join(f"{k}={v}" for k, v in params.items()) |
|
print(f"Generated Authorization URL: {self.authorize_url}?{query_string}") |
|
return f"{self.authorize_url}?{query_string}" |
|
|
|
def authenticate_with_code(self, auth_code: str) -> bool: |
|
if not self.code_verifier: |
|
print("Error: Code verifier not set. Generate an authorization URL first.") |
|
return False |
|
payload = { |
|
"grant_type": "authorization_code", |
|
"code": auth_code, |
|
"redirect_uri": self.redirect_uri, |
|
"client_id": self.client_id, |
|
"code_verifier": self.code_verifier |
|
} |
|
if self.client_secret: |
|
payload["client_secret"] = self.client_secret |
|
headers = {"Content-Type": "application/x-www-form-urlencoded"} |
|
print(f"Token Request Payload: {payload}") |
|
try: |
|
response = self.session.post(self.token_url, data=payload, headers=headers) |
|
response.raise_for_status() |
|
token_data = response.json() |
|
print(f"Token Response: {token_data}") |
|
self.access_token = token_data.get("access_token") |
|
if not self.access_token: |
|
raise ValueError("No access token received.") |
|
return True |
|
except requests.RequestException as e: |
|
print(f"Token Request Failed: {e}") |
|
print(f"Response Status: {e.response.status_code if e.response else 'No response'}") |
|
print(f"Response Text: {e.response.text if e.response else 'No response'}") |
|
return False |
|
except ValueError as e: |
|
print(f"Authentication error: {e}") |
|
return False |
|
|
|
def create_virtual_workspace(self, snapshot: str = "patient-prefetch", |
|
patient_id: str = "AutoPopulatedIfNotManuallySet", |
|
hook: str = "patient-view") -> bool: |
|
""" |
|
Create a virtual workspace in the specified workspace (FHIR API). |
|
|
|
Args: |
|
snapshot (str): The snapshot type (default: "patient-prefetch"). |
|
patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet"). |
|
hook (str): The hook type (default: "patient-view"). |
|
|
|
Returns: |
|
bool: True if the virtual workspace is created successfully, False otherwise. |
|
""" |
|
url = f"{self.fhir_base_url}/$virtual-workspace" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return False |
|
|
|
payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook} |
|
|
|
try: |
|
response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers()) |
|
response.raise_for_status() |
|
return True |
|
except requests.RequestException as e: |
|
print(f"Failed to create virtual workspace: {e}") |
|
return False |
|
|
|
def get_mips_patients(self) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve a list of patients from the MIPS API. |
|
|
|
Returns: |
|
Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise. |
|
""" |
|
url = f"{self.mips_base_url}/Patient" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return None |
|
|
|
try: |
|
response = self.session.get(url, headers=self._get_headers()) |
|
response.raise_for_status() |
|
return response.json() if response.text else {} |
|
except requests.RequestException as e: |
|
print(f"Failed to retrieve MIPS patients: {e}") |
|
return None |
|
|
|
def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve patient information by ID from the MIPS API. |
|
|
|
Args: |
|
patient_id (str): The ID of the patient to retrieve. |
|
|
|
Returns: |
|
Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise. |
|
""" |
|
url = f"{self.mips_base_url}/Patient/{patient_id}" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return None |
|
|
|
try: |
|
response = self.session.get(url, headers=self._get_headers()) |
|
response.raise_for_status() |
|
return response.json() if response.text else {} |
|
except requests.RequestException as e: |
|
print(f"Failed to retrieve patient {patient_id}: {e}") |
|
return None |
|
|
|
def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]: |
|
""" |
|
Retrieve encounters from the MIPS API, optionally filtered by patient ID. |
|
|
|
Args: |
|
patient_id (str, optional): The ID of the patient to filter encounters by. |
|
|
|
Returns: |
|
Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise. |
|
""" |
|
url = f"{self.mips_base_url}/Encounter" |
|
if patient_id: |
|
url += f"?patient={patient_id}" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return None |
|
|
|
try: |
|
response = self.session.get(url, headers=self._get_headers()) |
|
response.raise_for_status() |
|
return response.json() if response.text else {} |
|
except requests.RequestException as e: |
|
print(f"Failed to retrieve encounters: {e}") |
|
return None |
|
|
|
def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool: |
|
""" |
|
Update patient data in the FHIR API using a PUT request. |
|
|
|
Args: |
|
patient_id (str): The ID of the patient to update. |
|
patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format. |
|
|
|
Returns: |
|
bool: True if the patient is updated successfully, False otherwise. |
|
""" |
|
url = f"{self.fhir_base_url}/Patient/{patient_id}" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return False |
|
|
|
try: |
|
response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers()) |
|
response.raise_for_status() |
|
return True |
|
except requests.RequestException as e: |
|
print(f"Failed to update FHIR patient {patient_id}: {e}") |
|
return False |
|
|
|
def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool: |
|
""" |
|
Update patient data in the MIPS API using a PUT request. |
|
|
|
Args: |
|
patient_id (str): The ID of the patient to update. |
|
patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format. |
|
|
|
Returns: |
|
bool: True if the patient is updated successfully, False otherwise. |
|
""" |
|
url = f"{self.mips_base_url}/Patient/{patient_id}" |
|
|
|
if not self.access_token and not self.authenticate(): |
|
print("Cannot proceed without authentication.") |
|
return False |
|
|
|
try: |
|
response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers()) |
|
response.raise_for_status() |
|
return True |
|
except requests.RequestException as e: |
|
print(f"Failed to update MIPS patient {patient_id}: {e}") |
|
return False |
|
|
|
def close(self): |
|
"""Close the session to free up resources.""" |
|
self.session.close() |
|
|
|
def __enter__(self): |
|
"""Support for context manager entry.""" |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
"""Support for context manager exit, ensuring session is closed.""" |
|
self.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|