|
|
|
|
|
import WarClient |
|
import conversationDB |
|
import requests |
|
import re |
|
from bs4 import BeautifulSoup |
|
import urllib.request as urllib |
|
import warnings |
|
import time |
|
import config |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
session = requests.Session() |
|
|
|
def fixString(S): |
|
|
|
S = S.replace(",+", ",") |
|
S = S.replace("!.", "!") |
|
S = S.replace(".?", "?") |
|
S = S.replace(",,", ",") |
|
S = S.replace("?.", "?") |
|
S = S.replace("??", "?") |
|
S = S.replace(" ?", "?") |
|
S = S.replace(" .", ".") |
|
S = S.replace(",!", "!") |
|
S = S.replace(",.", ",") |
|
S = S.replace(".]", ".") |
|
S = S.replace(",\)", ")") |
|
S = S.replace("&", "") |
|
S = S.replace("&", "") |
|
S = S.replace("ен,ицхак", "ен-ицхак") |
|
S = S.replace("СШа", "США") |
|
S = S.replace("(,", "(") |
|
S = S.replace("?.", "?") |
|
S = S.replace("#", "") |
|
S = S.replace("()", "") |
|
S = S.strip(',') |
|
S = S.strip() |
|
return S |
|
|
|
def compare_pages(url1, url2): |
|
|
|
return urllib.urlopen(url1).geturl() == urllib.urlopen(url2).geturl() |
|
|
|
def remove_non_english_russian_chars(s): |
|
|
|
pattern = '[^A-Za-zА-Яа-яЁё(),.!?"\s-]' |
|
|
|
return re.sub(pattern, '', s) |
|
|
|
def remove_extra_spaces(s): |
|
|
|
s = re.sub(r"\s+", " ", s) |
|
s = re.sub(r"\s+([.,-])", r"\1", s) |
|
return(s) |
|
|
|
def getLastPage(thread_url=config.thread_url): |
|
|
|
print('looking for the last page of the thread') |
|
page = 1 |
|
lastPage = False |
|
|
|
while not lastPage: |
|
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)): |
|
page += 1 |
|
else: |
|
lastPage = True |
|
print('Last page of this thread is '+str(page)) |
|
return page |
|
|
|
def login(username=config.username, password=config.password, thread_url=config.thread_url): |
|
|
|
|
|
|
|
login_page_response = session.get(config.login_url) |
|
soup = BeautifulSoup(login_page_response.text, 'html.parser') |
|
csrf_token = soup.find('input', {'name': '_xfToken'})['value'] |
|
|
|
|
|
login_data = { |
|
'login': username, |
|
'password': password, |
|
'remember': '1', |
|
'_xfRedirect': thread_url, |
|
'_xfToken': csrf_token |
|
} |
|
response = session.post(config.login_url, data=login_data) |
|
|
|
|
|
if 'Invalid login' in response.text: |
|
print('Login failed!') |
|
exit() |
|
else: |
|
print('Login successful') |
|
|
|
def post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by="",quote_text="",quote_source="",img_url=""): |
|
|
|
|
|
quote_source = quote_source.split('-')[-1] |
|
|
|
if quoted_by: |
|
if img_url: |
|
message = f'Примерно вот так: \n[IMG]{img_url}[/IMG]' |
|
message = f'[QUOTE="{quoted_by}, post: {quote_source}"]{quote_text}[/QUOTE]{message}' |
|
|
|
|
|
response = session.get(thread_url) |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
xf_token = soup.find('input', {'name': '_xfToken'}).get('value') |
|
|
|
|
|
message_data = { |
|
'_xfToken': xf_token, |
|
'message': message, |
|
'attachment_hash': '', |
|
'last_date': '', |
|
'_xfRequestUri': post_url, |
|
'_xfWithData': '1', |
|
'_xfResponseType': 'json' |
|
} |
|
|
|
response = session.post(post_url, data=message_data) |
|
|
|
|
|
if not response.ok: |
|
print('Post failed!') |
|
exit() |
|
|
|
print('Post submitted successfully.') |
|
|
|
def getMessages(thread_url=config.thread_url, quotedUser="", startingPage=1): |
|
|
|
allquotes =[] |
|
|
|
page = startingPage |
|
lastPage = False |
|
|
|
|
|
messengerName = "" |
|
messageID = "" |
|
quotedID = "" |
|
|
|
|
|
namePattern = re.compile('data-lb-caption-desc="(.*?) ·') |
|
messageIDPattern = re.compile('data-lb-id="(.*?)"') |
|
quotedIDPattern = re.compile('data-source="(.*?)"') |
|
quotedNamePattern = re.compile('data-quote="(.*?)"') |
|
|
|
while not lastPage: |
|
response = requests.get(thread_url + 'page-' + str(page)) |
|
if response.status_code == 200: |
|
|
|
|
|
html_content = response.content |
|
|
|
|
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
|
|
|
|
messageData = soup.find_all('div', {'class': 'message-userContent lbContainer js-lbContainer'}) |
|
|
|
for data in messageData: |
|
try: |
|
|
|
matchName = namePattern.search(str(data)) |
|
if matchName: |
|
messengerName = matchName.group(1) |
|
|
|
|
|
matchID = quotedIDPattern.search(str(data)) |
|
if matchID: |
|
quotedID = matchID.group(1) |
|
|
|
|
|
matchID = messageIDPattern.search(str(data)) |
|
if matchID: |
|
messageID = matchID.group(1) |
|
|
|
|
|
matchQuotedName = quotedNamePattern.search(str(data)) |
|
if matchQuotedName: |
|
quotedName = matchQuotedName.group(1) |
|
if quotedUser and (quotedUser != quotedName): |
|
continue |
|
|
|
|
|
blockquote = data.find('blockquote') |
|
if blockquote: |
|
|
|
text = data.find('div', {'class': 'bbWrapper'}) |
|
|
|
for bq in text.find_all('blockquote'): |
|
bq.extract() |
|
reply = text.get_text().replace('\n', ' ').strip() |
|
|
|
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': quotedID}) |
|
|
|
else: |
|
text = data.find('div', {'class': 'bbWrapper'}) |
|
if text.get_text().startswith('@WarBot'): |
|
reply = text.get_text().replace('@WarBot','').replace('\n', ' ').strip() |
|
allquotes.append({'reply': reply, 'messengerName': messengerName, 'messageID': messageID, 'quotedID': 'post: 0'}) |
|
|
|
except: |
|
continue |
|
|
|
|
|
if not compare_pages(thread_url + 'page-' + str(page), thread_url + 'page-' + str(page + 1)): |
|
page += 1 |
|
else: |
|
lastPage = True |
|
else: |
|
lastPage = True |
|
|
|
return allquotes |
|
|
|
def WarOnlineBot(): |
|
|
|
|
|
try: |
|
login(username=config.username, password=config.password, thread_url=config.thread_url) |
|
|
|
lookUpPages = 5 |
|
startingPage = getLastPage(thread_url=config.thread_url) - lookUpPages |
|
if startingPage < 1: |
|
startingPage = 1 |
|
|
|
|
|
allMessages = getMessages(thread_url=config.thread_url, quotedUser='', startingPage=startingPage) |
|
|
|
|
|
messages_by_bot_IDs = [] |
|
|
|
|
|
direct_messages = [] |
|
|
|
for msg in allMessages: |
|
|
|
if msg['quotedID'].split(': ')[-1] == '0': |
|
direct_messages.append(msg) |
|
|
|
if msg['messengerName'] == config.username: |
|
messages_by_bot_IDs.append(msg['quotedID'].split(': ')[-1]) |
|
|
|
messages_by_bot_IDs = list(set([elem for elem in messages_by_bot_IDs if elem])) |
|
|
|
|
|
messagesForBot = getMessages(thread_url=config.thread_url, quotedUser=config.username, startingPage=startingPage) |
|
|
|
|
|
for msg in direct_messages: |
|
messagesForBot.append(msg) |
|
|
|
|
|
messages_for_bot_IDs = [] |
|
|
|
for msg in messagesForBot: |
|
|
|
messages_for_bot_IDs.append(msg['messageID'].split('-')[-1]) |
|
|
|
messages_for_bot_IDs = [elem for elem in messages_for_bot_IDs if elem] |
|
|
|
|
|
messages_for_bot_IDs = [ID for ID in messages_for_bot_IDs if ID not in messages_by_bot_IDs] |
|
|
|
|
|
|
|
for msg in messagesForBot: |
|
if msg['messageID'].split('-')[-1] in messages_for_bot_IDs: |
|
|
|
originalQuote = msg['reply'] |
|
if originalQuote == "": |
|
continue |
|
else: |
|
quote = remove_non_english_russian_chars(msg['reply']) |
|
quote = remove_extra_spaces(quote) |
|
|
|
message = "" |
|
previous_dialogue = "" |
|
|
|
print('Quote: ', originalQuote) |
|
|
|
|
|
db = conversationDB.DataBase() |
|
|
|
if msg['quotedID'].split(': ')[-1] != '0': |
|
|
|
|
|
dbmessages = db.getmessages(msg['messengerName']) |
|
for dbmessage in dbmessages: |
|
previous_dialogue += dbmessage[0]+' '+dbmessage[1]+' ' |
|
|
|
quote = previous_dialogue + quote |
|
quote = remove_non_english_russian_chars(quote) |
|
quote = remove_extra_spaces(quote) |
|
|
|
quote = " ".join(quote.split()[-config.MaxWords:]) |
|
|
|
|
|
quote = fixString(quote) |
|
|
|
FailureCounter = 0 |
|
while (not message) and (FailureCounter<3): |
|
message = WarClient.getReply(message=quote) |
|
|
|
if '02' in message: |
|
message = "" |
|
FailureCounter+=1 |
|
|
|
if FailureCounter == 3: |
|
continue |
|
|
|
|
|
message = fixString(message) |
|
print('Reply: ', message) |
|
|
|
if message.endswith('.png'): |
|
|
|
login(username=config.username, password=config.password, thread_url=config.thread_url) |
|
time.sleep(1) |
|
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], |
|
quote_text=originalQuote, quote_source=msg['messageID'], |
|
img_url=message) |
|
|
|
|
|
else: |
|
|
|
|
|
db.setmessages(username=msg['messengerName'], message_text=originalQuote, bot_reply=message) |
|
|
|
db.cleanup(username=msg['messengerName'], remaining_messages=config.remaining_messages) |
|
|
|
db.deleteDuplicates() |
|
|
|
login(username=config.username, password=config.password, thread_url=config.thread_url) |
|
time.sleep(1) |
|
post(message=message, thread_url=config.thread_url, post_url=config.post_url, quoted_by=msg['messengerName'], quote_text=originalQuote, quote_source=msg['messageID']) |
|
|
|
time.sleep(10) |
|
return 0 |
|
except: |
|
print('Bad Connection') |
|
return -1 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
|
|
while True: |
|
print('Starting Session') |
|
result = WarOnlineBot() |
|
|
|
|
|
|
|
""" |
|
login(username=config.username, password=config.password, thread_url=config.thread_url) |
|
print("logged in") |
|
post(message="", thread_url=config.thread_url, post_url=config.post_url, quoted_by='Test', |
|
quote_text="posting an image",img_url='https://replicate.delivery/pbxt/knKBiJt8DPZ0B1o25PaLJSZjgv3D5HcwLoBIn0JESbe3nISIA/out-0.png') |
|
""" |
|
if result == 0: |
|
print('Session finished. Timeout...') |
|
|
|
timer = range(60 * config.timeout) |
|
for t in timer: |
|
time.sleep(1) |
|
else: |
|
|
|
time.sleep(10) |
|
|