Spaces:
Runtime error
Runtime error
File size: 5,349 Bytes
fce29f6 ff8f823 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
import numpy as np
from numpy.linalg import norm
class DirectedCentralityRnak(object):
def __init__(self,
document_feats,
extract_num=20,
beta=0.2,
lambda1=1,
lambda2=0.8,
alpha=1,
processors=8):
self.extract_num = extract_num
self.processors = processors
self.beta = beta
self.lambda1 = lambda1
self.lambda2 = lambda2
self.alpha = alpha
self.candidate_phrases = [x['candidate_phrases'] for x in document_feats]
self.doc_embeddings = [x['sentence_embeddings'] for x in document_feats]
self.tokens_embeddings = [x['candidate_phrases_embeddings'] for x in document_feats]
def flat_list(self, l):
return [x for ll in l for x in ll]
def extract_summary(self,):
paired_scores = self.rank()
rank_list_phrases = []
for candidate, paired_score in zip(self.candidate_phrases, paired_scores):
candidates = []
for i in range(len(candidate)):
phrase = candidate[i]
candidates.append([phrase, paired_score[i][0], paired_score[i][1]])
rank_list_phrases.append(candidates)
predicted_candidation = []
for i in range(len(rank_list_phrases)):
final_score = []
position_weight = 1 / (np.array(list(range(1, len(rank_list_phrases[i]) + 1))))
position_weight = np.exp(position_weight) / np.sum(np.exp(position_weight))
cnt = 0
for candidate, index, score in rank_list_phrases[i]:
final_score.append([candidate, score * position_weight[cnt]])
cnt += 1
final_score.sort(key = lambda x: x[1], reverse = True)
candidates = [x[0].strip() for x in final_score]
predicted_candidation.append(candidates)
return predicted_candidation
def pairdown(self, scores, pair_indice, length):
out_matrix = np.ones((length, length))
for pair in pair_indice:
out_matrix[pair[0][0]][pair[0][1]] = scores[pair[1]]
out_matrix[pair[0][1]][pair[0][0]] = scores[pair[1]]
return out_matrix
def get_similarity_matrix(self, sentence_embeddings):
pairs = []
scores = []
cnt = 0
for i in range(len(sentence_embeddings)-1):
for j in range(i, len(sentence_embeddings)):
if type(sentence_embeddings[i]) == float or type(sentence_embeddings[j]) == float:
scores.append(0)
else:
scores.append(np.dot(sentence_embeddings[i], sentence_embeddings[j]))
pairs.append(([i, j], cnt))
cnt += 1
return self.pairdown(scores, pairs, len(sentence_embeddings))
def compute_scores(self, similarity_matrix, edge_threshold=0):
forward_scores = [1e-10 for i in range(len(similarity_matrix))]
backward_scores = [1e-10 for i in range(len(similarity_matrix))]
edges = []
n = len(similarity_matrix)
alpha = self.alpha
for i in range(len(similarity_matrix)):
for j in range(i+1, len(similarity_matrix[i])):
edge_score = similarity_matrix[i][j]
# boundary_position_function
db_i = min(i, alpha * (n-i))
db_j = min(j, alpha * (n-j))
if edge_score > edge_threshold:
if db_i < db_j:
forward_scores[i] += edge_score
backward_scores[j] += edge_score
edges.append((i,j,edge_score))
else:
forward_scores[j] += edge_score
backward_scores[i] += edge_score
edges.append((j,i,edge_score))
return np.asarray(forward_scores), np.asarray(backward_scores), edges
def _rank_part(self, similarity_matrix, doc_vector, candidate_phrases_embeddings):
min_score = np.min(similarity_matrix)
max_score = np.max(similarity_matrix)
threshold = min_score + self.beta * (max_score - min_score)
new_matrix = similarity_matrix - threshold
dist = []
for emb in candidate_phrases_embeddings:
if type(doc_vector) == float or type(emb) == float:
dist.append(0)
else:
dist.append(1/np.sum(np.abs(emb - doc_vector)))
forward_score, backward_score, _ = self.compute_scores(new_matrix)
paired_scores = []
for node in range(len(forward_score)):
paired_scores.append([node, (self.lambda1 * forward_score[node] + self.lambda2 * backward_score[node]) * (dist[node])])
return paired_scores
def rank(self,):
similarity_matrix = []
extracted_list = []
for embedded in self.tokens_embeddings:
similarity_matrix.append(self.get_similarity_matrix(embedded))
for matrix, doc_vector, candidate_phrases_embeddings in zip(similarity_matrix, self.doc_embeddings, self.tokens_embeddings):
extracted_list.append(self._rank_part(matrix, doc_vector, candidate_phrases_embeddings))
return extracted_list
|