File size: 22,206 Bytes
b632a36
bbf6f5b
27090a6
421602f
9a9379e
 
6d1368c
 
a6d25e6
b632a36
 
28ad39d
2527227
9a46a7b
efb7248
 
 
 
 
551d569
 
 
6d1368c
 
949a8a8
ad8118d
6d1368c
 
 
 
 
 
 
 
 
 
 
dae6371
414ad6b
6d1368c
551d569
8081474
551d569
27090a6
153b40f
 
771b1f8
f13beaa
 
153b40f
af2000a
6d1368c
 
 
771b1f8
6d1368c
 
 
27090a6
8081474
b632a36
496ca18
3e7e517
8081474
3e7e517
 
8081474
9a9379e
6d1368c
 
9a9379e
a581c9d
6d1368c
 
2527227
 
 
efb7248
 
 
6d1368c
 
 
 
 
 
551d569
6d1368c
 
949a8a8
4725242
b632a36
 
 
3e7e517
b632a36
 
 
 
3e7e517
b632a36
 
53feba3
3e7e517
 
 
 
 
 
53feba3
b632a36
3e7e517
b632a36
 
53feba3
3e7e517
 
 
 
 
 
53feba3
 
0365be0
3e7e517
b632a36
 
 
 
 
 
 
6d1368c
4725242
949a8a8
6d1368c
b632a36
6d1368c
 
 
b632a36
6d1368c
28ad39d
 
6d1368c
b632a36
4725242
b632a36
 
 
3e7e517
b632a36
 
 
771b1f8
53feba3
b632a36
3e7e517
771b1f8
 
 
bbf6f5b
 
 
 
 
 
53feba3
771b1f8
 
 
 
bbf6f5b
771b1f8
bbf6f5b
771b1f8
3e7e517
b632a36
 
 
 
 
3e7e517
b632a36
 
 
4725242
551d569
 
53feba3
3e7e517
551d569
 
28ad39d
 
551d569
 
4725242
 
b632a36
4725242
551d569
28ad39d
 
551d569
949a8a8
53feba3
3e7e517
 
 
28ad39d
551d569
 
 
 
 
 
6d1368c
2527227
 
 
 
 
 
 
771b1f8
 
2527227
 
 
 
 
771b1f8
2527227
 
771b1f8
 
2527227
771b1f8
2527227
771b1f8
2527227
 
 
 
 
8081474
4725242
8081474
3e7e517
551d569
3e7e517
4725242
 
3e7e517
4725242
551d569
4725242
 
 
 
 
 
 
2527227
 
 
 
 
 
 
 
 
 
 
 
 
 
6d1368c
2527227
551d569
53feba3
2527227
 
 
 
8081474
2527227
 
ac65d39
771b1f8
 
 
551d569
 
771b1f8
 
bbf6f5b
 
 
 
 
 
53feba3
771b1f8
 
bbf6f5b
 
771b1f8
551d569
771b1f8
 
421602f
fa3df7f
153b40f
 
53feba3
771b1f8
 
 
 
 
 
9a46a7b
f13beaa
771b1f8
 
 
 
 
414ad6b
fa3df7f
 
 
 
f13beaa
153b40f
9a46a7b
fa3df7f
f13beaa
9a46a7b
771b1f8
153b40f
9a46a7b
153b40f
771b1f8
fa3df7f
771b1f8
f13beaa
153b40f
fa3df7f
 
 
 
 
 
 
153b40f
 
9a46a7b
fa3df7f
9a46a7b
fa3df7f
9a46a7b
efb7248
 
 
8081474
 
3e7e517
b632a36
4725242
b632a36
 
efb7248
3e7e517
 
 
 
 
 
 
 
 
 
53feba3
3e7e517
 
 
 
 
 
 
 
 
 
 
0365be0
3e7e517
 
 
53feba3
3e7e517
 
 
53feba3
3e7e517
 
9426abb
3e7e517
 
414ad6b
3e7e517
 
 
771b1f8
 
3e7e517
fa3df7f
 
414ad6b
771b1f8
 
 
 
 
efb7248
fa3df7f
771b1f8
 
 
efb7248
 
 
 
 
 
 
 
771b1f8
 
3e7e517
 
 
 
efb7248
3e7e517
0365be0
3e7e517
efb7248
3e7e517
4725242
 
efb7248
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0365be0
efb7248
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8081474
 
 
3e7e517
 
 
 
 
8081474
 
