|
import streamlit as st |
|
import pandas as pd |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
from Bio import pairwise2 |
|
from collections import defaultdict |
|
import re |
|
|
|
|
|
IMPORTANT_GENES = { |
|
'rpoB': {'range': (759807, 763325), 'description': 'RNA polymerase β subunit (Rifampicin resistance)'}, |
|
'katG': {'range': (2153889, 2156111), 'description': 'Catalase-peroxidase (Isoniazid resistance)'}, |
|
'inhA': {'range': (1674202, 1675011), 'description': 'Enoyl-ACP reductase (Isoniazid resistance)'}, |
|
'gyrA': {'range': (7302, 9818), 'description': 'DNA gyrase subunit A (Fluoroquinolone resistance)'} |
|
} |
|
|
|
def read_fasta_from_upload(uploaded_file): |
|
"""Read a FASTA file from Streamlit upload""" |
|
content = uploaded_file.getvalue().decode('utf-8').strip() |
|
parts = content.split('\n', 1) |
|
sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
|
return sequence.upper() |
|
|
|
def split_genome_into_chunks(sequence, chunk_size=10000, overlap=100): |
|
"""Split genome into manageable chunks for alignment""" |
|
chunks = [] |
|
positions = [] |
|
for i in range(0, len(sequence), chunk_size - overlap): |
|
chunk = sequence[i:i + chunk_size] |
|
chunks.append(chunk) |
|
positions.append(i) |
|
return chunks, positions |
|
|
|
def find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start): |
|
"""Find mutations in a genome chunk""" |
|
mutations = [] |
|
|
|
alignments = pairwise2.align.globalms(ref_chunk, query_chunk, |
|
match=2, |
|
mismatch=-3, |
|
open=-10, |
|
extend=-0.5) |
|
|
|
if not alignments: |
|
return mutations |
|
|
|
alignment = alignments[0] |
|
ref_aligned, query_aligned = alignment[0], alignment[1] |
|
|
|
real_pos = 0 |
|
for i in range(len(ref_aligned)): |
|
if ref_aligned[i] != '-': |
|
real_pos += 1 |
|
|
|
if ref_aligned[i] != query_aligned[i]: |
|
abs_pos = chunk_start + real_pos - 1 |
|
mut = { |
|
'position': abs_pos, |
|
'ref_base': ref_aligned[i], |
|
'query_base': query_aligned[i] if query_aligned[i] != '-' else 'None', |
|
'type': 'SNP' if ref_aligned[i] != '-' and query_aligned[i] != '-' else 'INDEL', |
|
'context': { |
|
'ref': ref_aligned[max(0,i-5):i] + '[' + ref_aligned[i] + ']' + ref_aligned[i+1:i+6], |
|
'query': query_aligned[max(0,i-5):i] + '[' + query_aligned[i] + ']' + query_aligned[i+1:i+6] |
|
} |
|
} |
|
|
|
|
|
for gene, info in IMPORTANT_GENES.items(): |
|
start, end = info['range'] |
|
if start <= abs_pos <= end: |
|
mut['gene'] = gene |
|
mut['gene_position'] = abs_pos - start + 1 |
|
mut['gene_description'] = info['description'] |
|
|
|
mutations.append(mut) |
|
|
|
return mutations |
|
|
|
def visualize_mutations(mutations, genome_length): |
|
"""Create mutation visualization plots""" |
|
|
|
gene_regions = [] |
|
for gene, info in IMPORTANT_GENES.items(): |
|
start, end = info['range'] |
|
gene_regions.append({ |
|
'gene': gene, |
|
'start': start, |
|
'end': end, |
|
'y': 1 |
|
}) |
|
|
|
|
|
fig = go.Figure() |
|
|
|
|
|
for region in gene_regions: |
|
fig.add_trace(go.Scatter( |
|
x=[region['start'], region['end']], |
|
y=[region['y'], region['y']], |
|
mode='lines', |
|
name=region['gene'], |
|
line=dict(width=10), |
|
hoverinfo='text', |
|
hovertext=f"{region['gene']}: {region['start']}-{region['end']}" |
|
)) |
|
|
|
|
|
mutation_data = pd.DataFrame(mutations) |
|
if not mutation_data.empty: |
|
fig.add_trace(go.Scatter( |
|
x=mutation_data['position'], |
|
y=[1.1] * len(mutation_data), |
|
mode='markers', |
|
name='Mutations', |
|
marker=dict( |
|
color=['red' if t == 'SNP' else 'blue' for t in mutation_data['type']], |
|
size=8 |
|
), |
|
hoverinfo='text', |
|
hovertext=mutation_data.apply( |
|
lambda x: f"Position: {x['position']}<br>" |
|
f"Type: {x['type']}<br>" |
|
f"Change: {x['ref_base']}->{x['query_base']}", |
|
axis=1 |
|
) |
|
)) |
|
|
|
fig.update_layout( |
|
title="Genome-wide Mutation Distribution", |
|
xaxis_title="Genome Position", |
|
yaxis_visible=False, |
|
showlegend=True, |
|
height=400 |
|
) |
|
|
|
return fig |
|
|
|
def analyze_mutations(mutations): |
|
"""Generate comprehensive mutation statistics""" |
|
stats = { |
|
'total_mutations': len(mutations), |
|
'snps': len([m for m in mutations if m['type'] == 'SNP']), |
|
'indels': len([m for m in mutations if m['type'] == 'INDEL']), |
|
'by_gene': defaultdict(int), |
|
'important_mutations': [] |
|
} |
|
|
|
for mut in mutations: |
|
if 'gene' in mut: |
|
stats['by_gene'][mut['gene']] += 1 |
|
stats['important_mutations'].append(mut) |
|
|
|
return stats |
|
|
|
def main(): |
|
st.title("M. tuberculosis Full Genome Comparison") |
|
|
|
st.markdown(""" |
|
This tool performs whole-genome comparison of M. tuberculosis strains, identifying mutations |
|
and analyzing resistance-associated genes. |
|
|
|
**Instructions:** |
|
1. Upload your reference genome (typically H37Rv) |
|
2. Upload your query genome (clinical isolate) |
|
3. Configure analysis parameters if needed |
|
4. Run the analysis |
|
""") |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
with col1: |
|
reference_file = st.file_uploader("Reference Genome (FASTA)", type=['fasta', 'fa']) |
|
with col2: |
|
query_file = st.file_uploader("Query Genome (FASTA)", type=['fasta', 'fa']) |
|
|
|
|
|
with st.expander("Advanced Settings"): |
|
chunk_size = st.slider("Analysis chunk size (bp)", 5000, 20000, 10000, 1000) |
|
overlap = st.slider("Chunk overlap (bp)", 50, 200, 100, 10) |
|
|
|
if reference_file and query_file: |
|
if st.button("Run Analysis"): |
|
with st.spinner("Analyzing genomes..."): |
|
try: |
|
|
|
ref_genome = read_fasta_from_upload(reference_file) |
|
query_genome = read_fasta_from_upload(query_file) |
|
|
|
|
|
progress_bar = st.progress(0) |
|
status = st.empty() |
|
|
|
|
|
status.text("Splitting genomes into chunks...") |
|
ref_chunks, chunk_positions = split_genome_into_chunks(ref_genome, chunk_size, overlap) |
|
query_chunks, _ = split_genome_into_chunks(query_genome, chunk_size, overlap) |
|
|
|
|
|
status.text("Analyzing mutations...") |
|
all_mutations = [] |
|
total_chunks = len(ref_chunks) |
|
|
|
for i, (ref_chunk, query_chunk, chunk_start) in enumerate(zip(ref_chunks, query_chunks, chunk_positions)): |
|
progress_bar.progress((i + 1) / total_chunks) |
|
mutations = find_mutations_in_chunk(ref_chunk, query_chunk, chunk_start) |
|
all_mutations.extend(mutations) |
|
|
|
|
|
progress_bar.empty() |
|
status.empty() |
|
|
|
|
|
stats = analyze_mutations(all_mutations) |
|
|
|
|
|
st.success("Analysis complete!") |
|
|
|
|
|
st.header("Results Summary") |
|
col1, col2, col3 = st.columns(3) |
|
col1.metric("Total Mutations", stats['total_mutations']) |
|
col2.metric("SNPs", stats['snps']) |
|
col3.metric("INDELs", stats['indels']) |
|
|
|
|
|
st.plotly_chart(visualize_mutations(all_mutations, len(ref_genome))) |
|
|
|
|
|
st.header("Resistance-Associated Genes") |
|
gene_mutations = pd.DataFrame([ |
|
{"Gene": gene, "Mutations": count, "Description": IMPORTANT_GENES[gene]['description']} |
|
for gene, count in stats['by_gene'].items() |
|
]) |
|
|
|
if not gene_mutations.empty: |
|
st.dataframe(gene_mutations) |
|
|
|
|
|
if stats['important_mutations']: |
|
st.header("Detailed Mutation Analysis") |
|
mutations_df = pd.DataFrame(stats['important_mutations']) |
|
st.dataframe(mutations_df) |
|
|
|
|
|
csv = mutations_df.to_csv(index=False) |
|
st.download_button( |
|
"Download Results (CSV)", |
|
csv, |
|
"mtb_mutations.csv", |
|
"text/csv", |
|
key='download-csv' |
|
) |
|
|
|
except Exception as e: |
|
st.error(f"Analysis error: {str(e)}") |
|
|
|
if __name__ == "__main__": |
|
main() |