Tonic commited on
Commit
a2a0a3e
·
unverified ·
1 Parent(s): ff4408c

get authorization code fix

Browse files
Files changed (4) hide show
  1. app.py +24 -1
  2. old/extractcode.py +2 -0
  3. utils/callbackmanager.py +16 -0
  4. utils/meldrx.py +331 -331
app.py CHANGED
@@ -11,6 +11,7 @@ from urllib.parse import urlparse, parse_qs # Import URL parsing utilities
11
  from utils.callbackmanager import CallbackManager
12
  from utils.meldrx import MeldRxAPI
13
  from prompts import system_instructions
 
14
  # Set up logging
15
  logging.basicConfig(level=logging.INFO)
16
  logger = logging.getLogger(__name__)
@@ -63,13 +64,35 @@ def display_form(first_name, last_name, middle_initial, dob, age, sex, address,
63
 
64
 
65
 
66
- # Create a simplified interface to avoid complex component interactions
67
  CALLBACK_MANAGER = CallbackManager(
68
  redirect_uri="https://multitransformer-discharge-guard.hf.space/callback",
69
  client_secret=None,
70
  )
71
 
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  def generate_discharge_paper_one_click():
74
  """One-click function to fetch patient data and generate discharge paper with AI Content."""
75
  patient_data_str = CALLBACK_MANAGER.get_patient_data()
 
11
  from utils.callbackmanager import CallbackManager
12
  from utils.meldrx import MeldRxAPI
13
  from prompts import system_instructions
14
+ from old.extractcode import extract_code_from_url ,
15
  # Set up logging
16
  logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
 
64
 
65
 
66
 
 
67
  CALLBACK_MANAGER = CallbackManager(
68
  redirect_uri="https://multitransformer-discharge-guard.hf.space/callback",
69
  client_secret=None,
70
  )
71
 
72
 
73
+ class CallbackManager:
74
+ def __init__(self, redirect_uri: str, client_secret: str = None):
75
+ client_id = os.getenv("APPID")
76
+ if not client_id:
77
+ raise ValueError("APPID environment variable not set.")
78
+ workspace_id = os.getenv("WORKSPACE_URL")
79
+ if not workspace_id:
80
+ raise ValueError("WORKSPACE_URL environment variable not set.")
81
+ self.api = MeldRxAPI(client_id, client_secret, workspace_id, redirect_uri)
82
+ self.auth_code = None
83
+ self.access_token = None
84
+
85
+ def handle_callback(self, callback_url: str) -> str:
86
+ """Handles the callback URL and extracts the code automatically."""
87
+ self.auth_code = extract_code_from_url(callback_url)
88
+ if not self.auth_code:
89
+ return "No authentication code found in URL."
90
+
91
+ if self.api.authenticate_with_code(self.auth_code):
92
+ self.access_token = self.api.access_token
93
+ return f"Authentication successful! Access Token: {self.access_token[:10]}... (truncated)"
94
+ return "Authentication failed. Please check the authorization code."
95
+
96
  def generate_discharge_paper_one_click():
97
  """One-click function to fetch patient data and generate discharge paper with AI Content."""
98
  patient_data_str = CALLBACK_MANAGER.get_patient_data()
old/extractcode.py CHANGED
@@ -1,4 +1,6 @@
1
  import urllib.parse
 
 
2
 
3
  def extract_code_from_url(url: str) -> str:
4
  """Extracts the 'code' parameter from a given URL."""
 
1
  import urllib.parse
2
+ from utils.meldrx import MeldRxAPI
3
+ import os
4
 
5
  def extract_code_from_url(url: str) -> str:
6
  """Extracts the 'code' parameter from a given URL."""
utils/callbackmanager.py CHANGED
@@ -7,7 +7,12 @@ import logging
7
  from huggingface_hub import InferenceClient # Import InferenceClient
8
  from urllib.parse import urlparse, parse_qs # Import URL parsing utilities
9
  from utils.meldrx import MeldRxAPI # Import the MeldRxAPI class
 
 
10
  # ... (CallbackManager, display_form, generate_pdf_from_form, generate_pdf_from_meldrx, generate_discharge_paper_one_click, client initialization remain the same) ...
 
 
 
11
 
12
  class CallbackManager:
13
  def __init__(self, redirect_uri: str, client_secret: str = None):
@@ -21,6 +26,17 @@ class CallbackManager:
21
  self.auth_code = None
22
  self.access_token = None
23
 
 
 
 
 
 
 
 
 
 
 
 
24
  def get_auth_url(self) -> str:
25
  return self.api.get_authorization_url()
26
 
 
7
  from huggingface_hub import InferenceClient # Import InferenceClient
8
  from urllib.parse import urlparse, parse_qs # Import URL parsing utilities
9
  from utils.meldrx import MeldRxAPI # Import the MeldRxAPI class
10
+ import logging
11
+ from old.extractcode import extract_code_from_url
12
  # ... (CallbackManager, display_form, generate_pdf_from_form, generate_pdf_from_meldrx, generate_discharge_paper_one_click, client initialization remain the same) ...
13
+ # Set up logging
14
+ logging.basicConfig(level=logging.INFO)
15
+ logger = logging.getLogger(__name__)
16
 
17
  class CallbackManager:
18
  def __init__(self, redirect_uri: str, client_secret: str = None):
 
26
  self.auth_code = None
27
  self.access_token = None
28
 
29
+ def handle_callback(self, callback_url: str) -> str:
30
+ """Handles the callback URL and extracts the code automatically."""
31
+ self.auth_code = extract_code_from_url(callback_url)
32
+ if not self.auth_code:
33
+ return "No authentication code found in URL."
34
+
35
+ if self.api.authenticate_with_code(self.auth_code):
36
+ self.access_token = self.api.access_token
37
+ return f"Authentication successful! Access Token: {self.access_token[:10]}... (truncated)"
38
+ return "Authentication failed. Please check the authorization code."
39
+
40
  def get_auth_url(self) -> str:
41
  return self.api.get_authorization_url()
42
 
utils/meldrx.py CHANGED
@@ -1,331 +1,331 @@
1
- import requests
2
- import json
3
- import base64
4
- import hashlib
5
- import secrets
6
- from typing import Optional, Dict, Any
7
- from urllib.parse import urlencode
8
-
9
- class MeldRxAPI:
10
- def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str):
11
- self.base_url = "https://app.meldrx.com"
12
- self.api_base_url = f"{self.base_url}/api"
13
- self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}"
14
- self.mips_base_url = f"{self.base_url}/mms-api"
15
- self.token_url = f"{self.base_url}/connect/token"
16
- self.authorize_url = f"{self.base_url}/connect/authorize"
17
- self.client_id = client_id
18
- self.client_secret = client_secret
19
- self.workspace_id = workspace_id
20
- self.redirect_uri = redirect_uri
21
- self.access_token = None
22
- self.code_verifier = None
23
- self.session = requests.Session()
24
-
25
- def _generate_code_verifier(self) -> str:
26
- self.code_verifier = secrets.token_urlsafe(32) # 43 characters
27
- return self.code_verifier
28
-
29
- def _generate_code_challenge(self, code_verifier: str) -> str:
30
- sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest()
31
- code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
32
- return code_challenge
33
-
34
- def authenticate(self) -> bool:
35
- payload = {
36
- "grant_type": "client_credentials",
37
- "client_id": self.client_id,
38
- "client_secret": self.client_secret
39
- }
40
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
41
- try:
42
- response = self.session.post(self.token_url, data=payload, headers=headers)
43
- response.raise_for_status()
44
- token_data = response.json()
45
- self.access_token = token_data.get("access_token")
46
- if not self.access_token:
47
- raise ValueError("No access token received.")
48
- return True
49
- except requests.RequestException as e:
50
- print(f"Authentication failed: {e}")
51
- return False
52
- except ValueError as e:
53
- print(f"Authentication error: {e}")
54
- return False
55
-
56
- def _get_headers(self) -> Dict[str, str]:
57
- headers = {"Content-Type": "application/json"}
58
- if self.access_token:
59
- headers["Authorization"] = f"Bearer {self.access_token}"
60
- return headers
61
-
62
- def get_patients(self) -> Optional[Dict[str, Any]]:
63
- url = f"{self.fhir_base_url}/Patient"
64
- if not self.access_token and not self.authenticate():
65
- print("Cannot proceed without authentication.")
66
- return None
67
- try:
68
- response = self.session.get(url, headers=self._get_headers())
69
- response.raise_for_status()
70
- return response.json() if response.text else {}
71
- except requests.RequestException as e:
72
- print(f"Failed to retrieve patients: {e}")
73
- return None
74
-
75
- def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
76
- code_verifier = self._generate_code_verifier()
77
- code_challenge = self._generate_code_challenge(code_verifier)
78
- params = {
79
- "response_type": "code",
80
- "client_id": self.client_id,
81
- "redirect_uri": self.redirect_uri,
82
- "scope": scope,
83
- "state": state,
84
- "aud": self.fhir_base_url,
85
- "code_challenge": code_challenge,
86
- "code_challenge_method": "S256"
87
- }
88
- query_string = urlencode(params, safe='/*') # 'safe' preserves / and * in scope
89
- return f"{self.authorize_url}?{query_string}"
90
-
91
- def authenticate_with_code(self, auth_code: str) -> bool:
92
- if not self.code_verifier:
93
- print("Code verifier not set. Generate an authorization URL first.")
94
- return False
95
- payload = {
96
- "grant_type": "authorization_code",
97
- "code": auth_code,
98
- "redirect_uri": self.redirect_uri,
99
- "client_id": self.client_id,
100
- "code_verifier": self.code_verifier
101
- }
102
- if self.client_secret:
103
- payload["client_secret"] = self.client_secret
104
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
105
- try:
106
- response = self.session.post(self.token_url, data=payload, headers=headers)
107
- response.raise_for_status()
108
- token_data = response.json()
109
- self.access_token = token_data.get("access_token")
110
- if not self.access_token:
111
- raise ValueError("No access token received.")
112
- return True
113
- except requests.RequestException as e:
114
- print(f"Authentication failed: {e}")
115
- return False
116
- except ValueError as e:
117
- print(f"Authentication error: {e}")
118
- return False
119
-
120
- def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
121
- patient_id: str = "AutoPopulatedIfNotManuallySet",
122
- hook: str = "patient-view") -> bool:
123
- """
124
- Create a virtual workspace in the specified workspace (FHIR API).
125
-
126
- Args:
127
- snapshot (str): The snapshot type (default: "patient-prefetch").
128
- patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet").
129
- hook (str): The hook type (default: "patient-view").
130
-
131
- Returns:
132
- bool: True if the virtual workspace is created successfully, False otherwise.
133
- """
134
- url = f"{self.fhir_base_url}/$virtual-workspace"
135
-
136
- if not self.access_token and not self.authenticate():
137
- print("Cannot proceed without authentication.")
138
- return False
139
-
140
- payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook}
141
-
142
- try:
143
- response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers())
144
- response.raise_for_status()
145
- return True
146
- except requests.RequestException as e:
147
- print(f"Failed to create virtual workspace: {e}")
148
- return False
149
-
150
- def get_mips_patients(self) -> Optional[Dict[str, Any]]:
151
- """
152
- Retrieve a list of patients from the MIPS API.
153
-
154
- Returns:
155
- Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
156
- """
157
- url = f"{self.mips_base_url}/Patient"
158
-
159
- if not self.access_token and not self.authenticate():
160
- print("Cannot proceed without authentication.")
161
- return None
162
-
163
- try:
164
- response = self.session.get(url, headers=self._get_headers())
165
- response.raise_for_status()
166
- return response.json() if response.text else {}
167
- except requests.RequestException as e:
168
- print(f"Failed to retrieve MIPS patients: {e}")
169
- return None
170
-
171
- def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]:
172
- """
173
- Retrieve patient information by ID from the MIPS API.
174
-
175
- Args:
176
- patient_id (str): The ID of the patient to retrieve.
177
-
178
- Returns:
179
- Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
180
- """
181
- url = f"{self.mips_base_url}/Patient/{patient_id}"
182
-
183
- if not self.access_token and not self.authenticate():
184
- print("Cannot proceed without authentication.")
185
- return None
186
-
187
- try:
188
- response = self.session.get(url, headers=self._get_headers())
189
- response.raise_for_status()
190
- return response.json() if response.text else {}
191
- except requests.RequestException as e:
192
- print(f"Failed to retrieve patient {patient_id}: {e}")
193
- return None
194
-
195
- def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]:
196
- """
197
- Retrieve encounters from the MIPS API, optionally filtered by patient ID.
198
-
199
- Args:
200
- patient_id (str, optional): The ID of the patient to filter encounters by.
201
-
202
- Returns:
203
- Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise.
204
- """
205
- url = f"{self.mips_base_url}/Encounter"
206
- if patient_id:
207
- url += f"?patient={patient_id}"
208
-
209
- if not self.access_token and not self.authenticate():
210
- print("Cannot proceed without authentication.")
211
- return None
212
-
213
- try:
214
- response = self.session.get(url, headers=self._get_headers())
215
- response.raise_for_status()
216
- return response.json() if response.text else {}
217
- except requests.RequestException as e:
218
- print(f"Failed to retrieve encounters: {e}")
219
- return None
220
-
221
- def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
222
- """
223
- Update patient data in the FHIR API using a PUT request.
224
-
225
- Args:
226
- patient_id (str): The ID of the patient to update.
227
- patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
228
-
229
- Returns:
230
- bool: True if the patient is updated successfully, False otherwise.
231
- """
232
- url = f"{self.fhir_base_url}/Patient/{patient_id}"
233
-
234
- if not self.access_token and not self.authenticate():
235
- print("Cannot proceed without authentication.")
236
- return False
237
-
238
- try:
239
- response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
240
- response.raise_for_status()
241
- return True
242
- except requests.RequestException as e:
243
- print(f"Failed to update FHIR patient {patient_id}: {e}")
244
- return False
245
-
246
- def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
247
- """
248
- Update patient data in the MIPS API using a PUT request.
249
-
250
- Args:
251
- patient_id (str): The ID of the patient to update.
252
- patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
253
-
254
- Returns:
255
- bool: True if the patient is updated successfully, False otherwise.
256
- """
257
- url = f"{self.mips_base_url}/Patient/{patient_id}"
258
-
259
- if not self.access_token and not self.authenticate():
260
- print("Cannot proceed without authentication.")
261
- return False
262
-
263
- try:
264
- response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
265
- response.raise_for_status()
266
- return True
267
- except requests.RequestException as e:
268
- print(f"Failed to update MIPS patient {patient_id}: {e}")
269
- return False
270
-
271
- def close(self):
272
- """Close the session to free up resources."""
273
- self.session.close()
274
-
275
- def __enter__(self):
276
- """Support for context manager entry."""
277
- return self
278
-
279
- def __exit__(self, exc_type, exc_val, exc_tb):
280
- """Support for context manager exit, ensuring session is closed."""
281
- self.close()
282
-
283
-
284
- # # Example usage with patient update functionality
285
- # if __name__ == "__main__":
286
- # # Replace these with your actual credentials and workspace ID
287
- # CLIENT_ID = "your_client_id"
288
- # CLIENT_SECRET = "your_client_secret"
289
- # WORKSPACE_ID = "your_workspace_id"
290
- # PATIENT_ID = "example_patient_id" # Replace with an actual patient ID
291
-
292
- # with MeldRxAPI(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, workspace_id=WORKSPACE_ID) as meldrx:
293
- # # Authenticate
294
- # if meldrx.authenticate():
295
- # print("Authentication successful!")
296
-
297
- # # Retrieve specific patient information from MIPS API
298
- # patient_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
299
- # if patient_info is not None:
300
- # print(f"Original Patient {PATIENT_ID} Info:", json.dumps(patient_info, indent=2))
301
-
302
- # # Example patient data to update (FHIR Patient resource format)
303
- # updated_patient_data = {
304
- # "resourceType": "Patient",
305
- # "id": PATIENT_ID,
306
- # "name": [{
307
- # "family": "Doe",
308
- # "given": ["John", "Updated"]
309
- # }],
310
- # "gender": "male",
311
- # "birthDate": "1980-01-01"
312
- # }
313
-
314
- # # Update patient in FHIR API
315
- # if meldrx.update_fhir_patient(PATIENT_ID, updated_patient_data):
316
- # print(f"Successfully updated patient {PATIENT_ID} in FHIR API")
317
- # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
318
- # if updated_info:
319
- # print(f"Updated Patient {PATIENT_ID} Info (FHIR):", json.dumps(updated_info, indent=2))
320
-
321
- # # Update patient in MIPS API
322
- # if meldrx.update_mips_patient(PATIENT_ID, updated_patient_data):
323
- # print(f"Successfully updated patient {PATIENT_ID} in MIPS API")
324
- # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
325
- # if updated_info:
326
- # print(f"Updated Patient {PATIENT_ID} Info (MIPS):", json.dumps(updated_info, indent=2))
327
-
328
- # # Retrieve encounters for the patient from MIPS API
329
- # encounters = meldrx.get_mips_encounters(patient_id=PATIENT_ID)
330
- # if encounters is not None:
331
- # print(f"Encounters for Patient {PATIENT_ID}:", json.dumps(encounters, indent=2))
 
