# You can find this code for Chainlit python streaming here (https://docs.chainlit.io/concepts/streaming/python) # OpenAI Chat completion import os from openai import AsyncOpenAI # importing openai for API usage import chainlit as cl # importing chainlit for our app from chainlit.prompt import Prompt, PromptMessage # importing prompt tools from chainlit.playground.providers import ChatOpenAI # importing ChatOpenAI tools from dotenv import load_dotenv load_dotenv() import pandas as pd import os from openai import AsyncOpenAI import chainlit as cl from chainlit.prompt import Prompt, PromptMessage from dotenv import load_dotenv # Load environment variables load_dotenv() # Load UDM fields CSV file def load_udm_fields(csv_path): return pd.read_csv(csv_path) # Map log fields to UDM fields def map_log_fields_to_udm(log_fields, udm_fields): mapped_fields = [] for field in log_fields: # Try to find a matching UDM field udm_match = udm_fields[udm_fields['Field Name'].str.contains(field, case=False)] if not udm_match.empty: mapped_fields.append({ 'Log_Field': field, 'UDM_Field': udm_match.iloc[0]['Field Name'] }) else: # If no direct match, add to custom fields mapped_fields.append({ 'Log_Field': field, 'UDM_Field': 'custom_fields.' + field }) return pd.DataFrame(mapped_fields) # Chainlit OpenAI Templates for multi-shot learning system_template = """You are a cybersecurity expert specialized in log analysis and data normalization, helping security teams map security log fields to Google Chronicle's Unified Data Model (UDM). Please follow these steps: 1. Map each product log field to its corresponding UDM field using the reference UDM CSV provided. 2. For fields that don't have a direct match in UDM, place them into custom fields. 3. Ensure each mapped field, including custom fields, is unique and accurate. 4. Organize the mapping into a structured table format. """ # Multi-shot learning examples for Fortinet and Palo Alto user_template = """Here is a sample log: {input} Please follow these steps: 1. Use the provided UDM CSV to map the log fields. 2. For fields that don't have a direct match, assign them to custom fields. 3. Organize the mapping into a structured table. ### Example 1: Fortinet Fields to UDM Mapping Log Attribute | UDM Attribute --------------|--------------- devname | intermediary.hostname devid | intermediary.asset.hardware.serial_number srcip | principal.ip dstip | target.ip dstport | target.port ### Example 2: Palo Alto Fields to UDM Mapping Log Attribute | UDM Attribute --------------|--------------- src_ip | principal.ip dest_ip | target.ip dest_port | target.port action | security_result.action_details severity | security_result.severity_details Now proceed to map the given sample log: """ @cl.on_chat_start # Marks a function that will be executed at the start of a user session async def start_chat(): settings = { "model": "gpt-3.5-turbo", "temperature": 0, "max_tokens": 500, "top_p": 1, "frequency_penalty": 0, "presence_penalty": 0, } cl.user_session.set("settings", settings) @cl.on_message # Marks a function that should be run each time the chatbot receives a message from a user async def main(message: cl.Message): settings = cl.user_session.get("settings") # Load the UDM fields reference CSV udm_fields_csv = "udm_field_list_v2.csv" # Replace with your actual CSV path udm_fields = load_udm_fields(udm_fields_csv) # Simulate log fields from the user's input (in real use case, you'd parse the input log) log_fields = message.content.split() # Example: Splitting input log into fields for simplicity # Perform the mapping mapped_fields_df = map_log_fields_to_udm(log_fields, udm_fields) # Create a response showing the mapping mapped_fields_table = mapped_fields_df.to_string(index=False) prompt = Prompt( provider=ChatOpenAI.id, messages=[ PromptMessage(role="system", template=system_template, formatted=system_template), PromptMessage(role="user", template=user_template, formatted=user_template.format(input=message.content)), ], inputs={"input": message.content}, settings=settings, ) msg = cl.Message(content=f"Here is the mapped log fields to UDM:\n\n{mapped_fields_table}") await msg.send() # Save the mapping to CSV for further analysis mapped_fields_df.to_csv('mapped_log_fields.csv', index=False)