import os
import logging
from rdflib import Graph
from pydantic import BaseModel
from fastapi import FastAPI, HTTPException
from huggingface_hub import InferenceClient
# Configurazione logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Configurazione API Hugging Face
API_KEY = os.getenv("HF_API_KEY")
client = InferenceClient(api_key=API_KEY)
# File RDF
RDF_FILE = "Ontologia.rdf"
####################################
# Caricamento RDF (riassunto)
####################################
def load_rdf_summary():
"""
Carica un riassunto dell'ontologia dal file RDF (se necessario).
Qui puoi usare parse e scansionare classi e proprietà reali.
"""
if not os.path.exists(RDF_FILE):
return "Nessun file RDF trovato."
try:
g = Graph()
g.parse(RDF_FILE, format="xml")
# Esempio di estrazione semplificata di classi e proprietà
classes = set()
properties = set()
for s, p, o in g.triples((None, None, None)):
if "Class" in str(o):
classes.add(s)
if "Property" in str(o):
properties.add(s)
class_summary = "\n".join([f"- Classe: {cls}" for cls in classes])
prop_summary = "\n".join([f"- Proprietà: {prop}" for prop in properties])
return f"Classi:\n{class_summary}\n\nProprietà:\n{prop_summary}"
except Exception as e:
logger.error(f"Errore durante il parsing del file RDF: {e}")
return "Errore nel caricamento del file RDF."
rdf_context = load_rdf_summary()
logger.info("RDF Summary: %s", rdf_context)
####################################
# Validazione SPARQL
####################################
def validate_sparql_query(query: str, rdf_file_path: str) -> bool:
"""
Esegue il parsing e l'esecuzione di test della query su RDF,
per verificare che sia sintatticamente e semanticamente corretta.
"""
g = Graph()
try:
g.parse(rdf_file_path, format="xml")
g.query(query) # Se c'è errore di sintassi o referenza, solleva eccezione
return True
except Exception as e:
logger.error(f"Errore durante la validazione della query SPARQL: {e}")
return False
####################################
# Prompt di Sistema molto stringente
####################################
def create_system_message(rdf_context: str) -> str:
"""
Prompt di sistema estremo:
- impone l'uso di un SOLO prefisso
- vieta righe multiple
- vieta di inventare prefissi
- obbliga a iniziare con `PREFIX base: ... SELECT` o `ASK`
"""
return f"""
Sei un assistente esperto nella generazione di query SPARQL basate su un'ontologia RDF.
Ecco un riassunto dell'ontologia su cui devi lavorare:
{rdf_context}
DI SEGUITO LE REGOLE TASSATIVE:
1. DEVI usare ESCLUSIVAMENTE questo prefisso di base (e NON modificarlo in nessun modo):
PREFIX base:
2. La query deve stare in UNA SOLA RIGA, senza andare a capo.
3. La query deve INIZIARE con:
PREFIX base: SELECT
oppure
PREFIX base: ASK
4. Se devi indicare una classe, usa: ?qualcosa a base:NomeClasse .
5. Se devi indicare una proprietà, usa: ?s base:NomeProprieta ?o .
6. NON generare alcun altro prefisso.
7. NON utilizzare URI lunghe senza < > e NON inventare prefissi o risorse inesistenti.
8. Se non puoi rispondere con una query SPARQL valida secondo questi criteri, scrivi:
"Non posso generare una query SPARQL per questa richiesta."
Esempio di query corretta (fittizia) in una sola riga:
PREFIX base: SELECT ?stanza WHERE {{ ?stanza a base:Stanza . }} LIMIT 10
RISPONDI ESCLUSIVAMENTE CON LA QUERY O IL MESSAGGIO DI IMPOSSIBILITA'.
"""
####################################
# Prompt di "correzione"
####################################
def create_correction_message(rdf_context: str, errore: str) -> str:
"""
Questo prompt serve per la seconda iterazione se la query non è valida.
Invita a correggere la query e a rispettare le regole.
"""
return f"""
La query che hai fornito è risultata NON valida per il seguente motivo:
{errore}
RICORDA LE REGOLE TASSATIVE, in particolare l'uso ESATTO del prefisso:
PREFIX base:
Riscrivi la query in UNA SOLA RIGA, rispettando la sintassi SPARQL e usando solo classi e proprietà presenti nell'ontologia.
Se non riesci, dì: "Non posso generare una query SPARQL per questa richiesta."
"""
####################################
# Funzione per chiamare il modello
####################################
async def call_model(messages, temperature=0.7, max_tokens=2048):
try:
response = client.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=0.7,
stream=False
)
raw_text = response["choices"][0]["message"]["content"]
# Rimuoviamo eventuali newline per forzare la singola riga
return raw_text.replace("\n", " ").strip()
except Exception as e:
logger.error(f"Errore nel modello: {e}")
raise HTTPException(status_code=500, detail=str(e))
####################################
# FastAPI
####################################
app = FastAPI()
class QueryRequest(BaseModel):
message: str
max_tokens: int = 2048
temperature: float = 0.7
@app.post("/generate-query/")
async def generate_query(request: QueryRequest):
# 1) Prima iterazione
system_msg = create_system_message(rdf_context)
user_msg = request.message
messages_first = [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg}
]
response1 = await call_model(messages_first, request.temperature, request.max_tokens)
logger.info(f"[Prima iterazione] Risposta generata dal modello: {response1}")
# Controllo se comincia con il prefisso esatto
mandated_prefix = "PREFIX base: "
if not (response1.startswith(mandated_prefix + " SELECT") or response1.startswith(mandated_prefix + " ASK")):
return {
"query": None,
"explanation": "Il modello non ha usato il prefisso obbligatorio o non ha usato SELECT/ASK."
}
# Verifichiamo se la query è valida
if validate_sparql_query(response1, RDF_FILE):
return {"query": response1, "explanation": "Query valida alla prima iterazione."}
else:
# 2) Seconda iterazione (correzione)
correction_msg = create_correction_message(rdf_context, "Query non valida alla prima iterazione.")
# Comunichiamo al modello la query precedente come contesto e chiediamo la correzione
messages_second = [
{"role": "system", "content": system_msg}, # Prompt di sistema invariato
{"role": "assistant", "content": response1}, # La risposta 'errata'
{"role": "system", "content": correction_msg} # Istruzione di correzione
]
response2 = await call_model(messages_second, request.temperature, request.max_tokens)
logger.info(f"[Seconda iterazione] Risposta generata dal modello: {response2}")
# Ricontrollo se comincia con il prefisso esatto
if not (response2.startswith(mandated_prefix + " SELECT") and not response2.startswith(mandated_prefix + " ASK")):
# O se manca la SELECT e l'ASK
# Con un piccolo fix: potresti voler controllare sia SELECT che ASK qui
if not (response2.startswith(mandated_prefix + " SELECT") or response2.startswith(mandated_prefix + " ASK")):
return {
"query": None,
"explanation": "Anche la seconda iterazione non ha usato il prefisso e SELECT/ASK corretti."
}
# Validazione della seconda risposta
if validate_sparql_query(response2, RDF_FILE):
return {"query": response2, "explanation": "Query valida alla seconda iterazione (corretta)."}
else:
return {
"query": None,
"explanation": "Anche la seconda iterazione ha prodotto una query non valida. Interrompo."
}
@app.get("/")
async def root():
return {"message": "Server attivo e pronto a generare query SPARQL!"}