In [None]:
"""
This script implements corresponds to the experiments conducted for
weitting the paper "Optimizing AI Reasoning: A Hamiltonian Dynamics Approach to
Multi-Hop Question Answering".

Author: Javier Marín
Email: javier@jmarin.info
Version: 1.0.0
Date: October 65, 2024

License: MIT License

Copyright (c) 2024 Javier Marín

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

Dependencies:
- Python 3.8+
- NumPy
- Pandas
- PyTorch
- Transformers
- Scikit-learn
- SciPy
- Statsmodels
- Matplotlib
- Seaborn

For a full list of dependencies and their versions, see requirements.txt
"""

## Imports

In [None]:
# Standard library imports
import os
import re
import time

# Third-party imports
import numpy as np
import pandas as pd
import torch
import seaborn as sns
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

from transformers import AutoTokenizer, AutoModel
from statsmodels.multivariate.manova import MANOVA
from scipy import stats
from scipy.optimize import curve_fit
from scipy.integrate import odeint
from sklearn import (
 metrics,
 model_selection,
 cluster,
 decomposition,
 feature_extraction,
 linear_model
)

# Visualization settings
sns.set_theme(style="whitegrid", context="paper")
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']

## Load BERT pretrained model

In [None]:
# Load pre-trained model and tokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased")

## Load data

In [None]:
# Load the OBQA dataset
df = pd.read_csv("obqa_chains.csv", sep=";")

# Ensure necessary columns exist
required_columns = ['QID', 'Chain#', 'Question', 'Answer', 'Fact1', 'Fact2', 'Turk']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
 raise ValueError(f"Missing required columns: {missing_columns}")

# Preprocess the data
df['Question'] = df['Question'] + " " + df['Answer'] # Combine question and answer
df['is_valid'] = df['Turk'].str.contains('yes', case=False, na=False)

## Model embeddings

In [None]:
def get_bert_embedding(text):
 """Get BERT embedding for a given text."""
 inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
 with torch.no_grad():
 outputs = model(**inputs)
 return outputs.last_hidden_state.mean(dim=1).squeeze().numpy()

def refined_hamiltonian_energy(chain):
 emb1 = get_bert_embedding(chain['Fact1'])
 emb2 = get_bert_embedding(chain['Fact2'])
 emb_q = get_bert_embedding(chain['Question'])

 # Refined kinetic term: measure of change between facts
 T = np.linalg.norm(emb2 - emb1)

 # Refined potential term: measure of relevance to question
 V = (np.dot(emb1, emb_q) + np.dot(emb2, emb_q)) / 2

 # Total "Hamiltonian" energy: balance between change and relevance
 H = T - V

 return H, T, V


# Analyze energy conservation
def energy_conservation_score(chain):
 _, T, V = refined_hamiltonian_energy(chain)
 # Measure how balanced T and V are
 return 1 / (1 + abs(T - V)) # Now always between 0 and 1, 1 being perfect balance



# Calculate refined energies and scores
df['H_energy'], df['T_energy'], df['V_energy'] = zip(*df.apply(refined_hamiltonian_energy, axis=1))
df['energy_conservation'] = df.apply(energy_conservation_score, axis=1)

## Hamiltonian systems

In [None]:
def get_trajectory(row):
 # Ensure we're working with strings
 chain = [str(row['Fact1']), str(row['Fact2'])]
 embeddings = [get_bert_embedding(sentence) for sentence in chain]
 return np.array(embeddings)

def refined_hamiltonian_energy(chain):
 emb1 = get_bert_embedding(chain['Fact1'])
 emb2 = get_bert_embedding(chain['Fact2'])

 # Refined kinetic term: measure of change between facts
 T = np.linalg.norm(emb2 - emb1)

 # Refined potential term: measure of relevance to facts
 V = (np.linalg.norm(emb1) + np.linalg.norm(emb2)) / 2

 # Total "Hamiltonian" energy: balance between change and relevance
 H = T - V

 return H, T, V


def compute_trajectory_energy(trajectory):
 return refined_hamiltonian_energy({'Fact1': str(trajectory[0]), 'Fact2': str(trajectory[1])})[0]


# Compute trajectories for all chains
trajectories = df.apply(get_trajectory, axis=1)

# Compute energies for trajectories
trajectory_energies = trajectories.apply(compute_trajectory_energy)


In [None]:
# Use PCA to reduce dimensionality for visualization
pca = PCA(n_components=3)
all_points = np.vstack(trajectories.values)
pca_result = pca.fit_transform(all_points)

trajectories_3d = trajectories.apply(lambda t: pca.transform(t))


# Analyze trajectory properties
def trajectory_length(traj):
 return np.sum(np.sqrt(np.sum(np.diff(traj, axis=0)**2, axis=1)))

def trajectory_smoothness(traj):
 first = abs(np.diff(traj[0], axis=0))[0]
 second = abs(np.diff(traj[1], axis=0))[0]
 return (first + second)/2

traj_properties = pd.DataFrame({
 'length': trajectories_3d.apply(trajectory_length),
 'smoothness': trajectories_3d.apply(trajectory_smoothness),
 'is_valid': df['is_valid']
})


In [None]:
# Create the main figure and grid for subplots
fig, axs = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle("Refined Hamiltonian-Inspired Energy Analysis of Reasoning Chains", fontsize=16)

# Distribution of Hamiltonian Energy
sns.histplot(data=df, x='H_energy', ax=axs[0, 0], kde=True, color='blue', bins=50)
axs[0, 0].set_title("Distribution of Refined Hamiltonian Energy")
axs[0, 0].set_xlabel("Hamiltonian Energy")
axs[0, 0].set_ylabel("Count")

# Kinetic vs Potential Energy
scatter = axs[0, 1].scatter(df['T_energy'], df['V_energy'], c=df['H_energy'], cmap='viridis', s=5, alpha=0.6)
axs[0, 1].set_title("Refined Kinetic vs Potential Energy")
axs[0, 1].set_xlabel("Kinetic Energy (T)")
axs[0, 1].set_ylabel("Potential Energy (V)")
plt.colorbar(scatter, ax=axs[0, 1], label="Hamiltonian Energy")

# Hamiltonian Energy: Valid vs Invalid Chains
valid_chains = df[df['is_valid']]
invalid_chains = df[~df['is_valid']]
sns.histplot(data=valid_chains, x='H_energy', ax=axs[1, 0], kde=True, color='green', label='Valid Chains', bins=50, alpha=0.6)
sns.histplot(data=invalid_chains, x='H_energy', ax=axs[1, 0], kde=True, color='red', label='Invalid Chains', bins=50, alpha=0.6)
axs[1, 0].set_title("Refined Hamiltonian Energy: Valid vs Invalid Chains")
axs[1, 0].set_xlabel("Hamiltonian Energy")
axs[1, 0].set_ylabel("Count")
axs[1, 0].legend()

