import numpy, string, functools, itertools, json from underthesea import pos_tag, ner stopwords = open('resources/stopwords_small.txt', encoding='utf-8').read().split('\n') stopwords = set([w.replace(' ','_') for w in stopwords]) punct_set = set([c for c in string.punctuation]) | set(['“','”',"...","–","…","..","•",'“','”']) map_pos = {'M':'noun','Y':'noun','Nb':'noun','Nc':'noun','Ni':'noun','Np':'noun','N':'noun','X':'adj', 'Nu':'noun','Ny':'noun','V':'verb', 'Vb':'verb','Vy':'verb','A': 'adj','Ab': 'adj','R':'adj'} map_synonym = json.load(open('resources/synonym.json', encoding='utf-8')) with open('resources/bigram.txt', encoding='utf-8') as f: data = f.read().split('\n') data = data[:-1] markov_score = {} for line in data: word, score = line.split('\t') # some score of words in corpus markov_score[word] = int(score) del data def makovCal(a, b): termBigram = a + "_" + b if termBigram in markov_score: freBigram = markov_score[termBigram] else: freBigram = 1 if a in markov_score: freUnigram = markov_score[a] else: freUnigram = 1 if freUnigram < 5: freUnigram = 5000 # 2000 else: freUnigram += 5000 # 2000 return float(freBigram) / freUnigram def generateCombinations(tokens, thresh_hold): combinations = [] for i in range(0, len(tokens)): word = tokens[i][0].lower() if word in stopwords: combinations.append([word]) continue pos = tokens[i][1] if pos in map_pos: pos = map_pos[pos] if word in map_synonym[pos]: synonyms = map_synonym[pos][word] possible_synonym = [] for syn in synonyms: if i == 0: pre_word = 'NONE' else: pre_word = tokens[i-1][0].lower() if i == len(tokens) - 1: next_word = 'NONE' else: next_word = tokens[i+1][0].lower() if makovCal(pre_word, syn) > thresh_hold or makovCal(syn, next_word) > thresh_hold: possible_synonym.append(syn) combinations.append([word] + possible_synonym) else: combinations.append([word]) else: combinations.append([word]) return combinations def generateVariants(untokenize_text): words = pos_tag(untokenize_text) for i in range(0, len(words)): words[i] = (words[i][0].replace(' ','_'), words[i][1]) tokens = words combinations = generateCombinations(tokens, 0.001) num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) base_line = 0.001 while(num_variants > 10000): base_line = base_line * 2 combinations = generateCombinations(tokens,base_line) num_variants = functools.reduce(lambda x, y: x*y, [len(c) for c in combinations]) combinations = list(itertools.product(*combinations)) combinations = [' '.join(e) for e in combinations] return combinations def generateNgram(paper, ngram=2, deli='_', rmSet = {}): words = paper.split() if len(words) == 1: return '' ngrams = [] for i in range(0,len(words) - ngram + 1): block = words[i:i + ngram] if not any(w in rmSet for w in block): ngrams.append(deli.join(block)) return ngrams def generatePassages(document, n): passages = [] paragraphs = document.split('\n\n') for para in paragraphs: sentences = para.rsplit(' . ') if len(sentences) <= 8: passages.append(' '.join(sentences)) else: for i in range(0, len(sentences) - n + 1): passages.append(' '.join([sentences[i + j] for j in range(0, n) if '?' not in sentences[i + j]])) return passages def passage_score(q_ngrams,passage): try: passage = passage.lower() p_unigram = set(generateNgram(passage,1,'_',punct_set | stopwords)) uni_score = len(p_unigram & q_ngrams['unigram']) p_bigram = set(generateNgram(passage,2,'_',punct_set | stopwords)) p_trigram = set(generateNgram(passage,3,'_',punct_set | stopwords)) p_fourgram= set(generateNgram(passage,4,'_',punct_set)) bi_score = len(p_bigram & q_ngrams['bigram']) tri_score = len(p_trigram & q_ngrams['trigram']) four_score = len(p_fourgram & q_ngrams['fourgram']) emd_sim = 0 return uni_score + bi_score*2 + tri_score*3 + four_score*4 + emd_sim*3 except: return 0 def passage_score_wrap(args): return passage_score(args[0],args[1]) def keyword_extraction(question): keywords = [] question = question.replace('_',' ') words = pos_tag(question) for i in range(0, len(words)): words[i] = (words[i][0].replace(' ','_'), words[i][1]) for token in words: word = token[0] pos = token[1] if word not in stopwords: keywords += word.lower().split('_') keywords = list(set(keywords)) keywords = [[w] for w in keywords] return keywords def isRelevant(text, keywords): text = text.lower().replace('_',' ') words = list(set([_ for word in keywords for _ in word])) for word in words: if word in text and word not in stopwords: return True return False def removeDuplicate(documents): mapUnigram = {} for doc in documents: mapUnigram[doc] = generateNgram(doc.lower(),1,'_',punct_set | stopwords) uniqueDocs = [] for i in range(0,len(documents)): check = True for j in range(0,len(uniqueDocs)): check_doc = mapUnigram[documents[i]] exists_doc = mapUnigram[uniqueDocs[j]] overlap_score = len( set(check_doc) & set(exists_doc) ) if overlap_score >= 0.8 * len(set(check_doc)) or overlap_score >= 0.8 * len(set(exists_doc)): check = False if check: uniqueDocs.append(documents[i]) return uniqueDocs def rel_ranking(question, documents): #Return ranked list of passages from list of documents q_variants = generateVariants(question) q_keywords = keyword_extraction(question) q_ngrams = {'unigram': set(generateNgram(question.lower(), 1, '_', punct_set | stopwords)), 'bigram' : set([]), 'trigram': set([]), 'fourgram': set([])} for q in q_variants: q = q.lower() q_ngrams['bigram'] = q_ngrams['bigram'] | set(generateNgram(q, 2, '_', punct_set | stopwords)) q_ngrams['trigram'] = q_ngrams['trigram'] | set(generateNgram(q, 3, '_', punct_set | stopwords)) q_ngrams['fourgram'] = q_ngrams['fourgram'] | set(generateNgram(q, 4, '_', punct_set)) documents = [d for d in documents if isRelevant(d, q_keywords)] passages = [generatePassages(d, 8) for d in documents] passages = [j for i in passages for j in i] passages = [' '.join([_.strip() for _ in p.split()]) for p in passages] passages = list(set(passages)) passages = [p for p in passages if isRelevant(p,q_keywords)] p_scores = [] for p in passages: p_scores += [passage_score_wrap((q_ngrams, p))] p_res = numpy.argsort([-s for s in p_scores]) relevantDocs = [] for i in range(0, len(passages)): relevantDocs.append(passages[p_res[i]]) relevantDocs = removeDuplicate(relevantDocs) return relevantDocs