import streamlit as st from Bio import pairwise2 from Bio.Seq import Seq import re from collections import defaultdict import pandas as pd import plotly.express as px import plotly.graph_objects as go # ------------------------------------------------- # 1. Define important gene regions and their associated resistance patterns # ------------------------------------------------- RESISTANCE_GENES = { 'rpoB': { 'start': 759807, 'end': 763325, 'description': 'RNA polymerase β subunit', 'drug': 'Rifampicin', 'mutations': { # Example: codon 531: from S -> L '531': {'from': 'S', 'to': ['L'], 'freq': 'High', 'confidence': 'High'}, '526': {'from': 'H', 'to': ['Y', 'D', 'R'], 'freq': 'High', 'confidence': 'High'}, '516': {'from': 'D', 'to': ['V', 'G'], 'freq': 'Moderate', 'confidence': 'High'}, '511': {'from': 'L', 'to': ['P'], 'freq': 'Low', 'confidence': 'Moderate'} } }, 'katG': { 'start': 2153889, 'end': 2156111, 'description': 'Catalase-peroxidase', 'drug': 'Isoniazid', 'mutations': { '315': {'from': 'S', 'to': ['T', 'N'], 'freq': 'High', 'confidence': 'High'}, '463': {'from': 'R', 'to': ['L'], 'freq': 'Moderate', 'confidence': 'Moderate'} } }, 'inhA': { 'start': 1674202, 'end': 1675011, 'description': 'Enoyl-ACP reductase', 'drug': 'Isoniazid/Ethionamide', 'mutations': { # Negative positions typically refer to promoter/regulatory sites. Compare nucleotides directly. '-15': {'from': 'C', 'to': ['T'], 'freq': 'High', 'confidence': 'High'}, '94': {'from': 'S', 'to': ['A'], 'freq': 'Moderate', 'confidence': 'High'} } }, 'gyrA': { 'start': 7302, 'end': 9818, 'description': 'DNA gyrase subunit A', 'drug': 'Fluoroquinolones', 'mutations': { '90': {'from': 'A', 'to': ['V'], 'freq': 'High', 'confidence': 'High'}, '94': {'from': 'D', 'to': ['G', 'A', 'N'], 'freq': 'High', 'confidence': 'High'} } } } # ------------------------------------------------- # 2. File reading functions # ------------------------------------------------- def read_fasta_file(file_path): """Read a FASTA file from disk""" try: with open(file_path, 'r') as handle: content = handle.read().strip() parts = content.split('\n', 1) sequence = ''.join(parts[1].split('\n')).replace(' ', '') return sequence.upper() except Exception as e: st.error(f"Error reading file {file_path}: {str(e)}") return None def read_fasta_from_upload(uploaded_file): """Read a FASTA file from Streamlit upload""" try: content = uploaded_file.getvalue().decode('utf-8').strip() parts = content.split('\n', 1) sequence = ''.join(parts[1].split('\n')).replace(' ', '') return sequence.upper() except Exception as e: st.error(f"Error reading uploaded file: {str(e)}") return None # ------------------------------------------------- # 3. Region extraction function # ------------------------------------------------- def extract_gene_region(genome_seq, gene_start, gene_end): """Extract a gene region with additional 200bp on each side for alignment context.""" try: flank = 200 start = max(0, gene_start - flank) end = min(len(genome_seq), gene_end + flank) extracted_seq = genome_seq[start:end] st.write(f"Extracted sequence length: {len(extracted_seq)}bp (for region {gene_start}-{gene_end})") return extracted_seq, start except Exception as e: st.error(f"Error extracting gene region: {str(e)}") return None, None # ------------------------------------------------- # 4. Codon-level extraction from aligned sequences # ------------------------------------------------- def extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset): """ Convert the nucleotide alignment into a list of codon diffs (ref_aa, query_aa, codon_number). We skip codons that have a gap in the reference, because we can’t reliably translate them. """ codon_list = [] real_pos = 0 # tracks how many non-gap reference bases we've seen ref_codon = [] query_codon = [] for i in range(len(ref_aligned)): ref_base = ref_aligned[i] query_base = query_aligned[i] # Only increment real_pos if the reference base is not a gap if ref_base != '-': real_pos += 1 ref_codon.append(ref_base) query_codon.append(query_base if query_base != '-' else 'N') # 'N' for missing # Once we have 3 bases for the reference, translate if len(ref_codon) == 3: # Example: If real_pos is 3, that means we just completed codon #1 for this region, etc. codon_start_pos = offset + (real_pos - 3) # The first base of this codon in genome coords # Check if at least part of this codon is in the gene boundaries # Typically we want the entire codon to be within gene_start..gene_end if (codon_start_pos >= gene_start) and (codon_start_pos + 2 <= gene_end): ref_aa = str(Seq(''.join(ref_codon)).translate()) query_aa = str(Seq(''.join(query_codon)).translate()) # codon_number in the gene gene_nt_pos = codon_start_pos - gene_start + 1 # nucleotide offset into the gene # e.g., if gene_nt_pos is 1..3 => codon_number = 1, if 4..6 => codon_number = 2, etc. codon_number = (gene_nt_pos - 1) // 3 + 1 if ref_aa != query_aa: codon_list.append({ 'codon_number': codon_number, 'ref_aa': ref_aa, 'query_aa': query_aa }) # Reset for the next codon ref_codon = [] query_codon = [] return codon_list # ------------------------------------------------- # 5. Find both codon-level and promoter-level mutations # ------------------------------------------------- def find_mutations_with_context(ref_seq, query_seq, gene_start, gene_end, offset=0): """ 1) Align the nucleotide sequences for the gene region. 2) Extract codon-level amino-acid differences for coding changes. 3) Identify direct nucleotide changes for promoter or negative positions (like -15). """ try: # Align the two nucleotide sequences alignments = pairwise2.align.globalms(ref_seq, query_seq, match=2, mismatch=-3, open=-10, extend=-0.5) if not alignments: st.warning("No alignments found") return {'codon_diffs': [], 'nt_diffs': []} # Take the best-scoring alignment alignment = alignments[0] ref_aligned, query_aligned = alignment[0], alignment[1] # 1) Extract codon-level diffs codon_diffs = extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset) # 2) Identify direct nucleotide differences for negative or regulatory positions # We only care about positions that are outside the coding region or specifically listed as negative nt_diffs = [] ref_pos = 0 # tracks real position in reference for i in range(len(ref_aligned)): ref_base = ref_aligned[i] query_base = query_aligned[i] # only increment ref_pos if ref_base isn't a gap if ref_base != '-': ref_pos += 1 actual_genome_pos = offset + ref_pos # actual coordinate in entire genome # Check if there's a mismatch if ref_base != query_base and (query_base != '-'): # If the position is < gene_start, it might be negative or promoter region # Or if the position is > gene_end, it might be some flanking region # We'll store it, and 'analyze_resistance' can figure out if it's relevant if actual_genome_pos < gene_start or actual_genome_pos > gene_end: # It's outside the coding region nt_diffs.append({ 'genome_pos': actual_genome_pos, 'ref_base': ref_base, 'query_base': query_base }) else: # Even if it's inside the gene, it might be an in-frame insertion or something # not forming a complete codon in the reference. We'll store it anyway. nt_diffs.append({ 'genome_pos': actual_genome_pos, 'ref_base': ref_base, 'query_base': query_base }) return { 'codon_diffs': codon_diffs, 'nt_diffs': nt_diffs } except Exception as e: st.error(f"Error in mutation analysis: {str(e)}") return {'codon_diffs': [], 'nt_diffs': []} # ------------------------------------------------- # 6. Analyze the found mutations for known resistance patterns # ------------------------------------------------- def analyze_resistance(mutation_data, gene_info): """Analyze codon-level amino-acid diffs and any direct nucleotide diffs for known patterns.""" codon_diffs = mutation_data['codon_diffs'] # list of {codon_number, ref_aa, query_aa} nt_diffs = mutation_data['nt_diffs'] # list of {genome_pos, ref_base, query_base} resistance_found = [] # We need to parse the dictionary keys in gene_info['mutations'] (they can be negative or numeric) for key_str, pattern in gene_info['mutations'].items(): try: key_val = int(key_str) except ValueError: # Should never happen if the dictionary is consistent, but just in case continue # If key_val > 0 => it's a codon-based mutation (like 531 for rpoB). # If key_val <= 0 => it's a nucleotide-based mutation in promoter or upstream region (like -15). if key_val > 0: # Codon-based for diff in codon_diffs: if diff['codon_number'] == key_val: # e.g. pattern['from'] = 'S', pattern['to'] = ['L'] if diff['ref_aa'] == pattern['from'] and diff['query_aa'] in pattern['to']: resistance_found.append({ 'position': key_str, 'change': f"{pattern['from']}{key_str}{diff['query_aa']}", 'frequency': pattern['freq'], 'confidence': pattern['confidence'] }) else: # Nucleotide-based (promoter or upstream). # We need to find an nt_diff at that offset from the gene_start. # e.g. -15 => actual genome position = gene_start + (-15) promoter_genome_pos = gene_info['start'] + key_val for diff in nt_diffs: if diff['genome_pos'] == promoter_genome_pos: # Check if ref_base = pattern['from'], query_base in pattern['to'] if diff['ref_base'] == pattern['from'] and diff['query_base'] in pattern['to']: resistance_found.append({ 'position': key_str, 'change': f"{pattern['from']}{key_str}{diff['query_base']}", 'frequency': pattern['freq'], 'confidence': pattern['confidence'] }) return resistance_found # ------------------------------------------------- # 7. Main Streamlit App # ------------------------------------------------- def main(): st.title("M. tuberculosis Drug Resistance Analysis - FIXED VERSION") st.markdown(""" ### Automated Drug Resistance Analysis Tool Upload your query genome (clinical isolate) in FASTA format for comparison with H37Rv reference. **Note**: This version correctly checks *codon-based* amino-acid mutations (e.g., rpoB S531L) and *nucleotide-based* promoter mutations (e.g., inhA -15C>T). """) # Debug mode toggle debug_mode = st.checkbox("Enable debug mode") # Load reference genome ref_genome = read_fasta_file("NC_000962.3.fasta") if ref_genome: st.success(f"Reference genome loaded successfully (length: {len(ref_genome)}bp)") else: st.error("Failed to load reference genome") return query_file = st.file_uploader("Upload Query Genome (FASTA)", type=['fasta', 'fa']) if query_file and st.button("Analyze Drug Resistance"): query_genome = read_fasta_from_upload(query_file) if query_genome: st.success(f"Query genome loaded successfully (length: {len(query_genome)}bp)") # Analysis progress tracking progress_bar = st.progress(0) status_text = st.empty() # Store all results all_results = {} # Analyze each gene for i, (gene, info) in enumerate(RESISTANCE_GENES.items()): status_text.text(f"Analyzing {gene} ({info['drug']})...") progress_bar.progress((i + 1) / len(RESISTANCE_GENES)) if debug_mode: st.subheader(f"Analyzing {gene}") st.write(f"Gene region: {info['start']}-{info['end']}") # Extract regions ref_region, ref_start = extract_gene_region(ref_genome, info['start'], info['end']) query_region, _ = extract_gene_region(query_genome, info['start'], info['end']) if ref_region and query_region: # Find mutations (codon-level + any promoter-level) mutation_data = find_mutations_with_context( ref_region, query_region, info['start'], info['end'], ref_start ) # Analyze resistance resistance = analyze_resistance(mutation_data, info) all_results[gene] = { 'mutation_data': mutation_data, 'resistance': resistance } if debug_mode: st.write(f"Codon-level differences: {len(mutation_data['codon_diffs'])}") st.write(mutation_data['codon_diffs']) st.write(f"Nucleotide-level differences: {len(mutation_data['nt_diffs'])}") st.write(mutation_data['nt_diffs']) st.write(f"Identified {len(resistance)} resistance patterns") else: st.error(f"Failed to analyze {gene}") # Clear progress indicators progress_bar.empty() status_text.empty() # Display Results st.header("Analysis Results") # Show results for each gene for gene, results in all_results.items(): st.subheader(f"{gene} Analysis") info = RESISTANCE_GENES[gene] st.write(f"Drug: {info['drug']}") num_codon_diffs = len(results['mutation_data']['codon_diffs']) num_nt_diffs = len(results['mutation_data']['nt_diffs']) st.write(f"Total codon-level differences found: {num_codon_diffs}") st.write(f"Total nucleotide-level differences found: {num_nt_diffs}") if results['resistance']: st.warning(f"Potential resistance mutations found in {gene}") resistance_df = pd.DataFrame(results['resistance']) st.dataframe(resistance_df) else: st.info(f"No known resistance mutations found in {gene}") # Download complete results if st.button("Download Complete Analysis"): # Create detailed report DataFrame report_data = [] for gene, results in all_results.items(): # Store codon diffs for diff in results['mutation_data']['codon_diffs']: report_data.append({ 'Gene': gene, 'Drug': RESISTANCE_GENES[gene]['drug'], 'Type': 'Codon_diff', **diff }) # Store nt diffs for diff in results['mutation_data']['nt_diffs']: report_data.append({ 'Gene': gene, 'Drug': RESISTANCE_GENES[gene]['drug'], 'Type': 'Nucleotide_diff', **diff }) # Store recognized resistance mutations for res in results['resistance']: report_data.append({ 'Gene': gene, 'Drug': RESISTANCE_GENES[gene]['drug'], 'Type': 'Resistance', **res }) report_df = pd.DataFrame(report_data) csv = report_df.to_csv(index=False) st.download_button( "Download Full Report (CSV)", csv, "mtb_analysis_report_fixed.csv", "text/csv" ) # Entry point if __name__ == "__main__": main()