LLMopsDK / app_udm.py
Galatea007's picture
Rename app.py to app_udm.py
d404a15 verified
raw
history blame
4.74 kB
# You can find this code for Chainlit python streaming here (https://docs.chainlit.io/concepts/streaming/python)
# OpenAI Chat completion
import os
from openai import AsyncOpenAI # importing openai for API usage
import chainlit as cl # importing chainlit for our app
from chainlit.prompt import Prompt, PromptMessage # importing prompt tools
from chainlit.playground.providers import ChatOpenAI # importing ChatOpenAI tools
from dotenv import load_dotenv
load_dotenv()
import pandas as pd
import os
from openai import AsyncOpenAI
import chainlit as cl
from chainlit.prompt import Prompt, PromptMessage
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Load UDM fields CSV file
def load_udm_fields(csv_path):
return pd.read_csv(csv_path)
# Map log fields to UDM fields
def map_log_fields_to_udm(log_fields, udm_fields):
mapped_fields = []
for field in log_fields:
# Try to find a matching UDM field
udm_match = udm_fields[udm_fields['Field Name'].str.contains(field, case=False)]
if not udm_match.empty:
mapped_fields.append({
'Log_Field': field,
'UDM_Field': udm_match.iloc[0]['Field Name']
})
else:
# If no direct match, add to custom fields
mapped_fields.append({
'Log_Field': field,
'UDM_Field': 'custom_fields.' + field
})
return pd.DataFrame(mapped_fields)
# Chainlit OpenAI Templates for multi-shot learning
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization,
helping security teams map security log fields to Google Chronicle's Unified Data Model (UDM).
Please follow these steps:
1. Map each product log field to its corresponding UDM field using the reference UDM CSV provided.
2. For fields that don't have a direct match in UDM, place them into custom fields.
3. Ensure each mapped field, including custom fields, is unique and accurate.
4. Organize the mapping into a structured table format.
"""
# Multi-shot learning examples for Fortinet and Palo Alto
user_template = """Here is a sample log:
{input}
Please follow these steps:
1. Use the provided UDM CSV to map the log fields.
2. For fields that don't have a direct match, assign them to custom fields.
3. Organize the mapping into a structured table.
### Example 1: Fortinet Fields to UDM Mapping
Log Attribute | UDM Attribute
--------------|---------------
devname | intermediary.hostname
devid | intermediary.asset.hardware.serial_number
srcip | principal.ip
dstip | target.ip
dstport | target.port
### Example 2: Palo Alto Fields to UDM Mapping
Log Attribute | UDM Attribute
--------------|---------------
src_ip | principal.ip
dest_ip | target.ip
dest_port | target.port
action | security_result.action_details
severity | security_result.severity_details
Now proceed to map the given sample log:
"""
@cl.on_chat_start # Marks a function that will be executed at the start of a user session
async def start_chat():
settings = {
"model": "gpt-3.5-turbo",
"temperature": 0,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
}
cl.user_session.set("settings", settings)
@cl.on_message # Marks a function that should be run each time the chatbot receives a message from a user
async def main(message: cl.Message):
settings = cl.user_session.get("settings")
# Load the UDM fields reference CSV
udm_fields_csv = "udm_field_list_v2.csv" # Replace with your actual CSV path
udm_fields = load_udm_fields(udm_fields_csv)
# Simulate log fields from the user's input (in real use case, you'd parse the input log)
log_fields = message.content.split() # Example: Splitting input log into fields for simplicity
# Perform the mapping
mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields)
# Create a response showing the mapping
mapped_fields_table = mapped_fields_df.to_string(index=False)
prompt = Prompt(
provider=ChatOpenAI.id,
messages=[
PromptMessage(role="system", template=system_template, formatted=system_template),
PromptMessage(role="user", template=user_template, formatted=user_template.format(input=message.content)),
],
inputs={"input": message.content},
settings=settings,
)
msg = cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{mapped_fields_table}")
await msg.send()
# Save the mapping to CSV for further analysis
mapped_fields_df.to_csv('mapped_log_fields.csv', index=False)