WarBot / WarOnline_Chat.py
kertser's picture
Upload 4 files
92b9c00
raw
history blame
14.8 kB
# This is an quote and post library for a specific thread in the WarOnline forum.
import WarClient
import conversationDB
import requests
import re
from bs4 import BeautifulSoup
import urllib.request as urllib
import warnings
import time
import config # Here the constants are stored
warnings.filterwarnings("ignore")
# Start a session to persist the login cookie across requests
session = requests.Session()
def fixString(S):
# This is a helper function to overcome the bugs of tokenizer
S = S.replace(",+", ",")
S = S.replace("!.", "!")
S = S.replace(".?", "?")
S = S.replace(",,", ",")
S = S.replace("?.", "?")
S = S.replace("??", "?")
S = S.replace(" ?", "?")
S = S.replace(" .", ".")
S = S.replace(",!", "!")
S = S.replace(",.", ",")
S = S.replace(".]", ".")
S = S.replace(",\)", ")")
S = S.replace("&", "")
S = S.replace("&", "")
S = S.replace("ен,ицхак", "ен-ицхак")
S = S.replace("СШа", "США")
S = S.replace("(,", "(")
S = S.replace("?.", "?")
S = S.replace("#", "")
S = S.replace("()", "")
S = S.strip(',')
S = S.strip()
return S
def compare_pages(url1, url2):
#Compares 2 pages and returns True if they are the same
return urllib.urlopen(url1).geturl() == urllib.urlopen(url2).geturl()
def remove_non_english_russian_chars(s):
# Regular expression to match all characters that are not in English or Russian
pattern = '[^A-Za-zА-Яа-яЁё(),.!?"\s-]'
# Replace all matched characters with an empty string
return re.sub(pattern, '', s)
def remove_extra_spaces(s):
# Removes extra whitespaces and unwanted characters
s = re.sub(r"\s+", " ", s) # replace all sequences of whitespace with a single space
s = re.sub(r"\s+([.,-])", r"\1", s) # remove spaces before period, dash or comma
return(s)
def getLastPage(thread_url=config.thread_url):
# Returns the number of the last page
print('looking for the last page of the thread')
page = 1 # Starting page
lastPage = False
while not lastPage:
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)):
page += 1
else:
lastPage = True
print('Last page of this thread is '+str(page))
return page
def login(username=config.username, password=config.password, thread_url=config.thread_url):
# Log-In to the forum and redirect to thread
# Retrieve the login page HTML to get the CSRF token
login_page_response = session.get(config.login_url)
soup = BeautifulSoup(login_page_response.text, 'html.parser')
csrf_token = soup.find('input', {'name': '_xfToken'})['value']
# Login to the website
login_data = {
'login': username,
'password': password,
'remember': '1',
'_xfRedirect': thread_url,
'_xfToken': csrf_token
}
response = session.post(config.login_url, data=login_data)
# Check if the login was successful
if 'Invalid login' in response.text:
print('Login failed!')
exit()
else:
print('Login successful')
def post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by="",quote_text="",quote_source="",img_url=""):
#Post a message to the forum (with or without the quote
#quote_source is in format 'post-3920992'
quote_source = quote_source.split('-')[-1] # Take the numbers only
if quoted_by:
if img_url: # It is an image
message = f'Примерно вот так: \n[IMG]{img_url}[/IMG]' # Set the image block
message = f'[QUOTE="{quoted_by}, post: {quote_source}"]{quote_text}[/QUOTE]{message}'
# Retrieve the thread page HTML
response = session.get(thread_url)
# Parse the HTML with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Extract the _xfToken value from the hidden form field
xf_token = soup.find('input', {'name': '_xfToken'}).get('value')
# Construct the message data for the POST request
message_data = {
'_xfToken': xf_token,
'message': message,
'attachment_hash': '',
'last_date': '',
'_xfRequestUri': post_url,
'_xfWithData': '1',
'_xfResponseType': 'json'
}
response = session.post(post_url, data=message_data)
# Check if the post was successful
if not response.ok:
print('Post failed!')
exit()
print('Post submitted successfully.')
def getMessages(thread_url=config.thread_url, quotedUser="", startingPage=1):
# Returns all the quotes for #username in the specific multi-page thread url
allquotes =[]
page = startingPage # Counter
lastPage = False
# Initial values for messangerName and the message ID
messengerName = ""
messageID = ""
quotedID = ""
# Patterns to search in the last quote.
namePattern = re.compile('data-lb-caption-desc="(.*?) ·')
messageIDPattern = re.compile('data-lb-id="(.*?)"')
quotedIDPattern = re.compile('data-source="(.*?)"')
quotedNamePattern = re.compile('data-quote="(.*?)"')
while not lastPage:
response = requests.get(thread_url + 'page-' + str(page))
if response.status_code == 200:
# Core of the function
html_content = response.content
# Parse the HTML content using BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Find all the message in the thread page
messageData = soup.find_all('div', {'class': 'message-userContent lbContainer js-lbContainer'})
for data in messageData:
try:
# Get the messager username
matchName = namePattern.search(str(data))
if matchName:
messengerName = matchName.group(1)
# Get the quoted ID
matchID = quotedIDPattern.search(str(data))
if matchID:
quotedID = matchID.group(1)
# Get the message ID
matchID = messageIDPattern.search(str(data))
if matchID:
messageID = matchID.group(1)
# Match the QuotedName
matchQuotedName = quotedNamePattern.search(str(data))
if matchQuotedName:
quotedName = matchQuotedName.group(1)
if quotedUser and (quotedUser != quotedName):
continue
# Make sure that the messages have a quote inside
blockquote = data.find('blockquote')
if blockquote:
# Extract the text
text = data.find('div', {'class': 'bbWrapper'})
for bq in text.find_all('blockquote'):
bq.extract()
reply = text.get_text().replace('\n', ' ').strip()
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': quotedID})
else: # Looking for a direct message "@WarBot"
text = data.find('div', {'class': 'bbWrapper'})
if text.get_text().startswith('@WarBot'):
reply = text.get_text().replace('@WarBot','').replace('\n', ' ').strip()
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': 'post: 0'})
except:
continue # There was no text in this quote, move to the next
#check if that is not a last page
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)):
page += 1
else:
lastPage = True
else:
lastPage = True
return allquotes
def WarOnlineBot():
# Core Engine of the Client
try: # Try logging in
login(username=config.username, password=config.password, thread_url=config.thread_url)
lookUpPages = 5 # How many pages back to look in the thread
startingPage = getLastPage(thread_url=config.thread_url) - lookUpPages
if startingPage < 1:
startingPage = 1 # Starting page cannot be less than 1
# All messages (with quotes) by ALL users:
allMessages = getMessages(thread_url=config.thread_url, quotedUser='', startingPage=startingPage)
# IDs of the quoted messages, replied by the bot:
messages_by_bot_IDs = []
# Initiate the direct messages
direct_messages = []
for msg in allMessages:
# Direct message to the bot
if msg['quotedID'].split(': ')[-1] == '0': #debug
direct_messages.append(msg)
# Set a list of replied messages IDs
if msg['messengerName'] == config.username: #message posted by the WarBot
messages_by_bot_IDs.append(msg['quotedID'].split(': ')[-1])
# remove empty and repeated elements
messages_by_bot_IDs = list(set([elem for elem in messages_by_bot_IDs if elem]))
# All messages (with quotes) sent _FOR_ the Bot:
messagesForBot = getMessages(thread_url=config.thread_url, quotedUser=config.username, startingPage=startingPage)
# Append the direct messages to the messagesForBot:
for msg in direct_messages:
messagesForBot.append(msg)
# IDs of the messages, quoting the bot:
messages_for_bot_IDs = []
for msg in messagesForBot:
# Set a list of posted message IDs
messages_for_bot_IDs.append(msg['messageID'].split('-')[-1])
# remove empty elements
messages_for_bot_IDs = [elem for elem in messages_for_bot_IDs if elem]
# Filter to leave just the unanswered messages IDs:
messages_for_bot_IDs = [ID for ID in messages_for_bot_IDs if ID not in messages_by_bot_IDs]
# Reply the unanswered messages:
for msg in messagesForBot:
if msg['messageID'].split('-')[-1] in messages_for_bot_IDs:
originalQuote = msg['reply']
if originalQuote == "": # Just images, no text
continue
else:
quote = remove_non_english_russian_chars(msg['reply'])
quote = remove_extra_spaces(quote)
message = "" #Initiating the reply message by Bot
previous_dialogue = "" #Initiating the previous dialogue
print('Quote: ', originalQuote)
# Init Connection
db = conversationDB.DataBase()
if msg['quotedID'].split(': ')[-1] != '0': # It is dialogue. Look-up for the previous quotes
# Get the previous dialogue from the database
dbmessages = db.getmessages(msg['messengerName'])
for dbmessage in dbmessages:
previous_dialogue += dbmessage[0]+' '+dbmessage[1]+' '
# Update the string and preprocess it
quote = previous_dialogue + quote
quote = remove_non_english_russian_chars(quote)
quote = remove_extra_spaces(quote)
# Truncate the quote to return only the last MaxWords of words:
quote = " ".join(quote.split()[-config.MaxWords:])
# Fix the quote string, to eliminate errors:
quote = fixString(quote)
FailureCounter = 0 # In case there is a bug in the model
while (not message) and (FailureCounter<3):
message = WarClient.getReply(message=quote)
# Strange error in message if there is '02' in the message text.
if '02' in message:
message = ""
FailureCounter+=1
if FailureCounter == 3:
continue # Skip that answer
# Post-processing fixes:
message = fixString(message)
print('Reply: ', message)
if message.endswith('.png'): # It is an image reply:
# Post an image reply:
login(username=config.username, password=config.password, thread_url=config.thread_url)
time.sleep(1)
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'],
quote_text=originalQuote, quote_source=msg['messageID'],
img_url=message)
# will not be added to the database, if image is a reply
else:
# Add the new conversation pair to the database
db.setmessages(username=msg['messengerName'], message_text=originalQuote, bot_reply=message)
# Clean up the excessive records, leaving only the remaining messages
db.cleanup(username=msg['messengerName'], remaining_messages=config.remaining_messages)
# Delete the duplicate records
db.deleteDuplicates()
login(username=config.username, password=config.password, thread_url=config.thread_url)
time.sleep(1)
post(message=message, thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID'])
time.sleep(10) # Standby time for server load release
return 0 # All is good
except:
print('Bad Connection')
return -1 # Error
if __name__ == '__main__':
# Start the scheduler
while True:
print('Starting Session')
result = WarOnlineBot()
# Debug Only:
#imgWord = 'как выглядит'
"""
login(username=config.username, password=config.password, thread_url=config.thread_url)
print("logged in")
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by='Test',
quote_text="posting an image",img_url='https://replicate.delivery/pbxt/knKBiJt8DPZ0B1o25PaLJSZjgv3D5HcwLoBIn0JESbe3nISIA/out-0.png')
"""
if result == 0: # Good result
print('Session finished. Timeout...')
timer = range(60 * config.timeout)
for t in timer:
time.sleep(1)
else:
# Rerunning Session
time.sleep(10) # Standby time for server load release