import streamlit as st
from Bio import pairwise2
from Bio.Seq import Seq
import re
from collections import defaultdict
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
# -------------------------------------------------
# 1. Define important gene regions and their associated resistance patterns
# -------------------------------------------------
'rpoB': {
'start': 759807,
'end': 763325,
'description': 'RNA polymerase β subunit',
'drug': 'Rifampicin',
'mutations': {
# Example: codon 531: from S -> L
'531': {'from': 'S', 'to': ['L'], 'freq': 'High', 'confidence': 'High'},
'526': {'from': 'H', 'to': ['Y', 'D', 'R'], 'freq': 'High', 'confidence': 'High'},
'516': {'from': 'D', 'to': ['V', 'G'], 'freq': 'Moderate', 'confidence': 'High'},
'511': {'from': 'L', 'to': ['P'], 'freq': 'Low', 'confidence': 'Moderate'}
'katG': {
'start': 2153889,
'end': 2156111,
'description': 'Catalase-peroxidase',
'drug': 'Isoniazid',
'mutations': {
'315': {'from': 'S', 'to': ['T', 'N'], 'freq': 'High', 'confidence': 'High'},
'463': {'from': 'R', 'to': ['L'], 'freq': 'Moderate', 'confidence': 'Moderate'}
'inhA': {
'start': 1674202,
'end': 1675011,
'description': 'Enoyl-ACP reductase',
'drug': 'Isoniazid/Ethionamide',
'mutations': {
# Negative positions typically refer to promoter/regulatory sites. Compare nucleotides directly.
'-15': {'from': 'C', 'to': ['T'], 'freq': 'High', 'confidence': 'High'},
'94': {'from': 'S', 'to': ['A'], 'freq': 'Moderate', 'confidence': 'High'}
'gyrA': {
'start': 7302,
'end': 9818,
'description': 'DNA gyrase subunit A',
'drug': 'Fluoroquinolones',
'mutations': {
'90': {'from': 'A', 'to': ['V'], 'freq': 'High', 'confidence': 'High'},
'94': {'from': 'D', 'to': ['G', 'A', 'N'], 'freq': 'High', 'confidence': 'High'}
# -------------------------------------------------
# 2. File reading functions
# -------------------------------------------------
def read_fasta_file(file_path):
"""Read a FASTA file from disk"""
with open(file_path, 'r') as handle:
content = handle.read().strip()
parts = content.split('\n', 1)
sequence = ''.join(parts[1].split('\n')).replace(' ', '')
return sequence.upper()
except Exception as e:
st.error(f"Error reading file {file_path}: {str(e)}")
return None
def read_fasta_from_upload(uploaded_file):
"""Read a FASTA file from Streamlit upload"""
content = uploaded_file.getvalue().decode('utf-8').strip()
parts = content.split('\n', 1)
sequence = ''.join(parts[1].split('\n')).replace(' ', '')
return sequence.upper()
except Exception as e:
st.error(f"Error reading uploaded file: {str(e)}")
return None
# -------------------------------------------------
# 3. Region extraction function
# -------------------------------------------------
def extract_gene_region(genome_seq, gene_start, gene_end):
"""Extract a gene region with additional 200bp on each side for alignment context."""
flank = 200
start = max(0, gene_start - flank)
end = min(len(genome_seq), gene_end + flank)
extracted_seq = genome_seq[start:end]
st.write(f"Extracted sequence length: {len(extracted_seq)}bp (for region {gene_start}-{gene_end})")
return extracted_seq, start
except Exception as e:
st.error(f"Error extracting gene region: {str(e)}")
return None, None
# -------------------------------------------------
# 4. Codon-level extraction from aligned sequences
# -------------------------------------------------
def extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset):
Convert the nucleotide alignment into a list of codon diffs (ref_aa, query_aa, codon_number).
We skip codons that have a gap in the reference, because we can’t reliably translate them.
codon_list = []
real_pos = 0 # tracks how many non-gap reference bases we've seen
ref_codon = []
query_codon = []
for i in range(len(ref_aligned)):
ref_base = ref_aligned[i]
query_base = query_aligned[i]
# Only increment real_pos if the reference base is not a gap
if ref_base != '-':
real_pos += 1
query_codon.append(query_base if query_base != '-' else 'N') # 'N' for missing
# Once we have 3 bases for the reference, translate
if len(ref_codon) == 3:
# Example: If real_pos is 3, that means we just completed codon #1 for this region, etc.
codon_start_pos = offset + (real_pos - 3) # The first base of this codon in genome coords
# Check if at least part of this codon is in the gene boundaries
# Typically we want the entire codon to be within gene_start..gene_end
if (codon_start_pos >= gene_start) and (codon_start_pos + 2 <= gene_end):
ref_aa = str(Seq(''.join(ref_codon)).translate())
query_aa = str(Seq(''.join(query_codon)).translate())
# codon_number in the gene
gene_nt_pos = codon_start_pos - gene_start + 1 # nucleotide offset into the gene
# e.g., if gene_nt_pos is 1..3 => codon_number = 1, if 4..6 => codon_number = 2, etc.
codon_number = (gene_nt_pos - 1) // 3 + 1
if ref_aa != query_aa:
'codon_number': codon_number,
'ref_aa': ref_aa,
'query_aa': query_aa
# Reset for the next codon
ref_codon = []
query_codon = []
return codon_list
# -------------------------------------------------
# 5. Find both codon-level and promoter-level mutations
# -------------------------------------------------
def find_mutations_with_context(ref_seq, query_seq, gene_start, gene_end, offset=0):
1) Align the nucleotide sequences for the gene region.
2) Extract codon-level amino-acid differences for coding changes.
3) Identify direct nucleotide changes for promoter or negative positions (like -15).
# Align the two nucleotide sequences
alignments = pairwise2.align.globalms(ref_seq, query_seq, match=2, mismatch=-3, open=-10, extend=-0.5)
if not alignments:
st.warning("No alignments found")
return {'codon_diffs': [], 'nt_diffs': []}
# Take the best-scoring alignment
alignment = alignments[0]
ref_aligned, query_aligned = alignment[0], alignment[1]
# 1) Extract codon-level diffs
codon_diffs = extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset)
# 2) Identify direct nucleotide differences for negative or regulatory positions
# We only care about positions that are outside the coding region or specifically listed as negative
nt_diffs = []
ref_pos = 0 # tracks real position in reference
for i in range(len(ref_aligned)):
ref_base = ref_aligned[i]
query_base = query_aligned[i]
# only increment ref_pos if ref_base isn't a gap
if ref_base != '-':
ref_pos += 1
actual_genome_pos = offset + ref_pos # actual coordinate in entire genome
# Check if there's a mismatch
if ref_base != query_base and (query_base != '-'):
# If the position is < gene_start, it might be negative or promoter region
# Or if the position is > gene_end, it might be some flanking region
# We'll store it, and 'analyze_resistance' can figure out if it's relevant
if actual_genome_pos < gene_start or actual_genome_pos > gene_end:
# It's outside the coding region
'genome_pos': actual_genome_pos,
'ref_base': ref_base,
'query_base': query_base
# Even if it's inside the gene, it might be an in-frame insertion or something
# not forming a complete codon in the reference. We'll store it anyway.
'genome_pos': actual_genome_pos,
'ref_base': ref_base,
'query_base': query_base
return {
'codon_diffs': codon_diffs,
'nt_diffs': nt_diffs
except Exception as e:
st.error(f"Error in mutation analysis: {str(e)}")
return {'codon_diffs': [], 'nt_diffs': []}
# -------------------------------------------------
# 6. Analyze the found mutations for known resistance patterns
# -------------------------------------------------
def analyze_resistance(mutation_data, gene_info):
"""Analyze codon-level amino-acid diffs and any direct nucleotide diffs for known patterns."""
codon_diffs = mutation_data['codon_diffs'] # list of {codon_number, ref_aa, query_aa}
nt_diffs = mutation_data['nt_diffs'] # list of {genome_pos, ref_base, query_base}
resistance_found = []
# We need to parse the dictionary keys in gene_info['mutations'] (they can be negative or numeric)
for key_str, pattern in gene_info['mutations'].items():
key_val = int(key_str)
except ValueError:
# Should never happen if the dictionary is consistent, but just in case
# If key_val > 0 => it's a codon-based mutation (like 531 for rpoB).
# If key_val <= 0 => it's a nucleotide-based mutation in promoter or upstream region (like -15).
if key_val > 0:
# Codon-based
for diff in codon_diffs:
if diff['codon_number'] == key_val:
# e.g. pattern['from'] = 'S', pattern['to'] = ['L']
if diff['ref_aa'] == pattern['from'] and diff['query_aa'] in pattern['to']:
'position': key_str,
'change': f"{pattern['from']}{key_str}{diff['query_aa']}",
'frequency': pattern['freq'],
'confidence': pattern['confidence']
# Nucleotide-based (promoter or upstream).
# We need to find an nt_diff at that offset from the gene_start.
# e.g. -15 => actual genome position = gene_start + (-15)
promoter_genome_pos = gene_info['start'] + key_val
for diff in nt_diffs:
if diff['genome_pos'] == promoter_genome_pos:
# Check if ref_base = pattern['from'], query_base in pattern['to']
if diff['ref_base'] == pattern['from'] and diff['query_base'] in pattern['to']:
'position': key_str,
'change': f"{pattern['from']}{key_str}{diff['query_base']}",
'frequency': pattern['freq'],
'confidence': pattern['confidence']
return resistance_found
# -------------------------------------------------
# 7. Main Streamlit App
# -------------------------------------------------
def main():
st.title("M. tuberculosis Drug Resistance Analysis - FIXED VERSION")
### Automated Drug Resistance Analysis Tool
Upload your query genome (clinical isolate) in FASTA format for comparison with H37Rv reference.
**Note**: This version correctly checks *codon-based* amino-acid mutations (e.g., rpoB S531L)
and *nucleotide-based* promoter mutations (e.g., inhA -15C>T).
# Debug mode toggle
debug_mode = st.checkbox("Enable debug mode")
# Load reference genome
ref_genome = read_fasta_file("NC_000962.3.fasta")
if ref_genome:
st.success(f"Reference genome loaded successfully (length: {len(ref_genome)}bp)")
st.error("Failed to load reference genome")
query_file = st.file_uploader("Upload Query Genome (FASTA)", type=['fasta', 'fa'])
if query_file and st.button("Analyze Drug Resistance"):
query_genome = read_fasta_from_upload(query_file)
if query_genome:
st.success(f"Query genome loaded successfully (length: {len(query_genome)}bp)")
# Analysis progress tracking
progress_bar = st.progress(0)
status_text = st.empty()
# Store all results
all_results = {}
# Analyze each gene
for i, (gene, info) in enumerate(RESISTANCE_GENES.items()):
status_text.text(f"Analyzing {gene} ({info['drug']})...")
progress_bar.progress((i + 1) / len(RESISTANCE_GENES))
if debug_mode:
st.subheader(f"Analyzing {gene}")
st.write(f"Gene region: {info['start']}-{info['end']}")
# Extract regions
ref_region, ref_start = extract_gene_region(ref_genome, info['start'], info['end'])
query_region, _ = extract_gene_region(query_genome, info['start'], info['end'])
if ref_region and query_region:
# Find mutations (codon-level + any promoter-level)
mutation_data = find_mutations_with_context(
ref_region, query_region,
info['start'], info['end'],
# Analyze resistance
resistance = analyze_resistance(mutation_data, info)
all_results[gene] = {
'mutation_data': mutation_data,
'resistance': resistance
if debug_mode:
st.write(f"Codon-level differences: {len(mutation_data['codon_diffs'])}")
st.write(f"Nucleotide-level differences: {len(mutation_data['nt_diffs'])}")
st.write(f"Identified {len(resistance)} resistance patterns")
st.error(f"Failed to analyze {gene}")
# Clear progress indicators
# Display Results
st.header("Analysis Results")
# Show results for each gene
for gene, results in all_results.items():
st.subheader(f"{gene} Analysis")
st.write(f"Drug: {info['drug']}")
num_codon_diffs = len(results['mutation_data']['codon_diffs'])
num_nt_diffs = len(results['mutation_data']['nt_diffs'])
st.write(f"Total codon-level differences found: {num_codon_diffs}")
st.write(f"Total nucleotide-level differences found: {num_nt_diffs}")
if results['resistance']:
st.warning(f"Potential resistance mutations found in {gene}")
resistance_df = pd.DataFrame(results['resistance'])
st.info(f"No known resistance mutations found in {gene}")
# Download complete results
if st.button("Download Complete Analysis"):
# Create detailed report DataFrame
report_data = []
for gene, results in all_results.items():
# Store codon diffs
for diff in results['mutation_data']['codon_diffs']:
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Codon_diff',
# Store nt diffs
for diff in results['mutation_data']['nt_diffs']:
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Nucleotide_diff',
# Store recognized resistance mutations
for res in results['resistance']:
'Gene': gene,
'Drug': RESISTANCE_GENES[gene]['drug'],
'Type': 'Resistance',
report_df = pd.DataFrame(report_data)
csv = report_df.to_csv(index=False)
"Download Full Report (CSV)",
# Entry point
if __name__ == "__main__":