Spaces:
Paused
Paused
# You can find this code for Chainlit python streaming here (https://docs.chainlit.io/concepts/streaming/python) | |
# OpenAI Chat completion | |
import os | |
from openai import AsyncOpenAI # importing openai for API usage | |
import chainlit as cl # importing chainlit for our app | |
from chainlit.prompt import Prompt, PromptMessage # importing prompt tools | |
from chainlit.playground.providers import ChatOpenAI # importing ChatOpenAI tools | |
from dotenv import load_dotenv | |
load_dotenv() | |
import pandas as pd | |
import os | |
from openai import AsyncOpenAI | |
import chainlit as cl | |
from chainlit.prompt import Prompt, PromptMessage | |
from dotenv import load_dotenv | |
# Load environment variables | |
load_dotenv() | |
# Load UDM fields CSV file | |
def load_udm_fields(csv_path): | |
return pd.read_csv(csv_path) | |
# Map log fields to UDM fields | |
def map_log_fields_to_udm(log_fields, udm_fields): | |
mapped_fields = [] | |
for field in log_fields: | |
# Try to find a matching UDM field | |
udm_match = udm_fields[udm_fields['Field Name'].str.contains(field, case=False)] | |
if not udm_match.empty: | |
mapped_fields.append({ | |
'Log_Field': field, | |
'UDM_Field': udm_match.iloc[0]['Field Name'] | |
}) | |
else: | |
# If no direct match, add to custom fields | |
mapped_fields.append({ | |
'Log_Field': field, | |
'UDM_Field': 'custom_fields.' + field | |
}) | |
return pd.DataFrame(mapped_fields) | |
# Chainlit OpenAI Templates for multi-shot learning | |
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, | |
helping security teams map security log fields to Google Chronicle's Unified Data Model (UDM). | |
Please follow these steps: | |
1. Map each product log field to its corresponding UDM field using the reference UDM CSV provided. | |
2. For fields that don't have a direct match in UDM, place them into custom fields. | |
3. Ensure each mapped field, including custom fields, is unique and accurate. | |
4. Organize the mapping into a structured table format. | |
""" | |
# Multi-shot learning examples for Fortinet and Palo Alto | |
user_template = """Here is a sample log: | |
{input} | |
Please follow these steps: | |
1. Use the provided UDM CSV to map the log fields. | |
2. For fields that don't have a direct match, assign them to custom fields. | |
3. Organize the mapping into a structured table. | |
### Example 1: Fortinet Fields to UDM Mapping | |
Log Attribute | UDM Attribute | |
--------------|--------------- | |
devname | intermediary.hostname | |
devid | intermediary.asset.hardware.serial_number | |
srcip | principal.ip | |
dstip | target.ip | |
dstport | target.port | |
### Example 2: Palo Alto Fields to UDM Mapping | |
Log Attribute | UDM Attribute | |
--------------|--------------- | |
src_ip | principal.ip | |
dest_ip | target.ip | |
dest_port | target.port | |
action | security_result.action_details | |
severity | security_result.severity_details | |
Now proceed to map the given sample log: | |
""" | |
# Marks a function that will be executed at the start of a user session | |
async def start_chat(): | |
settings = { | |
"model": "gpt-3.5-turbo", | |
"temperature": 0, | |
"max_tokens": 500, | |
"top_p": 1, | |
"frequency_penalty": 0, | |
"presence_penalty": 0, | |
} | |
cl.user_session.set("settings", settings) | |
# Marks a function that should be run each time the chatbot receives a message from a user | |
async def main(message: cl.Message): | |
settings = cl.user_session.get("settings") | |
# Load the UDM fields reference CSV | |
udm_fields_csv = "udm_field_list_v2.csv" # Replace with your actual CSV path | |
udm_fields = load_udm_fields(udm_fields_csv) | |
# Simulate log fields from the user's input (in real use case, you'd parse the input log) | |
log_fields = message.content.split() # Example: Splitting input log into fields for simplicity | |
# Perform the mapping | |
mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields) | |
# Create a response showing the mapping | |
mapped_fields_table = mapped_fields_df.to_string(index=False) | |
prompt = Prompt( | |
provider=ChatOpenAI.id, | |
messages=[ | |
PromptMessage(role="system", template=system_template, formatted=system_template), | |
PromptMessage(role="user", template=user_template, formatted=user_template.format(input=message.content)), | |
], | |
inputs={"input": message.content}, | |
settings=settings, | |
) | |
msg = cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{mapped_fields_table}") | |
await msg.send() | |
# Save the mapping to CSV for further analysis | |
mapped_fields_df.to_csv('mapped_log_fields.csv', index=False) | |