1
+ import requests
2
+ import json
3
+ import base64
4
+ import hashlib
5
+ import secrets
6
+ from typing import Optional, Dict, Any
7
+ from urllib.parse import urlencode
8
+
9
+ class MeldRxAPI:
10
+ def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str):
11
+ self.base_url = "https://app.meldrx.com"
12
+ self.api_base_url = f"{self.base_url}/api"
13
+ self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}"
14
+ self.mips_base_url = f"{self.base_url}/mms-api"
15
+ self.token_url = f"{self.base_url}/connect/token"
16
+ self.authorize_url = f"{self.base_url}/connect/authorize"
17
+ self.client_id = client_id
18
+ self.client_secret = client_secret
19
+ self.workspace_id = workspace_id
20
+ self.redirect_uri = redirect_uri
21
+ self.access_token = None
22
+ self.code_verifier = None
23
+ self.session = requests.Session()
24
+
25
+ def _generate_code_verifier(self) -> str:
26
+ self.code_verifier = secrets.token_urlsafe(32) # 43 characters
27
+ return self.code_verifier
28
+
29
+ def _generate_code_challenge(self, code_verifier: str) -> str:
30
+ sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest()
31
+ code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
32
+ return code_challenge
33
+
34
+ def authenticate(self) -> bool:
35
+ payload = {
36
+ "grant_type": "client_credentials",
37
+ "client_id": self.client_id,
38
+ "client_secret": self.client_secret
39
+ }
40
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
41
+ try:
42
+ response = self.session.post(self.token_url, data=payload, headers=headers)
43
+ response.raise_for_status()
44
+ token_data = response.json()
45
+ self.access_token = token_data.get("access_token")
46
+ if not self.access_token:
47
+ raise ValueError("No access token received.")
48
+ return True
49
+ except requests.RequestException as e:
50
+ print(f"Authentication failed: {e}")
51
+ return False
52
+ except ValueError as e:
53
+ print(f"Authentication error: {e}")
54
+ return False
55
+
56
+ def _get_headers(self) -> Dict[str, str]:
57
+ headers = {"Content-Type": "application/json"}
58
+ if self.access_token:
59
+ headers["Authorization"] = f"Bearer {self.access_token}"
60
+ return headers
61
+
62
+ def get_patients(self) -> Optional[Dict[str, Any]]:
63
+ url = f"{self.fhir_base_url}/Patient"
64
+ if not self.access_token and not self.authenticate():
65
+ print("Cannot proceed without authentication.")
66
+ return None
67
+ try:
68
+ response = self.session.get(url, headers=self._get_headers())
69
+ response.raise_for_status()
70
+ return response.json() if response.text else {}
71
+ except requests.RequestException as e:
72
+ print(f"Failed to retrieve patients: {e}")
73
+ return None
74
+
75
+ def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
76
+ code_verifier = self._generate_code_verifier()
77
+ code_challenge = self._generate_code_challenge(code_verifier)
78
+ params = {
79
+ "response_type": "code",
80
+ "client_id": self.client_id,
81
+ "redirect_uri": self.redirect_uri,
82
+ "scope": scope,
83
+ "state": state,
84
+ "aud": self.fhir_base_url,
85
+ "code_challenge": code_challenge,
86
+ "code_challenge_method": "S256"
87
+ }
88
+ query_string = urlencode(params, safe='/*') # 'safe' preserves / and * in scope
89
+ return f"{self.authorize_url}?{query_string}"
90
+
91
+ def authenticate_with_code(self, auth_code: str) -> bool:
92
+ if not self.code_verifier:
93
+ print("Code verifier not set. Generate an authorization URL first.")
94
+ return False
95
+ payload = {
96
+ "grant_type": "authorization_code",
97
+ "code": auth_code,
98
+ "redirect_uri": self.redirect_uri,
99
+ "client_id": self.client_id,
100
+ "code_verifier": self.code_verifier
101
+ }
102
+ if self.client_secret:
103
+ payload["client_secret"] = self.client_secret
104
+ headers = {"Content-Type": "application/x-www-form-urlencoded"}
105
+ try:
106
+ response = self.session.post(self.token_url, data=payload, headers=headers)
107
+ response.raise_for_status()
108
+ token_data = response.json()
109
+ self.access_token = token_data.get("access_token")
110
+ if not self.access_token:
111
+ raise ValueError("No access token received.")
112
+ return True
113
+ except requests.RequestException as e:
114
+ print(f"Authentication failed: {e}")
115
+ return False
116
+ except ValueError as e:
117
+ print(f"Authentication error: {e}")
118
+ return False
119
+
120
+ def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
121
+ patient_id: str = "AutoPopulatedIfNotManuallySet",
122
+ hook: str = "patient-view") -> bool:
123
+ """
124
+ Create a virtual workspace in the specified workspace (FHIR API).
125
+
126
+ Args:
127
+ snapshot (str): The snapshot type (default: "patient-prefetch").
128
+ patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet").
129
+ hook (str): The hook type (default: "patient-view").
130
+
131
+ Returns:
132
+ bool: True if the virtual workspace is created successfully, False otherwise.
133
+ """
134
+ url = f"{self.fhir_base_url}/$virtual-workspace"
135
+
136
+ if not self.access_token and not self.authenticate():
137
+ print("Cannot proceed without authentication.")
138
+ return False
139
+
140
+ payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook}
141
+
142
+ try:
143
+ response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers())
144
+ response.raise_for_status()
145
+ return True
146
+ except requests.RequestException as e:
147
+ print(f"Failed to create virtual workspace: {e}")
148
+ return False
149
+
150
+ def get_mips_patients(self) -> Optional[Dict[str, Any]]:
151
+ """
152
+ Retrieve a list of patients from the MIPS API.
153
+
154
+ Returns:
155
+ Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
156
+ """
157
+ url = f"{self.mips_base_url}/Patient"
158
+
159
+ if not self.access_token and not self.authenticate():
160
+ print("Cannot proceed without authentication.")
161
+ return None
162
+
163
+ try:
164
+ response = self.session.get(url, headers=self._get_headers())
165
+ response.raise_for_status()
166
+ return response.json() if response.text else {}
167
+ except requests.RequestException as e:
168
+ print(f"Failed to retrieve MIPS patients: {e}")
169
+ return None
170
+
171
+ def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]:
172
+ """
173
+ Retrieve patient information by ID from the MIPS API.
174
+
175
+ Args:
176
+ patient_id (str): The ID of the patient to retrieve.
177
+
178
+ Returns:
179
+ Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
180
+ """
181
+ url = f"{self.mips_base_url}/Patient/{patient_id}"
182
+
183
+ if not self.access_token and not self.authenticate():
184
+ print("Cannot proceed without authentication.")
185
+ return None
186
+
187
+ try:
188
+ response = self.session.get(url, headers=self._get_headers())
189
+ response.raise_for_status()
190
+ return response.json() if response.text else {}
191
+ except requests.RequestException as e:
192
+ print(f"Failed to retrieve patient {patient_id}: {e}")
193
+ return None
194
+
195
+ def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]:
196
+ """
197
+ Retrieve encounters from the MIPS API, optionally filtered by patient ID.
198
+
199
+ Args:
200
+ patient_id (str, optional): The ID of the patient to filter encounters by.
201
+
202
+ Returns:
203
+ Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise.
204
+ """
205
+ url = f"{self.mips_base_url}/Encounter"
206
+ if patient_id:
207
+ url += f"?patient={patient_id}"
208
+
209
+ if not self.access_token and not self.authenticate():
210
+ print("Cannot proceed without authentication.")
211
+ return None
212
+
213
+ try:
214
+ response = self.session.get(url, headers=self._get_headers())
215
+ response.raise_for_status()
216
+ return response.json() if response.text else {}
217
+ except requests.RequestException as e:
218
+ print(f"Failed to retrieve encounters: {e}")
219
+ return None
220
+
221
+ def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
222
+ """
223
+ Update patient data in the FHIR API using a PUT request.
224
+
225
+ Args:
226
+ patient_id (str): The ID of the patient to update.
227
+ patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
228
+
229
+ Returns:
230
+ bool: True if the patient is updated successfully, False otherwise.
231
+ """
232
+ url = f"{self.fhir_base_url}/Patient/{patient_id}"
233
+
234
+ if not self.access_token and not self.authenticate():
235
+ print("Cannot proceed without authentication.")
236
+ return False
237
+
238
+ try:
239
+ response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
240
+ response.raise_for_status()
241
+ return True
242
+ except requests.RequestException as e:
243
+ print(f"Failed to update FHIR patient {patient_id}: {e}")
244
+ return False
245
+
246
+ def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
247
+ """
248
+ Update patient data in the MIPS API using a PUT request.
249
+
250
+ Args:
251
+ patient_id (str): The ID of the patient to update.
252
+ patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
253
+
254
+ Returns:
255
+ bool: True if the patient is updated successfully, False otherwise.
256
+ """
257
+ url = f"{self.mips_base_url}/Patient/{patient_id}"
258
+
259
+ if not self.access_token and not self.authenticate():
260
+ print("Cannot proceed without authentication.")
261
+ return False
262
+
263
+ try:
264
+ response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
265
+ response.raise_for_status()
266
+ return True
267
+ except requests.RequestException as e:
268
+ print(f"Failed to update MIPS patient {patient_id}: {e}")
269
+ return False
270
+
271
+ def close(self):
272
+ """Close the session to free up resources."""
273
+ self.session.close()
274
+
275
+ def __enter__(self):
276
+ """Support for context manager entry."""
277
+ return self
278
+
279
+ def __exit__(self, exc_type, exc_val, exc_tb):
280
+ """Support for context manager exit, ensuring session is closed."""
281
+ self.close()
282
+
283
+
284
+ # # Example usage with patient update functionality
285
+ # if __name__ == "__main__":
286
+ # # Replace these with your actual credentials and workspace ID
287
+ # CLIENT_ID = "your_client_id"
288
+ # CLIENT_SECRET = "your_client_secret"
289
+ # WORKSPACE_ID = "your_workspace_id"
290
+ # PATIENT_ID = "example_patient_id" # Replace with an actual patient ID
291
+
292
+ # with MeldRxAPI(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, workspace_id=WORKSPACE_ID) as meldrx:
293
+ # # Authenticate
294
+ # if meldrx.authenticate():
295
+ # print("Authentication successful!")
296
+
297
+ # # Retrieve specific patient information from MIPS API
298
+ # patient_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
299
+ # if patient_info is not None:
300
+ # print(f"Original Patient {PATIENT_ID} Info:", json.dumps(patient_info, indent=2))
301
+
302
+ # # Example patient data to update (FHIR Patient resource format)
303
+ # updated_patient_data = {
304
+ # "resourceType": "Patient",
305
+ # "id": PATIENT_ID,
306
+ # "name": [{
307
+ # "family": "Doe",
308
+ # "given": ["John", "Updated"]
309
+ # }],
310
+ # "gender": "male",
311
+ # "birthDate": "1980-01-01"
312
+ # }
313
+
314
+ # # Update patient in FHIR API
315
+ # if meldrx.update_fhir_patient(PATIENT_ID, updated_patient_data):
316
+ # print(f"Successfully updated patient {PATIENT_ID} in FHIR API")
317
+ # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
318
+ # if updated_info:
319
+ # print(f"Updated Patient {PATIENT_ID} Info (FHIR):", json.dumps(updated_info, indent=2))
320
+
321
+ # # Update patient in MIPS API
322
+ # if meldrx.update_mips_patient(PATIENT_ID, updated_patient_data):
323
+ # print(f"Successfully updated patient {PATIENT_ID} in MIPS API")
324
+ # updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
325
+ # if updated_info:
326
+ # print(f"Updated Patient {PATIENT_ID} Info (MIPS):", json.dumps(updated_info, indent=2))
327
+
328
+ # # Retrieve encounters for the patient from MIPS API
329
+ # encounters = meldrx.get_mips_encounters(patient_id=PATIENT_ID)
330
+ # if encounters is not None:
331
+ # print(f"Encounters for Patient {PATIENT_ID}:", json.dumps(encounters, indent=2))