Spaces:
Sleeping
Sleeping
import numpy as np | |
from urllib.parse import urlparse, parse_qs | |
from pyalex import Works | |
import pandas as pd | |
def openalex_url_to_pyalex_query(url): | |
""" | |
Convert an OpenAlex search URL to a pyalex query. | |
Args: | |
url (str): The OpenAlex search URL. | |
Returns: | |
tuple: (Works object, dict of parameters) | |
""" | |
parsed_url = urlparse(url) | |
query_params = parse_qs(parsed_url.query) | |
# Initialize the Works object | |
query = Works() | |
# Handle filters | |
if 'filter' in query_params: | |
filters = query_params['filter'][0].split(',') | |
for f in filters: | |
if ':' in f: | |
key, value = f.split(':', 1) | |
if key == 'default.search': | |
query = query.search(value) | |
else: | |
query = query.filter(**{key: value}) | |
# Handle sort | |
if 'sort' in query_params: | |
sort_params = query_params['sort'][0].split(',') | |
for s in sort_params: | |
if s.startswith('-'): | |
query = query.sort(**{s[1:]: 'desc'}) | |
else: | |
query = query.sort(**{s: 'asc'}) | |
# Handle other parameters | |
params = {} | |
for key in ['page', 'per-page', 'sample', 'seed']: | |
if key in query_params: | |
params[key] = query_params[key][0] | |
return query, params | |
def invert_abstract(inv_index): | |
"""Reconstruct abstract from inverted index.""" | |
if inv_index is not None: | |
l_inv = [(w, p) for w, pos in inv_index.items() for p in pos] | |
return " ".join(map(lambda x: x[0], sorted(l_inv, key=lambda x: x[1]))) | |
else: | |
return ' ' | |
def get_pub(x): | |
"""Extract publication name from record.""" | |
try: | |
source = x['source']['display_name'] | |
if source not in ['parsed_publication','Deleted Journal']: | |
return source | |
else: | |
return ' ' | |
except: | |
return ' ' | |
def get_field(x): | |
"""Extract academic field from record.""" | |
try: | |
field = x['primary_topic']['subfield']['display_name'] | |
if field is not None: | |
return field | |
else: | |
return np.nan | |
except: | |
return np.nan | |
def process_records_to_df(records): | |
""" | |
Convert OpenAlex records to a pandas DataFrame with processed fields. | |
Args: | |
records (list): List of OpenAlex record dictionaries | |
Returns: | |
pandas.DataFrame: Processed DataFrame with abstracts, publications, and titles | |
""" | |
records_df = pd.DataFrame(records) | |
records_df['abstract'] = [invert_abstract(t) for t in records_df['abstract_inverted_index']] | |
records_df['parsed_publication'] = [get_pub(x) for x in records_df['primary_location']] | |
records_df['parsed_publication'] = records_df['parsed_publication'].fillna(' ') | |
records_df['abstract'] = records_df['abstract'].fillna(' ') | |
records_df['title'] = records_df['title'].fillna(' ') | |
return records_df | |
def openalex_url_to_filename(url): | |
""" | |
Convert an OpenAlex URL to a filename-safe string with timestamp. | |
Args: | |
url (str): The OpenAlex search URL | |
Returns: | |
str: A filename-safe string with timestamp (without extension) | |
""" | |
from datetime import datetime | |
import re | |
# First parse the URL into query and params | |
parsed_url = urlparse(url) | |
query_params = parse_qs(parsed_url.query) | |
# Create parts of the filename | |
parts = [] | |
# Handle filters | |
if 'filter' in query_params: | |
filters = query_params['filter'][0].split(',') | |
for f in filters: | |
if ':' in f: | |
key, value = f.split(':', 1) | |
# Replace dots with underscores and clean the value | |
key = key.replace('.', '_') | |
# Clean the value to be filename-safe and add spaces around words | |
clean_value = re.sub(r'[^\w\s-]', '', value) | |
# Replace multiple spaces with single space and strip | |
clean_value = ' '.join(clean_value.split()) | |
# Replace spaces with underscores for filename | |
clean_value = clean_value.replace(' ', '_') | |
if key == 'default_search': | |
parts.append(f"search_{clean_value}") | |
else: | |
parts.append(f"{key}_{clean_value}") | |
# Handle sort parameters | |
if 'sort' in query_params: | |
sort_params = query_params['sort'][0].split(',') | |
for s in sort_params: | |
if s.startswith('-'): | |
parts.append(f"sort_{s[1:].replace('.', '_')}_desc") | |
else: | |
parts.append(f"sort_{s.replace('.', '_')}_asc") | |
# Add timestamp | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
# Combine all parts | |
filename = '__'.join(parts) if parts else 'openalex_query' | |
filename = f"{filename}__{timestamp}" | |
# Ensure filename is not too long (max 255 chars is common filesystem limit) | |
if len(filename) > 255: | |
filename = filename[:251] # leave room for potential extension | |
return filename |