import streamlit as st |
import google.generativeai as genai |
import requests |
import subprocess |
import os |
import pylint |
import pandas as pd |
from sklearn.model_selection import train_test_split |
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier |
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score |
import git |
import spacy |
from spacy.lang.en import English |
import boto3 |
import unittest |
import docker |
import sympy as sp |
from scipy.optimize import minimize, differential_evolution |
import numpy as np |
import matplotlib.pyplot as plt |
import seaborn as sns |
from IPython.display import display |
from tenacity import retry, stop_after_attempt, wait_fixed |
import torch |
import torch.nn as nn |
import torch.optim as optim |
from transformers import AutoTokenizer, AutoModel |
import networkx as nx |
from sklearn.cluster import KMeans |
from scipy.stats import ttest_ind |
from statsmodels.tsa.arima.model import ARIMA |
import nltk |
from nltk.sentiment import SentimentIntensityAnalyzer |
import cv2 |
from PIL import Image |
import tensorflow as tf |
from tensorflow.keras.applications import ResNet50 |
from tensorflow.keras.preprocessing import image |
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions |
genai.configure(api_key=st.secrets["GOOGLE_API_KEY"]) |
generation_config = { |
"temperature": 0.4, |
"top_p": 0.8, |
"top_k": 50, |
"max_output_tokens": 4096, |
} |
model = genai.GenerativeModel( |
model_name="gemini-1.5-pro", |
generation_config=generation_config, |
system_instruction=""" |
You are Ath, an ultra-advanced AI code assistant with expertise across multiple domains including machine learning, data science, web development, cloud computing, and more. Your responses should showcase cutting-edge techniques, best practices, and innovative solutions. |
""" |
) |
chat_session = model.start_chat(history=[]) |
@retry(stop=stop_after_attempt(5), wait=wait_fixed(2)) |
def generate_response(user_input): |
try: |
response = chat_session.send_message(user_input) |
return response.text |
except Exception as e: |
return f"Error: {e}" |
def optimize_code(code): |
with open("temp_code.py", "w") as file: |
file.write(code) |
result = subprocess.run(["pylint", "temp_code.py"], capture_output=True, text=True) |
os.remove("temp_code.py") |
return code |
def fetch_from_github(query): |
pass |
def interact_with_api(api_url): |
response = requests.get(api_url) |
return response.json() |
def train_advanced_ml_model(X, y): |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) |
models = { |
'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), |
'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42) |
} |
results = {} |
for name, model in models.items(): |
model.fit(X_train, y_train) |
y_pred = model.predict(X_test) |
results[name] = { |
'accuracy': accuracy_score(y_test, y_pred), |
'precision': precision_score(y_test, y_pred, average='weighted'), |
'recall': recall_score(y_test, y_pred, average='weighted'), |
'f1': f1_score(y_test, y_pred, average='weighted') |
} |
return results |
def handle_error(error): |
st.error(f"An error occurred: {error}") |
def initialize_git_repo(repo_path): |
if not os.path.exists(repo_path): |
os.makedirs(repo_path) |
if not os.path.exists(os.path.join(repo_path, '.git')): |
repo = git.Repo.init(repo_path) |
else: |
repo = git.Repo(repo_path) |
return repo |
def integrate_with_git(repo_path, code): |
repo = initialize_git_repo(repo_path) |
with open(os.path.join(repo_path, "generated_code.py"), "w") as file: |
file.write(code) |
repo.index.add(["generated_code.py"]) |
repo.index.commit("Added generated code") |
def process_user_input(user_input): |
nlp = spacy.load("en_core_web_sm") |
doc = nlp(user_input) |
return doc |
def interact_with_cloud_services(service_name, action, params): |
client = boto3.client(service_name) |
response = getattr(client, action)(**params) |
return response |
def run_tests(): |
tests_dir = os.path.join(os.getcwd(), 'tests') |
if not os.path.exists(tests_dir): |
os.makedirs(tests_dir) |
init_file = os.path.join(tests_dir, '__init__.py') |
if not os.path.exists(init_file): |
with open(init_file, 'w') as f: |
f.write('') |
test_suite = unittest.TestLoader().discover(tests_dir) |
test_runner = unittest.TextTestRunner() |
test_result = test_runner.run(test_suite) |
return test_result |
def execute_code_in_docker(code): |
client = docker.from_env() |
try: |
container = client.containers.run( |
image="python:3.9", |
command=f"python -c '{code}'", |
detach=True, |
remove=True |
) |
result = container.wait() |
logs = container.logs().decode('utf-8') |
return logs, result['StatusCode'] |
except Exception as e: |
return f"Error: {e}", 1 |
def solve_complex_equation(equation): |
x, y, z = sp.symbols('x y z') |
eq = sp.Eq(eval(equation)) |
solution = sp.solve(eq) |
return solution |
def advanced_optimization(function, bounds): |
result = differential_evolution(lambda x: eval(function), bounds) |
return result.x, result.fun |
def visualize_complex_data(data): |
df = pd.DataFrame(data) |
fig, axs = plt.subplots(2, 2, figsize=(16, 12)) |
sns.heatmap(df.corr(), annot=True, cmap='coolwarm', ax=axs[0, 0]) |
axs[0, 0].set_title('Correlation Heatmap') |
sns.pairplot(df, diag_kind='kde', ax=axs[0, 1]) |
axs[0, 1].set_title('Pairplot') |
df.plot(kind='box', ax=axs[1, 0]) |
axs[1, 0].set_title('Box Plot') |
sns.violinplot(data=df, ax=axs[1, 1]) |
axs[1, 1].set_title('Violin Plot') |
plt.tight_layout() |
return fig |
def analyze_complex_data(data): |
df = pd.DataFrame(data) |
summary = df.describe() |
correlation = df.corr() |
skewness = df.skew() |
kurtosis = df.kurtosis() |
return { |
'summary': summary, |
'correlation': correlation, |
'skewness': skewness, |
'kurtosis': kurtosis |
} |
def train_deep_learning_model(X, y): |
class DeepNN(nn.Module): |
def __init__(self, input_size): |
super(DeepNN, self).__init__() |
self.fc1 = nn.Linear(input_size, 64) |
self.fc2 = nn.Linear(64, 32) |
self.fc3 = nn.Linear(32, 1) |
def forward(self, x): |
x = torch.relu(self.fc1(x)) |
x = torch.relu(self.fc2(x)) |
x = torch.sigmoid(self.fc3(x)) |
return x |
X_tensor = torch.FloatTensor(X.values) |
y_tensor = torch.FloatTensor(y.values) |
model = DeepNN(X.shape[1]) |
criterion = nn.BCELoss() |
optimizer = optim.Adam(model.parameters()) |
epochs = 100 |
for epoch in range(epochs): |
optimizer.zero_grad() |
outputs = model(X_tensor) |
loss = criterion(outputs, y_tensor.unsqueeze(1)) |
loss.backward() |
optimizer.step() |
return model |
def perform_nlp_analysis(text): |
nlp = spacy.load("en_core_web_sm") |
doc = nlp(text) |
entities = [(ent.text, ent.label_) for ent in doc.ents] |
tokens = [token.text for token in doc] |
pos_tags = [(token.text, token.pos_) for token in doc] |
sia = SentimentIntensityAnalyzer() |
sentiment = sia.polarity_scores(text) |
return { |
'entities': entities, |
'tokens': tokens, |
'pos_tags': pos_tags, |
'sentiment': sentiment |
} |
def perform_image_analysis(image_path): |
img = cv2.imread(image_path) |
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
model = ResNet50(weights='imagenet') |
img_resized = cv2.resize(img_rgb, (224, 224)) |
img_array = image.img_to_array(img_resized) |
img_array = np.expand_dims(img_array, axis=0) |
img_array = preprocess_input(img_array) |
predictions = model.predict(img_array) |
decoded_predictions = decode_predictions(predictions, top=3)[0] |
edges = cv2.Canny(img, 100, 200) |
return { |
'predictions': decoded_predictions, |
'edges': edges |
} |
def perform_time_series_analysis(data): |
df = pd.DataFrame(data) |
model = ARIMA(df, order=(1, 1, 1)) |
results = model.fit() |
forecast = results.forecast(steps=5) |
return { |
'model_summary': results.summary(), |
'forecast': forecast |
} |
def perform_graph_analysis(nodes, edges): |
G = nx.Graph() |
G.add_nodes_from(nodes) |
G.add_edges_from(edges) |
centrality = nx.degree_centrality(G) |
clustering = nx.clustering(G) |
shortest_paths = dict(nx.all_pairs_shortest_path_length(G)) |
return { |
'centrality': centrality, |
'clustering': clustering, |
'shortest_paths': shortest_paths |
} |
st.set_page_config(page_title="Ultra AI Code Assistant", page_icon="🚀", layout="wide") |
st.markdown('<div class="main-container">', unsafe_allow_html=True) |
st.title("🚀 Ultra AI Code Assistant") |
st.markdown('<p class="subtitle">Powered by Advanced AI and Domain Expertise</p>', unsafe_allow_html=True) |
task_type = st.selectbox("Select Task Type", [ |
"Code Generation", |
"Machine Learning", |
"Data Analysis", |
"Natural Language Processing", |
"Image Analysis", |
"Time Series Analysis", |
"Graph Analysis" |
]) |
prompt = st.text_area("Enter your task description or code:", height=120) |
if st.button("Execute Task"): |
if prompt.strip() == "": |
st.error("Please enter a valid prompt.") |
else: |
with st.spinner("Processing your request..."): |
try: |
if task_type == "Code Generation": |
processed_input = process_user_input(prompt) |
completed_text = generate_response(processed_input.text) |
if "Error" in completed_text: |
handle_error(completed_text) |
else: |
optimized_code = optimize_code(completed_text) |
st.success("Code generated and optimized successfully!") |
st.markdown('<div class="output-container">', unsafe_allow_html=True) |
st.markdown('<div class="code-block">', unsafe_allow_html=True) |
st.code(optimized_code) |
st.markdown('</div>', unsafe_allow_html=True) |
st.markdown('</div>', unsafe_allow_html=True) |
repo_path = "./repo" |
integrate_with_git(repo_path, optimized_code) |
test_result = run_tests() |
if test_result.wasSuccessful(): |
st.success("All tests passed successfully!") |
else: |
st.error("Some tests failed. Please check the code.") |
execution_result, status_code = execute_code_in_docker(optimized_code) |
if status_code == 0: |
st.success("Code executed successfully in Docker!") |
st.text(execution_result) |
else: |
st.error(f"Code execution failed: {execution_result}") |
elif task_type == "Machine Learning": |
from sklearn.datasets import make_classification |
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42) |
results = train_advanced_ml_model(X, y) |
st.write("Machine Learning Model Performance:") |
st.json(results) |
st.write("Deep Learning Model:") |
deep_model = train_deep_learning_model(pd.DataFrame(X), pd.Series(y)) |
st.write(deep_model) |
elif task_type == "Data Analysis": |
data = pd.DataFrame(np.random.randn(100, 5), columns=['A', 'B', 'C', 'D', 'E']) |
analysis_results = analyze_complex_data(data) |
st.write("Data Analysis Results:") |
st.write(analysis_results['summary']) |
st.write("Correlation Matrix:") |
st.write(analysis_results['correlation']) |
fig = visualize_complex_data(data) |
st.pyplot(fig) |
elif task_type == "Natural Language Processing": |
nlp_results = perform_nlp_analysis(prompt) |
st.write("NLP Analysis Results:") |
st.json(nlp_results) |
elif task_type == "Image Analysis": |
uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png", "jpeg"]) |
if uploaded_file is not None: |
image = Image.open(uploaded_file) |
st.image(image, caption='Uploaded Image', use_column_width=True) |
with open("temp_image.jpg", "wb") as f: |
f.write(uploaded_file.getbuffer()) |
analysis_results = perform_image_analysis("temp_image.jpg") |
st.write("Image Analysis Results:") |
st.write("Top 3 predictions:") |
for i, (imagenet_id, label, score) in enumerate(analysis_results['predictions']): |
st.write(f"{i + 1}: {label} ({score:.2f})") |
st.write("Edge Detection:") |
st.image(analysis_results['edges'], caption='Edge Detection', use_column_width=True) |
os.remove("temp_image.jpg") |
elif task_type == "Time Series Analysis": |
dates = pd.date_range(start='1/1/2020', end='1/1/2021', freq='D') |
values = np.random.randn(len(dates)).cumsum() |
ts_data = pd.Series(values, index=dates) |
st.line_chart(ts_data) |
analysis_results = perform_time_series_analysis(ts_data) |
st.write("Time Series Analysis Results:") |
st.write(analysis_results['model_summary']) |
st.write("Forecast for the next 5 periods:") |
st.write(analysis_results['forecast']) |
elif task_type == "Graph Analysis": |
nodes = range(1, 11) |
edges = [(1, 2), (1, 3), (2, 4), (2, 5), (3, 6), (3, 7), (4, 8), (5, 9), (6, 10)] |
analysis_results = perform_graph_analysis(nodes, edges) |
st.write("Graph Analysis Results:") |
st.write("Centrality:") |
st.json(analysis_results['centrality']) |
st.write("Clustering Coefficient:") |
st.json(analysis_results['clustering']) |
G = nx.Graph() |
G.add_nodes_from(nodes) |
G.add_edges_from(edges) |
fig, ax = plt.subplots(figsize=(10, 8)) |
nx.draw(G, with_labels=True, node_color='lightblue', node_size=500, font_size=16, font_weight='bold', ax=ax) |
st.pyplot(fig) |
except Exception as e: |
handle_error(e) |
st.markdown(""" |
<div style='text-align: center; margin-top: 2rem; color: #4a5568;'> |
Created with ❤️ by Your Ultra AI Code Assistant |
</div> |
""", unsafe_allow_html=True) |
st.markdown('</div>', unsafe_allow_html=True) |
def explain_code(code): |
"""Generate an explanation for the given code using NLP techniques.""" |
explanation = generate_response(f"Explain the following code:\n\n{code}") |
return explanation |
def generate_unit_tests(code): |
"""Generate unit tests for the given code.""" |
unit_tests = generate_response(f"Generate unit tests for the following code:\n\n{code}") |
return unit_tests |
def suggest_optimizations(code): |
"""Suggest optimizations for the given code.""" |
optimizations = generate_response(f"Suggest optimizations for the following code:\n\n{code}") |
return optimizations |
def generate_documentation(code): |
"""Generate documentation for the given code.""" |
documentation = generate_response(f"Generate documentation for the following code:\n\n{code}") |
return documentation |
if task_type == "Code Generation": |
st.sidebar.header("Code Analysis Tools") |
if st.sidebar.button("Explain Code"): |
explanation = explain_code(optimized_code) |
st.sidebar.subheader("Code Explanation") |
st.sidebar.write(explanation) |
if st.sidebar.button("Generate Unit Tests"): |
unit_tests = generate_unit_tests(optimized_code) |
st.sidebar.subheader("Generated Unit Tests") |
st.sidebar.code(unit_tests) |
if st.sidebar.button("Suggest Optimizations"): |
optimizations = suggest_optimizations(optimized_code) |
st.sidebar.subheader("Suggested Optimizations") |
st.sidebar.write(optimizations) |
if st.sidebar.button("Generate Documentation"): |
documentation = generate_documentation(optimized_code) |
st.sidebar.subheader("Generated Documentation") |
st.sidebar.write(documentation) |
def perform_security_analysis(code): |
"""Perform a basic security analysis on the given code.""" |
security_analysis = generate_response(f"Perform a security analysis on the following code and suggest improvements:\n\n{code}") |
return security_analysis |
def generate_api_documentation(code): |
"""Generate API documentation for the given code.""" |
api_docs = generate_response(f"Generate API documentation for the following code:\n\n{code}") |
return api_docs |
def suggest_design_patterns(code): |
"""Suggest appropriate design patterns for the given code.""" |
design_patterns = generate_response(f"Suggest appropriate design patterns for the following code:\n\n{code}") |
return design_patterns |
if task_type == "Code Generation": |
st.sidebar.header("Advanced Code Analysis") |
if st.sidebar.button("Security Analysis"): |
security_analysis = perform_security_analysis(optimized_code) |
st.sidebar.subheader("Security Analysis") |
st.sidebar.write(security_analysis) |
if st.sidebar.button("Generate API Documentation"): |
api_docs = generate_api_documentation(optimized_code) |
st.sidebar.subheader("API Documentation") |
st.sidebar.write(api_docs) |
if st.sidebar.button("Suggest Design Patterns"): |
design_patterns = suggest_design_patterns(optimized_code) |
st.sidebar.subheader("Suggested Design Patterns") |
st.sidebar.write(design_patterns) |
def generate_project_structure(project_description): |
"""Generate a complete project structure based on the given description.""" |
project_structure = generate_response(f"Generate a complete project structure for the following project description:\n\n{project_description}") |
return project_structure |
if st.sidebar.button("Generate Project Structure"): |
project_description = st.sidebar.text_area("Enter project description:") |
if project_description: |
project_structure = generate_project_structure(project_description) |
st.sidebar.subheader("Generated Project Structure") |
st.sidebar.code(project_structure) |
def suggest_libraries(code): |
"""Suggest relevant libraries and frameworks for the given code.""" |
suggestions = generate_response(f"Suggest relevant libraries and frameworks for the following code:\n\n{code}") |
return suggestions |
if task_type == "Code Generation": |
if st.sidebar.button("Suggest Libraries"): |
library_suggestions = suggest_libraries(optimized_code) |
st.sidebar.subheader("Suggested Libraries and Frameworks") |
st.sidebar.write(library_suggestions) |
def translate_code(code, target_language): |
"""Translate the given code to the specified target language.""" |
translated_code = generate_response(f"Translate the following code to {target_language}:\n\n{code}") |
return translated_code |
if task_type == "Code Generation": |
target_language = st.sidebar.selectbox("Select target language for translation", ["Python", "JavaScript", "Java", "C++", "Go"]) |
if st.sidebar.button("Translate Code"): |
translated_code = translate_code(optimized_code, target_language) |
st.sidebar.subheader(f"Translated Code ({target_language})") |
st.sidebar.code(translated_code) |
def generate_readme(project_description, code): |
"""Generate a README file for the project based on the description and code.""" |
readme_content = generate_response(f"Generate a README.md file for the following project:\n\nDescription: {project_description}\n\nCode:\n{code}") |
return readme_content |
if task_type == "Code Generation": |
if st.sidebar.button("Generate README"): |
project_description = st.sidebar.text_area("Enter project description:") |
if project_description: |
readme_content = generate_readme(project_description, optimized_code) |
st.sidebar.subheader("Generated README.md") |
st.sidebar.markdown(readme_content) |
def suggest_refactoring(code): |
"""Suggest code refactoring improvements for the given code.""" |
refactoring_suggestions = generate_response(f"Suggest code refactoring improvements for the following code:\n\n{code}") |
return refactoring_suggestions |
if task_type == "Code Generation": |
if st.sidebar.button("Suggest Refactoring"): |
refactoring_suggestions = suggest_refactoring(optimized_code) |
st.sidebar.subheader("Refactoring Suggestions") |
st.sidebar.write(refactoring_suggestions) |
def generate_test_data(code): |
"""Generate sample test data for the given code.""" |
test_data = generate_response(f"Generate sample test data for the following code:\n\n{code}") |
return test_data |
if task_type == "Code Generation": |
if st.sidebar.button("Generate Test Data"): |
test_data = generate_test_data(optimized_code) |
st.sidebar.subheader("Generated Test Data") |
st.sidebar.code(test_data) |
if __name__ == "__main__": |
st.sidebar.header("About") |
st.sidebar.info("This Ultra AI Code Assistant is powered by advanced AI models and incorporates expertise across multiple domains including software development, machine learning, data analysis, and more.") |
st.sidebar.header("Feedback") |
feedback = st.sidebar.text_area("Please provide any feedback or suggestions:") |
if st.sidebar.button("Submit Feedback"): |
st.sidebar.success("Thank you for your feedback!") |