|
import streamlit as st |
|
from Bio import pairwise2 |
|
from Bio.Seq import Seq |
|
import re |
|
from collections import defaultdict |
|
import pandas as pd |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
|
|
|
|
|
|
|
|
RESISTANCE_GENES = { |
|
'rpoB': { |
|
'start': 759807, |
|
'end': 763325, |
|
'description': 'RNA polymerase β subunit', |
|
'drug': 'Rifampicin', |
|
'mutations': { |
|
|
|
'531': {'from': 'S', 'to': ['L'], 'freq': 'High', 'confidence': 'High'}, |
|
'526': {'from': 'H', 'to': ['Y', 'D', 'R'], 'freq': 'High', 'confidence': 'High'}, |
|
'516': {'from': 'D', 'to': ['V', 'G'], 'freq': 'Moderate', 'confidence': 'High'}, |
|
'511': {'from': 'L', 'to': ['P'], 'freq': 'Low', 'confidence': 'Moderate'} |
|
} |
|
}, |
|
'katG': { |
|
'start': 2153889, |
|
'end': 2156111, |
|
'description': 'Catalase-peroxidase', |
|
'drug': 'Isoniazid', |
|
'mutations': { |
|
'315': {'from': 'S', 'to': ['T', 'N'], 'freq': 'High', 'confidence': 'High'}, |
|
'463': {'from': 'R', 'to': ['L'], 'freq': 'Moderate', 'confidence': 'Moderate'} |
|
} |
|
}, |
|
'inhA': { |
|
'start': 1674202, |
|
'end': 1675011, |
|
'description': 'Enoyl-ACP reductase', |
|
'drug': 'Isoniazid/Ethionamide', |
|
'mutations': { |
|
|
|
'-15': {'from': 'C', 'to': ['T'], 'freq': 'High', 'confidence': 'High'}, |
|
'94': {'from': 'S', 'to': ['A'], 'freq': 'Moderate', 'confidence': 'High'} |
|
} |
|
}, |
|
'gyrA': { |
|
'start': 7302, |
|
'end': 9818, |
|
'description': 'DNA gyrase subunit A', |
|
'drug': 'Fluoroquinolones', |
|
'mutations': { |
|
'90': {'from': 'A', 'to': ['V'], 'freq': 'High', 'confidence': 'High'}, |
|
'94': {'from': 'D', 'to': ['G', 'A', 'N'], 'freq': 'High', 'confidence': 'High'} |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
def read_fasta_file(file_path): |
|
"""Read a FASTA file from disk""" |
|
try: |
|
with open(file_path, 'r') as handle: |
|
content = handle.read().strip() |
|
parts = content.split('\n', 1) |
|
sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
|
return sequence.upper() |
|
except Exception as e: |
|
st.error(f"Error reading file {file_path}: {str(e)}") |
|
return None |
|
|
|
def read_fasta_from_upload(uploaded_file): |
|
"""Read a FASTA file from Streamlit upload""" |
|
try: |
|
content = uploaded_file.getvalue().decode('utf-8').strip() |
|
parts = content.split('\n', 1) |
|
sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
|
return sequence.upper() |
|
except Exception as e: |
|
st.error(f"Error reading uploaded file: {str(e)}") |
|
return None |
|
|
|
|
|
|
|
|
|
def extract_gene_region(genome_seq, gene_start, gene_end): |
|
"""Extract a gene region with additional 200bp on each side for alignment context.""" |
|
try: |
|
flank = 200 |
|
start = max(0, gene_start - flank) |
|
end = min(len(genome_seq), gene_end + flank) |
|
extracted_seq = genome_seq[start:end] |
|
st.write(f"Extracted sequence length: {len(extracted_seq)}bp (for region {gene_start}-{gene_end})") |
|
return extracted_seq, start |
|
except Exception as e: |
|
st.error(f"Error extracting gene region: {str(e)}") |
|
return None, None |
|
|
|
|
|
|
|
|
|
def extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset): |
|
""" |
|
Convert the nucleotide alignment into a list of codon diffs (ref_aa, query_aa, codon_number). |
|
We skip codons that have a gap in the reference, because we can’t reliably translate them. |
|
""" |
|
codon_list = [] |
|
real_pos = 0 |
|
|
|
ref_codon = [] |
|
query_codon = [] |
|
|
|
for i in range(len(ref_aligned)): |
|
ref_base = ref_aligned[i] |
|
query_base = query_aligned[i] |
|
|
|
|
|
if ref_base != '-': |
|
real_pos += 1 |
|
ref_codon.append(ref_base) |
|
query_codon.append(query_base if query_base != '-' else 'N') |
|
|
|
|
|
if len(ref_codon) == 3: |
|
|
|
codon_start_pos = offset + (real_pos - 3) |
|
|
|
|
|
|
|
if (codon_start_pos >= gene_start) and (codon_start_pos + 2 <= gene_end): |
|
ref_aa = str(Seq(''.join(ref_codon)).translate()) |
|
query_aa = str(Seq(''.join(query_codon)).translate()) |
|
|
|
|
|
gene_nt_pos = codon_start_pos - gene_start + 1 |
|
|
|
codon_number = (gene_nt_pos - 1) // 3 + 1 |
|
|
|
if ref_aa != query_aa: |
|
codon_list.append({ |
|
'codon_number': codon_number, |
|
'ref_aa': ref_aa, |
|
'query_aa': query_aa |
|
}) |
|
|
|
|
|
ref_codon = [] |
|
query_codon = [] |
|
|
|
return codon_list |
|
|
|
|
|
|
|
|
|
def find_mutations_with_context(ref_seq, query_seq, gene_start, gene_end, offset=0): |
|
""" |
|
1) Align the nucleotide sequences for the gene region. |
|
2) Extract codon-level amino-acid differences for coding changes. |
|
3) Identify direct nucleotide changes for promoter or negative positions (like -15). |
|
""" |
|
try: |
|
|
|
alignments = pairwise2.align.globalms(ref_seq, query_seq, match=2, mismatch=-3, open=-10, extend=-0.5) |
|
|
|
if not alignments: |
|
st.warning("No alignments found") |
|
return {'codon_diffs': [], 'nt_diffs': []} |
|
|
|
|
|
alignment = alignments[0] |
|
ref_aligned, query_aligned = alignment[0], alignment[1] |
|
|
|
|
|
codon_diffs = extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset) |
|
|
|
|
|
|
|
nt_diffs = [] |
|
ref_pos = 0 |
|
for i in range(len(ref_aligned)): |
|
ref_base = ref_aligned[i] |
|
query_base = query_aligned[i] |
|
|
|
|
|
if ref_base != '-': |
|
ref_pos += 1 |
|
actual_genome_pos = offset + ref_pos |
|
|
|
|
|
if ref_base != query_base and (query_base != '-'): |
|
|
|
|
|
|
|
if actual_genome_pos < gene_start or actual_genome_pos > gene_end: |
|
|
|
nt_diffs.append({ |
|
'genome_pos': actual_genome_pos, |
|
'ref_base': ref_base, |
|
'query_base': query_base |
|
}) |
|
else: |
|
|
|
|
|
nt_diffs.append({ |
|
'genome_pos': actual_genome_pos, |
|
'ref_base': ref_base, |
|
'query_base': query_base |
|
}) |
|
|
|
return { |
|
'codon_diffs': codon_diffs, |
|
'nt_diffs': nt_diffs |
|
} |
|
except Exception as e: |
|
st.error(f"Error in mutation analysis: {str(e)}") |
|
return {'codon_diffs': [], 'nt_diffs': []} |
|
|
|
|
|
|
|
|
|
def analyze_resistance(mutation_data, gene_info): |
|
"""Analyze codon-level amino-acid diffs and any direct nucleotide diffs for known patterns.""" |
|
codon_diffs = mutation_data['codon_diffs'] |
|
nt_diffs = mutation_data['nt_diffs'] |
|
|
|
resistance_found = [] |
|
|
|
|
|
for key_str, pattern in gene_info['mutations'].items(): |
|
try: |
|
key_val = int(key_str) |
|
except ValueError: |
|
|
|
continue |
|
|
|
|
|
|
|
if key_val > 0: |
|
|
|
for diff in codon_diffs: |
|
if diff['codon_number'] == key_val: |
|
|
|
if diff['ref_aa'] == pattern['from'] and diff['query_aa'] in pattern['to']: |
|
resistance_found.append({ |
|
'position': key_str, |
|
'change': f"{pattern['from']}{key_str}{diff['query_aa']}", |
|
'frequency': pattern['freq'], |
|
'confidence': pattern['confidence'] |
|
}) |
|
else: |
|
|
|
|
|
|
|
promoter_genome_pos = gene_info['start'] + key_val |
|
for diff in nt_diffs: |
|
if diff['genome_pos'] == promoter_genome_pos: |
|
|
|
if diff['ref_base'] == pattern['from'] and diff['query_base'] in pattern['to']: |
|
resistance_found.append({ |
|
'position': key_str, |
|
'change': f"{pattern['from']}{key_str}{diff['query_base']}", |
|
'frequency': pattern['freq'], |
|
'confidence': pattern['confidence'] |
|
}) |
|
|
|
return resistance_found |
|
|
|
|
|
|
|
|
|
def main(): |
|
st.title("M. tuberculosis Drug Resistance Analysis - FIXED VERSION") |
|
|
|
st.markdown(""" |
|
### Automated Drug Resistance Analysis Tool |
|
Upload your query genome (clinical isolate) in FASTA format for comparison with H37Rv reference. |
|
|
|
**Note**: This version correctly checks *codon-based* amino-acid mutations (e.g., rpoB S531L) |
|
and *nucleotide-based* promoter mutations (e.g., inhA -15C>T). |
|
""") |
|
|
|
|
|
debug_mode = st.checkbox("Enable debug mode") |
|
|
|
|
|
ref_genome = read_fasta_file("NC_000962.3.fasta") |
|
if ref_genome: |
|
st.success(f"Reference genome loaded successfully (length: {len(ref_genome)}bp)") |
|
else: |
|
st.error("Failed to load reference genome") |
|
return |
|
|
|
query_file = st.file_uploader("Upload Query Genome (FASTA)", type=['fasta', 'fa']) |
|
|
|
if query_file and st.button("Analyze Drug Resistance"): |
|
query_genome = read_fasta_from_upload(query_file) |
|
if query_genome: |
|
st.success(f"Query genome loaded successfully (length: {len(query_genome)}bp)") |
|
|
|
|
|
progress_bar = st.progress(0) |
|
status_text = st.empty() |
|
|
|
|
|
all_results = {} |
|
|
|
|
|
for i, (gene, info) in enumerate(RESISTANCE_GENES.items()): |
|
status_text.text(f"Analyzing {gene} ({info['drug']})...") |
|
progress_bar.progress((i + 1) / len(RESISTANCE_GENES)) |
|
|
|
if debug_mode: |
|
st.subheader(f"Analyzing {gene}") |
|
st.write(f"Gene region: {info['start']}-{info['end']}") |
|
|
|
|
|
ref_region, ref_start = extract_gene_region(ref_genome, info['start'], info['end']) |
|
query_region, _ = extract_gene_region(query_genome, info['start'], info['end']) |
|
|
|
if ref_region and query_region: |
|
|
|
mutation_data = find_mutations_with_context( |
|
ref_region, query_region, |
|
info['start'], info['end'], |
|
ref_start |
|
) |
|
|
|
|
|
resistance = analyze_resistance(mutation_data, info) |
|
|
|
all_results[gene] = { |
|
'mutation_data': mutation_data, |
|
'resistance': resistance |
|
} |
|
|
|
if debug_mode: |
|
st.write(f"Codon-level differences: {len(mutation_data['codon_diffs'])}") |
|
st.write(mutation_data['codon_diffs']) |
|
st.write(f"Nucleotide-level differences: {len(mutation_data['nt_diffs'])}") |
|
st.write(mutation_data['nt_diffs']) |
|
|
|
st.write(f"Identified {len(resistance)} resistance patterns") |
|
else: |
|
st.error(f"Failed to analyze {gene}") |
|
|
|
|
|
progress_bar.empty() |
|
status_text.empty() |
|
|
|
|
|
st.header("Analysis Results") |
|
|
|
|
|
for gene, results in all_results.items(): |
|
st.subheader(f"{gene} Analysis") |
|
info = RESISTANCE_GENES[gene] |
|
|
|
st.write(f"Drug: {info['drug']}") |
|
|
|
num_codon_diffs = len(results['mutation_data']['codon_diffs']) |
|
num_nt_diffs = len(results['mutation_data']['nt_diffs']) |
|
st.write(f"Total codon-level differences found: {num_codon_diffs}") |
|
st.write(f"Total nucleotide-level differences found: {num_nt_diffs}") |
|
|
|
if results['resistance']: |
|
st.warning(f"Potential resistance mutations found in {gene}") |
|
resistance_df = pd.DataFrame(results['resistance']) |
|
st.dataframe(resistance_df) |
|
else: |
|
st.info(f"No known resistance mutations found in {gene}") |
|
|
|
|
|
if st.button("Download Complete Analysis"): |
|
|
|
report_data = [] |
|
for gene, results in all_results.items(): |
|
|
|
for diff in results['mutation_data']['codon_diffs']: |
|
report_data.append({ |
|
'Gene': gene, |
|
'Drug': RESISTANCE_GENES[gene]['drug'], |
|
'Type': 'Codon_diff', |
|
**diff |
|
}) |
|
|
|
for diff in results['mutation_data']['nt_diffs']: |
|
report_data.append({ |
|
'Gene': gene, |
|
'Drug': RESISTANCE_GENES[gene]['drug'], |
|
'Type': 'Nucleotide_diff', |
|
**diff |
|
}) |
|
|
|
for res in results['resistance']: |
|
report_data.append({ |
|
'Gene': gene, |
|
'Drug': RESISTANCE_GENES[gene]['drug'], |
|
'Type': 'Resistance', |
|
**res |
|
}) |
|
|
|
report_df = pd.DataFrame(report_data) |
|
csv = report_df.to_csv(index=False) |
|
st.download_button( |
|
"Download Full Report (CSV)", |
|
csv, |
|
"mtb_analysis_report_fixed.csv", |
|
"text/csv" |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|