# Distribution of Energy Conservation Scores
sns.histplot(data=df, x='energy_conservation', ax=axs[1, 1], kde=True, color='orange', bins=50)
axs[1, 1].set_title("Distribution of Refined Energy Conservation Scores")
axs[1, 1].set_xlabel("Energy Conservation Score")
axs[1, 1].set_ylabel("Count")

# Adjust layout and display
plt.tight_layout()
plt.subplots_adjust(top=0.93) # Adjust for main title
plt.savefig('refined_hamiltonian_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Calculate direction vectors
def calculate_direction(trajectory):
 return trajectory[1] - trajectory[0]

direction_vectors = np.array([calculate_direction(traj) for traj in trajectories_3d])

# Calculate magnitude and angle of direction vectors
magnitudes = np.linalg.norm(direction_vectors, axis=1)
angles = np.arctan2(direction_vectors[:, 1], direction_vectors[:, 0])

# Add these to the dataframe
df['trajectory_magnitude'] = magnitudes
df['trajectory_angle'] = angles

# Visualize magnitude distribution
plt.figure(figsize=(12, 6))
sns.histplot(data=df, x='trajectory_magnitude', hue='is_valid', element='step', stat='density', common_norm=False)
plt.title('Distribution of Trajectory Magnitudes')
plt.xlabel('Magnitude')
plt.ylabel('Density')
plt.legend(title='Is Valid')
plt.tight_layout()
plt.tight_layout()
plt.savefig('trajectories_magntude_plot.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
plt.figure(figsize=(12, 6))

# Define colors explicitly
colors = {'Valid': 'blue', 'Invalid': 'red'}

# Create a new DataFrame with the data for plotting
plot_data = pd.DataFrame({
 'Hamiltonian Energy': df['H_energy'],
 'Validity': df['is_valid'].map({True: 'Valid', False: 'Invalid'})
})

# Create the histogram plot with explicit colors
sns.histplot(data=plot_data, x='Hamiltonian Energy', hue='Validity',
 element='step', stat='density', common_norm=False,
 palette=colors)

plt.title('Distribution of Refined Hamiltonian Energy', fontsize=16)
plt.xlabel('Hamiltonian Energy', fontsize=14)
plt.ylabel('Density', fontsize=14)

# Adjust legend
plt.legend(title='Chain Validity', title_fontsize='13', fontsize='12')

# Add vertical lines for mean energies
plt.axvline(x=-60.889, color='blue', linestyle='--', label='Mean Valid')
plt.axvline(x=-53.816, color='red', linestyle='--', label='Mean Invalid')

# Add text annotations for mean energies
plt.text(-60.889, plt.gca().get_ylim()[1], 'Mean Valid',
 rotation=90, va='top', ha='right', color='blue')
plt.text(-53.816, plt.gca().get_ylim()[1], 'Mean Invalid',
 rotation=90, va='top', ha='left', color='red')

plt.tight_layout()
plt.savefig('refined_hamiltonian_energy_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Perform PCA to reduce to 2 dimensions
pca = PCA(n_components=2)
trajectories_2d = pca.fit_transform(np.vstack(trajectories))

# Reshape the data back into trajectories
trajectories_2d = trajectories_2d.reshape(len(trajectories), -1, 2)

# Create the plot
plt.figure(figsize=(12, 10))
plt.style.use('seaborn-whitegrid')
sns.set_context("paper")
plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']

# Plot trajectories
valid_trajectories = []
invalid_trajectories = []
for i, traj in enumerate(trajectories_2d[:100]): # Limit to 100 for clarity
 if df.iloc[i]['is_valid']:
 valid_trajectories.append(traj)
 color = 'green'
 else:
 invalid_trajectories.append(traj)
 color = 'red'
 plt.plot(traj[:, 0], traj[:, 1], color=color, alpha=0.5)
 plt.scatter(traj[0, 0], traj[0, 1], color=color, s=20, marker='o')
 plt.scatter(traj[-1, 0], traj[-1, 1], color=color, s=20, marker='s')

# Calculate the vector field based on the average direction of trajectories
grid_size = 20
x = np.linspace(trajectories_2d[:, :, 0].min(), trajectories_2d[:, :, 0].max(), grid_size)
y = np.linspace(trajectories_2d[:, :, 1].min(), trajectories_2d[:, :, 1].max(), grid_size)
X, Y = np.meshgrid(x, y)

U = np.zeros_like(X)
V = np.zeros_like(Y)

for i in range(grid_size):
 for j in range(grid_size):
 nearby_trajectories = [traj for traj in trajectories_2d if
 (x[i]-0.5 < traj[:, 0]).any() and (traj[:, 0] < x[i]+0.5).any() and
 (y[j]-0.5 < traj[:, 1]).any() and (traj[:, 1] < y[j]+0.5).any()]
 if nearby_trajectories:
 directions = np.diff(nearby_trajectories, axis=1)
 avg_direction = np.mean(directions, axis=(0, 1))
 U[j, i], V[j, i] = avg_direction

# Normalize the vector field
magnitude = np.sqrt(U**2 + V**2)
U = U / np.where(magnitude > 0, magnitude, 1)
V = V / np.where(magnitude > 0, magnitude, 1)

plt.streamplot(X, Y, U, V, density=1, color='gray', linewidth=0.5, arrowsize=0.5)

# Find key points using KMeans clustering
n_clusters = 5 # Adjust this number based on how many key points you want
kmeans = KMeans(n_clusters=n_clusters)
flattened_trajectories = trajectories_2d.reshape(-1, 2)
kmeans.fit(flattened_trajectories)
key_points = kmeans.cluster_centers_

# Plot key points
plt.scatter(key_points[:, 0], key_points[:, 1], color='blue', s=100, marker='*', zorder=5)

# Add labels to key points
for i, point in enumerate(key_points):
 plt.annotate(f'Key Point {i+1}', (point[0], point[1]), xytext=(5, 5),
 textcoords='offset points', fontsize=8, color='blue')

# Add labels and title
plt.xlabel('PCA 1')
plt.ylabel('PCA 2')
plt.title('2D Reasoning Trajectories with Phase Space Features and Key Points')

# Add a legend
valid_line = plt.Line2D([], [], color='green', label='Valid Chains')
invalid_line = plt.Line2D([], [], color='red', label='Invalid Chains')
vector_field_line = plt.Line2D([], [], color='gray', label='Vector Field')
key_point_marker = plt.Line2D([], [], color='blue', marker='*', linestyle='None',
 markersize=10, label='Key Points')
plt.legend(handles=[valid_line, invalid_line, vector_field_line, key_point_marker])

# Show the plot
plt.tight_layout()
plt.savefig('2d_reasoning_trajectories_with_key_points.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

for i, trajectory in enumerate(trajectories_3d[:100]): # Limit to first 100 for clarity
 color = 'green' if df.iloc[i]['is_valid'] else 'red'
 ax.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2], color=color, alpha=0.5)
 ax.scatter(trajectory[0, 0], trajectory[0, 1], trajectory[0, 2], color=color, s=20)
 ax.scatter(trajectory[-1, 0], trajectory[-1, 1], trajectory[-1, 2], color=color, s=20, marker='s')

ax.set_xlabel('PCA 1')
ax.set_ylabel('PCA 2')
ax.set_zlabel('PCA 3')
ax.set_title('Reasoning Trajectories in 3D Embedding Space')
plt.tight_layout()
plt.show()

In [None]:
def compute_vector_field(trajectories, grid_size=10):
 # Determine the bounds of the space
 all_points = np.vstack(trajectories)
 mins = np.min(all_points, axis=0)
 maxs = np.max(all_points, axis=0)

 # Create a grid
 x = np.linspace(mins[0], maxs[0], grid_size)
 y = np.linspace(mins[1], maxs[1], grid_size)
 z = np.linspace(mins[2], maxs[2], grid_size)
 X, Y, Z = np.meshgrid(x, y, z)

 U = np.zeros((grid_size, grid_size, grid_size))
 V = np.zeros((grid_size, grid_size, grid_size))
 W = np.zeros((grid_size, grid_size, grid_size))

 # Compute average direction for each grid cell
 for trajectory in trajectories:
 directions = np.diff(trajectory, axis=0)
 for direction, point in zip(directions, trajectory[:-1]):
 i, j, k = np.floor((point - mins) / (maxs - mins) * (grid_size - 1)).astype(int)
 U[i, j, k] += direction[0]
 V[i, j, k] += direction[1]
 W[i, j, k] += direction[2]

 # Normalize
 magnitude = np.sqrt(U**2 + V**2 + W**2)
 U /= np.where(magnitude > 0, magnitude, 1)
 V /= np.where(magnitude > 0, magnitude, 1)
 W /= np.where(magnitude > 0, magnitude, 1)

 return X, Y, Z, U, V, W

# Set up the figure and 3D axis
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection='3d')

# Plot trajectories
for i, trajectory in enumerate(trajectories_3d[:100]): # Limit to first 100 for clarity
 color = 'green' if df.iloc[i]['is_valid'] else 'red'
 ax.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2], color=color, alpha=0.5)
 ax.scatter(trajectory[0, 0], trajectory[0, 1], trajectory[0, 2], color=color, s=20)
 ax.scatter(trajectory[-1, 0], trajectory[-1, 1], trajectory[-1, 2], color=color, s=20, marker='s')

