Spaces:
Sleeping
Sleeping
import numpy, string, functools, itertools, json | |
from underthesea import pos_tag, ner | |
stopwords = open('resources/stopwords_small.txt', encoding='utf-8').read().split('\n') | |
stopwords = set([w.replace(' ','_') for w in stopwords]) | |
punct_set = set([c for c in string.punctuation]) | set(['β','β',"...","β","β¦","..","β’",'β','β']) | |
map_pos = {'M':'noun','Y':'noun','Nb':'noun','Nc':'noun','Ni':'noun','Np':'noun','N':'noun','X':'adj', | |
'Nu':'noun','Ny':'noun','V':'verb', 'Vb':'verb','Vy':'verb','A': 'adj','Ab': 'adj','R':'adj'} | |
map_synonym = json.load(open('resources/synonym.json', encoding='utf-8')) | |
with open('resources/bigram.txt', encoding='utf-8') as f: | |
data = f.read().split('\n') | |
data = data[:-1] | |
markov_score = {} | |
for line in data: | |
word, score = line.split('\t') | |
# some score of words in corpus | |
markov_score[word] = int(score) | |
del data | |
def makovCal(a, b): | |
termBigram = a + "_" + b | |
if termBigram in markov_score: | |
freBigram = markov_score[termBigram] | |
else: | |
freBigram = 1 | |
if a in markov_score: | |
freUnigram = markov_score[a] | |
else: | |
freUnigram = 1 | |
if freUnigram < 5: | |
freUnigram = 5000 # 2000 | |
else: | |
freUnigram += 5000 # 2000 | |
return float(freBigram) / freUnigram | |
def generateCombinations(tokens, thresh_hold): | |
combinations = [] | |
for i in range(0, len(tokens)): | |
word = tokens[i][0].lower() | |
if word in stopwords: | |
combinations.append([word]) | |
continue | |
pos = tokens[i][1] | |
if pos in map_pos: | |
pos = map_pos[pos] | |
if word in map_synonym[pos]: | |
synonyms = map_synonym[pos][word] | |
possible_synonym = [] | |
for syn in synonyms: | |
if i == 0: | |
pre_word = 'NONE' | |
else: | |
pre_word = tokens[i-1][0].lower() | |
if i == len(tokens) - 1: | |
next_word = 'NONE' | |
else: | |
next_word = tokens[i+1][0].lower() | |
if makovCal(pre_word, syn) > thresh_hold or makovCal(syn, next_word) > thresh_hold: | |
possible_synonym.append(syn) | |
combinations.append([word] + possible_synonym) | |
else: | |
combinations.append([word]) | |
else: | |
combinations.append([word]) | |
return combinations | |
def generateVariants(untokenize_text): | |
words = pos_tag(untokenize_text) | |
for i in range(0, len(words)): | |
words[i] = (words[i][0].replace(' ','_'), words[i][1]) | |
tokens = words | |
combinations = generateCombinations(tokens, 0.001) | |
num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) | |
base_line = 0.001 | |
while(num_variants > 10000): | |
base_line = base_line * 2 | |
combinations = generateCombinations(tokens,base_line) | |
num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) | |
combinations = list(itertools.product(*combinations)) | |
combinations = [' '.join(e) for e in combinations] | |
return combinations | |
def generateNgram(paper, ngram=2, deli='_', rmSet = {}): | |
words = paper.split() | |
if len(words) == 1: | |
return '' | |
ngrams = [] | |
for i in range(0,len(words) - ngram + 1): | |
block = words[i:i + ngram] | |
if not any(w in rmSet for w in block): | |
ngrams.append(deli.join(block)) | |
return ngrams | |
def generatePassages(document, n): | |
passages = [] | |
paragraphs = document.split('\n\n') | |
for para in paragraphs: | |
sentences = para.rsplit(' . ') | |
if len(sentences) <= 8: | |
passages.append(' '.join(sentences)) | |
else: | |
for i in range(0, len(sentences) - n + 1): | |
passages.append(' '.join([sentences[i + j] for j in range(0, n) if '?' not in sentences[i + j]])) | |
return passages | |
def passage_score(q_ngrams,passage): | |
try: | |
passage = passage.lower() | |
p_unigram = set(generateNgram(passage,1,'_',punct_set | stopwords)) | |
uni_score = len(p_unigram & q_ngrams['unigram']) | |
p_bigram = set(generateNgram(passage,2,'_',punct_set | stopwords)) | |
p_trigram = set(generateNgram(passage,3,'_',punct_set | stopwords)) | |
p_fourgram= set(generateNgram(passage,4,'_',punct_set)) | |
bi_score = len(p_bigram & q_ngrams['bigram']) | |
tri_score = len(p_trigram & q_ngrams['trigram']) | |
four_score = len(p_fourgram & q_ngrams['fourgram']) | |
emd_sim = 0 | |
return uni_score + bi_score*2 + tri_score*3 + four_score*4 + emd_sim*3 | |
except: | |
return 0 | |
def passage_score_wrap(args): | |
return passage_score(args[0],args[1]) | |
def keyword_extraction(question): | |
keywords = [] | |
question = question.replace('_',' ') | |
words = pos_tag(question) | |
for i in range(0, len(words)): | |
words[i] = (words[i][0].replace(' ','_'), words[i][1]) | |
for token in words: | |
word = token[0] | |
pos = token[1] | |
if word not in stopwords: | |
keywords += word.lower().split('_') | |
keywords = list(set(keywords)) | |
keywords = [[w] for w in keywords] | |
return keywords | |
def isRelevant(text, keywords): | |
text = text.lower().replace('_',' ') | |
words = list(set([_ for word in keywords for _ in word])) | |
for word in words: | |
if word in text and word not in stopwords: | |
return True | |
return False | |
def removeDuplicate(documents): | |
mapUnigram = {} | |
for doc in documents: | |
mapUnigram[doc] = generateNgram(doc.lower(),1,'_',punct_set | stopwords) | |
uniqueDocs = [] | |
for i in range(0,len(documents)): | |
check = True | |
for j in range(0,len(uniqueDocs)): | |
check_doc = mapUnigram[documents[i]] | |
exists_doc = mapUnigram[uniqueDocs[j]] | |
overlap_score = len( set(check_doc) & set(exists_doc) ) | |
if overlap_score >= 0.8 * len(set(check_doc)) or overlap_score >= 0.8 * len(set(exists_doc)): | |
check = False | |
if check: | |
uniqueDocs.append(documents[i]) | |
return uniqueDocs | |
def rel_ranking(question, documents): | |
#Return ranked list of passages from list of documents | |
q_variants = generateVariants(question) | |
q_keywords = keyword_extraction(question) | |
q_ngrams = {'unigram': set(generateNgram(question.lower(), 1, '_', punct_set | stopwords)), | |
'bigram' : set([]), 'trigram': set([]), 'fourgram': set([])} | |
for q in q_variants: | |
q = q.lower() | |
q_ngrams['bigram'] = q_ngrams['bigram'] | set(generateNgram(q, 2, '_', punct_set | stopwords)) | |
q_ngrams['trigram'] = q_ngrams['trigram'] | set(generateNgram(q, 3, '_', punct_set | stopwords)) | |
q_ngrams['fourgram'] = q_ngrams['fourgram'] | set(generateNgram(q, 4, '_', punct_set)) | |
documents = [d for d in documents if isRelevant(d, q_keywords)] | |
passages = [generatePassages(d, 8) for d in documents] | |
passages = [j for i in passages for j in i] | |
passages = [' '.join([_.strip() for _ in p.split()]) for p in passages] | |
passages = list(set(passages)) | |
passages = [p for p in passages if isRelevant(p,q_keywords)] | |
p_scores = [] | |
for p in passages: | |
p_scores += [passage_score_wrap((q_ngrams, p))] | |
p_res = numpy.argsort([-s for s in p_scores]) | |
relevantDocs = [] | |
for i in range(0, len(passages)): | |
relevantDocs.append(passages[p_res[i]]) | |
relevantDocs = removeDuplicate(relevantDocs) | |
return relevantDocs |