Question_Answering / relevance_ranking.py
namnh113's picture
Update relevance_ranking.py
f33d0e2
raw
history blame
7.72 kB
import numpy, string, functools, itertools, json
from underthesea import pos_tag, ner
stopwords = open('resources/stopwords_small.txt', encoding='utf-8').read().split('\n')
stopwords = set([w.replace(' ','_') for w in stopwords])
punct_set = set([c for c in string.punctuation]) | set(['β€œ','”',"...","–","…","..","β€’",'β€œ','”'])
map_pos = {'M':'noun','Y':'noun','Nb':'noun','Nc':'noun','Ni':'noun','Np':'noun','N':'noun','X':'adj',
'Nu':'noun','Ny':'noun','V':'verb', 'Vb':'verb','Vy':'verb','A': 'adj','Ab': 'adj','R':'adj'}
map_synonym = json.load(open('resources/synonym.json', encoding='utf-8'))
with open('resources/bigram.txt', encoding='utf-8') as f:
data = f.read().split('\n')
data = data[:-1]
markov_score = {}
for line in data:
word, score = line.split('\t')
# some score of words in corpus
markov_score[word] = int(score)
del data
def makovCal(a, b):
termBigram = a + "_" + b
if termBigram in markov_score:
freBigram = markov_score[termBigram]
else:
freBigram = 1
if a in markov_score:
freUnigram = markov_score[a]
else:
freUnigram = 1
if freUnigram < 5:
freUnigram = 5000 # 2000
else:
freUnigram += 5000 # 2000
return float(freBigram) / freUnigram
def generateCombinations(tokens, thresh_hold):
combinations = []
for i in range(0, len(tokens)):
word = tokens[i][0].lower()
if word in stopwords:
combinations.append([word])
continue
pos = tokens[i][1]
if pos in map_pos:
pos = map_pos[pos]
if word in map_synonym[pos]:
synonyms = map_synonym[pos][word]
possible_synonym = []
for syn in synonyms:
if i == 0:
pre_word = 'NONE'
else:
pre_word = tokens[i-1][0].lower()
if i == len(tokens) - 1:
next_word = 'NONE'
else:
next_word = tokens[i+1][0].lower()
if makovCal(pre_word, syn) > thresh_hold or makovCal(syn, next_word) > thresh_hold:
possible_synonym.append(syn)
combinations.append([word] + possible_synonym)
else:
combinations.append([word])
else:
combinations.append([word])
return combinations
def generateVariants(untokenize_text):
words = pos_tag(untokenize_text)
for i in range(0, len(words)):
words[i] = (words[i][0].replace(' ','_'), words[i][1])
tokens = words
combinations = generateCombinations(tokens, 0.001)
num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations])
base_line = 0.001
while(num_variants > 10000):
base_line = base_line * 2
combinations = generateCombinations(tokens,base_line)
num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations])
combinations = list(itertools.product(*combinations))
combinations = [' '.join(e) for e in combinations]
return combinations
def generateNgram(paper, ngram=2, deli='_', rmSet = {}):
words = paper.split()
if len(words) == 1:
return ''
ngrams = []
for i in range(0,len(words) - ngram + 1):
block = words[i:i + ngram]
if not any(w in rmSet for w in block):
ngrams.append(deli.join(block))
return ngrams
def generatePassages(document, n):
passages = []
paragraphs = document.split('\n\n')
for para in paragraphs:
sentences = para.rsplit(' . ')
if len(sentences) <= 8:
passages.append(' '.join(sentences))
else:
for i in range(0, len(sentences) - n + 1):
passages.append(' '.join([sentences[i + j] for j in range(0, n) if '?' not in sentences[i + j]]))
return passages
def passage_score(q_ngrams,passage):
try:
passage = passage.lower()
p_unigram = set(generateNgram(passage,1,'_',punct_set | stopwords))
uni_score = len(p_unigram & q_ngrams['unigram'])
p_bigram = set(generateNgram(passage,2,'_',punct_set | stopwords))
p_trigram = set(generateNgram(passage,3,'_',punct_set | stopwords))
p_fourgram= set(generateNgram(passage,4,'_',punct_set))
bi_score = len(p_bigram & q_ngrams['bigram'])
tri_score = len(p_trigram & q_ngrams['trigram'])
four_score = len(p_fourgram & q_ngrams['fourgram'])
emd_sim = 0
return uni_score + bi_score*2 + tri_score*3 + four_score*4 + emd_sim*3
except:
return 0
def passage_score_wrap(args):
return passage_score(args[0],args[1])
def keyword_extraction(question):
keywords = []
question = question.replace('_',' ')
words = pos_tag(question)
for i in range(0, len(words)):
words[i] = (words[i][0].replace(' ','_'), words[i][1])
for token in words:
word = token[0]
pos = token[1]
if word not in stopwords:
keywords += word.lower().split('_')
keywords = list(set(keywords))
keywords = [[w] for w in keywords]
return keywords
def isRelevant(text, keywords):
text = text.lower().replace('_',' ')
words = list(set([_ for word in keywords for _ in word]))
for word in words:
if word in text and word not in stopwords:
return True
return False
def removeDuplicate(documents):
mapUnigram = {}
for doc in documents:
mapUnigram[doc] = generateNgram(doc.lower(),1,'_',punct_set | stopwords)
uniqueDocs = []
for i in range(0,len(documents)):
check = True
for j in range(0,len(uniqueDocs)):
check_doc = mapUnigram[documents[i]]
exists_doc = mapUnigram[uniqueDocs[j]]
overlap_score = len( set(check_doc) & set(exists_doc) )
if overlap_score >= 0.8 * len(set(check_doc)) or overlap_score >= 0.8 * len(set(exists_doc)):
check = False
if check:
uniqueDocs.append(documents[i])
return uniqueDocs
def rel_ranking(question, documents):
#Return ranked list of passages from list of documents
q_variants = generateVariants(question)
q_keywords = keyword_extraction(question)
q_ngrams = {'unigram': set(generateNgram(question.lower(), 1, '_', punct_set | stopwords)),
'bigram' : set([]), 'trigram': set([]), 'fourgram': set([])}
for q in q_variants:
q = q.lower()
q_ngrams['bigram'] = q_ngrams['bigram'] | set(generateNgram(q, 2, '_', punct_set | stopwords))
q_ngrams['trigram'] = q_ngrams['trigram'] | set(generateNgram(q, 3, '_', punct_set | stopwords))
q_ngrams['fourgram'] = q_ngrams['fourgram'] | set(generateNgram(q, 4, '_', punct_set))
documents = [d for d in documents if isRelevant(d, q_keywords)]
passages = [generatePassages(d, 8) for d in documents]
passages = [j for i in passages for j in i]
passages = [' '.join([_.strip() for _ in p.split()]) for p in passages]
passages = list(set(passages))
passages = [p for p in passages if isRelevant(p,q_keywords)]
p_scores = []
for p in passages:
p_scores += [passage_score_wrap((q_ngrams, p))]
p_res = numpy.argsort([-s for s in p_scores])
relevantDocs = []
for i in range(0, len(passages)):
relevantDocs.append(passages[p_res[i]])
relevantDocs = removeDuplicate(relevantDocs)
return relevantDocs