# Compute and plot vector field
X, Y, Z, U, V, W = compute_vector_field(trajectories_3d[:100])
ax.quiver(X, Y, Z, U, V, W, length=0.5, normalize=True, color='blue', alpha=0.3)

ax.set_xlabel('PCA 1')
ax.set_ylabel('PCA 2')
ax.set_zlabel('PCA 3')
ax.set_title('Reasoning Trajectories and Phase Space in 3D Embedding Space')

plt.tight_layout()
plt.savefig('3d_phase_space_plot.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
plt.figure(figsize=(10, 6))

# Create the histogram plot
sns.histplot(data=df, x='energy_conservation', kde=True, bins=50, color='green')

# Set the title and labels
plt.title("Distribution of Energy Conservation Scores", fontsize=16)
plt.xlabel("Energy Conservation Score", fontsize=12)
plt.ylabel("Frequency", fontsize=12)

# Adjust layout and display
plt.tight_layout()
plt.savefig('energy_conservation_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

sns.histplot(data=df, x='trajectory_magnitude', hue='is_valid', element='step', stat='density', common_norm=False, ax=ax1)
ax1.set_title('Distribution of Trajectory Magnitudes')
ax1.set_xlabel('Magnitude')
ax1.set_ylabel('Density')

sns.histplot(data=df, x='trajectory_angle', hue='is_valid', element='step', stat='density', common_norm=False, ax=ax2)
ax2.set_title('Distribution of Trajectory Angles')
ax2.set_xlabel('Angle (radians)')
ax2.set_ylabel('Density')

plt.tight_layout()
plt.savefig('magnitude_angle_distribution.png', dpi=300, bbox_inches='tight')
plt.close()

In [None]:
# Additional analysis
print(f"Average Energy Conservation Score: {df['energy_conservation'].mean():.4f}")
print(f"Correlation between Energy Conservation and Validity: {df['energy_conservation'].corr(df['is_valid']):.4f}")
print(f"Average Hamiltonian Energy for Valid Chains: {valid_chains['H_energy'].mean():.4f}")
print(f"Average Hamiltonian Energy for Invalid Chains: {invalid_chains['H_energy'].mean():.4f}")

# T-test for difference in Hamiltonian Energy
t_stat, p_value = stats.ttest_ind(valid_chains['H_energy'], invalid_chains['H_energy'])
print(f"\nT-test for difference in Hamiltonian Energy:")
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")

## Geometric analysis

In [None]:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

for i, trajectory in enumerate(trajectories_3d[:100]): # Limit to first 100 for clarity
 color = 'green' if df.iloc[i]['is_valid'] else 'red'
 ax.plot(trajectory[:, 0], trajectory[:, 1], trajectory[:, 2], color=color, alpha=0.5)
 ax.scatter(trajectory[0, 0], trajectory[0, 1], trajectory[0, 2], color=color, s=20)
 ax.scatter(trajectory[-1, 0], trajectory[-1, 1], trajectory[-1, 2], color=color, s=20, marker='s')

ax.set_xlabel('PCA 1')
ax.set_ylabel('PCA 2')
ax.set_zlabel('PCA 3')
ax.set_title('Reasoning Trajectories in 3D Embedding Space')
plt.tight_layout()
plt.savefig('3d_trajectories.png', dpi=300, bbox_inches='tight')
plt.close()

# 2. Trajectory Energy by Chain Index
plt.figure(figsize=(10, 6))
sns.scatterplot(x=df.index, y=trajectory_energies, hue=df['is_valid'], palette={True: 'green', False: 'red'})
plt.title('Trajectory Energy by Chain Index')
plt.xlabel('Chain Index')
plt.ylabel('Energy')
plt.legend(title='Is Valid')
plt.tight_layout()
plt.savefig('trajectory_energy.png', dpi=300, bbox_inches='tight')
plt.close()

In [None]:
# Energy Plot
plt.figure(figsize=(12, 6))
sns.scatterplot(x=df.index, y=trajectory_energies, hue=df['is_valid'], palette={True: 'green', False: 'red'})
plt.title('Trajectory Energy by Chain Index')
plt.xlabel('Chain Index')
plt.ylabel('Energy')
plt.legend(title='Is Valid')
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(12, 6))

# Define colors explicitly
colors = {'Valid': 'green', 'Invalid': 'red'}

# Create the histogram plot with explicit colors
sns.histplot(data=pd.DataFrame({'Energy': trajectory_energies, 'Is Valid': df['is_valid'].map({True: 'Valid', False: 'Invalid'})}),
 x='Energy', hue='Is Valid', element='step', stat='density', common_norm=False,
 palette=colors)

plt.title('Distribution of Trajectory Energies', fontsize=16)
plt.xlabel('Energy', fontsize=14)
plt.ylabel('Density', fontsize=14)

# Create a custom legend
handles = [plt.Rectangle((0,0),1,1, color=color) for color in colors.values()]
labels = list(colors.keys())
plt.legend(handles, labels, title='Trajectory Validity', title_fontsize='13', fontsize='12')

plt.tight_layout()
plt.savefig('energy_distribution_plot.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Distribution of Trajectory Magnitudes and Angles
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

sns.histplot(data=df, x='trajectory_magnitude', hue='is_valid', element='step', stat='density', common_norm=False, ax=ax1)
ax1.set_title('Distribution of Trajectory Magnitudes')
ax1.set_xlabel('Magnitude')
ax1.set_ylabel('Density')

sns.histplot(data=df, x='trajectory_angle', hue='is_valid', element='step', stat='density', common_norm=False, ax=ax2)
ax2.set_title('Distribution of Trajectory Angles')
ax2.set_xlabel('Angle (radians)')
ax2.set_ylabel('Density')

plt.tight_layout()
plt.savefig('magnitude_angle_distribution.png', dpi=300, bbox_inches='tight')
plt.close()

In [None]:
# Trajectory Magnitude vs Angle
plt.figure(figsize=(10, 8))
sns.scatterplot(data=df, x='trajectory_angle', y='trajectory_magnitude', hue='is_valid', alpha=0.6)
plt.title('Trajectory Magnitude vs Angle')
plt.xlabel('Angle (radians)')
plt.ylabel('Magnitude')
plt.legend(title='Is Valid')
plt.tight_layout()
plt.savefig('magnitude_vs_angle.png', dpi=300, bbox_inches='tight')
plt.close()

# 6. Trajectory Properties Comparison
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

sns.boxplot(x='is_valid', y='length', data=traj_properties, ax=ax1)
ax1.set_title('Trajectory Length')
ax1.set_xlabel('Is Valid')
ax1.set_ylabel('Length')

sns.boxplot(x='is_valid', y='smoothness', data=traj_properties, ax=ax2)
ax2.set_title('Trajectory Smoothness')
ax2.set_xlabel('Is Valid')
ax2.set_ylabel('Smoothness')

plt.tight_layout()
plt.savefig('trajectory_properties.png', dpi=300, bbox_inches='tight')
plt.close()

In [None]:
plt.figure(figsize=(12, 8))

# Define colors explicitly
colors = {'Valid': 'blue', 'Invalid': 'red'}

# Prepare the data
plot_data = df.copy()
plot_data['Validity'] = df['is_valid'].map({True: 'Valid', False: 'Invalid'})

# Create the scatter plot with explicit colors
sns.scatterplot(data=plot_data, x='trajectory_angle', y='trajectory_magnitude', hue='Validity',
 palette=colors, alpha=0.6)

plt.title('Trajectory Magnitude vs Angle', fontsize=16)
plt.xlabel('Angle (radians)', fontsize=14)
plt.ylabel('Magnitude', fontsize=14)

# Create custom legend handles
handles = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=color, markersize=10, alpha=0.6)
 for color in colors.values()]
