import requests import json import base64 import hashlib import secrets from typing import Optional, Dict, Any from urllib.parse import urlencode class MeldRxAPI: def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str): self.base_url = "https://app.meldrx.com" self.api_base_url = f"{self.base_url}/api" self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}" self.mips_base_url = f"{self.base_url}/mms-api" self.token_url = f"{self.base_url}/connect/token" self.authorize_url = f"{self.base_url}/connect/authorize" self.client_id = client_id self.client_secret = client_secret self.workspace_id = workspace_id self.redirect_uri = redirect_uri self.access_token = None self.code_verifier = None self.session = requests.Session() def _generate_code_verifier(self) -> str: self.code_verifier = secrets.token_urlsafe(32) # 43 characters return self.code_verifier def _generate_code_challenge(self, code_verifier: str) -> str: sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=') return code_challenge def authenticate(self) -> bool: payload = { "grant_type": "client_credentials", "client_id": self.client_id, "client_secret": self.client_secret } headers = {"Content-Type": "application/x-www-form-urlencoded"} try: response = self.session.post(self.token_url, data=payload, headers=headers) response.raise_for_status() token_data = response.json() self.access_token = token_data.get("access_token") if not self.access_token: raise ValueError("No access token received.") return True except requests.RequestException as e: print(f"Authentication failed: {e}") return False except ValueError as e: print(f"Authentication error: {e}") return False def _get_headers(self) -> Dict[str, str]: headers = {"Content-Type": "application/json"} if self.access_token: headers["Authorization"] = f"Bearer {self.access_token}" return headers def get_patients(self) -> Optional[Dict[str, Any]]: url = f"{self.fhir_base_url}/Patient" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return None try: response = self.session.get(url, headers=self._get_headers()) response.raise_for_status() return response.json() if response.text else {} except requests.RequestException as e: print(f"Failed to retrieve patients: {e}") return None def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str: code_verifier = self._generate_code_verifier() code_challenge = self._generate_code_challenge(code_verifier) params = { "response_type": "code", "client_id": self.client_id, "redirect_uri": self.redirect_uri, "scope": scope, "state": state, "aud": self.fhir_base_url, "code_challenge": code_challenge, "code_challenge_method": "S256" } query_string = "&".join(f"{k}={v}" for k, v in params.items()) print(f"Generated Authorization URL: {self.authorize_url}?{query_string}") # Debugging return f"{self.authorize_url}?{query_string}" def authenticate_with_code(self, auth_code: str) -> bool: if not self.code_verifier: print("Error: Code verifier not set. Generate an authorization URL first.") return False payload = { "grant_type": "authorization_code", "code": auth_code, "redirect_uri": self.redirect_uri, "client_id": self.client_id, "code_verifier": self.code_verifier } if self.client_secret: payload["client_secret"] = self.client_secret headers = {"Content-Type": "application/x-www-form-urlencoded"} print(f"Token Request Payload: {payload}") # Debugging try: response = self.session.post(self.token_url, data=payload, headers=headers) response.raise_for_status() token_data = response.json() print(f"Token Response: {token_data}") # Debugging self.access_token = token_data.get("access_token") if not self.access_token: raise ValueError("No access token received.") return True except requests.RequestException as e: print(f"Token Request Failed: {e}") print(f"Response Status: {e.response.status_code if e.response else 'No response'}") print(f"Response Text: {e.response.text if e.response else 'No response'}") return False except ValueError as e: print(f"Authentication error: {e}") return False def create_virtual_workspace(self, snapshot: str = "patient-prefetch", patient_id: str = "AutoPopulatedIfNotManuallySet", hook: str = "patient-view") -> bool: """ Create a virtual workspace in the specified workspace (FHIR API). Args: snapshot (str): The snapshot type (default: "patient-prefetch"). patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet"). hook (str): The hook type (default: "patient-view"). Returns: bool: True if the virtual workspace is created successfully, False otherwise. """ url = f"{self.fhir_base_url}/$virtual-workspace" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return False payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook} try: response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers()) response.raise_for_status() return True except requests.RequestException as e: print(f"Failed to create virtual workspace: {e}") return False def get_mips_patients(self) -> Optional[Dict[str, Any]]: """ Retrieve a list of patients from the MIPS API. Returns: Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise. """ url = f"{self.mips_base_url}/Patient" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return None try: response = self.session.get(url, headers=self._get_headers()) response.raise_for_status() return response.json() if response.text else {} except requests.RequestException as e: print(f"Failed to retrieve MIPS patients: {e}") return None def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]: """ Retrieve patient information by ID from the MIPS API. Args: patient_id (str): The ID of the patient to retrieve. Returns: Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise. """ url = f"{self.mips_base_url}/Patient/{patient_id}" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return None try: response = self.session.get(url, headers=self._get_headers()) response.raise_for_status() return response.json() if response.text else {} except requests.RequestException as e: print(f"Failed to retrieve patient {patient_id}: {e}") return None def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]: """ Retrieve encounters from the MIPS API, optionally filtered by patient ID. Args: patient_id (str, optional): The ID of the patient to filter encounters by. Returns: Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise. """ url = f"{self.mips_base_url}/Encounter" if patient_id: url += f"?patient={patient_id}" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return None try: response = self.session.get(url, headers=self._get_headers()) response.raise_for_status() return response.json() if response.text else {} except requests.RequestException as e: print(f"Failed to retrieve encounters: {e}") return None def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool: """ Update patient data in the FHIR API using a PUT request. Args: patient_id (str): The ID of the patient to update. patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format. Returns: bool: True if the patient is updated successfully, False otherwise. """ url = f"{self.fhir_base_url}/Patient/{patient_id}" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return False try: response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers()) response.raise_for_status() return True except requests.RequestException as e: print(f"Failed to update FHIR patient {patient_id}: {e}") return False def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool: """ Update patient data in the MIPS API using a PUT request. Args: patient_id (str): The ID of the patient to update. patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format. Returns: bool: True if the patient is updated successfully, False otherwise. """ url = f"{self.mips_base_url}/Patient/{patient_id}" if not self.access_token and not self.authenticate(): print("Cannot proceed without authentication.") return False try: response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers()) response.raise_for_status() return True except requests.RequestException as e: print(f"Failed to update MIPS patient {patient_id}: {e}") return False def close(self): """Close the session to free up resources.""" self.session.close() def __enter__(self): """Support for context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Support for context manager exit, ensuring session is closed.""" self.close() # # Example usage with patient update functionality # if __name__ == "__main__": # # Replace these with your actual credentials and workspace ID # CLIENT_ID = "your_client_id" # CLIENT_SECRET = "your_client_secret" # WORKSPACE_ID = "your_workspace_id" # PATIENT_ID = "example_patient_id" # Replace with an actual patient ID # with MeldRxAPI(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, workspace_id=WORKSPACE_ID) as meldrx: # # Authenticate # if meldrx.authenticate(): # print("Authentication successful!") # # Retrieve specific patient information from MIPS API # patient_info = meldrx.get_mips_patient_by_id(PATIENT_ID) # if patient_info is not None: # print(f"Original Patient {PATIENT_ID} Info:", json.dumps(patient_info, indent=2)) # # Example patient data to update (FHIR Patient resource format) # updated_patient_data = { # "resourceType": "Patient", # "id": PATIENT_ID, # "name": [{ # "family": "Doe", # "given": ["John", "Updated"] # }], # "gender": "male", # "birthDate": "1980-01-01" # } # # Update patient in FHIR API # if meldrx.update_fhir_patient(PATIENT_ID, updated_patient_data): # print(f"Successfully updated patient {PATIENT_ID} in FHIR API") # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID) # if updated_info: # print(f"Updated Patient {PATIENT_ID} Info (FHIR):", json.dumps(updated_info, indent=2)) # # Update patient in MIPS API # if meldrx.update_mips_patient(PATIENT_ID, updated_patient_data): # print(f"Successfully updated patient {PATIENT_ID} in MIPS API") # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID) # if updated_info: # print(f"Updated Patient {PATIENT_ID} Info (MIPS):", json.dumps(updated_info, indent=2)) # # Retrieve encounters for the patient from MIPS API # encounters = meldrx.get_mips_encounters(patient_id=PATIENT_ID) # if encounters is not None: # print(f"Encounters for Patient {PATIENT_ID}:", json.dumps(encounters, indent=2))