|
import pickle |
|
import os |
|
import praw |
|
import torch |
|
from transformers import RobertaTokenizer, RobertaForSequenceClassification |
|
import nltk |
|
from nltk.stem.porter import PorterStemmer |
|
from nltk.corpus import stopwords |
|
import spacy |
|
import string |
|
import matplotlib.pyplot as plt |
|
from wordcloud import WordCloud |
|
import re |
|
|
|
|
|
def save_data(data, filename): |
|
with open(filename, 'wb') as file: |
|
pickle.dump(data, file) |
|
|
|
|
|
def load_data(filename): |
|
if os.path.exists(filename): |
|
with open(filename, 'rb') as file: |
|
return pickle.load(file) |
|
else: |
|
return None |
|
|
|
|
|
|
|
REDDIT_CLIENT_ID = os.environ['client_id'] |
|
REDDIT_CLIENT_SECRET = os.environ['secret_key'] |
|
REDDIT_USERNAME = os.environ['username'] |
|
|
|
|
|
reddit = praw.Reddit( |
|
client_id=REDDIT_CLIENT_ID, |
|
client_secret=REDDIT_CLIENT_SECRET, |
|
user_agent=f"script:sentiment-analysis:v0.0.1 (by {REDDIT_USERNAME})" |
|
) |
|
|
|
|
|
stemmer = PorterStemmer() |
|
nlp = spacy.load("en_core_web_sm") |
|
nltk.download('punkt') |
|
nltk.download('stopwords') |
|
|
|
|
|
|
|
tokenizer = RobertaTokenizer.from_pretrained('aychang/roberta-base-imdb') |
|
model = RobertaForSequenceClassification.from_pretrained( |
|
'aychang/roberta-base-imdb', num_labels=2) |
|
model.classifier = torch.nn.Linear(768, 2) |
|
|
|
|
|
def get_sentiment(query): |
|
|
|
filename = f"data/sentiment_analysis/{query}_results.pkl" |
|
saved_data = load_data(filename) |
|
|
|
if saved_data: |
|
|
|
positive, negative, _ = saved_data |
|
wordcloud = f'static/images/wordcloud/{query}_cloud.png' |
|
return positive, negative, wordcloud |
|
else: |
|
|
|
results = get_reddit_results(query) |
|
if not results: |
|
|
|
error = "No results found for query" |
|
return error |
|
|
|
positive, negative, wordcloud = analyze_comments( |
|
results, query=query) |
|
print(f'positive:{positive}') |
|
save_data((positive, negative, wordcloud), filename) |
|
return positive, negative, f'static/images/wordcloud/{query}_cloud.png' |
|
|
|
|
|
def get_reddit_results(query): |
|
|
|
try: |
|
sub = reddit.subreddit('noveltranslations+progressionfantasy') |
|
results = sub.search(query, limit=1) |
|
|
|
|
|
results_list = list(results) |
|
|
|
if results_list: |
|
|
|
return results_list |
|
else: |
|
print("No results found for query.") |
|
return [] |
|
except Exception as e: |
|
print(f"Error occurred: {e}") |
|
return [] |
|
|
|
|
|
|
|
def transform_text(text): |
|
url_pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+') |
|
text = url_pattern.sub('', text) |
|
|
|
text = text.lower() |
|
text = nltk.word_tokenize(text) |
|
|
|
text = [i for i in text if i.isalnum()] |
|
|
|
stopwords_set = set(stopwords.words('english')) |
|
text = [i for i in text if i not in stopwords_set and i not in string.punctuation] |
|
|
|
|
|
text = [stemmer.stem(i) for i in text] |
|
|
|
return ' '.join(text) |
|
|
|
|
|
def tokenize(text): |
|
|
|
doc = nlp(text) |
|
return [token.text for token in doc] |
|
|
|
|
|
def analyze_comments(results, query): |
|
total_positive = 0 |
|
total_negative = 0 |
|
total_comments = 0 |
|
comments_for_cloud = [] |
|
|
|
for submission in results: |
|
|
|
submission.comments.replace_more(limit=None) |
|
all_comments = submission.comments.list() |
|
|
|
for comment in all_comments: |
|
|
|
comment_body = comment.body |
|
|
|
text = transform_text(comment_body) |
|
|
|
comments_for_cloud.append(comment_body) |
|
|
|
if text: |
|
|
|
tokens = tokenize(text) |
|
|
|
tokenized_input = tokenizer( |
|
tokens, return_tensors='pt', truncation=True, padding=True) |
|
|
|
outputs = model(**tokenized_input) |
|
|
|
probabilities = torch.softmax(outputs.logits, dim=-1) |
|
mean_probabilities = probabilities.mean(dim=1) |
|
|
|
positive_pct = mean_probabilities[0][1].item() * 100 |
|
negative_pct = mean_probabilities[0][0].item() * 100 |
|
|
|
total_positive += positive_pct |
|
total_negative += negative_pct |
|
total_comments += 1 |
|
|
|
if total_comments > 0: |
|
avg_positive = total_positive / total_comments |
|
avg_negative = total_negative / total_comments |
|
else: |
|
avg_positive = 0 |
|
avg_negative = 0 |
|
|
|
if total_comments > 0: |
|
all_comments_string = ' '.join(comments_for_cloud) |
|
|
|
wordcloud = WordCloud(width=400, height=400, |
|
background_color='white', |
|
max_words=30, |
|
stopwords=stopwords.words('english'), |
|
min_font_size=10).generate(all_comments_string) |
|
|
|
wordcloud.to_file( |
|
f'static/images/wordcloud/{query}_cloud.png') |
|
else: |
|
wordcloud = None |
|
print(f'positive:{avg_positive}') |
|
return round(avg_positive), round(avg_negative), wordcloud |
|
|