labels = list(colors.keys())

# Add the legend with custom handles
plt.legend(handles, labels, title='Chain Validity', title_fontsize='13', fontsize='12')

plt.tight_layout()
plt.savefig('refined_magnitude_vs_angle_plot.png', dpi=300, bbox_inches='tight')
plt.show()

# Calculate and print statistical information
valid_data = df[df['is_valid']]
invalid_data = df[~df['is_valid']]

print("Statistical Information:")
print(f"Correlation between Angle and Magnitude (overall): {df['trajectory_angle'].corr(df['trajectory_magnitude']):.3f}")
print(f"Correlation for Valid Chains: {valid_data['trajectory_angle'].corr(valid_data['trajectory_magnitude']):.3f}")
print(f"Correlation for Invalid Chains: {invalid_data['trajectory_angle'].corr(invalid_data['trajectory_magnitude']):.3f}")

# Perform t-tests
t_stat_angle, p_value_angle = stats.ttest_ind(valid_data['trajectory_angle'], invalid_data['trajectory_angle'])
t_stat_mag, p_value_mag = stats.ttest_ind(valid_data['trajectory_magnitude'], invalid_data['trajectory_magnitude'])

print("\nT-test for difference in Trajectory Angle:")
print(f"t-statistic: {t_stat_angle:.4f}")
print(f"p-value: {p_value_angle:.4f}")

print("\nT-test for difference in Trajectory Magnitude:")
print(f"t-statistic: {t_stat_mag:.4f}")
print(f"p-value: {p_value_mag:.4f}")

# Calculate and print mean values
print("\nMean Values:")
print(f"Mean Angle for Valid Chains: {valid_data['trajectory_angle'].mean():.3f}")
print(f"Mean Angle for Invalid Chains: {invalid_data['trajectory_angle'].mean():.3f}")
print(f"Mean Magnitude for Valid Chains: {valid_data['trajectory_magnitude'].mean():.3f}")
print(f"Mean Magnitude for Invalid Chains: {invalid_data['trajectory_magnitude'].mean():.3f}")

In [None]:
# Statistical tests
valid_mag = df[df['is_valid']]['trajectory_magnitude']
invalid_mag = df[~df['is_valid']]['trajectory_magnitude']
mag_ttest = ttest_ind(valid_mag, invalid_mag)

valid_ang = df[df['is_valid']]['trajectory_angle']
invalid_ang = df[~df['is_valid']]['trajectory_angle']
ang_ttest = ttest_ind(valid_ang, invalid_ang)

print("T-test for trajectory magnitude:", mag_ttest)
print("T-test for trajectory angle:", ang_ttest)

# Correlation with energy
mag_energy_corr = df['trajectory_magnitude'].corr(df['H_energy'])
ang_energy_corr = df['trajectory_angle'].corr(df['H_energy'])

print("Correlation between magnitude and H energy:", mag_energy_corr)
print("Correlation between angle and H energy:", ang_energy_corr)

In [None]:
def calculate_curvature(trajectory):
 # Assuming trajectory has 3 points: start, middle, end

 a = np.linalg.norm(trajectory[0][1] - trajectory[0][0])
 b = np.linalg.norm(trajectory[0][2] - trajectory[0][1])
 c = np.linalg.norm(trajectory[0][2] - trajectory[0][0])

 s = (a + b + c) / 2
 area = np.sqrt(s * (s-a) * (s-b) * (s-c))

 return 4 * area / (a * b * c)

def calculate_rate_of_change(trajectory):
 # Calculate the rate of change between each pair of consecutive points
 changes = np.diff(trajectory, axis=0)
 rates = np.linalg.norm(changes, axis=1)
 return np.mean(rates)

# Calculate curvature and rate of change
curvatures = []
rates_of_change = []

for traj in trajectories_3d:
 curvatures.append(calculate_curvature(traj))
 rates_of_change.append(calculate_rate_of_change(traj))

# Add these to the dataframe
df['curvature'] = curvatures
df['rate_of_change'] = rates_of_change


plt.figure(figsize=(12, 6))

# Define colors explicitly
colors = {'Valid': 'blue', 'Invalid': 'red'}