efb7248
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
import gradio as gr
from sentence_transformers import SentenceTransformer
import os
import time
import threading
import queue
import psycopg2
import zlib
import numpy as np
from urllib.parse import urlparse
import logging
from sklearn.preprocessing import normalize
from concurrent.futures import ThreadPoolExecutor
import requests
from fastapi import FastAPI, HTTPException, Query
from typing import List
import uvicorn
from starlette.requests import Request
from starlette.responses import HTMLResponse

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Настройки базы данных PostgreSQL
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
    raise ValueError("DATABASE_URL environment variable not set.")

parsed_url = urlparse(DATABASE_URL)
db_params = {
    "host": parsed_url.hostname,
    "port": parsed_url.port,
    "database": parsed_url.path.lstrip("/"),
    "user": parsed_url.username,
    "password": parsed_url.password,
    "sslmode": "require"
}

# Загружаем модель эмбеддингов
model_name = "BAAI/bge-m3"
logging.info(f"Загрузка модели {model_name}...")
model = SentenceTransformer(model_name)
logging.info("Модель загружена успешно.")

# Jina AI Reranker API
JINA_API_URL = 'https://api.jina.ai/v1/rerank'
JINA_API_KEY = os.environ.get("JINA_API_KEY")
if JINA_API_KEY is None:
    raise ValueError("JINA_API_KEY environment variable not set.")
JINA_RERANKER_MODEL = "jina-reranker-v2-base-multilingual"

# Имена таблиц
embeddings_table = "movie_embeddings"
query_cache_table = "query_cache"
movies_table = "Movies"  # Имя таблицы с фильмами

# Максимальный размер таблицы кэша запросов в байтах (50MB)
MAX_CACHE_SIZE = 50 * 1024 * 1024

# Очередь для необработанных фильмов
movies_queue = queue.Queue()

# Флаг, указывающий, что обработка фильмов завершена
processing_complete = False

# Флаг, указывающий, что выполняется поиск
search_in_progress = False

# Блокировка для доступа к базе данных
db_lock = threading.Lock()

# Размер пакета для обработки эмбеддингов
batch_size = 32

# Количество потоков для параллельной обработки
num_threads = 5

# FastAPI приложение
app = FastAPI()

def get_db_connection():
    """Устанавливает соединение с базой данных."""
    try:
        conn = psycopg2.connect(**db_params)
        return conn
    except Exception as e:
        logging.error(f"Ошибка подключения к базе данных: {e}")
        return None

