File size: 13,949 Bytes
9909dba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8db749a
 
9909dba
 
 
 
8db749a
9909dba
 
 
 
 
 
 
 
 
 
 
8db749a
9909dba
 
 
 
8db749a
9909dba
 
 
 
 
8db749a
 
 
9909dba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import requests
import json
import base64
import hashlib
import secrets
from typing import Optional, Dict, Any
from urllib.parse import urlencode

class MeldRxAPI:
    def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str):
        self.base_url = "https://app.meldrx.com"
        self.api_base_url = f"{self.base_url}/api"
        self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}"
        self.mips_base_url = f"{self.base_url}/mms-api"
        self.token_url = f"{self.base_url}/connect/token"
        self.authorize_url = f"{self.base_url}/connect/authorize"
        self.client_id = client_id
        self.client_secret = client_secret
        self.workspace_id = workspace_id
        self.redirect_uri = redirect_uri
        self.access_token = None
        self.code_verifier = None
        self.session = requests.Session()

    def _generate_code_verifier(self) -> str:
        self.code_verifier = secrets.token_urlsafe(32)  # 43 characters
        return self.code_verifier

    def _generate_code_challenge(self, code_verifier: str) -> str:
        sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
        return code_challenge

    def authenticate(self) -> bool:
        payload = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret
        }
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        try:
            response = self.session.post(self.token_url, data=payload, headers=headers)
            response.raise_for_status()
            token_data = response.json()
            self.access_token = token_data.get("access_token")
            if not self.access_token:
                raise ValueError("No access token received.")
            return True
        except requests.RequestException as e:
            print(f"Authentication failed: {e}")
            return False
        except ValueError as e:
            print(f"Authentication error: {e}")
            return False

    def _get_headers(self) -> Dict[str, str]:
        headers = {"Content-Type": "application/json"}
        if self.access_token:
            headers["Authorization"] = f"Bearer {self.access_token}"
        return headers

    def get_patients(self) -> Optional[Dict[str, Any]]:
        url = f"{self.fhir_base_url}/Patient"
        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return None
        try:
            response = self.session.get(url, headers=self._get_headers())
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.RequestException as e:
            print(f"Failed to retrieve patients: {e}")
            return None

    def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
        code_verifier = self._generate_code_verifier()
        code_challenge = self._generate_code_challenge(code_verifier)
        params = {
            "response_type": "code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "scope": scope,
            "state": state,
            "aud": self.fhir_base_url,
            "code_challenge": code_challenge,
            "code_challenge_method": "S256"
        }
        query_string = "&".join(f"{k}={v}" for k, v in params.items())
        print(f"Generated Authorization URL: {self.authorize_url}?{query_string}")  # Debugging
        return f"{self.authorize_url}?{query_string}"

    def authenticate_with_code(self, auth_code: str) -> bool:
        if not self.code_verifier:
            print("Error: Code verifier not set. Generate an authorization URL first.")
            return False
        payload = {
            "grant_type": "authorization_code",
            "code": auth_code,
            "redirect_uri": self.redirect_uri,
            "client_id": self.client_id,
            "code_verifier": self.code_verifier
        }
        if self.client_secret:
            payload["client_secret"] = self.client_secret
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        print(f"Token Request Payload: {payload}")  # Debugging
        try:
            response = self.session.post(self.token_url, data=payload, headers=headers)
            response.raise_for_status()
            token_data = response.json()
            print(f"Token Response: {token_data}")  # Debugging
            self.access_token = token_data.get("access_token")
            if not self.access_token:
                raise ValueError("No access token received.")
            return True
        except requests.RequestException as e:
            print(f"Token Request Failed: {e}")
            print(f"Response Status: {e.response.status_code if e.response else 'No response'}")
            print(f"Response Text: {e.response.text if e.response else 'No response'}")
            return False
        except ValueError as e:
            print(f"Authentication error: {e}")
            return False
        
    def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
                            patient_id: str = "AutoPopulatedIfNotManuallySet",
                            hook: str = "patient-view") -> bool:
        """
        Create a virtual workspace in the specified workspace (FHIR API).

        Args:
            snapshot (str): The snapshot type (default: "patient-prefetch").
            patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet").
            hook (str): The hook type (default: "patient-view").

        Returns:
            bool: True if the virtual workspace is created successfully, False otherwise.
        """
        url = f"{self.fhir_base_url}/$virtual-workspace"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return False

        payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook}

        try:
            response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers())
            response.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Failed to create virtual workspace: {e}")
            return False

    def get_mips_patients(self) -> Optional[Dict[str, Any]]:
        """
        Retrieve a list of patients from the MIPS API.

        Returns:
            Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
        """
        url = f"{self.mips_base_url}/Patient"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return None

        try:
            response = self.session.get(url, headers=self._get_headers())
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.RequestException as e:
            print(f"Failed to retrieve MIPS patients: {e}")
            return None

    def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve patient information by ID from the MIPS API.

        Args:
            patient_id (str): The ID of the patient to retrieve.

        Returns:
            Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
        """
        url = f"{self.mips_base_url}/Patient/{patient_id}"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return None

        try:
            response = self.session.get(url, headers=self._get_headers())
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.RequestException as e:
            print(f"Failed to retrieve patient {patient_id}: {e}")
            return None

    def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]:
        """
        Retrieve encounters from the MIPS API, optionally filtered by patient ID.

        Args:
            patient_id (str, optional): The ID of the patient to filter encounters by.

        Returns:
            Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise.
        """
        url = f"{self.mips_base_url}/Encounter"
        if patient_id:
            url += f"?patient={patient_id}"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return None

        try:
            response = self.session.get(url, headers=self._get_headers())
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.RequestException as e:
            print(f"Failed to retrieve encounters: {e}")
            return None

    def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
        """
        Update patient data in the FHIR API using a PUT request.

        Args:
            patient_id (str): The ID of the patient to update.
            patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.

        Returns:
            bool: True if the patient is updated successfully, False otherwise.
        """
        url = f"{self.fhir_base_url}/Patient/{patient_id}"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return False

        try:
            response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
            response.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Failed to update FHIR patient {patient_id}: {e}")
            return False

    def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
        """
        Update patient data in the MIPS API using a PUT request.

        Args:
            patient_id (str): The ID of the patient to update.
            patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.

        Returns:
            bool: True if the patient is updated successfully, False otherwise.
        """
        url = f"{self.mips_base_url}/Patient/{patient_id}"

        if not self.access_token and not self.authenticate():
            print("Cannot proceed without authentication.")
            return False

        try:
            response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
            response.raise_for_status()
            return True
        except requests.RequestException as e:
            print(f"Failed to update MIPS patient {patient_id}: {e}")
            return False

    def close(self):
        """Close the session to free up resources."""
        self.session.close()

    def __enter__(self):
        """Support for context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Support for context manager exit, ensuring session is closed."""
        self.close()


# # Example usage with patient update functionality
# if __name__ == "__main__":
#     # Replace these with your actual credentials and workspace ID
#     CLIENT_ID = "your_client_id"
#     CLIENT_SECRET = "your_client_secret"
#     WORKSPACE_ID = "your_workspace_id"
#     PATIENT_ID = "example_patient_id"  # Replace with an actual patient ID

#     with MeldRxAPI(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, workspace_id=WORKSPACE_ID) as meldrx:
#         # Authenticate
#         if meldrx.authenticate():
#             print("Authentication successful!")

#             # Retrieve specific patient information from MIPS API
#             patient_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
#             if patient_info is not None:
#                 print(f"Original Patient {PATIENT_ID} Info:", json.dumps(patient_info, indent=2))

#             # Example patient data to update (FHIR Patient resource format)
#             updated_patient_data = {
#                 "resourceType": "Patient",
#                 "id": PATIENT_ID,
#                 "name": [{
#                     "family": "Doe",
#                     "given": ["John", "Updated"]
#                 }],
#                 "gender": "male",
#                 "birthDate": "1980-01-01"
#             }

#             # Update patient in FHIR API
#             if meldrx.update_fhir_patient(PATIENT_ID, updated_patient_data):
#                 print(f"Successfully updated patient {PATIENT_ID} in FHIR API")
#                 updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
#                 if updated_info:
#                     print(f"Updated Patient {PATIENT_ID} Info (FHIR):", json.dumps(updated_info, indent=2))

#             # Update patient in MIPS API
#             if meldrx.update_mips_patient(PATIENT_ID, updated_patient_data):
#                 print(f"Successfully updated patient {PATIENT_ID} in MIPS API")
#                 updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
#                 if updated_info:
#                     print(f"Updated Patient {PATIENT_ID} Info (MIPS):", json.dumps(updated_info, indent=2))

#             # Retrieve encounters for the patient from MIPS API
#             encounters = meldrx.get_mips_encounters(patient_id=PATIENT_ID)
#             if encounters is not None:
#                 print(f"Encounters for Patient {PATIENT_ID}:", json.dumps(encounters, indent=2))