|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import Dataset, DataLoader |
|
from torch.nn.utils.rnn import pad_sequence |
|
import nltk |
|
from nltk.tokenize import word_tokenize |
|
from collections import Counter |
|
|
|
|
|
nltk.download('punkt') |
|
|
|
|
|
def load_text_data(file_path): |
|
with open(file_path, 'r') as file: |
|
data = file.readlines() |
|
return [line.strip() for line in data] |
|
|
|
|
|
file_path = 'data.txt' |
|
sentences = load_text_data(file_path) |
|
|
|
|
|
def tokenize(text): |
|
return word_tokenize(text.lower()) |
|
|
|
|
|
def build_vocab(sentences): |
|
tokens = [token for sentence in sentences for token in tokenize(sentence)] |
|
vocab = {word: i for i, (word, _) in enumerate(Counter(tokens).items())} |
|
vocab['<unk>'] = len(vocab) |
|
vocab['<pad>'] = len(vocab) |
|
return vocab |
|
|
|
vocab = build_vocab(sentences) |
|
vocab_size = len(vocab) |
|
print(f"Vocabulary size: {vocab_size}") |
|
|
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, sentences, vocab): |
|
self.data = [] |
|
self.vocab = vocab |
|
for sentence in sentences: |
|
tokens = tokenize(sentence) |
|
indices = [vocab.get(token, vocab['<unk>']) for token in tokens] |
|
for i in range(len(indices) - 1): |
|
self.data.append((indices[:i+1], indices[i+1])) |
|
|
|
def __len__(self): |
|
return len(self.data) |
|
|
|
def __getitem__(self, idx): |
|
inputs, target = self.data[idx] |
|
return torch.tensor(inputs, dtype=torch.long), torch.tensor(target, dtype=torch.long) |
|
|
|
dataset = TextDataset(sentences, vocab) |
|
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda x: (pad_sequence([i[0] for i in x], batch_first=True), |
|
torch.stack([i[1] for i in x]))) |
|
|
|
|
|
class LSTMModel(nn.Module): |
|
def __init__(self, vocab_size, embed_size, hidden_size): |
|
super(LSTMModel, self).__init__() |
|
self.embedding = nn.Embedding(vocab_size, embed_size) |
|
self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True) |
|
self.fc = nn.Linear(hidden_size, vocab_size) |
|
|
|
def forward(self, x): |
|
x = self.embedding(x) |
|
lstm_out, _ = self.lstm(x) |
|
out = self.fc(lstm_out[:, -1, :]) |
|
return out |
|
|
|
|
|
embed_size = 64 |
|
hidden_size = 256 |
|
model = LSTMModel(vocab_size, embed_size, hidden_size) |
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.Adam(model.parameters(), lr=0.01) |
|
|
|
|
|
num_epochs = 20 |
|
for epoch in range(num_epochs): |
|
for inputs, targets in dataloader: |
|
optimizer.zero_grad() |
|
outputs = model(inputs) |
|
loss = criterion(outputs, targets) |
|
loss.backward() |
|
optimizer.step() |
|
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') |
|
|
|
|
|
def predict_next_word(model, tokenizer, vocab, text): |
|
model.eval() |
|
tokens = tokenize(text) |
|
indices = [vocab.get(token, vocab['<unk>']) for token in tokens] |
|
inputs = torch.tensor(indices).unsqueeze(0) |
|
with torch.no_grad(): |
|
outputs = model(inputs) |
|
predicted_index = torch.argmax(outputs, dim=1).item() |
|
predicted_word = [word for word, index in vocab.items() if index == predicted_index][0] |
|
return predicted_word |
|
|
|
|
|
text = "Quantum mechanics is" |
|
next_word = predict_next_word(model, tokenize, vocab, text) |
|
print(f"Predicted next word: {next_word}") |
|
|