try to fix the identifier again
Browse files- utils/oneclick.py +17 -16
- utils/responseparser.py +30 -191
utils/oneclick.py
CHANGED
@@ -3,7 +3,7 @@ from typing import Tuple, Optional, Dict
|
|
3 |
from .meldrx import MeldRxAPI
|
4 |
from .responseparser import PatientDataExtractor
|
5 |
from .pdfutils import PDFGenerator
|
6 |
-
from .verifier import DischargeVerifier
|
7 |
import logging
|
8 |
import json
|
9 |
from huggingface_hub import InferenceClient
|
@@ -16,7 +16,7 @@ if not HF_TOKEN:
|
|
16 |
raise ValueError("HF_TOKEN environment variable not set.")
|
17 |
client = InferenceClient(api_key=HF_TOKEN)
|
18 |
MODEL_NAME = "meta-llama/Llama-3.3-70B-Instruct"
|
19 |
-
verifier = DischargeVerifier()
|
20 |
|
21 |
def generate_ai_discharge_summary(patient_dict: Dict[str, str], client) -> Tuple[Optional[str], Optional[str]]:
|
22 |
"""Generate a discharge summary using AI and verify it for hallucinations."""
|
@@ -113,14 +113,16 @@ def generate_discharge_paper_one_click(
|
|
113 |
logger.debug(f"Patient {i}: ID={patient_id_from_data}, Name={first_name_from_data} {last_name_from_data}")
|
114 |
logger.debug(f"Comparing - Input: ID={patient_id_input}, First={first_name_input}, Last={last_name_input}")
|
115 |
|
116 |
-
|
117 |
-
|
118 |
-
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
|
123 |
-
|
|
|
|
|
124 |
if matches:
|
125 |
matching_patients.append(patient_data)
|
126 |
logger.info(f"Found matching patient: ID={patient_id_from_data}, "
|
@@ -134,8 +136,8 @@ def generate_discharge_paper_one_click(
|
|
134 |
return None, (f"No patients found matching criteria: {search_criteria}\n"
|
135 |
f"Available IDs: {', '.join(all_patient_ids)}\n"
|
136 |
f"Available Names: {', '.join(all_patient_names)}"), None, None, None
|
137 |
-
|
138 |
-
patient_data = matching_patients[0]
|
139 |
logger.info(f"Selected patient data: {patient_data}")
|
140 |
|
141 |
basic_summary = format_discharge_summary(patient_data)
|
@@ -158,13 +160,12 @@ def generate_discharge_paper_one_click(
|
|
158 |
|
159 |
def format_discharge_summary(patient_data: dict) -> str:
|
160 |
"""Format patient data into a discharge summary text."""
|
161 |
-
patient_data.setdefault('
|
162 |
patient_data.setdefault('first_name', '')
|
163 |
patient_data.setdefault('last_name', '')
|
164 |
patient_data.setdefault('dob', 'Unknown')
|
165 |
patient_data.setdefault('age', 'Unknown')
|
166 |
patient_data.setdefault('sex', 'Unknown')
|
167 |
-
patient_data.setdefault('id', 'Unknown')
|
168 |
patient_data.setdefault('address', 'Unknown')
|
169 |
patient_data.setdefault('city', 'Unknown')
|
170 |
patient_data.setdefault('state', 'Unknown')
|
@@ -186,11 +187,11 @@ def format_discharge_summary(patient_data: dict) -> str:
|
|
186 |
"DISCHARGE SUMMARY",
|
187 |
"",
|
188 |
"PATIENT INFORMATION",
|
189 |
-
f"Name: {patient_data['
|
|
|
190 |
f"Date of Birth: {patient_data['dob']}",
|
191 |
f"Age: {patient_data['age']}",
|
192 |
f"Gender: {patient_data['sex']}",
|
193 |
-
f"Patient ID: {patient_data['id']}",
|
194 |
"",
|
195 |
"CONTACT INFORMATION",
|
196 |
f"Address: {patient_data['address']}",
|
|
|
3 |
from .meldrx import MeldRxAPI
|
4 |
from .responseparser import PatientDataExtractor
|
5 |
from .pdfutils import PDFGenerator
|
6 |
+
from .verifier import DischargeVerifier
|
7 |
import logging
|
8 |
import json
|
9 |
from huggingface_hub import InferenceClient
|
|
|
16 |
raise ValueError("HF_TOKEN environment variable not set.")
|
17 |
client = InferenceClient(api_key=HF_TOKEN)
|
18 |
MODEL_NAME = "meta-llama/Llama-3.3-70B-Instruct"
|
19 |
+
verifier = DischargeVerifier()
|
20 |
|
21 |
def generate_ai_discharge_summary(patient_dict: Dict[str, str], client) -> Tuple[Optional[str], Optional[str]]:
|
22 |
"""Generate a discharge summary using AI and verify it for hallucinations."""
|
|
|
113 |
logger.debug(f"Patient {i}: ID={patient_id_from_data}, Name={first_name_from_data} {last_name_from_data}")
|
114 |
logger.debug(f"Comparing - Input: ID={patient_id_input}, First={first_name_input}, Last={last_name_input}")
|
115 |
|
116 |
+
# Match logic: ID takes precedence, then first/last name
|
117 |
+
matches = False
|
118 |
+
if patient_id_input and patient_id_from_data == patient_id_input:
|
119 |
+
matches = True
|
120 |
+
elif not patient_id_input and first_name_input and last_name_input:
|
121 |
+
if first_name_input == first_name_from_data and last_name_input == last_name_from_data:
|
122 |
+
matches = True
|
123 |
+
elif not patient_id_input and not first_name_input and not last_name_input:
|
124 |
+
continue # Skip if no criteria provided
|
125 |
+
|
126 |
if matches:
|
127 |
matching_patients.append(patient_data)
|
128 |
logger.info(f"Found matching patient: ID={patient_id_from_data}, "
|
|
|
136 |
return None, (f"No patients found matching criteria: {search_criteria}\n"
|
137 |
f"Available IDs: {', '.join(all_patient_ids)}\n"
|
138 |
f"Available Names: {', '.join(all_patient_names)}"), None, None, None
|
139 |
+
|
140 |
+
patient_data = matching_patients[0] # Take the first match
|
141 |
logger.info(f"Selected patient data: {patient_data}")
|
142 |
|
143 |
basic_summary = format_discharge_summary(patient_data)
|
|
|
160 |
|
161 |
def format_discharge_summary(patient_data: dict) -> str:
|
162 |
"""Format patient data into a discharge summary text."""
|
163 |
+
patient_data.setdefault('id', 'Unknown')
|
164 |
patient_data.setdefault('first_name', '')
|
165 |
patient_data.setdefault('last_name', '')
|
166 |
patient_data.setdefault('dob', 'Unknown')
|
167 |
patient_data.setdefault('age', 'Unknown')
|
168 |
patient_data.setdefault('sex', 'Unknown')
|
|
|
169 |
patient_data.setdefault('address', 'Unknown')
|
170 |
patient_data.setdefault('city', 'Unknown')
|
171 |
patient_data.setdefault('state', 'Unknown')
|
|
|
187 |
"DISCHARGE SUMMARY",
|
188 |
"",
|
189 |
"PATIENT INFORMATION",
|
190 |
+
f"Name: {patient_data['first_name']} {patient_data['last_name']}".strip(),
|
191 |
+
f"Patient ID: {patient_data['id']}",
|
192 |
f"Date of Birth: {patient_data['dob']}",
|
193 |
f"Age: {patient_data['age']}",
|
194 |
f"Gender: {patient_data['sex']}",
|
|
|
195 |
"",
|
196 |
"CONTACT INFORMATION",
|
197 |
f"Address: {patient_data['address']}",
|
utils/responseparser.py
CHANGED
@@ -1,3 +1,4 @@
|
|
|
|
1 |
import json
|
2 |
import lxml.etree as etree
|
3 |
from datetime import datetime
|
@@ -5,7 +6,6 @@ from typing import List, Dict, Optional, Union
|
|
5 |
import base64
|
6 |
import logging
|
7 |
|
8 |
-
# Set up logging
|
9 |
logging.basicConfig(
|
10 |
level=logging.INFO,
|
11 |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
@@ -42,7 +42,7 @@ class PatientDataExtractor:
|
|
42 |
def _extract_patients(self) -> List:
|
43 |
"""Extract all patient entries based on format."""
|
44 |
if self.format == "xml":
|
45 |
-
return [self.data]
|
46 |
elif self.format == "json":
|
47 |
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
|
48 |
raise ValueError("Invalid FHIR Bundle format")
|
@@ -65,33 +65,15 @@ class PatientDataExtractor:
|
|
65 |
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns)
|
66 |
return id_list[0] if id_list else ""
|
67 |
elif self.format == "json":
|
68 |
-
# Check top-level 'id' first
|
69 |
patient_id = patient.get("id", "")
|
70 |
if patient_id:
|
71 |
return patient_id
|
72 |
-
# Fallback to 'identifier' field
|
73 |
identifiers = patient.get("identifier", [])
|
74 |
for identifier in identifiers:
|
75 |
-
if identifier.get("value"):
|
76 |
return identifier["value"]
|
77 |
-
return ""
|
78 |
-
|
79 |
-
def get_resource_type(self) -> str:
|
80 |
-
patient = self._get_current_patient()
|
81 |
-
if self.format == "xml":
|
82 |
-
return "ClinicalDocument"
|
83 |
-
elif self.format == "json":
|
84 |
-
return patient.get("resourceType", "")
|
85 |
-
|
86 |
-
def get_meta_last_updated(self) -> str:
|
87 |
-
patient = self._get_current_patient()
|
88 |
-
if self.format == "xml":
|
89 |
-
time_list = patient.xpath("//hl7:effectiveTime/@value", namespaces=self.ns)
|
90 |
-
return time_list[0] if time_list else ""
|
91 |
-
elif self.format == "json":
|
92 |
-
return patient.get("meta", {}).get("lastUpdated", "")
|
93 |
|
94 |
-
# Name Fields
|
95 |
def get_first_name(self) -> str:
|
96 |
patient = self._get_current_patient()
|
97 |
if self.format == "xml":
|
@@ -114,18 +96,6 @@ class PatientDataExtractor:
|
|
114 |
return name["family"]
|
115 |
return ""
|
116 |
|
117 |
-
def get_name_prefix(self) -> str:
|
118 |
-
patient = self._get_current_patient()
|
119 |
-
if self.format == "xml":
|
120 |
-
prefix = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:prefix/text()", namespaces=self.ns)
|
121 |
-
return prefix[0] if prefix else ""
|
122 |
-
elif self.format == "json":
|
123 |
-
for name in patient.get("name", []):
|
124 |
-
if name.get("use") == "official" and "prefix" in name:
|
125 |
-
return name["prefix"][0]
|
126 |
-
return ""
|
127 |
-
|
128 |
-
# Demographic Fields
|
129 |
def get_dob(self) -> str:
|
130 |
patient = self._get_current_patient()
|
131 |
if self.format == "xml":
|
@@ -139,7 +109,7 @@ class PatientDataExtractor:
|
|
139 |
if not dob:
|
140 |
return ""
|
141 |
try:
|
142 |
-
birth_date = datetime.strptime(dob[:8], "%Y%m%d")
|
143 |
today = datetime.now()
|
144 |
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
|
145 |
return str(age)
|
@@ -154,7 +124,6 @@ class PatientDataExtractor:
|
|
154 |
elif self.format == "json":
|
155 |
return patient.get("gender", "").capitalize()
|
156 |
|
157 |
-
# Address Fields
|
158 |
def get_address_line(self) -> str:
|
159 |
patient = self._get_current_patient()
|
160 |
if self.format == "xml":
|
@@ -191,7 +160,6 @@ class PatientDataExtractor:
|
|
191 |
addresses = patient.get("address", [])
|
192 |
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
|
193 |
|
194 |
-
# Contact Fields
|
195 |
def get_phone(self) -> str:
|
196 |
patient = self._get_current_patient()
|
197 |
if self.format == "xml":
|
@@ -203,50 +171,6 @@ class PatientDataExtractor:
|
|
203 |
return telecom.get("value", "")
|
204 |
return ""
|
205 |
|
206 |
-
# Extensions and Additional Fields
|
207 |
-
def get_race(self) -> str:
|
208 |
-
patient = self._get_current_patient()
|
209 |
-
if self.format == "xml":
|
210 |
-
race = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:raceCode/@displayName", namespaces=self.ns)
|
211 |
-
return race[0] if race else ""
|
212 |
-
elif self.format == "json":
|
213 |
-
for ext in patient.get("extension", []):
|
214 |
-
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race":
|
215 |
-
for sub_ext in ext.get("extension", []):
|
216 |
-
if sub_ext.get("url") == "text":
|
217 |
-
return sub_ext.get("valueString", "")
|
218 |
-
return ""
|
219 |
-
|
220 |
-
def get_ethnicity(self) -> str:
|
221 |
-
patient = self._get_current_patient()
|
222 |
-
if self.format == "xml":
|
223 |
-
ethnicity = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:ethnicGroupCode/@displayName", namespaces=self.ns)
|
224 |
-
return ethnicity[0] if ethnicity else ""
|
225 |
-
elif self.format == "json":
|
226 |
-
for ext in patient.get("extension", []):
|
227 |
-
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity":
|
228 |
-
for sub_ext in ext.get("extension", []):
|
229 |
-
if sub_ext.get("url") == "text":
|
230 |
-
return sub_ext.get("valueString", "")
|
231 |
-
return ""
|
232 |
-
|
233 |
-
def get_language(self) -> str:
|
234 |
-
patient = self._get_current_patient()
|
235 |
-
if self.format == "xml":
|
236 |
-
lang = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:languageCommunication/hl7:languageCode/@code", namespaces=self.ns)
|
237 |
-
return lang[0] if lang else ""
|
238 |
-
elif self.format == "json":
|
239 |
-
comms = patient.get("communication", [])
|
240 |
-
if comms and "language" in comms[0]:
|
241 |
-
lang = comms[0]["language"]
|
242 |
-
# Try 'text' first, then fall back to 'coding' if available
|
243 |
-
if "text" in lang:
|
244 |
-
return lang["text"]
|
245 |
-
elif "coding" in lang and lang["coding"]:
|
246 |
-
return lang["coding"][0].get("display", lang["coding"][0].get("code", ""))
|
247 |
-
return ""
|
248 |
-
|
249 |
-
# Medications
|
250 |
def get_medications(self) -> List[Dict[str, str]]:
|
251 |
if self.format == "xml":
|
252 |
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns)
|
@@ -278,7 +202,6 @@ class PatientDataExtractor:
|
|
278 |
result.append({"start": start, "stop": stop, "description": desc, "code": code})
|
279 |
return result
|
280 |
|
281 |
-
# Encounters
|
282 |
def get_encounters(self) -> List[Dict[str, str]]:
|
283 |
if self.format == "xml":
|
284 |
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns)
|
@@ -302,7 +225,6 @@ class PatientDataExtractor:
|
|
302 |
result.append({"start": start, "end": end, "description": desc, "code": code})
|
303 |
return result
|
304 |
|
305 |
-
# Conditions/Diagnoses
|
306 |
def get_conditions(self) -> List[Dict[str, str]]:
|
307 |
if self.format == "xml":
|
308 |
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns)
|
@@ -331,107 +253,16 @@ class PatientDataExtractor:
|
|
331 |
result.append({"onset": onset, "description": desc, "code": code})
|
332 |
return result
|
333 |
|
334 |
-
# Immunizations
|
335 |
-
def get_immunizations(self) -> List[Dict[str, str]]:
|
336 |
-
if self.format == "xml":
|
337 |
-
section = self.data.xpath("//hl7:section[hl7:code/@code='11369-6']", namespaces=self.ns)
|
338 |
-
if not section:
|
339 |
-
return []
|
340 |
-
immunizations = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
|
341 |
-
result = []
|
342 |
-
for imm in immunizations:
|
343 |
-
date_list = imm.xpath(".//hl7:effectiveTime/@value", namespaces=self.ns)
|
344 |
-
date = date_list[0] if date_list else ""
|
345 |
-
desc_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
|
346 |
-
desc = desc_list[0] if desc_list else ""
|
347 |
-
code_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
|
348 |
-
code = code_list[0] if code_list else ""
|
349 |
-
result.append({"date": date, "description": desc, "code": code})
|
350 |
-
return result
|
351 |
-
elif self.format == "json":
|
352 |
-
entries = self.data.get("entry", [])
|
353 |
-
result = []
|
354 |
-
for entry in entries:
|
355 |
-
if entry["resource"]["resourceType"] == "Immunization":
|
356 |
-
imm = entry["resource"]
|
357 |
-
date = imm.get("occurrenceDateTime", "")
|
358 |
-
desc = imm.get("vaccineCode", {}).get("text", "")
|
359 |
-
code = imm.get("vaccineCode", {}).get("coding", [{}])[0].get("code", "")
|
360 |
-
result.append({"date": date, "description": desc, "code": code})
|
361 |
-
return result
|
362 |
-
|
363 |
-
# Diagnostic Reports
|
364 |
-
def get_diagnostic_reports(self) -> List[Dict[str, str]]:
|
365 |
-
if self.format == "xml":
|
366 |
-
section = self.data.xpath("//hl7:section[hl7:code/@code='30954-2']", namespaces=self.ns)
|
367 |
-
if not section:
|
368 |
-
return []
|
369 |
-
reports = section[0].xpath(".//hl7:organizer", namespaces=self.ns)
|
370 |
-
result = []
|
371 |
-
for report in reports:
|
372 |
-
start_list = report.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
|
373 |
-
start = start_list[0] if start_list else ""
|
374 |
-
desc_list = report.xpath(".//hl7:code/@displayName", namespaces=self.ns)
|
375 |
-
desc = desc_list[0] if desc_list else ""
|
376 |
-
code_list = report.xpath(".//hl7:code/@code", namespaces=self.ns)
|
377 |
-
code = code_list[0] if code_list else ""
|
378 |
-
result.append({"start": start, "description": desc, "code": code})
|
379 |
-
return result
|
380 |
-
elif self.format == "json":
|
381 |
-
entries = self.data.get("entry", [])
|
382 |
-
result = []
|
383 |
-
for entry in entries:
|
384 |
-
if entry["resource"]["resourceType"] == "DiagnosticReport":
|
385 |
-
report = entry["resource"]
|
386 |
-
start = report.get("effectiveDateTime", "")
|
387 |
-
desc = report.get("code", {}).get("text", "")
|
388 |
-
code = report.get("code", {}).get("coding", [{}])[0].get("code", "")
|
389 |
-
data = report.get("presentedForm", [{}])[0].get("data", "")
|
390 |
-
if data:
|
391 |
-
decoded = base64.b64decode(data).decode('utf-8')
|
392 |
-
result.append({"start": start, "description": desc, "code": code, "content": decoded})
|
393 |
-
else:
|
394 |
-
result.append({"start": start, "description": desc, "code": code})
|
395 |
-
return result
|
396 |
-
|
397 |
-
# Comprehensive Extraction
|
398 |
-
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]:
|
399 |
-
"""Extract all available data for the current patient."""
|
400 |
-
return {
|
401 |
-
"id": self.get_id(),
|
402 |
-
"resource_type": self.get_resource_type(),
|
403 |
-
"meta_last_updated": self.get_meta_last_updated(),
|
404 |
-
"first_name": self.get_first_name(),
|
405 |
-
"last_name": self.get_last_name(),
|
406 |
-
"name_prefix": self.get_name_prefix(),
|
407 |
-
"dob": self.get_dob(),
|
408 |
-
"age": self.get_age(),
|
409 |
-
"gender": self.get_gender(),
|
410 |
-
"address_line": self.get_address_line(),
|
411 |
-
"city": self.get_city(),
|
412 |
-
"state": self.get_state(),
|
413 |
-
"zip_code": self.get_zip_code(),
|
414 |
-
"phone": self.get_phone(),
|
415 |
-
"race": self.get_race(),
|
416 |
-
"ethnicity": self.get_ethnicity(),
|
417 |
-
"language": self.get_language(),
|
418 |
-
"medications": self.get_medications(),
|
419 |
-
"encounters": self.get_encounters(),
|
420 |
-
"conditions": self.get_conditions(),
|
421 |
-
"immunizations": self.get_immunizations(),
|
422 |
-
"diagnostic_reports": self.get_diagnostic_reports()
|
423 |
-
}
|
424 |
-
|
425 |
def get_patient_dict(self) -> Dict[str, str]:
|
426 |
"""Return a dictionary of patient data mapped to discharge form fields."""
|
427 |
data = self.get_all_patient_data()
|
428 |
latest_encounter = data["encounters"][-1] if data["encounters"] else {}
|
429 |
latest_condition = data["conditions"][-1] if data["conditions"] else {}
|
430 |
-
medications_str = "; ".join([m["description"] for m in data["medications"]])
|
431 |
return {
|
|
|
432 |
"first_name": data["first_name"],
|
433 |
"last_name": data["last_name"],
|
434 |
-
"middle_initial": "",
|
435 |
"dob": data["dob"],
|
436 |
"age": data["age"],
|
437 |
"sex": data["gender"],
|
@@ -439,25 +270,37 @@ class PatientDataExtractor:
|
|
439 |
"city": data["city"],
|
440 |
"state": data["state"],
|
441 |
"zip_code": data["zip_code"],
|
442 |
-
"
|
|
|
443 |
"doctor_last_name": "",
|
444 |
-
"doctor_middle_initial": "",
|
445 |
"hospital_name": "",
|
446 |
"doctor_address": "",
|
447 |
"doctor_city": "",
|
448 |
"doctor_state": "",
|
449 |
"doctor_zip": "",
|
450 |
"admission_date": latest_encounter.get("start", ""),
|
451 |
-
"referral_source": "",
|
452 |
-
"admission_method": "",
|
453 |
"discharge_date": latest_encounter.get("end", ""),
|
454 |
-
"discharge_reason": "",
|
455 |
-
"date_of_death": "",
|
456 |
"diagnosis": latest_condition.get("description", ""),
|
457 |
-
"
|
458 |
-
|
459 |
-
|
460 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
461 |
}
|
462 |
|
463 |
def get_all_patients(self) -> List[Dict[str, str]]:
|
@@ -468,8 +311,4 @@ class PatientDataExtractor:
|
|
468 |
self.set_patient_by_index(i)
|
469 |
all_patients.append(self.get_patient_dict())
|
470 |
self.set_patient_by_index(original_idx)
|
471 |
-
return all_patients
|
472 |
-
|
473 |
-
def get_patient_ids(self) -> List[str]:
|
474 |
-
"""Return a list of all patient IDs."""
|
475 |
-
return [self.get_id() for _ in self.patients]
|
|
|
1 |
+
# utils/responseparser.py
|
2 |
import json
|
3 |
import lxml.etree as etree
|
4 |
from datetime import datetime
|
|
|
6 |
import base64
|
7 |
import logging
|
8 |
|
|
|
9 |
logging.basicConfig(
|
10 |
level=logging.INFO,
|
11 |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
42 |
def _extract_patients(self) -> List:
|
43 |
"""Extract all patient entries based on format."""
|
44 |
if self.format == "xml":
|
45 |
+
return [self.data]
|
46 |
elif self.format == "json":
|
47 |
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
|
48 |
raise ValueError("Invalid FHIR Bundle format")
|
|
|
65 |
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns)
|
66 |
return id_list[0] if id_list else ""
|
67 |
elif self.format == "json":
|
|
|
68 |
patient_id = patient.get("id", "")
|
69 |
if patient_id:
|
70 |
return patient_id
|
|
|
71 |
identifiers = patient.get("identifier", [])
|
72 |
for identifier in identifiers:
|
73 |
+
if identifier.get("value"):
|
74 |
return identifier["value"]
|
75 |
+
return ""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
76 |
|
|
|
77 |
def get_first_name(self) -> str:
|
78 |
patient = self._get_current_patient()
|
79 |
if self.format == "xml":
|
|
|
96 |
return name["family"]
|
97 |
return ""
|
98 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
99 |
def get_dob(self) -> str:
|
100 |
patient = self._get_current_patient()
|
101 |
if self.format == "xml":
|
|
|
109 |
if not dob:
|
110 |
return ""
|
111 |
try:
|
112 |
+
birth_date = datetime.strptime(dob[:8], "%Y%m%d") if len(dob) >= 8 else datetime.strptime(dob, "%Y-%m-%d")
|
113 |
today = datetime.now()
|
114 |
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
|
115 |
return str(age)
|
|
|
124 |
elif self.format == "json":
|
125 |
return patient.get("gender", "").capitalize()
|
126 |
|
|
|
127 |
def get_address_line(self) -> str:
|
128 |
patient = self._get_current_patient()
|
129 |
if self.format == "xml":
|
|
|
160 |
addresses = patient.get("address", [])
|
161 |
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
|
162 |
|
|
|
163 |
def get_phone(self) -> str:
|
164 |
patient = self._get_current_patient()
|
165 |
if self.format == "xml":
|
|
|
171 |
return telecom.get("value", "")
|
172 |
return ""
|
173 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
174 |
def get_medications(self) -> List[Dict[str, str]]:
|
175 |
if self.format == "xml":
|
176 |
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns)
|
|
|
202 |
result.append({"start": start, "stop": stop, "description": desc, "code": code})
|
203 |
return result
|
204 |
|
|
|
205 |
def get_encounters(self) -> List[Dict[str, str]]:
|
206 |
if self.format == "xml":
|
207 |
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns)
|
|
|
225 |
result.append({"start": start, "end": end, "description": desc, "code": code})
|
226 |
return result
|
227 |
|
|
|
228 |
def get_conditions(self) -> List[Dict[str, str]]:
|
229 |
if self.format == "xml":
|
230 |
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns)
|
|
|
253 |
result.append({"onset": onset, "description": desc, "code": code})
|
254 |
return result
|
255 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
256 |
def get_patient_dict(self) -> Dict[str, str]:
|
257 |
"""Return a dictionary of patient data mapped to discharge form fields."""
|
258 |
data = self.get_all_patient_data()
|
259 |
latest_encounter = data["encounters"][-1] if data["encounters"] else {}
|
260 |
latest_condition = data["conditions"][-1] if data["conditions"] else {}
|
261 |
+
medications_str = "; ".join([m["description"] for m in data["medications"] if m["description"]])
|
262 |
return {
|
263 |
+
"id": data["id"],
|
264 |
"first_name": data["first_name"],
|
265 |
"last_name": data["last_name"],
|
|
|
266 |
"dob": data["dob"],
|
267 |
"age": data["age"],
|
268 |
"sex": data["gender"],
|
|
|
270 |
"city": data["city"],
|
271 |
"state": data["state"],
|
272 |
"zip_code": data["zip_code"],
|
273 |
+
"phone": data["phone"],
|
274 |
+
"doctor_first_name": "", # Could be extracted from Practitioner resource if available
|
275 |
"doctor_last_name": "",
|
|
|
276 |
"hospital_name": "",
|
277 |
"doctor_address": "",
|
278 |
"doctor_city": "",
|
279 |
"doctor_state": "",
|
280 |
"doctor_zip": "",
|
281 |
"admission_date": latest_encounter.get("start", ""),
|
|
|
|
|
282 |
"discharge_date": latest_encounter.get("end", ""),
|
|
|
|
|
283 |
"diagnosis": latest_condition.get("description", ""),
|
284 |
+
"medications": medications_str if medications_str else "None specified",
|
285 |
+
}
|
286 |
+
|
287 |
+
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]:
|
288 |
+
"""Extract all available data for the current patient."""
|
289 |
+
return {
|
290 |
+
"id": self.get_id(),
|
291 |
+
"first_name": self.get_first_name(),
|
292 |
+
"last_name": self.get_last_name(),
|
293 |
+
"dob": self.get_dob(),
|
294 |
+
"age": self.get_age(),
|
295 |
+
"gender": self.get_gender(),
|
296 |
+
"address_line": self.get_address_line(),
|
297 |
+
"city": self.get_city(),
|
298 |
+
"state": self.get_state(),
|
299 |
+
"zip_code": self.get_zip_code(),
|
300 |
+
"phone": self.get_phone(),
|
301 |
+
"medications": self.get_medications(),
|
302 |
+
"encounters": self.get_encounters(),
|
303 |
+
"conditions": self.get_conditions(),
|
304 |
}
|
305 |
|
306 |
def get_all_patients(self) -> List[Dict[str, str]]:
|
|
|
311 |
self.set_patient_by_index(i)
|
312 |
all_patients.append(self.get_patient_dict())
|
313 |
self.set_patient_by_index(original_idx)
|
314 |
+
return all_patients
|
|
|
|
|
|
|
|