|
import json |
|
import lxml.etree as etree |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Union |
|
import base64 |
|
|
|
class PatientDataExtractor: |
|
"""Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML).""" |
|
|
|
def __init__(self, patient_data: str, format_type: str = None): |
|
"""Initialize with patient data and optional format type.""" |
|
self.format = format_type.lower() if format_type else self._detect_format(patient_data) |
|
if self.format == "xml": |
|
self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data |
|
self.ns = {'hl7': 'urn:hl7-org:v3'} |
|
elif self.format == "json": |
|
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data |
|
else: |
|
raise ValueError("Unsupported format. Use 'xml' or 'json'") |
|
|
|
self.patients = self._extract_patients() |
|
self.current_patient_idx = 0 |
|
|
|
def _detect_format(self, data: str) -> str: |
|
"""Detect the format of the input data.""" |
|
if isinstance(data, str): |
|
data = data.strip() |
|
if data.startswith('<'): |
|
return 'xml' |
|
elif data.startswith('{') or data.startswith('['): |
|
return 'json' |
|
raise ValueError("Cannot determine data format") |
|
|
|
def _extract_patients(self) -> List: |
|
"""Extract all patient entries based on format.""" |
|
if self.format == "xml": |
|
return [self.data] |
|
elif self.format == "json": |
|
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: |
|
raise ValueError("Invalid FHIR Bundle format") |
|
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] |
|
|
|
def set_patient_by_index(self, index: int) -> bool: |
|
"""Set the current patient by index.""" |
|
if 0 <= index < len(self.patients): |
|
self.current_patient_idx = index |
|
return True |
|
return False |
|
|
|
def _get_current_patient(self): |
|
"""Get the currently selected patient resource.""" |
|
return self.patients[self.current_patient_idx] |
|
|
|
|
|
def get_id(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns) |
|
return id_list[0] if id_list else "" |
|
elif self.format == "json": |
|
return patient.get("id", "") |
|
|
|
def get_resource_type(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
return "ClinicalDocument" |
|
elif self.format == "json": |
|
return patient.get("resourceType", "") |
|
|
|
def get_meta_last_updated(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
time_list = patient.xpath("//hl7:effectiveTime/@value", namespaces=self.ns) |
|
return time_list[0] if time_list else "" |
|
elif self.format == "json": |
|
return patient.get("meta", {}).get("lastUpdated", "") |
|
|
|
|
|
def get_first_name(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns) |
|
return given[0] if given else "" |
|
elif self.format == "json": |
|
for name in patient.get("name", []): |
|
if name.get("use") == "official" and "given" in name: |
|
return name["given"][0] |
|
return "" |
|
|
|
def get_last_name(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns) |
|
return family[0] if family else "" |
|
elif self.format == "json": |
|
for name in patient.get("name", []): |
|
if name.get("use") == "official" and "family" in name: |
|
return name["family"] |
|
return "" |
|
|
|
def get_name_prefix(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
prefix = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:prefix/text()", namespaces=self.ns) |
|
return prefix[0] if prefix else "" |
|
elif self.format == "json": |
|
for name in patient.get("name", []): |
|
if name.get("use") == "official" and "prefix" in name: |
|
return name["prefix"][0] |
|
return "" |
|
|
|
|
|
def get_dob(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns) |
|
return dob[0] if dob else "" |
|
elif self.format == "json": |
|
return patient.get("birthDate", "") |
|
|
|
def get_age(self) -> str: |
|
dob = self.get_dob() |
|
if not dob: |
|
return "" |
|
try: |
|
birth_date = datetime.strptime(dob[:8], "%Y%m%d") |
|
today = datetime.now() |
|
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) |
|
return str(age) |
|
except ValueError: |
|
return "" |
|
|
|
def get_gender(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns) |
|
return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else "" |
|
elif self.format == "json": |
|
return patient.get("gender", "").capitalize() |
|
|
|
|
|
def get_address_line(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns) |
|
return line[0] if line else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" |
|
|
|
def get_city(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns) |
|
return city[0] if city else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["city"] if addresses and "city" in addresses[0] else "" |
|
|
|
def get_state(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns) |
|
return state[0] if state else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["state"] if addresses and "state" in addresses[0] else "" |
|
|
|
def get_zip_code(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns) |
|
return zip[0] if zip else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" |
|
|
|
|
|
def get_phone(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns) |
|
return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else "" |
|
elif self.format == "json": |
|
for telecom in patient.get("telecom", []): |
|
if telecom.get("system") == "phone" and telecom.get("use") == "home": |
|
return telecom.get("value", "") |
|
return "" |
|
|
|
|
|
def get_race(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
race = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:raceCode/@displayName", namespaces=self.ns) |
|
return race[0] if race else "" |
|
elif self.format == "json": |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-race": |
|
for sub_ext in ext.get("extension", []): |
|
if sub_ext.get("url") == "text": |
|
return sub_ext.get("valueString", "") |
|
return "" |
|
|
|
def get_ethnicity(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
ethnicity = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:ethnicGroupCode/@displayName", namespaces=self.ns) |
|
return ethnicity[0] if ethnicity else "" |
|
elif self.format == "json": |
|
for ext in patient.get("extension", []): |
|
if ext.get("url") == "http://hl7.org/fhir/us/core/StructureDefinition/us-core-ethnicity": |
|
for sub_ext in ext.get("extension", []): |
|
if sub_ext.get("url") == "text": |
|
return sub_ext.get("valueString", "") |
|
return "" |
|
|
|
def get_language(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
lang = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:languageCommunication/hl7:languageCode/@code", namespaces=self.ns) |
|
return lang[0] if lang else "" |
|
elif self.format == "json": |
|
comms = patient.get("communication", []) |
|
return comms[0]["language"]["text"] if comms and "language" in comms[0] else "" |
|
|
|
|
|
def get_medications(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) |
|
result = [] |
|
for med in meds: |
|
start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
start = start_list[0] if start_list else "" |
|
stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) |
|
stop = stop_list[0] if stop_list else "" |
|
desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"start": start, "stop": stop, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "MedicationRequest": |
|
med = entry["resource"] |
|
start = med.get("authoredOn", "") |
|
stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "") |
|
desc = med.get("medicationCodeableConcept", {}).get("text", "") |
|
code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "") |
|
result.append({"start": start, "stop": stop, "description": desc, "code": code}) |
|
return result |
|
|
|
|
|
def get_encounters(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns) |
|
if not service: |
|
return [] |
|
start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
start = start_list[0] if start_list else "" |
|
end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) |
|
end = end_list[0] if end_list else "" |
|
return [{"start": start, "end": end, "description": "Patient Care", "code": ""}] |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "Encounter": |
|
enc = entry["resource"] |
|
start = enc.get("period", {}).get("start", "") |
|
end = enc.get("period", {}).get("end", "") |
|
desc = enc.get("type", [{}])[0].get("text", "") |
|
code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "") |
|
result.append({"start": start, "end": end, "description": desc, "code": code}) |
|
return result |
|
|
|
|
|
def get_conditions(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else [] |
|
result = [] |
|
for entry in entries: |
|
onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
onset = onset_list[0] if onset_list else "" |
|
desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"onset": onset, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "Condition": |
|
cond = entry["resource"] |
|
onset = cond.get("onsetDateTime", "") |
|
desc = cond.get("code", {}).get("text", "") |
|
code = cond.get("code", {}).get("coding", [{}])[0].get("code", "") |
|
result.append({"onset": onset, "description": desc, "code": code}) |
|
return result |
|
|
|
|
|
def get_immunizations(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='11369-6']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
immunizations = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) |
|
result = [] |
|
for imm in immunizations: |
|
date_list = imm.xpath(".//hl7:effectiveTime/@value", namespaces=self.ns) |
|
date = date_list[0] if date_list else "" |
|
desc_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = imm.xpath(".//hl7:consumable/hl7:manufacturedProduct/hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"date": date, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "Immunization": |
|
imm = entry["resource"] |
|
date = imm.get("occurrenceDateTime", "") |
|
desc = imm.get("vaccineCode", {}).get("text", "") |
|
code = imm.get("vaccineCode", {}).get("coding", [{}])[0].get("code", "") |
|
result.append({"date": date, "description": desc, "code": code}) |
|
return result |
|
|
|
|
|
def get_diagnostic_reports(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='30954-2']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
reports = section[0].xpath(".//hl7:organizer", namespaces=self.ns) |
|
result = [] |
|
for report in reports: |
|
start_list = report.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
start = start_list[0] if start_list else "" |
|
desc_list = report.xpath(".//hl7:code/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = report.xpath(".//hl7:code/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"start": start, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "DiagnosticReport": |
|
report = entry["resource"] |
|
start = report.get("effectiveDateTime", "") |
|
desc = report.get("code", {}).get("text", "") |
|
code = report.get("code", {}).get("coding", [{}])[0].get("code", "") |
|
data = report.get("presentedForm", [{}])[0].get("data", "") |
|
if data: |
|
decoded = base64.b64decode(data).decode('utf-8') |
|
result.append({"start": start, "description": desc, "code": code, "content": decoded}) |
|
else: |
|
result.append({"start": start, "description": desc, "code": code}) |
|
return result |
|
|
|
|
|
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]: |
|
"""Extract all available data for the current patient.""" |
|
return { |
|
"id": self.get_id(), |
|
"resource_type": self.get_resource_type(), |
|
"meta_last_updated": self.get_meta_last_updated(), |
|
"first_name": self.get_first_name(), |
|
"last_name": self.get_last_name(), |
|
"name_prefix": self.get_name_prefix(), |
|
"dob": self.get_dob(), |
|
"age": self.get_age(), |
|
"gender": self.get_gender(), |
|
"address_line": self.get_address_line(), |
|
"city": self.get_city(), |
|
"state": self.get_state(), |
|
"zip_code": self.get_zip_code(), |
|
"phone": self.get_phone(), |
|
"race": self.get_race(), |
|
"ethnicity": self.get_ethnicity(), |
|
"language": self.get_language(), |
|
"medications": self.get_medications(), |
|
"encounters": self.get_encounters(), |
|
"conditions": self.get_conditions(), |
|
"immunizations": self.get_immunizations(), |
|
"diagnostic_reports": self.get_diagnostic_reports() |
|
} |
|
|
|
def get_patient_dict(self) -> Dict[str, str]: |
|
"""Return a dictionary of patient data mapped to discharge form fields.""" |
|
data = self.get_all_patient_data() |
|
latest_encounter = data["encounters"][-1] if data["encounters"] else {} |
|
latest_condition = data["conditions"][-1] if data["conditions"] else {} |
|
medications_str = "; ".join([m["description"] for m in data["medications"]]) |
|
return { |
|
"first_name": data["first_name"], |
|
"last_name": data["last_name"], |
|
"middle_initial": "", |
|
"dob": data["dob"], |
|
"age": data["age"], |
|
"sex": data["gender"], |
|
"address": data["address_line"], |
|
"city": data["city"], |
|
"state": data["state"], |
|
"zip_code": data["zip_code"], |
|
"doctor_first_name": "", |
|
"doctor_last_name": "", |
|
"doctor_middle_initial": "", |
|
"hospital_name": "", |
|
"doctor_address": "", |
|
"doctor_city": "", |
|
"doctor_state": "", |
|
"doctor_zip": "", |
|
"admission_date": latest_encounter.get("start", ""), |
|
"referral_source": "", |
|
"admission_method": "", |
|
"discharge_date": latest_encounter.get("end", ""), |
|
"discharge_reason": "", |
|
"date_of_death": "", |
|
"diagnosis": latest_condition.get("description", ""), |
|
"procedures": "", |
|
"medications": medications_str, |
|
"preparer_name": "", |
|
"preparer_job_title": "" |
|
} |
|
|
|
def get_all_patients(self) -> List[Dict[str, str]]: |
|
"""Return a list of dictionaries for all patients.""" |
|
original_idx = self.current_patient_idx |
|
all_patients = [] |
|
for i in range(len(self.patients)): |
|
self.set_patient_by_index(i) |
|
all_patients.append(self.get_patient_dict()) |
|
self.set_patient_by_index(original_idx) |
|
return all_patients |
|
|
|
def get_patient_ids(self) -> List[str]: |
|
"""Return a list of all patient IDs.""" |
|
return [self.get_id() for _ in self.patients] |
|
|
|
|