# Prepare the data
plot_data = pd.DataFrame({
 'Curvature': df['curvature'],
 'Validity': df['is_valid'].map({True: 'Valid', False: 'Invalid'})
})

# Create the histogram plot with explicit colors
sns.histplot(data=plot_data, x='Curvature', hue='Validity',
 element='step', stat='density', common_norm=False,
 palette=colors)

plt.title('Distribution of Trajectory Curvatures', fontsize=16)
plt.xlabel('Curvature', fontsize=14)
plt.ylabel('Density', fontsize=14)

# Adjust legend
plt.legend(title='Chain Validity', title_fontsize='13', fontsize='12')

# Calculate mean curvatures for valid and invalid chains
mean_valid = df[df['is_valid']]['curvature'].mean()
mean_invalid = df[~df['is_valid']]['curvature'].mean()

# Add vertical lines for mean curvatures
plt.axvline(x=mean_valid, color='blue', linestyle='--', label='Mean Valid')
plt.axvline(x=mean_invalid, color='red', linestyle='--', label='Mean Invalid')

# Add text annotations for mean curvatures
plt.text(mean_valid, plt.gca().get_ylim()[1], f'Mean Valid: {mean_valid:.3f}',
 rotation=90, va='top', ha='right', color='blue')
plt.text(mean_invalid, plt.gca().get_ylim()[1], f'Mean Invalid: {mean_invalid:.3f}',
 rotation=90, va='top', ha='left', color='red')