def setup_database():
    """Настраивает базу данных: создает расширение, таблицы и индексы."""
    conn = get_db_connection()
    if conn is None:
        return

    try:
        with conn.cursor() as cur:
            # Создаем расширение pgvector если его нет
            cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")

            # Создаем таблицу для хранения эмбеддингов фильмов
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS "{embeddings_table}" (
                    movie_id INTEGER PRIMARY KEY,
                    embedding_crc32 BIGINT,
                    string_crc32 BIGINT,
                    model_name TEXT,
                    embedding vector(1024)
                );
                CREATE INDEX IF NOT EXISTS idx_string_crc32 ON "{embeddings_table}" (string_crc32);
            """)

            # Создаем таблицу для кэширования запросов
            cur.execute(f"""
                CREATE TABLE IF NOT EXISTS "{query_cache_table}" (
                    query_crc32 BIGINT PRIMARY KEY,
                    query TEXT,
                    model_name TEXT,
                    embedding vector(1024),
                    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
                );
                CREATE INDEX IF NOT EXISTS idx_query_crc32 ON "{query_cache_table}" (query_crc32);
                CREATE INDEX IF NOT EXISTS idx_created_at ON "{query_cache_table}" (created_at);
            """)

        conn.commit()
        logging.info("База данных успешно настроена.")
    except Exception as e:
        logging.error(f"Ошибка при настройке базы данных: {e}")
        conn.rollback()
    finally:
        conn.close()

# Настраиваем базу данных при запуске
setup_database()

def calculate_crc32(text):
    """Вычисляет CRC32 для строки."""
    return zlib.crc32(text.encode('utf-8')) & 0xFFFFFFFF

def encode_string(text):
    """Кодирует строку в эмбеддинг."""
    embedding = model.encode(text, convert_to_tensor=True, normalize_embeddings=True)
    return embedding.cpu().numpy()

def get_movies_without_embeddings():
    """Получает список фильмов, для которых нужно создать эмбеддинги."""
    conn = get_db_connection()
    if conn is None:
        return []

    movies_to_process = []
    try:
        with conn.cursor() as cur:
            # Получаем список ID фильмов, которые уже есть в таблице эмбеддингов
            cur.execute(f"SELECT movie_id FROM \"{embeddings_table}\"")
            existing_ids = {row[0] for row in cur.fetchall()}

            # Получаем список всех фильмов из таблицы Movies с подготовленной строкой
            cur.execute(f"""
                SELECT id, data,
                    jsonb_build_object(
                        'Название', data->>'name',
                        'Год', data->>'year',
                        'Жанры', (SELECT string_agg(genre->>'name', ', ') FROM jsonb_array_elements(data->'genres') AS genre),
                        'Описание', COALESCE(data->>'description', '')
                    ) AS prepared_json
                FROM "{movies_table}"
            """)
            all_movies = cur.fetchall()

            # Фильтруем только те фильмы, которых нет в таблице эмбеддингов
            for movie_id, movie_data, prepared_json in all_movies:
                if movie_id not in existing_ids:
                    prepared_string = f"Название: {prepared_json['Название']}\nГод: {prepared_json['Год']}\nЖанры: {prepared_json['Жанры']}\nОписание: {prepared_json['Описание']}"
                    movies_to_process.append((movie_id, movie_data, prepared_string))

        logging.info(f"Найдено {len(movies_to_process)} фильмов для обработки.")
    except Exception as e:
        logging.error(f"Ошибка при получении списка фильмов для обработки: {e}")
    finally:
        conn.close()

    return movies_to_process

def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name):
    """Получает эмбеддинг из базы данных."""
    try:
        with conn.cursor() as cur:
            cur.execute(f"SELECT embedding FROM \"{table_name}\" WHERE \"{crc32_column}\" = %s AND model_name = %s",
                       (crc32_value, model_name))
            result = cur.fetchone()
            if result and result[0]:
                # Нормализуем эмбеддинг после извлечения из БД
                return normalize(np.array(result[0]).reshape(1, -1))[0]
    except Exception as e:
        logging.error(f"Ошибка при получении эмбеддинга из БД: {e}")
    return None

def insert_embedding(conn, table_name, movie_id, embedding_crc32, string_crc32, embedding):
    """Вставляет эмбеддинг в базу данных."""
    try:
        # Нормализуем эмбеддинг перед сохранением
        normalized_embedding = normalize(embedding.reshape(1, -1))[0]
        with conn.cursor() as cur:
            cur.execute(f"""
                INSERT INTO "{table_name}" 
                (movie_id, embedding_crc32, string_crc32, model_name, embedding)
                VALUES (%s, %s, %s, %s, %s)
                ON CONFLICT (movie_id) DO NOTHING
            """, (movie_id, embedding_crc32, string_crc32, model_name, normalized_embedding.tolist()))
        conn.commit()
        return True
    except Exception as e:
        logging.error(f"Ошибка при вставке эмбеддинга: {e}")
        conn.rollback()
        return False

def process_batch(batch):
    """Обрабатывает пакет фильмов, создавая для них эмбеддинги."""
    conn = get_db_connection()
    if conn is None:
        return

    try:
        for movie_id, movie_data, prepared_string in batch:
            string_crc32 = calculate_crc32(prepared_string)

            # Проверяем существующий эмбеддинг
            existing_embedding = get_embedding_from_db(conn, embeddings_table, "string_crc32", string_crc32, model_name)

            if existing_embedding is None:
                embedding = encode_string(prepared_string)
                embedding_crc32 = calculate_crc32(str(embedding.tolist()))

                if insert_embedding(conn, embeddings_table, movie_id, embedding_crc32, string_crc32, embedding):
                    logging.info(f"Сохранен эмбеддинг для '{movie_data['name']}' (ID: {movie_id})")
                else:
                    logging.error(f"Ошибка сохранения эмбеддинга для '{movie_data['name']}' (ID: {movie_id})")
            else:
                logging.info(f"Эмбеддинг для '{movie_data['name']}' (ID: {movie_id}) уже существует")
    except Exception as e:
        logging.error(f"Ошибка при обработке пакета фильмов: {e}")
    finally:
        conn.close()

def process_movies():
    """Обрабатывает фильмы, создавая для них эмбеддинги."""
    global processing_complete

    logging.info("Начало обработки фильмов.")

    # Получаем список фильмов, которые нужно обработать
    movies_to_process = get_movies_without_embeddings()

    if not movies_to_process:
        logging.info("Все фильмы уже обработаны.")
        processing_complete = True
        return

    # Добавляем фильмы в очередь
    for movie in movies_to_process:
        movies_queue.put(movie)

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        try:
            while not movies_queue.empty():
                if search_in_progress:
                    time.sleep(1)
                    continue

                batch = []
                while not movies_queue.empty() and len(batch) < batch_size:
                    try:
                        movie = movies_queue.get_nowait()
                        batch.append(movie)
                    except queue.Empty:
                        break

                if not batch:
                    break

                executor.submit(process_batch, batch)
                logging.info(f"Отправлен на обработку пакет из {len(batch)} фильмов.")
        except Exception as e:
            logging.error(f"Ошибка при обработке фильмов: {e}")

    processing_complete = True
    logging.info("Обработка фильмов завершена")

def get_movie_data_from_db(conn, movie_ids):
    """Получает данные фильмов из таблицы Movies по списку ID."""
    movie_data_dict = {}
    try:
        with conn.cursor() as cur:
            cur.execute(f"""
                SELECT id, data,
                    jsonb_build_object(
                        'Название', data->>'name',
                        'Год', data->>'year',
                        'Жанры', (SELECT string_agg(genre->>'name', ', ') FROM jsonb_array_elements(data->'genres') AS genre),
                        'Описание', COALESCE(data->>'description', '')
                    ) AS prepared_json
                FROM "{movies_table}"
                WHERE id IN %s
            """, (tuple(movie_ids),))
            for movie_id, movie_data, prepared_json in cur.fetchall():
                prepared_string = f"Название: {prepared_json['Название']}\nГод: {prepared_json['Год']}\nЖанры: {prepared_json['Жанры']}\nОписание: {prepared_json['Описание']}"
                movie_data_dict[movie_id] = (movie_data, prepared_string)
    except Exception as e:
        logging.error(f"Ошибка при получении данных фильмов из БД: {e}")
    return movie_data_dict

def rerank_with_api(query, results, top_k):
    """Переранжирует результаты с помощью Jina AI Reranker API."""
    logging.info(f"Начало переранжирования для запроса: '{query}'")

    # Получаем данные фильмов из БД
    conn = get_db_connection()
    movie_ids = [movie_id for movie_id, _ in results]
    movie_data_dict = get_movie_data_from_db(conn, movie_ids)
    conn.close()

    documents = []
    for movie_id, _ in results:
        movie_data, prepared_string = movie_data_dict.get(movie_id, (None, None))
        if movie_data:
            documents.append(prepared_string)
        else:
            logging.warning(f"Данные для фильма с ID {movie_id} не найдены в БД.")

    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {JINA_API_KEY}'
    }
    data = {
        "model": JINA_RERANKER_MODEL,
        "query": query,
        "top_n": top_k,
        "documents": documents
    }
    logging.info(f"Отправка данных на реранжировку (documents count): {len(data['documents'])}")

    try:
        response = requests.post(JINA_API_URL, headers=headers, json=data)
        response.raise_for_status()
        result = response.json()
        logging.info(f"Ответ от API реранжировщика получен.")

        reranked_results = []
        if 'results' in result:
            for item in result['results']:
                index = item['index']
                movie_id = results[index][0]
                reranked_results.append((movie_id, item['relevance_score']))
        else:
            logging.warning("Ответ от API не содержит ключа 'results'.")

        logging.info("Переранжирование завершено.")
        return reranked_results

    except requests.exceptions.RequestException as e:
        logging.error(f"Ошибка при запросе к API реранжировщика: {e}")
        return []
    
def search_movies_internal(query: str, top_k: int = 25):
    """Внутренняя функция для поиска фильмов по запросу (используется и в Gradio, и в API)."""
    global search_in_progress
    search_in_progress = True
    start_time = time.time()

    try:
        conn = get_db_connection()
        if conn is None:
            raise Exception("Ошибка подключения к базе данных")

        query_crc32 = calculate_crc32(query)
        query_embedding = get_embedding_from_db(conn, query_cache_table, "query_crc32", query_crc32, model_name)

        if query_embedding is None:
            query_embedding = encode_string(query)

            try:
                with conn.cursor() as cur:
                    cur.execute(f"""
                        INSERT INTO "{query_cache_table}" (query_crc32, query, model_name, embedding)
                        VALUES (%s, %s, %s, %s)
                        ON CONFLICT (query_crc32) DO NOTHING
                    """, (query_crc32, query, model_name, query_embedding.tolist()))
                conn.commit()
                logging.info(f"Сохранен новый эмбеддинг запроса: {query}")
            except Exception as e:
                logging.error(f"Ошибка при сохранении эмбеддинга запроса: {e}")
                conn.rollback()

        # Используем косинусное расстояние для поиска
        try:
            with conn.cursor() as cur:
                cur.execute(f"""
                    WITH query_embedding AS (
                        SELECT embedding
                        FROM "{query_cache_table}"
                        WHERE query_crc32 = %s
                    )
                    SELECT m.movie_id, 1 - (m.embedding <=> (SELECT embedding FROM query_embedding)) as similarity
                    FROM "{embeddings_table}" m, query_embedding
                    ORDER BY similarity DESC
                    LIMIT %s
                """, (query_crc32, int(top_k * 2)))

                results = cur.fetchall()
            logging.info(f"Найдено {len(results)} предварительных результатов поиска.")
        except Exception as e:
            logging.error(f"Ошибка при выполнении поискового запроса: {e}")
            results = []
        finally:
            conn.close()

        # Переранжируем результаты с помощью API
        reranked_results = rerank_with_api(query, results, top_k)

        conn = get_db_connection()
        movie_ids = [movie_id for movie_id, _ in reranked_results]
        movie_data_dict = get_movie_data_from_db(conn, movie_ids)
        conn.close()

        formatted_results = []
        for movie_id, score in reranked_results:
            # Находим данные фильма
            movie_data, _ = movie_data_dict.get(movie_id, (None, None))
            if movie_data:
                formatted_results.append({
                    "movie_id": movie_id,
                    "name": movie_data['name'],
                    "year": movie_data['year'],
                    "genres": [genre['name'] for genre in movie_data['genres']],
                    "description": movie_data.get('description', ''),
                    "relevance_score": score
                })
            else:
                logging.warning(f"Данные для фильма с ID {movie_id} не найдены в БД.")

        search_time = time.time() - start_time
        logging.info(f"Поиск выполнен за {search_time:.2f} секунд.")

        return formatted_results, search_time

    except Exception as e:
        logging.error(f"Ошибка при выполнении поиска: {e}")
        raise

    finally:
        search_in_progress = False
        
def search_movies(query, top_k=25):
    """Функция поиска фильмов для Gradio интерфейса."""
    try:
        results, search_time = search_movies_internal(query, top_k)
        output = f"<p>Время поиска: {search_time:.2f} сек</p>"
        for result in results:
            output += f"<h3>{result['name']} ({result['year']})</h3>\n"
            output += f"<p><strong>Жанры:</strong> {', '.join(result['genres'])}</p>\n"
            output += f"<p><strong>Описание:</strong> {result['description']}</p>\n"
            output += f"<p><strong>Релевантность (reranker score):</strong> {result['relevance_score']:.4f}</p>\n"
            output += "<hr>\n"
        return output
    except Exception as e:
        return f"<p>Произошла ошибка при выполнении поиска: {e}</p>"

@app.get("/search/", response_model=List[dict])
async def api_search_movies(query: str = Query(..., description="Поисковый запрос"), top_k: int = Query(25, description="Количество возвращаемых результатов")):
    """API endpoint для поиска фильмов."""
    try:
        results, _ = search_movies_internal(query, top_k)
        return results
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Запускаем обработку фильмов в отдельном потоке (если ещё не запущена)
if not 'processing_thread' in globals():
    processing_thread = threading.Thread(target=process_movies)
    processing_thread.start()
elif not processing_thread.is_alive():
    processing_thread = threading.Thread(target=process_movies)
    processing_thread.start()

# Создаем интерфейс Gradio
iface = gr.Interface(
    fn=search_movies,
    inputs=gr.Textbox(lines=2, placeholder="Введите запрос для поиска фильмов..."),
    outputs=gr.HTML(label="Результаты поиска"),
    title="Семантический поиск фильмов",
    description="Введите описание фильма, который вы ищете, и система найдет наиболее похожие фильмы."
)

# Встраиваем Gradio в FastAPI
app = gr.mount_gradio_app(app, iface, path="/")

# Рут-эндпоинт для демонстрации, что FastAPI работает
@app.get("/api")
async def root():
    return {"message": "FastAPI is running. Access the API documentation at /docs"}

# Запускаем FastAPI
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=7860)