import pandas as pd import difflib from spacy.tokens import Doc import numpy as np from numpy import dot from numpy.linalg import norm from pyvis.network import Network import streamlit.components.v1 as components class HealthseaSearch: def __init__(self, _health_aspects, _products, _conditions, _benefits): self.health_aspects = _health_aspects self.products = _products self.conditions = _conditions self.benefits = _benefits def __call__(self, query): return query # Load product meta def get_products(self, _aspect, n): product_list = [] product_ids = {} _n = n _aspect = _aspect.replace(" ", "_") if _aspect in self.health_aspects: aspect = self.health_aspects[_aspect] else: _aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ 0 ] aspect = self.health_aspects[_aspect] product_scoring = aspect["products"] if n != 0: if n > len(product_scoring): n = len(product_scoring) product_scoring = aspect["products"][:n] for product in product_scoring: if product[1] not in product_ids: product_list.append((product[0], self.products[product[1]], _aspect)) product_ids[product[1]] = 1 for alias in aspect["alias"]: n = _n _product_scoring = self.health_aspects[alias]["products"] if n != 0: if n > len(_product_scoring): n = len(_product_scoring) _product_scoring = self.health_aspects[alias]["products"][:n] for product in _product_scoring: if product[1] not in product_ids: product_list.append((product[0], self.products[product[1]], alias)) product_ids[product[1]] = 1 n = _n if len(product_list) > n and n != 0: product_list = product_list[:n] product_list = sorted(product_list, key=lambda tup: tup[0], reverse=True) return product_list # Load product meta and return as DataFrame def get_products_df(self, _aspect, n): product_list = self.get_products(_aspect, n) product_data = { "product": [], "score": [], "health_aspect": [], "rating": [], "reviews": [], } for product in product_list: product_data["score"].append(product[0]) product_data["product"].append(product[1]["name"]) product_data["health_aspect"].append(product[2]) product_data["rating"].append(product[1]["rating"]) product_data["reviews"].append(product[1]["review_count"]) datatypes = { "product": str, "score": int, "health_aspect": str, "rating": str, "reviews": int, } df = pd.DataFrame(data=product_data) df = df.astype(datatypes) return df # Get health aspect def get_aspect(self, _aspect): _aspect = _aspect.replace(" ", "_") if _aspect in self.health_aspects: return self.health_aspects[_aspect] else: _aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ 0 ] return self.health_aspects[_aspect] # Get health aspect meta def get_aspect_meta(self, _aspect): _aspect = _aspect.replace(" ", "_") if _aspect in self.conditions: return self.conditions[_aspect] elif _aspect in self.benefits: return self.benefits[_aspect] else: _aspect = difflib.get_close_matches("_aspect", self.conditions.keys())[0] return self.conditions[_aspect] # Plotting vectors (2D/3D) def tsne_plot(self, dataset): "Creates and TSNE model and plots it" labels = [] tokens = [] for i in dataset: tokens.append(np.array(i[1])) labels.append(i[0]) if len(dataset) > 2: tsne_model = TSNE( perplexity=40, n_components=3, init="pca", n_iter=2500, random_state=23 ) new_values = tsne_model.fit_transform(tokens) x = [] y = [] z = [] for value in new_values: x.append(value[0]) y.append(value[1]) z.append(value[2]) trace = go.Scatter3d( x=x, y=y, z=z, text=labels, textposition="top right", mode="lines+markers+text", marker={ "size": 10, "opacity": 1, "colorscale":'Viridis', }, ) # Configure the layout. layout = go.Layout( margin={"l": 0, "r": 0, "b": 0, "t": 0}, font={"color": "#DF55E2", "size":20} ) data = [trace] return go.Figure(data=data, layout=layout) def pyvis(self, vectors): net = Network(height='500px', width='700px', bgcolor="#0E1117", font_color="#ffffff") net.barnes_hut(central_gravity = 0.8, spring_length = 100) net.add_node(vectors[0][0], label=vectors[0][0], color="#4EA0DB", value=100) # node id = 1 and label = Node 1 for vector in vectors[1:]: net.add_node(vector[0], label=vector[0], color="#FE51B9", value=70) # node id = 1 and label = Node 1 for i, vector in enumerate(vectors): current_vector = vectors[i] if i < len(vectors): for _vector in vectors[i+1:]: sim = self.calculate_cosine_sim(current_vector[1],_vector[1]) net.add_edge(current_vector[0],_vector[0], weight=sim, value=sim*0.1, title=sim) net.save_graph("viz.html") HtmlFile = open("viz.html", 'r', encoding='utf-8') source_code = HtmlFile.read() components.html(source_code, height = 500, width=700) def calculate_cosine_sim(self,a,b): cos_sim = dot(a, b)/(norm(a)*norm(b)) return cos_sim # Load substance meta def get_substances(self, _aspect, n): substance_list = [] substance_ids = {} exclude = ["sodium", "sugar", "sugar_alcohol"] _n = n _aspect = _aspect.replace(" ", "_") if _aspect in self.health_aspects: aspect = self.health_aspects[_aspect] else: _aspect = difflib.get_close_matches("_aspect", self.health_aspects.keys())[ 0 ] aspect = self.health_aspects[_aspect] substance_scoring = aspect["substance"] if n != 0: if n > len(substance_scoring): n = len(substance_scoring) substance_scoring = aspect["substance"][:n] for substance in substance_scoring: if substance[1] in exclude: continue if substance[1] not in substance_ids: substance_list.append((substance[0], substance[1], _aspect)) substance_ids[substance[1]] = 1 for alias in aspect["alias"]: n = _n _substance_scoring = self.health_aspects[alias]["substance"] if n != 0: if n > len(_substance_scoring): n = len(_substance_scoring) _substance_scoring = self.health_aspects[alias]["substance"][:n] for substance in _substance_scoring: if substance[1] in exclude: continue if substance[1] not in substance_ids: substance_list.append((substance[0], substance[1], alias)) substance_ids[substance[1]] = 1 n = _n if len(substance_list) > n and n != 0: substance_list = substance_list[:n] substance_list = sorted(substance_list, key=lambda tup: tup[0], reverse=True) return substance_list # Load substance meta and return as DataFrame def get_substances_df(self, _aspect, n): substance_list = self.get_substances(_aspect, n) substance_data = {"substance": [], "score": [], "health_aspect": []} for substance in substance_list: substance_data["score"].append(substance[0]) substance_data["substance"].append(substance[1]) substance_data["health_aspect"].append(substance[2]) datatypes = {"substance": str, "score": int, "health_aspect": str} df = pd.DataFrame(data=substance_data) df = df.astype(datatypes) return df # Get all health aspect indices def get_all_conditions(self): condition_list = [] for condition_key in self.conditions: if condition_key in self.health_aspects: alias = len(self.health_aspects[condition_key]["alias"]) else: alias = 0 condition_list.append((self.conditions[condition_key]["frequency"],condition_key,alias)) condition_list = sorted(condition_list, key=lambda tup: tup[0], reverse=True) return condition_list def get_all_conditions_df(self): condition_list = self.get_all_conditions()[:100] condition_data = { "Condition": [], "Frequency": [], "Alias": [] } for condition in condition_list: condition_data["Frequency"].append(condition[0]) condition_data["Condition"].append(condition[1]) condition_data["Alias"].append(condition[2]) datatypes = { "Frequency": int, "Condition": str, "Alias": int } df = pd.DataFrame(data=condition_data) df = df.astype(datatypes) return df def get_all_benefits(self): benefit_list = [] for benefit_key in self.benefits: if benefit_key in self.health_aspects: alias = len(self.health_aspects[benefit_key]["alias"]) else: alias = 0 benefit_list.append((self.benefits[benefit_key]["frequency"],benefit_key,alias)) benefit_list = sorted(benefit_list, key=lambda tup: tup[0], reverse=True) return benefit_list def get_all_benefits_df(self): benefit_list = self.get_all_benefits()[:100] benefit_data = { "Benefit": [], "Frequency": [], "Alias": [] } for benefit in benefit_list: benefit_data["Frequency"].append(benefit[0]) benefit_data["Benefit"].append(benefit[1]) benefit_data["Alias"].append(benefit[2]) datatypes = { "Frequency": int, "Benefit": str, "Alias": int } df = pd.DataFrame(data=benefit_data) df = df.astype(datatypes) return df class HealthseaPipe: # Get Clauses and their predictions def get_clauses(self, doc): clauses = [] for clause in doc._.clauses: words = [] spaces = [] clause_slice = doc[clause["split_indices"][0] : clause["split_indices"][1]] if clause["has_ent"]: for token in clause_slice: if token.i == clause["ent_indices"][0]: words.append( clause["blinder"].replace(">", "").replace("<", "") ) spaces.append(True) elif token.i not in range( clause["ent_indices"][0], clause["ent_indices"][1] ): words.append(token.text) spaces.append(token.whitespace_) clauses.append(Doc(doc.vocab, words=words, spaces=spaces)) else: for token in clause_slice: words.append(token.text) spaces.append(token.whitespace_) clauses.append(Doc(doc.vocab, words=words, spaces=spaces)) return clauses