LLMopsDK / app_transformation.py
Galatea007's picture
Update app_transformation.py
f5abec5
raw
history blame
3.51 kB
import pandas as pd
import chainlit as cl
from chainlit.prompt import Prompt, PromptMessage
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# ChatOpenAI Templates
system_template = """You are a cybersecurity expert specialized in log analysis and data normalization,
helping security teams to map security log fields to Google Chronicle's Unified Data Model (UDM).
Please follow these steps:
1. Use web search to consult the latest log documentation for the product provided and Google Chronicle UDM schema documentation.
2. Map each product log field to its corresponding UDM field.
3. Carefully consult the UDM fields csv file available to you as part of this code. Make sure you attempt to map to already defined fields in the UDM data model.
4. For fields that don't have a direct match in UDM, place them into custom fields.
5. Ensure each mapped field, including custom fields, is unique and accurate.
6. Organize the mapping into a structured table format.
You always provide detailed, accurate, and structured responses in a professional tone, focusing on precision.
"""
user_template = """Here is a sample log:
{input}
Please follow these steps:
1. Use web search to consult the latest documentation for this product/log source and Google Chronicle UDM schema documentation.
2. Map each log field to its corresponding UDM field.
3. For fields that don't have a direct match in UDM, place them into custom fields.
4. Ensure each mapped field, including custom fields, is unique and accurate.
5. Organize the mapping into a structured table format.
"""
# Function to read UDM fields from the existing CSV file
def read_udm_fields(csv_file_path):
udm_fields_df = pd.read_csv(csv_file_path)
return udm_fields_df
@cl.on_chat_start # Marks function to be executed at the start of a user session
async def start_chat():
settings = {
"model": "gpt-3.5-turbo",
"temperature": 0,
"max_tokens": 500,
"top_p": 1,
"frequency_penalty": 0,
"presence_penalty": 0,
}
cl.user_session.set("settings", settings)
@cl.on_message # Marks function to run each time chatbot receives a message from a user
async def main(message: cl.Message):
settings = cl.user_session.get("settings")
client = AsyncOpenAI()
print(message.content)
# Read UDM fields from the existing CSV file
csv_file_path = 'udm_fields.csv' # Ensure this file exists in the environment
udm_fields_df = read_udm_fields(csv_file_path)
# Process and map log fields to UDM fields based on the CSV
prompt = Prompt(
provider=ChatOpenAI.id,
messages=[
PromptMessage(role="system", template=system_template, formatted=system_template),
PromptMessage(role="user", template=user_template, formatted=user_template.format(input=message.content)),
],
inputs={"input": message.content},
settings=settings,
)
print([m.to_openai() for m in prompt.messages])
msg = cl.Message(content="")
async for stream_resp in await client.chat.completions.create(
messages=[m.to_openai() for m in prompt.messages], stream=True, **settings
):
token = stream_resp.choices[0].delta.content
if not token:
token = ""
await msg.stream_token(token)
# Update prompt object with the completion
prompt.completion = msg.content
msg.prompt = prompt
# Send and close the message stream
await msg.send()