plt.tight_layout()
plt.savefig('refined_curvature_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

# Calculate and print statistical information
valid_curv = df[df['is_valid']]['curvature']
invalid_curv = df[~df['is_valid']]['curvature']
t_stat, p_value = stats.ttest_ind(valid_curv, invalid_curv)

In [None]:
plt.figure(figsize=(12, 6))

# Define colors explicitly
colors = {'Valid': 'blue', 'Invalid': 'red'}

# Prepare the data
plot_data = pd.DataFrame({
 'Rate of Change': df['rate_of_change'],
 'Validity': df['is_valid'].map({True: 'Valid', False: 'Invalid'})
})

# Create the histogram plot with explicit colors
sns.histplot(data=plot_data, x='Rate of Change', hue='Validity',
 element='step', stat='density', common_norm=False,
 palette=colors)

plt.title('Distribution of Trajectory Rates of Change', fontsize=16)
plt.xlabel('Rate of Change', fontsize=14)
plt.ylabel('Density', fontsize=14)

# Create custom legend handles
handles = [plt.Rectangle((0,0),1,1, color=colors[label]) for label in colors]
labels = list(colors.keys())

# Add the legend with custom handles
plt.legend(handles, labels, title='Chain Validity', title_fontsize='13', fontsize='12')

plt.tight_layout()
plt.savefig('simplified_rate_of_change_distribution.png', dpi=300, bbox_inches='tight')
plt.show()

# Calculate and print statistical information
valid_roc = df[df['is_valid']]['rate_of_change']
invalid_roc = df[~df['is_valid']]['rate_of_change']
t_stat, p_value = stats.ttest_ind(valid_roc, invalid_roc)

mean_valid = valid_roc.mean()
mean_invalid = invalid_roc.mean()

print("Distribution of Trajectory Rates of Change")
print(f"Average Rate of Change for Valid Chains: {mean_valid:.3f}")
print(f"Average Rate of Change for Invalid Chains: {mean_invalid:.3f}")
print(f"Correlation between Rate of Change and Validity: {df['rate_of_change'].corr(df['is_valid']):.3f}")
print("\nT-test for difference in Rate of Change:")
print(f"t-statistic: {t_stat:.4f}")
print(f"p-value: {p_value:.4f}")

In [None]:
# Statistical tests
df['curvature'] = df['curvature'].fillna(0)
df['rate_of_change'] = df['rate_of_change'].astype(float)
valid_curv = df[df['is_valid']]['curvature']
invalid_curv = df[~df['is_valid']]['curvature']
curv_ttest = ttest_ind(valid_curv, invalid_curv)

valid_roc = df[df['is_valid']]['rate_of_change']
invalid_roc = df[~df['is_valid']]['rate_of_change']
roc_ttest = ttest_ind(valid_roc, invalid_roc)

print("T-test for trajectory curvature:", curv_ttest)
print("T-test for trajectory rate of change:", roc_ttest)

# Correlation with energy
curv_energy_corr = df['curvature'].corr(df['H_energy'])
roc_energy_corr = df['rate_of_change'].corr(df['H_energy'])

print("Correlation between curvature and energy:", curv_energy_corr)
print("Correlation between rate of change and energy:", roc_energy_corr)

In [None]:
# Frenet's framework
def reduce_dimensionality(trajectories, n_components=3):
 """Reduce dimensionality of trajectories using PCA"""
 flattened = np.vstack(trajectories)
 pca = PCA(n_components=n_components)
 reduced = pca.fit_transform(flattened)
 return reduced.reshape(len(trajectories), -1, n_components), pca

def frenet_serret_frame(trajectory):
 """Compute Frenet-Serret frame for a trajectory"""
 # Compute tangent vectors
 T = np.diff(trajectory, axis=0)
 T_norm = np.linalg.norm(T, axis=1, keepdims=True)
 T = np.divide(T, T_norm, where=T_norm!=0)

 # Compute normal vectors
 N = np.diff(T, axis=0)
 N_norm = np.linalg.norm(N, axis=1, keepdims=True)
 N = np.divide(N, N_norm, where=N_norm!=0)

 # Compute binormal vectors
 B = np.cross(T[:-1], N)

 return T[:-1], N, B

def compute_curvature_torsion(T, N, B):
 """Compute curvature and torsion from Frenet-Serret frame"""
 dT = np.diff(T, axis=0)
 curvature = np.linalg.norm(dT, axis=1)

 # Compute torsion
 dB = np.diff(B, axis=0)
 torsion = np.sum(dB * N[1:], axis=1)

 return np.mean(curvature), np.mean(torsion)

# Reduce dimensionality of trajectories
reduced_trajectories, pca = reduce_dimensionality(trajectories)

# Compute Frenet-Serret frames and curvature/torsion
curvatures = []
torsions = []
for i, traj in enumerate(reduced_trajectories):
 try:
 T, N, B = frenet_serret_frame(traj)
 curvature, torsion = compute_curvature_torsion(T, N, B)
 curvatures.append(curvature)
 torsions.append(torsion)
 except Exception as e:
 print(f"Error processing trajectory {i}: {str(e)}")
 print(f"Trajectory shape: {traj.shape}")
 curvatures.append(np.nan)
 torsions.append(np.nan)

df['curvature'] = curvatures
df['torsion'] = torsions

# Remove any NaN values
df = df.dropna(subset=['curvature', 'torsion'])


In [None]:
# Analyze the principal components
explained_variance_ratio = pca.explained_variance_ratio_
cumulative_variance_ratio = np.cumsum(explained_variance_ratio)

plt.figure(figsize=(10, 6))
plt.plot(range(1, len(explained_variance_ratio) + 1), cumulative_variance_ratio, 'bo-')
plt.xlabel('Number of Components', fontsize=14)
plt.ylabel('Cumulative Explained Variance Ratio', fontsize=14)
plt.title('Explained Variance Ratio by Principal Components', fontsize=16)
plt.savefig('pca_explained_variance.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"Explained variance ratio of first 3 components: {explained_variance_ratio[:3]}")
print(f"Cumulative explained variance ratio of first 3 components: {cumulative_variance_ratio[2]:.4f}")

In [None]:
# Compute and visualize Hamiltonian along trajectories

def hamiltonian(q, p, q_goal):
 """Hamiltonian function"""
 T = 0.5 * np.dot(p, p) # Kinetic energy
 V = sophisticated_potential(q, q_goal) # Potential energy
 return T + V

def sophisticated_potential(q, q_goal):
 """A more sophisticated potential energy function"""
 similarity = np.dot(q, q_goal) / (np.linalg.norm(q) * np.linalg.norm(q_goal))
 complexity = np.linalg.norm(q) # Assume more complex states have higher norm
 return -similarity + 0.1 * complexity # Balance between relevance and complexity

# Compute and visualize Hamiltonian along trajectories
hamiltonians = []
q_goal = np.mean([traj[-1] for traj in trajectories], axis=0) # Assuming the goal is the average final state

for traj in trajectories:
 H = []
 for i in range(len(traj)):
 q = traj[i]
 p = traj[i] - traj[i-1] if i > 0 else np.zeros_like(q) # Estimate momentum as the difference between states
 H.append(hamiltonian(q, p, q_goal))
 hamiltonians.append(H)

plt.figure(figsize=(12, 6))
for i, H in enumerate(hamiltonians[:20]): # Plot first 20 for clarity
 plt.plot(H, label=f'Trajectory {i+1}')
plt.title('Hamiltonian Evolution Along Reasoning Trajectories', fontsize=16)
plt.xlabel('Time Step', fontsize=16)
plt.ylabel('Hamiltonian',fontsize=16)
plt.legend()
plt.savefig('hamiltonian_evolution_plot.png', dpi=300, bbox_inches='tight')
plt.show()

# Statistical analysis
valid_curvature = df[df['is_valid']]['curvature']
invalid_curvature = df[~df['is_valid']]['curvature']
t_stat, p_value = stats.ttest_ind(valid_curvature, invalid_curvature)

print(f"T-test for curvature: t-statistic = {t_stat}, p-value = {p_value}")

# Correlation analysis
correlation = df['curvature'].corr(df['torsion'])
print(f"Correlation between curvature and torsion: {correlation}")



In [None]:
# 3D plot of trajectories
fig = plt.figure(figsize=(12,12))
ax = fig.add_subplot(111, projection='3d')

for i, traj in enumerate(trajectories_3d[:20]): # Plot first 20 for clarity
 color = 'green' if df.iloc[i]['is_valid'] else 'red'
 ax.plot(traj[:, 0], traj[:, 1], traj[:, 2], color=color, alpha=0.6)

ax.set_xlabel('PCA 1', fontsize=14)
ax.set_ylabel('PCA 2', fontsize=14)
ax.set_zlabel('PCA 3', fontsize=14)
ax.set_title('Reasoning Trajectories in PCA Space', fontsize=16)
# Add legend
ax.legend([valid_handle, invalid_handle], ['Valid', 'Invalid'], loc='upper right')
plt.savefig('pca_trajectories_plot.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Statistical Analysis

pca_means = np.array([traj.mean(axis=0) for traj in trajectories_3d])
X = pd.DataFrame(pca_means, columns=['PCA1', 'PCA2', 'PCA3'])
y = pd.Series(df['is_valid'].values, name='is_valid')

# Ensure 'is_valid' is boolean
y = y.astype(bool)

# Combine X and y into a single DataFrame
data = pd.concat([X, y], axis=1)

# 1. MANOVA test
manova = MANOVA.from_formula('PCA1 + PCA2 + PCA3 ~ is_valid', data=data)
print("MANOVA test results:")
print(manova.mv_test())

# 2. T-tests for each PCA dimension
for i in range(3):
 t_stat, p_value = stats.ttest_ind(X[f'PCA{i+1}'][y], X[f'PCA{i+1}'][~y])
 print(f"T-test for PCA{i+1}: t-statistic = {t_stat:.4f}, p-value = {p_value:.4f}")

# 3. Logistic Regression
log_reg = LogisticRegression()
log_reg.fit(X, y)
y_pred = log_reg.predict(X)
accuracy = accuracy_score(y, y_pred)
print(f"Logistic Regression Accuracy: {accuracy:.4f}")

# 4. Effect sizes (Cohen's d) for each PCA dimension
for i in range(3):
 cohens_d = (X[f'PCA{i+1}'][y].mean() - X[f'PCA{i+1}'][~y].mean()) / np.sqrt((X[f'PCA{i+1}'][y].var() + X[f'PCA{i+1}'][~y].var()) / 2)
 print(f"Cohen's d for PCA{i+1}: {cohens_d:.4f}")

# 5. Trajectory length comparison
trajectory_lengths = np.array([np.sum(np.sqrt(np.sum(np.diff(traj, axis=0)**2, axis=1))) for traj in trajectories_pca])
t_stat, p_value = stats.ttest_ind(trajectory_lengths[y], trajectory_lengths[~y])
print(f"T-test for trajectory lengths: t-statistic = {t_stat:.4f}, p-value = {p_value:.4f}")

In [None]:
# Correlation between trajectory complexity and validity
# Analyze trajectory complexity
def trajectory_complexity(traj):
 return np.sum(np.linalg.norm(np.diff(traj, axis=0), axis=1))

complexities = [trajectory_complexity(traj) for traj in reduced_trajectories]
df['complexity'] = complexities
complexity_correlation = stats.pointbiserialr(df['is_valid'], df['complexity'])
print(f"Correlation between trajectory complexity and validity: r = {complexity_correlation.correlation:.4f}, p = {complexity_correlation.pvalue:.4f}")

## Canonical transformations

In [None]:
def hamiltonian(state, t, k):
 """Simple harmonic oscillator Hamiltonian"""
 q, p = state
 return p**2 / 2 + k * q**2 / 2

def hamilton_equations(state, t, k):
 """Hamilton's equations for simple harmonic oscillator"""
 q, p = state
 dqdt = p
 dpdt = -k * q
 return [dqdt, dpdt]

def canonical_transform_to_action_angle(q, p, k):
 """Transform from (q,p) to action-angle variables (I, theta)"""
 I = (p**2 + k * q**2) / (2 * k)
 theta = np.arctan2(np.sqrt(k) * q, p)
 return I, theta

def inverse_canonical_transform(I, theta, k):
 """Transform from action-angle variables (I, theta) back to (q,p)"""
 q = np.sqrt(2 * I / k) * np.sin(theta)
 p = np.sqrt(2 * I * k) * np.cos(theta)
 return q, p

# Parameters
k = 1.0 # Spring constant
t = np.linspace(0, 10, 100)

# Apply canonical transformation to our trajectories
action_angle_trajectories = []
for traj in trajectories_pca:
 q, p = traj[:, 0], traj[:, 1] # Assuming first two PCs represent position and momentum
 I, theta = canonical_transform_to_action_angle(q, p, k)
 action_angle_trajectories.append(np.column_stack((I, theta)))


# Analysis
action_means_valid = [np.mean(traj[:, 0]) for traj, valid in zip(action_angle_trajectories, df['is_valid'].tolist()) if valid]
action_means_nonvalid = [np.mean(traj[:, 0]) for traj, valid in zip(action_angle_trajectories, df['is_valid'].tolist()) if not valid]
angle_ranges_valid = [np.ptp(traj[:, 1]) for traj, valid in zip(action_angle_trajectories, df['is_valid'].tolist()) if valid]
angle_ranges_nonvalid = [np.ptp(traj[:, 1]) for traj, valid in zip(action_angle_trajectories, df['is_valid'].tolist()) if not valid]

print(f"Mean action for valid chains: {np.mean(action_means_valid):.4f}")
print(f"Mean action for non-valid chains: {np.mean(action_means_nonvalid):.4f}")
print(f"Mean angle range for valid chains: {np.mean(angle_ranges_valid):.4f}")
print(f"Mean angle range for non-valid chains: {np.mean(angle_ranges_nonvalid):.4f}")

# Statistical tests
from scipy import stats

t_stat, p_value = stats.ttest_ind(action_means_valid, action_means_nonvalid)
print(f"T-test for action means: t-statistic = {t_stat:.4f}, p-value = {p_value:.4f}")

t_stat, p_value = stats.ttest_ind(angle_ranges_valid, angle_ranges_nonvalid)
print(f"T-test for angle ranges: t-statistic = {t_stat:.4f}, p-value = {p_value:.4f}")

# Classify trajectories based on action and angle properties
def classify_trajectory(action, angle_range, valid):
 high_action = np.mean(action_means_valid if valid else action_means_nonvalid) + np.std(action_means_valid if valid else action_means_nonvalid)
 low_action = np.mean(action_means_valid if valid else action_means_nonvalid) - np.std(action_means_valid if valid else action_means_nonvalid)
 high_angle_range = np.mean(angle_ranges_valid if valid else angle_ranges_nonvalid) + np.std(angle_ranges_valid if valid else angle_ranges_nonvalid)

 if action > high_action and angle_range > high_angle_range:
 return "High energy, complex reasoning"
 elif action < low_action and angle_range > high_angle_range:
 return "Low energy, exploratory reasoning"
 elif action > high_action and angle_range <= high_angle_range:
 return "High energy, focused reasoning"
 elif action < low_action and angle_range <= high_angle_range:
 return "Low energy, simple reasoning"
 else:
 return "Moderate reasoning"

In [None]:
# Plotting
fig = plt.figure(figsize=(15, 5))

# Original space
ax1 = fig.add_subplot(131)
for traj, valid in zip(trajectories_pca[:10], df['is_valid'].tolist()[:10]): # Plot first 10 for clarity
 color = 'green' if valid else 'red'
 ax1.plot(traj[:, 0], traj[:, 1], color=color, alpha=0.7)
ax1.set_xlabel('PC1 (q)', fontsize=12)
ax1.set_ylabel('PC2 (p)', fontsize=12)
ax1.set_title('Original Phase Space', fontsize=14)
ax1.legend([valid_handle, invalid_handle], ['Valid', 'Invalid'], loc='upper right', fontsize=12)

# Action-Angle space
ax2 = fig.add_subplot(132)
for traj, valid in zip(action_angle_trajectories[:10], df['is_valid'].tolist()[:10]):
 color = 'green' if valid else 'red'
 ax2.plot(traj[:, 0], traj[:, 1], color=color, alpha=0.7)
ax2.set_xlabel('Action (I)', fontsize=12)
ax2.set_ylabel('Angle (theta)', fontsize=12)
ax2.set_title('Action-Angle Space', fontsize=14)
ax2.legend([valid_handle, invalid_handle], ['Valid', 'Invalid'], loc='upper right', fontsize=12)

# 3D visualization
ax3 = fig.add_subplot(133, projection='3d')
for traj, valid in zip(action_angle_trajectories[:10], df['is_valid'].tolist()[:10]):
 color = 'green' if valid else 'red'
 ax3.plot(traj[:, 0], np.cos(traj[:, 1]), np.sin(traj[:, 1]), color=color, alpha=0.7)
ax3.set_xlabel('Action (I)', fontsize=12)
ax3.set_ylabel('cos(theta)', fontsize=12)
ax3.set_zlabel('sin(theta)', fontsize=12)
ax3.set_title('3D Action-Angle Space', fontsize=14)
ax3.legend([valid_handle, invalid_handle], ['Valid', 'Invalid'], loc='upper right', fontsize=12)

plt.tight_layout()
plt.savefig('canonical_transformation_analysis_with_validity.png', dpi=300, bbox_inches='tight')
plt.show()

## Conservation laws

In [None]:
def calculate_hamiltonian(q, p):
 """Simple Hamiltonian function"""
 return 0.5 * (q**2 + p**2)

def calculate_angular_momentum(q, p):
 """Angular momentum-like quantity"""
 return q * p

def calculate_energy_like_quantity(q, p):
 """Energy-like conserved quantity"""
 return q**2 - p**2

def analyze_conservation(trajectories, quantity_func, quantity_name):
 conserved_scores = []
 for traj in trajectories:
 q_start, q_end = traj[:, 0]
 p_start, p_end = traj[:, 1]
 quantity_start = quantity_func(q_start, p_start)
 quantity_end = quantity_func(q_end, p_end)
 change = abs(quantity_end - quantity_start)
 conserved_scores.append(change)
 return conserved_scores

# Analyze conservation for different quantities
hamiltonian_scores = analyze_conservation(trajectories_2d, calculate_hamiltonian, "Hamiltonian")
angular_momentum_scores = analyze_conservation(trajectories_2d, calculate_angular_momentum, "Angular Momentum")
energy_scores = analyze_conservation(trajectories_2d, calculate_energy_like_quantity, "Energy-like Quantity")

# Print some statistics
print("Hamiltonian changes - Mean: {:.4f}, Std: {:.4f}".format(np.mean(hamiltonian_scores), np.std(hamiltonian_scores)))
print("Angular Momentum changes - Mean: {:.4f}, Std: {:.4f}".format(np.mean(angular_momentum_scores), np.std(angular_momentum_scores)))
print("Energy-like Quantity changes - Mean: {:.4f}, Std: {:.4f}".format(np.mean(energy_scores), np.std(energy_scores)))

In [None]:
# Visualize conservation of quantities
plt.figure(figsize=(15, 5))

plt.subplot(131)
plt.hist(hamiltonian_scores, bins=20, color='blue', alpha=0.7)
plt.title("Conservation of Hamiltonian", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)

plt.subplot(132)
plt.hist(angular_momentum_scores, bins=20, color='green', alpha=0.7)
plt.title("Conservation of Angular Momentum", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)

plt.subplot(133)
plt.hist(energy_scores, bins=20, color='red', alpha=0.7)
plt.title("Conservation of Energy-like Quantity", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)

plt.tight_layout()
plt.savefig('conservation_laws_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# Calculate the overall range for x-axis
all_scores = np.concatenate([hamiltonian_scores, angular_momentum_scores, energy_scores])
min_score = np.min(all_scores)
max_score = np.max(all_scores)

# Create bins that cover the entire range
bins = np.linspace(min_score, max_score, 21) # 20 bins

# Compute histograms
h_hist, _ = np.histogram(hamiltonian_scores, bins=bins)
a_hist, _ = np.histogram(angular_momentum_scores, bins=bins)
e_hist, _ = np.histogram(energy_scores, bins=bins)

# Find the maximum frequency across all histograms
max_freq = max(np.max(h_hist), np.max(a_hist), np.max(e_hist))

plt.figure(figsize=(15, 5))

plt.subplot(131)
plt.hist(hamiltonian_scores, bins=bins, color='blue', alpha=0.7)
plt.title("Conservation of Hamiltonian", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.xlim(min_score, max_score)
plt.ylim(0, max_freq)

plt.subplot(132)
plt.hist(angular_momentum_scores, bins=bins, color='green', alpha=0.7)
plt.title("Conservation of Angular Momentum", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.xlim(min_score, max_score)
plt.ylim(0, max_freq)

plt.subplot(133)
plt.hist(energy_scores, bins=bins, color='red', alpha=0.7)
plt.title("Conservation of Energy-like Quantity", fontsize=16)
plt.xlabel("Standard Error", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.xlim(min_score, max_score)
plt.ylim(0, max_freq)

plt.tight_layout()
plt.savefig('conservation_laws_analysis_same_scales.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
def calculate_trajectory_entropy(trajectory):
 """Calculate the entropy of a trajectory."""
 # Discretize the trajectory into bins
 hist, _ = np.histogram(trajectory, bins=20, density=True)
 return entropy(hist)

def calculate_free_energy(trajectory, temperature=1.0):
 """Calculate a free energy analog for a trajectory."""
 # Assume energy is proportional to the squared distance from the origin
 energy = np.sum(trajectory**2, axis=1)
 entropy = calculate_trajectory_entropy(energy)
 return np.mean(energy) - temperature * entropy

# Apply to all trajectories
trajectory_entropies = [calculate_trajectory_entropy(traj) for traj in trajectories_2d]
free_energies = [calculate_free_energy(traj) for traj in trajectories_2d]

# Analyze the results
print("Mean trajectory entropy:", np.mean(trajectory_entropies))
print("Mean free energy:", np.mean(free_energies))

# Visualize the results
plt.figure(figsize=(12, 5))
plt.subplot(121)
plt.hist(trajectory_entropies, bins=20)
plt.title("Distribution of Trajectory Entropies", fontsize=16)
plt.xlabel("Entropy", fontsize=14)
plt.ylabel("Frequency", fontsize=14)

plt.subplot(122)
plt.hist(free_energies, bins=20)
plt.title("Distribution of Free Energies", fontsize=16)
plt.xlabel("Free Energy", fontsize=14)
plt.ylabel("Frequency", fontsize=14)
plt.tight_layout()
plt.show()

In [None]:
def measure_computation_time(trajectories, num_samples):
 """Measure computation time for different numbers of trajectories."""
 times = []
 sample_sizes = range(100, num_samples, 100)

 for size in sample_sizes:
 start_time = time.time()
 _ = [analyze_trajectory(traj) for traj in trajectories[:size]]
 end_time = time.time()
 times.append(end_time - start_time)

 return sample_sizes, times

def analyze_trajectory(trajectory):
 """Placeholder for your trajectory analysis function."""
 # Replace this with your actual analysis
 return calculate_hamiltonian(trajectory[:, 0], trajectory[:, 1])

# Measure computation time
sample_sizes, computation_times = measure_computation_time(trajectories_2d, len(trajectories_2d))


In [None]:
# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(sample_sizes, computation_times, 'b-')
plt.title("Computational Complexity", fontsize=16)
plt.xlabel("Number of Trajectories", fontsize=14)
plt.ylabel("Computation Time (seconds)", fontsize=14)
plt.grid(True)
plt.show()

In [None]:
# Estimate complexity
def complexity_function(x, a, b):
 return a * x**b

popt, _ = curve_fit(complexity_function, sample_sizes, computation_times)

print(f"Estimated complexity: O(n^{popt[1]:.2f})")

In [None]:
def classify_trajectory(trajectory):
 """Classify a trajectory as valid or invalid based on Hamiltonian conservation."""
 hamiltonian_change = np.abs(calculate_hamiltonian(trajectory[0, 0], trajectory[0, 1]) -
 calculate_hamiltonian(trajectory[-1, 0], trajectory[-1, 1]))
 return hamiltonian_change < 0.5 # Threshold for classification

# Split the data
X_train, X_test, y_train, y_test = train_test_split(trajectories_2d, df['is_valid'], test_size=0.2, random_state=42)

# Classify test set
y_pred = [classify_trajectory(traj) for traj in X_test]

# Analyze errors
conf_matrix = confusion_matrix(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print("Confusion Matrix:")
print(conf_matrix)
print("\nClassification Report:")
print(class_report)

# Analyze misclassified trajectories
misclassified = X_test[y_test != y_pred]
misclassified_labels = y_test[y_test != y_pred]

print("\nAnalysis of Misclassified Trajectories:")
for i, (traj, true_label) in enumerate(zip(misclassified, misclassified_labels)):
 hamiltonian_change = np.abs(calculate_hamiltonian(traj[0, 0], traj[0, 1]) -
 calculate_hamiltonian(traj[-1, 0], traj[-1, 1]))
 print(f"Trajectory {i}:")
 print(f" True label: {'Valid' if true_label else 'Invalid'}")
 print(f" Predicted: {'Valid' if classify_trajectory(traj) else 'Invalid'}")
 print(f" Hamiltonian change: {hamiltonian_change:.4f}")
 print(f" Start point: {traj[0]}")
 print(f" End point: {traj[-1]}")
 print()

# Visualize some misclassified trajectories
plt.figure(figsize=(15, 5))
for i in range(3):
 plt.subplot(1, 3, i+1)
 plt.plot(misclassified[i][:, 0], misclassified[i][:, 1], 'r-')
 plt.scatter(misclassified[i][0, 0], misclassified[i][0, 1], c='g', label='Start')
 plt.scatter(misclassified[i][-1, 0], misclassified[i][-1, 1], c='b', label='End')
 plt.title(f"Misclassified Trajectory {i+1}", fontsize=16)
 plt.xlabel("PC1", fontsize=14)
 plt.ylabel("PC2", fontsize=14)
 plt.legend()
plt.tight_layout()
plt.show()