LLMopsDK / app_udm.py
Galatea007's picture
Update app_udm.py
8afee73 verified
import pandas as pd
import openai
import chainlit as cl
from dotenv import load_dotenv
import re
from openai import OpenAI
# Load environment variables
# Initialize OpenAI client
client = OpenAI()
# Load UDM fields CSV file
def load_udm_fields(csv_path):
return pd.read_csv(csv_path)
# Function to retrieve relevant UDM fields for log mapping
def retrieve_udm_field(log_field, udm_fields):
""" Retrieves the UDM field for a given log field """
udm_match = udm_fields[udm_fields['UDM_Field'].str.contains(log_field, case=False)]
if not udm_match.empty:
return udm_match.iloc[0]['UDM_Field']
return None
# Function to extract log fields from sample log or description
def extract_log_fields(user_input):
""" Extract fields from the user input which may contain log samples or descriptions """
fields = re.findall(r'\b\w+\b', user_input)
return list(set(fields))
# Function to map log fields to UDM fields
def map_log_fields_to_udm(log_fields, udm_fields):
""" Map log fields to UDM fields """
mapped_fields = []
custom_fields = set()
for field in log_fields:
udm_field = retrieve_udm_field(field, udm_fields)
if udm_field:
mapped_fields.append({'Log_Field': field, 'UDM_Field': udm_field})
custom_field = f"custom_fields.{field}"
while custom_field in custom_fields:
custom_field = f"{custom_field}_1"
mapped_fields.append({'Log_Field': field, 'UDM_Field': custom_field})
return pd.DataFrame(mapped_fields)
# GPT-4-based generation function with few-shot learning
def generate_udm_mapping_response(log_fields, udm_fields_csv):
""" Generate a response using GPT-4 to map log fields to UDM fields """
# Load UDM Fields
udm_fields = load_udm_fields(udm_fields_csv)
# Map the log fields to UDM
mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields)
# Prepare the mapping as context
mapped_fields_text = mapped_fields_df.to_string(index=False)
# Few-shot learning examples
examples = """
### Example 1: Fortinet Fields to UDM Mapping
Log Attribute | UDM Attribute
devname | intermediary.hostname
devid | intermediary.asset.hardware.serial_number
srcip | principal.ip
dstip | target.ip
dstport | target.port
### Example 2: Palo Alto Fields to UDM Mapping
Log Attribute | UDM Attribute
src_ip | principal.ip
dest_ip | target.ip
dest_port | target.port
action | security_result.action_details
severity | security_result.severity_details
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, helping security teams to map security log fields to Google Chronicle's Unified Data Model (UDM).
Please follow these steps: 1. Identify the vendor name from the input, and understand the specific log field conventions of that vendor.
2. Extract log fields while ignoring general words that are not part of the field names.
3. Use web search or previous examples to consult the latest log documentation for the product provided and Google Chronicle UDM schema documentation.
4. Map each product log field to its corresponding UDM field based on known mappings or documentation. For the mapping to UDM fields,
use the full list of UDM fields in the csv file provided udm_fields_csv.
5. Only attempt to map the user input that you deem as log fields for this product.
6.For fields that don't have a direct match in UDM, place them into custom fields, ensuring that each custom field is unique and logically consistent.
7. Organize the mapping into a structured table format and provide the user the option to download your mapping into a csv file.
Think through your response step by step, and always aim for accurate, and professional responses with a focus on precision.
# Call GPT-4 for final output with additional explanation
response = client.chat.completions.create(
{"role": "system", "content": system_template},
{"role": "user", "content": f"Here are the log fields: {log_fields}. Please map them to UDM:\n\n{mapped_fields_text}\n\n{examples}"}
return response.choices[0].message.content
# Chainlit app functionality
async def start_chat():
await cl.Message(content="Welcome! Please provide the product name and the log fields or sample log you want to map to UDM.").send()
settings = {
"model": "gpt-3.5-turbo",
"temperature": 0,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
cl.user_session.set("settings", settings)
async def main(message: cl.Message):
user_input = message.content
log_fields = extract_log_fields(user_input)
udm_fields_csv = 'udm_field_list_v2.csv'
# Generate the UDM mapping response
response = generate_udm_mapping_response(log_fields, udm_fields_csv)
# Send the response back to Chainlit
await cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{response}").send()