Tonic commited on
Commit
7f2013e
·
unverified ·
1 Parent(s): f087af0

add callback manager , model app

Browse files
Files changed (2) hide show
  1. callbackmanager.py +5 -0
  2. meldrx.py +3 -92
callbackmanager.py CHANGED
@@ -4,6 +4,11 @@ from meldrx import MeldRxAPI
4
  import json
5
  import os
6
 
 
 
 
 
 
7
  class CallbackManager:
8
  def __init__(self, redirect_uri: str, client_secret: str = None):
9
  client_id = os.getenv("APPID")
 
4
  import json
5
  import os
6
 
7
+ import gradio as gr
8
+ from meldrx import MeldRxAPI
9
+ import json
10
+ import os
11
+
12
  class CallbackManager:
13
  def __init__(self, redirect_uri: str, client_secret: str = None):
14
  client_id = os.getenv("APPID")
meldrx.py CHANGED
@@ -22,7 +22,7 @@ class MeldRxAPI:
22
  self.session = requests.Session()
23
 
24
  def _generate_code_verifier(self) -> str:
25
- self.code_verifier = secrets.token_urlsafe(32) # 43 characters, per PKCE spec
26
  return self.code_verifier
27
 
28
  def _generate_code_challenge(self, code_verifier: str) -> str:
@@ -43,7 +43,7 @@ class MeldRxAPI:
43
  token_data = response.json()
44
  self.access_token = token_data.get("access_token")
45
  if not self.access_token:
46
- raise ValueError("No access token received from the server.")
47
  return True
48
  except requests.RequestException as e:
49
  print(f"Authentication failed: {e}")
@@ -107,7 +107,7 @@ class MeldRxAPI:
107
  token_data = response.json()
108
  self.access_token = token_data.get("access_token")
109
  if not self.access_token:
110
- raise ValueError("No access token received from the server.")
111
  return True
112
  except requests.RequestException as e:
113
  print(f"Authentication failed: {e}")
@@ -115,96 +115,7 @@ class MeldRxAPI:
115
  except ValueError as e:
116
  print(f"Authentication error: {e}")
117
  return False
118
-
119
- def _get_headers(self) -> Dict[str, str]:
120
- """
121
- Generate headers with the Bearer token for authenticated requests.
122
-
123
- Returns:
124
- Dict[str, str]: Headers dictionary with Authorization token if available.
125
- """
126
- headers = {"Content-Type": "application/json"}
127
- if self.access_token:
128
- headers["Authorization"] = f"Bearer {self.access_token}"
129
- return headers
130
-
131
- def get_patients(self) -> Optional[Dict[str, Any]]:
132
- """
133
- Retrieve a list of patients from the specified workspace (FHIR API).
134
-
135
- Returns:
136
- Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
137
- """
138
- url = f"{self.fhir_base_url}/Patient"
139
-
140
- if not self.access_token and not self.authenticate():
141
- print("Cannot proceed without authentication.")
142
- return None
143
-
144
- try:
145
- response = self.session.get(url, headers=self._get_headers())
146
- response.raise_for_status()
147
- return response.json() if response.text else {}
148
- except requests.RequestException as e:
149
- print(f"Failed to retrieve patients: {e}")
150
- return None
151
 
152
- def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
153
- """
154
- Generate the SMART on FHIR authorization URL for patient login.
155
-
156
- Args:
157
- scope (str): The requested scopes (default: patient/*.read openid profile).
158
- state (str): A random state string for security (default: random_state).
159
-
160
- Returns:
161
- str: The authorization URL to open in a browser.
162
- """
163
- params = {
164
- "response_type": "code",
165
- "client_id": self.client_id,
166
- "redirect_uri": self.redirect_uri,
167
- "scope": scope,
168
- "state": state,
169
- "aud": self.fhir_base_url
170
- }
171
- query_string = "&".join(f"{k}={v}" for k, v in params.items())
172
- return f"{self.authorize_url}?{query_string}"
173
-
174
- def authenticate_with_code(self, auth_code: str) -> bool:
175
- """
176
- Exchange an authorization code for an access token.
177
-
178
- Args:
179
- auth_code (str): The authorization code from the redirect URI.
180
-
181
- Returns:
182
- bool: True if authentication is successful, False otherwise.
183
- """
184
- payload = {
185
- "grant_type": "authorization_code",
186
- "code": auth_code,
187
- "redirect_uri": self.redirect_uri,
188
- "client_id": self.client_id
189
- }
190
- if self.client_secret:
191
- payload["client_secret"] = self.client_secret
192
- headers = {"Content-Type": "application/x-www-form-urlencoded"}
193
-
194
- try:
195
- response = self.session.post(self.token_url, data=payload, headers=headers)
196
- response.raise_for_status()
197
- token_data = response.json()
198
- self.access_token = token_data.get("access_token")
199
- if not self.access_token:
200
- raise ValueError("No access token received from the server.")
201
- return True
202
- except requests.RequestException as e:
203
- print(f"Authentication failed: {e}")
204
- return False
205
- except ValueError as e:
206
- print(f"Authentication error: {e}")
207
- return False
208
  def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
209
  patient_id: str = "AutoPopulatedIfNotManuallySet",
210
  hook: str = "patient-view") -> bool:
 
22
  self.session = requests.Session()
23
 
24
  def _generate_code_verifier(self) -> str:
25
+ self.code_verifier = secrets.token_urlsafe(32) # 43 characters
26
  return self.code_verifier
27
 
28
  def _generate_code_challenge(self, code_verifier: str) -> str:
 
43
  token_data = response.json()
44
  self.access_token = token_data.get("access_token")
45
  if not self.access_token:
46
+ raise ValueError("No access token received.")
47
  return True
48
  except requests.RequestException as e:
49
  print(f"Authentication failed: {e}")
 
107
  token_data = response.json()
108
  self.access_token = token_data.get("access_token")
109
  if not self.access_token:
110
+ raise ValueError("No access token received.")
111
  return True
112
  except requests.RequestException as e:
113
  print(f"Authentication failed: {e}")
 
115
  except ValueError as e:
116
  print(f"Authentication error: {e}")
117
  return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
118
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
120
  patient_id: str = "AutoPopulatedIfNotManuallySet",
121
  hook: str = "patient-view") -> bool: