dnaseq / app.py
lyimo's picture
Update app.py
edf285e verified
import streamlit as st
from Bio import pairwise2
from Bio.Seq import Seq
import re
from collections import defaultdict
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
# -------------------------------------------------
# 1. Define important gene regions and their associated resistance patterns
# -------------------------------------------------
RESISTANCE_GENES = {
'rpoB': {
'start': 759807,
'end': 763325,
'description': 'RNA polymerase β subunit',
'drug': 'Rifampicin',
'mutations': {
# Example: codon 531: from S -> L
'531': {'from': 'S', 'to': ['L'], 'freq': 'High', 'confidence': 'High'},
'526': {'from': 'H', 'to': ['Y', 'D', 'R'], 'freq': 'High', 'confidence': 'High'},
'516': {'from': 'D', 'to': ['V', 'G'], 'freq': 'Moderate', 'confidence': 'High'},
'511': {'from': 'L', 'to': ['P'], 'freq': 'Low', 'confidence': 'Moderate'}
}
},
'katG': {
'start': 2153889,
'end': 2156111,
'description': 'Catalase-peroxidase',
'drug': 'Isoniazid',
'mutations': {
'315': {'from': 'S', 'to': ['T', 'N'], 'freq': 'High', 'confidence': 'High'},
'463': {'from': 'R', 'to': ['L'], 'freq': 'Moderate', 'confidence': 'Moderate'}
}
},
'inhA': {
'start': 1674202,
'end': 1675011,
'description': 'Enoyl-ACP reductase',
'drug': 'Isoniazid/Ethionamide',
'mutations': {
# Negative positions typically refer to promoter/regulatory sites. Compare nucleotides directly.
'-15': {'from': 'C', 'to': ['T'], 'freq': 'High', 'confidence': 'High'},
'94': {'from': 'S', 'to': ['A'], 'freq': 'Moderate', 'confidence': 'High'}
}
},
'gyrA': {
'start': 7302,
'end': 9818,
'description': 'DNA gyrase subunit A',
'drug': 'Fluoroquinolones',
'mutations': {
'90': {'from': 'A', 'to': ['V'], 'freq': 'High', 'confidence': 'High'},
'94': {'from': 'D', 'to': ['G', 'A', 'N'], 'freq': 'High', 'confidence': 'High'}
}
}
}
# -------------------------------------------------
# 2. File reading functions
# -------------------------------------------------
def read_fasta_file(file_path):
"""Read a FASTA file from disk"""
try:
with open(file_path, 'r') as handle:
content = handle.read().strip()
parts = content.split('\n', 1)
sequence = ''.join(parts[1].split('\n')).replace(' ', '')
return sequence.upper()
except Exception as e:
st.error(f"Error reading file {file_path}: {str(e)}")
return None
def read_fasta_from_upload(uploaded_file):
"""Read a FASTA file from Streamlit upload"""
try:
content = uploaded_file.getvalue().decode('utf-8').strip()
parts = content.split('\n', 1)
sequence = ''.join(parts[1].split('\n')).replace(' ', '')
return sequence.upper()
except Exception as e:
st.error(f"Error reading uploaded file: {str(e)}")
return None
# -------------------------------------------------
# 3. Region extraction function
# -------------------------------------------------
def extract_gene_region(genome_seq, gene_start, gene_end):
"""Extract a gene region with additional 200bp on each side for alignment context."""
try:
flank = 200
start = max(0, gene_start - flank)
end = min(len(genome_seq), gene_end + flank)
extracted_seq = genome_seq[start:end]
st.write(f"Extracted sequence length: {len(extracted_seq)}bp (for region {gene_start}-{gene_end})")
return extracted_seq, start
except Exception as e:
st.error(f"Error extracting gene region: {str(e)}")
return None, None
# -------------------------------------------------
# 4. Codon-level extraction from aligned sequences
# -------------------------------------------------
def extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset):
"""
Convert the nucleotide alignment into a list of codon diffs (ref_aa, query_aa, codon_number).
We skip codons that have a gap in the reference, because we can’t reliably translate them.
"""
codon_list = []
real_pos = 0 # tracks how many non-gap reference bases we've seen
ref_codon = []
query_codon = []
for i in range(len(ref_aligned)):
ref_base = ref_aligned[i]
query_base = query_aligned[i]
# Only increment real_pos if the reference base is not a gap
if ref_base != '-':
real_pos += 1
ref_codon.append(ref_base)
query_codon.append(query_base if query_base != '-' else 'N') # 'N' for missing
# Once we have 3 bases for the reference, translate
if len(ref_codon) == 3:
# Example: If real_pos is 3, that means we just completed codon #1 for this region, etc.
codon_start_pos = offset + (real_pos - 3) # The first base of this codon in genome coords
# Check if at least part of this codon is in the gene boundaries
# Typically we want the entire codon to be within gene_start..gene_end
if (codon_start_pos >= gene_start) and (codon_start_pos + 2 <= gene_end):
ref_aa = str(Seq(''.join(ref_codon)).translate())
query_aa = str(Seq(''.join(query_codon)).translate())
# codon_number in the gene
gene_nt_pos = codon_start_pos - gene_start + 1 # nucleotide offset into the gene
# e.g., if gene_nt_pos is 1..3 => codon_number = 1, if 4..6 => codon_number = 2, etc.
codon_number = (gene_nt_pos - 1) // 3 + 1
if ref_aa != query_aa:
codon_list.append({
'codon_number': codon_number,
'ref_aa': ref_aa,
'query_aa': query_aa
})
# Reset for the next codon
ref_codon = []
query_codon = []
return codon_list
# -------------------------------------------------
# 5. Find both codon-level and promoter-level mutations
# -------------------------------------------------
def find_mutations_with_context(ref_seq, query_seq, gene_start, gene_end, offset=0):
"""
1) Align the nucleotide sequences for the gene region.
2) Extract codon-level amino-acid differences for coding changes.
3) Identify direct nucleotide changes for promoter or negative positions (like -15).
"""
try:
# Align the two nucleotide sequences
alignments = pairwise2.align.globalms(ref_seq, query_seq, match=2, mismatch=-3, open=-10, extend=-0.5)
if not alignments:
st.warning("No alignments found")
return {'codon_diffs': [], 'nt_diffs': []}
# Take the best-scoring alignment
alignment = alignments[0]
ref_aligned, query_aligned = alignment[0], alignment[1]
# 1) Extract codon-level diffs
codon_diffs = extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset)
# 2) Identify direct nucleotide differences for negative or regulatory positions
# We only care about positions that are outside the coding region or specifically listed as negative
nt_diffs = []
ref_pos = 0 # tracks real position in reference
for i in range(len(ref_aligned)):
ref_base = ref_aligned[i]
query_base = query_aligned[i]
# only increment ref_pos if ref_base isn't a gap
if ref_base != '-':
ref_pos += 1
actual_genome_pos = offset + ref_pos # actual coordinate in entire genome
# Check if there's a mismatch
if ref_base != query_base and (query_base != '-'):
# If the position is < gene_start, it might be negative or promoter region
# Or if the position is > gene_end, it might be some flanking region
# We'll store it, and 'analyze_resistance' can figure out if it's relevant
if actual_genome_pos < gene_start or actual_genome_pos > gene_end:
# It's outside the coding region
nt_diffs.append({
'genome_pos': actual_genome_pos,
'ref_base': ref_base,
'query_base': query_base
})
else:
# Even if it's inside the gene, it might be an in-frame insertion or something
# not forming a complete codon in the reference. We'll store it anyway.
nt_diffs.append({
'genome_pos': actual_genome_pos,
'ref_base': ref_base,
'query_base': query_base
})
return {
'codon_diffs': codon_diffs,
'nt_diffs': nt_diffs
}
except Exception as e:
st.error(f"Error in mutation analysis: {str(e)}")
return {'codon_diffs': [], 'nt_diffs': []}
# -------------------------------------------------
# 6. Analyze the found mutations for known resistance patterns
# -------------------------------------------------
def analyze_resistance(mutation_data, gene_info):
"""Analyze codon-level amino-acid diffs and any direct nucleotide diffs for known patterns."""
codon_diffs = mutation_data['codon_diffs'] # list of {codon_number, ref_aa, query_aa}
nt_diffs = mutation_data['nt_diffs'] # list of {genome_pos, ref_base, query_base}
resistance_found = []
# We need to parse the dictionary keys in gene_info['mutations'] (they can be negative or numeric)
for key_str, pattern in gene_info['mutations'].items():
try:
key_val = int(key_str)
except ValueError:
# Should never happen if the dictionary is consistent, but just in case
continue
# If key_val > 0 => it's a codon-based mutation (like 531 for rpoB).
# If key_val <= 0 => it's a nucleotide-based mutation in promoter or upstream region (like -15).
if key_val > 0:
# Codon-based
for diff in codon_diffs:
if diff['codon_number'] == key_val:
# e.g. pattern['from'] = 'S', pattern['to'] = ['L']
if diff['ref_aa'] == pattern['from'] and diff['query_aa'] in pattern['to']:
resistance_found.append({
'position': key_str,
'change': f"{pattern['from']}{key_str}{diff['query_aa']}",
'frequency': pattern['freq'],
'confidence': pattern['confidence']
})
else:
# Nucleotide-based (promoter or upstream).
# We need to find an nt_diff at that offset from the gene_start.
# e.g. -15 => actual genome position = gene_start + (-15)
promoter_genome_pos = gene_info['start'] + key_val
for diff in nt_diffs:
if diff['genome_pos'] == promoter_genome_pos:
# Check if ref_base = pattern['from'], query_base in pattern['to']
if diff['ref_base'] == pattern['from'] and diff['query_base'] in pattern['to']:
resistance_found.append({
'position': key_str,
'change': f"{pattern['from']}{key_str}{diff['query_base']}",
'frequency': pattern['freq'],
'confidence': pattern['confidence']
})
return resistance_found
# -------------------------------------------------
# 7. Main Streamlit App
# -------------------------------------------------
def main():
st.title("M. tuberculosis Drug Resistance Analysis - FIXED VERSION")
st.markdown("""
### Automated Drug Resistance Analysis Tool
Upload your query genome (clinical isolate) in FASTA format for comparison with H37Rv reference.
**Note**: This version correctly checks *codon-based* amino-acid mutations (e.g., rpoB S531L)
and *nucleotide-based* promoter mutations (e.g., inhA -15C>T).
""")
# Debug mode toggle
debug_mode = st.checkbox("Enable debug mode")
# Load reference genome
ref_genome = read_fasta_file("NC_000962.3.fasta")
if ref_genome:
st.success(f"Reference genome loaded successfully (length: {len(ref_genome)}bp)")
else:
st.error("Failed to load reference genome")
return
query_file = st.file_uploader("Upload Query Genome (FASTA)", type=['fasta', 'fa'])
if query_file and st.button("Analyze Drug Resistance"):
query_genome = read_fasta_from_upload(query_file)
if query_genome:
st.success(f"Query genome loaded successfully (length: {len(query_genome)}bp)")
# Analysis progress tracking
progress_bar = st.progress(0)
status_text = st.empty()
# Store all results
all_results = {}
# Analyze each gene
for i, (gene, info) in enumerate(RESISTANCE_GENES.items()):
status_text.text(f"Analyzing {gene} ({info['drug']})...")
progress_bar.progress((i + 1) / len(RESISTANCE_GENES))
if debug_mode:
st.subheader(f"Analyzing {gene}")
st.write(f"Gene region: {info['start']}-{info['end']}")
# Extract regions
ref_region, ref_start = extract_gene_region(ref_genome, info['start'], info['end'])
query_region, _ = extract_gene_region(query_genome, info['start'], info['end'])
if ref_region and query_region:
# Find mutations (codon-level + any promoter-level)
mutation_data = find_mutations_with_context(
ref_region, query_region,
info['start'], info['end'],
ref_start
)
# Analyze resistance
resistance = analyze_resistance(mutation_data, info)
all_results[gene] = {
'mutation_data': mutation_data,
'resistance': resistance
}
if debug_mode:
st.write(f"Codon-level differences: {len(mutation_data['codon_diffs'])}")
st.write(mutation_data['codon_diffs'])
st.write(f"Nucleotide-level differences: {len(mutation_data['nt_diffs'])}")
st.write(mutation_data['nt_diffs'])
st.write(f"Identified {len(resistance)} resistance patterns")
else:
st.error(f"Failed to analyze {gene}")
# Clear progress indicators
progress_bar.empty()
status_text.empty()
# Display Results
st.header("Analysis Results")
# Show results for each gene
for gene, results in all_results.items():
st.subheader(f"{gene} Analysis")
info = RESISTANCE_GENES[gene]
st.write(f"Drug: {info['drug']}")
num_codon_diffs = len(results['mutation_data']['codon_diffs'])
num_nt_diffs = len(results['mutation_data']['nt_diffs'])
st.write(f"Total codon-level differences found: {num_codon_diffs}")
st.write(f"Total nucleotide-level differences found: {num_nt_diffs}")
if results['resistance']:
st.warning(f"Potential resistance mutations found in {gene}")
resistance_df = pd.DataFrame(results['resistance'])
st.dataframe(resistance_df)
else:
st.info(f"No known resistance mutations found in {gene}")
# Download complete results
if st.button("Download Complete Analysis"):
# Create detailed report DataFrame
report_data = []
for gene, results in all_results.items():
# Store codon diffs
for diff in results['mutation_data']['codon_diffs']:
report_data.append({
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Codon_diff',
**diff
})
# Store nt diffs
for diff in results['mutation_data']['nt_diffs']:
report_data.append({
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Nucleotide_diff',
**diff
})
# Store recognized resistance mutations
for res in results['resistance']:
report_data.append({
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Resistance',
**res
})
report_df = pd.DataFrame(report_data)
csv = report_df.to_csv(index=False)
st.download_button(
"Download Full Report (CSV)",
csv,
"mtb_analysis_report_fixed.csv",
"text/csv"
)
# Entry point
if __name__ == "__main__":
main()