Spaces:
Paused
Paused
import pandas as pd | |
import openai | |
import chainlit as cl | |
from dotenv import load_dotenv | |
import re | |
from openai import OpenAI | |
# Load environment variables | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI() | |
# Load UDM fields CSV file | |
def load_udm_fields(csv_path): | |
return pd.read_csv(csv_path) | |
# Function to retrieve relevant UDM fields for log mapping | |
def retrieve_udm_field(log_field, udm_fields): | |
""" Retrieves the UDM field for a given log field """ | |
udm_match = udm_fields[udm_fields['UDM_Field'].str.contains(log_field, case=False)] | |
if not udm_match.empty: | |
return udm_match.iloc[0]['UDM_Field'] | |
else: | |
return None | |
# Function to extract log fields from sample log or description | |
def extract_log_fields(user_input): | |
""" Extract fields from the user input which may contain log samples or descriptions """ | |
fields = re.findall(r'\b\w+\b', user_input) | |
return list(set(fields)) | |
# Function to map log fields to UDM fields | |
def map_log_fields_to_udm(log_fields, udm_fields): | |
""" Map log fields to UDM fields """ | |
mapped_fields = [] | |
custom_fields = set() | |
for field in log_fields: | |
udm_field = retrieve_udm_field(field, udm_fields) | |
if udm_field: | |
mapped_fields.append({'Log_Field': field, 'UDM_Field': udm_field}) | |
else: | |
custom_field = f"custom_fields.{field}" | |
while custom_field in custom_fields: | |
custom_field = f"{custom_field}_1" | |
custom_fields.add(custom_field) | |
mapped_fields.append({'Log_Field': field, 'UDM_Field': custom_field}) | |
return pd.DataFrame(mapped_fields) | |
# GPT-4-based generation function with few-shot learning | |
def generate_udm_mapping_response(log_fields, udm_fields_csv): | |
""" Generate a response using GPT-4 to map log fields to UDM fields """ | |
# Load UDM Fields | |
udm_fields = load_udm_fields(udm_fields_csv) | |
# Map the log fields to UDM | |
mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields) | |
# Prepare the mapping as context | |
mapped_fields_text = mapped_fields_df.to_string(index=False) | |
# Few-shot learning examples | |
examples = """ | |
### Example 1: Fortinet Fields to UDM Mapping | |
Log Attribute | UDM Attribute | |
--------------|--------------- | |
devname | intermediary.hostname | |
devid | intermediary.asset.hardware.serial_number | |
srcip | principal.ip | |
dstip | target.ip | |
dstport | target.port | |
### Example 2: Palo Alto Fields to UDM Mapping | |
Log Attribute | UDM Attribute | |
--------------|--------------- | |
src_ip | principal.ip | |
dest_ip | target.ip | |
dest_port | target.port | |
action | security_result.action_details | |
severity | security_result.severity_details | |
""" | |
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, helping security teams to map security log fields to Google Chronicle's Unified Data Model (UDM). | |
Please follow these steps: 1. Identify the vendor name from the input, and understand the specific log field conventions of that vendor. | |
2. Extract log fields while ignoring general words that are not part of the field names. | |
3. Use web search or previous examples to consult the latest log documentation for the product provided and Google Chronicle UDM schema documentation. | |
4. Map each product log field to its corresponding UDM field based on known mappings or documentation. For the mapping to UDM fields, | |
use the full list of UDM fields in the csv file provided udm_fields_csv. | |
5. Only attempt to map the user input that you deem as log fields for this product. | |
6.For fields that don't have a direct match in UDM, place them into custom fields, ensuring that each custom field is unique and logically consistent. | |
7. Organize the mapping into a structured table format and provide the user the option to download your mapping into a csv file. | |
Think through your response step by step, and always aim for accurate, and professional responses with a focus on precision. | |
""" | |
# Call GPT-4 for final output with additional explanation | |
response = client.chat.completions.create( | |
model="gpt-4", | |
messages=[ | |
{"role": "system", "content": system_template}, | |
{"role": "user", "content": f"Here are the log fields: {log_fields}. Please map them to UDM:\n\n{mapped_fields_text}\n\n{examples}"} | |
] | |
) | |
return response.choices[0].message.content | |
# Chainlit app functionality | |
async def start_chat(): | |
await cl.Message(content="Welcome! Please provide the product name and the log fields or sample log you want to map to UDM.").send() | |
settings = { | |
"model": "gpt-3.5-turbo", | |
"temperature": 0, | |
"max_tokens": 500, | |
"top_p": 1, | |
"frequency_penalty": 0, | |
"presence_penalty": 0, | |
} | |
cl.user_session.set("settings", settings) | |
async def main(message: cl.Message): | |
user_input = message.content | |
log_fields = extract_log_fields(user_input) | |
udm_fields_csv = 'udm_field_list_v2.csv' | |
# Generate the UDM mapping response | |
response = generate_udm_mapping_response(log_fields, udm_fields_csv) | |
# Send the response back to Chainlit | |
await cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{response}").send() |