import re import logging import json from langchain.schema import ( HumanMessage, SystemMessage, ) def save_logs(scheduler, JSON_DATASET_PATH, logs) -> None: """ Every interaction with app saves the log of question and answer, this is to get the usage statistics of app and evaluate model performances """ try: # We get the timestamp here now because we are simply recording time of logging current_time = datetime.now().timestamp() logs["time"] = str(current_time) # Save feedback (if any) if feedback: logs["feedback"] = feedback logs["record_id"] = str(uuid4()) # Do some reordering to keep things clean (time up front) field_order = [ "record_id", "session_id", "time", # current log time "session_duration_seconds", "client_location", "platform", "system_prompt", "sources", "reports", "subtype", "year", "question", "retriever", "endpoint_type", "reader", "docs", "answer", "feedback" ] ordered_logs = {k: logs.get(k) for k in field_order if k in logs} with scheduler.lock: with open(JSON_DATASET_PATH, 'a') as f: json.dump(ordered_logs, f) f.write("\n") logging.info("logging done") except Exception as e: raise def get_message_template(type, SYSTEM_PROMPT, USER_PROMPT): if type == 'NVIDIA': messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role":"user","content":USER_PROMPT}] elif type == 'DEDICATED': messages = [ SystemMessage(content=SYSTEM_PROMPT), HumanMessage(content=USER_PROMPT),] else: messages = None return messages def make_html_source(source,i): """ takes the text and converts it into html format for display in "source" side tab """ meta = source.metadata content = source.page_content.strip() name = meta['subtype'] card = f"""

Doc {i} - {meta['subtype']} - Page {int(meta['page'])}

{content}

""" return card def parse_output_llm_with_sources(output): # Split the content into a list of text and "[Doc X]" references content_parts = re.split(r'\[(Doc\s?\d+(?:,\s?Doc\s?\d+)*)\]', output) parts = [] for part in content_parts: if part.startswith("Doc"): subparts = part.split(",") subparts = [subpart.lower().replace("doc","").strip() for subpart in subparts] subparts = [f"""{subpart}""" for subpart in subparts] parts.append("".join(subparts)) else: parts.append(part) content_parts = "".join(parts) return content_parts def get_client_ip(request=None): """Get the client IP address from the request context""" try: if request: # Try different headers that might contain the real IP ip = request.client.host # Check for proxy headers forwarded_for = request.headers.get('X-Forwarded-For') if forwarded_for: # X-Forwarded-For can contain multiple IPs - first one is the client ip = forwarded_for.split(',')[0].strip() logging.debug(f"Client IP detected: {ip}") return ip except Exception as e: logging.error(f"Error getting client IP: {e}") return "127.0.0.1" def get_client_location(ip_address) -> dict | None: """Get geolocation info using ipapi.co""" # Add headers so we don't get blocked... headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } try: response = requests.get( f'https://ipapi.co/{ip_address}/json/', headers=headers, timeout=5 ) if response.status_code == 200: data = response.json() # Add random noise between -0.01 and 0.01 degrees (roughly ±1km) lat = data.get('latitude') lon = data.get('longitude') if lat is not None and lon is not None: lat += random.uniform(-0.01, 0.01) lon += random.uniform(-0.01, 0.01) return { 'city': data.get('city'), 'region': data.get('region'), 'country': data.get('country_name'), 'latitude': lat, 'longitude': lon } elif response.status_code == 429: logging.warning(f"Rate limit exceeded. Response: {response.text}") return None else: logging.error(f"Error: Status code {response.status_code}. Response: {response.text}") return None except requests.exceptions.RequestException as e: logging.error(f"Request failed: {str(e)}") return None def get_platform_info(user_agent: str) -> str: """Get platform info""" # Make a best guess at the device type if any(mobile_keyword in user_agent.lower() for mobile_keyword in ['mobile', 'android', 'iphone', 'ipad', 'ipod']): platform_info = 'mobile' else: platform_info = 'desktop' return platform_info