File size: 14,630 Bytes
1ef9436
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@File    :   streamer_room_db.py
@Time    :   2024/08/31
@Project :   https://github.com/PeterH0323/Streamer-Sales
@Author  :   HinGwenWong
@Version :   1.0
@Desc    :   直播间信息数据库操作
"""


from datetime import datetime
from typing import List

from loguru import logger
from sqlmodel import Session, and_, not_, select

from ...web_configs import API_CONFIG
from ..models.streamer_room_model import ChatMessageInfo, OnAirRoomStatusItem, SalesDocAndVideoInfo, StreamRoomInfo
from .init_db import DB_ENGINE


async def get_db_streaming_room_info(user_id: int, room_id: int | None = None) -> List[StreamRoomInfo] | None:
    """查询数据库中的商品信息

    Args:
        user_id (int): 用户 ID
        streamer_id (int | None, optional): 主播 ID,用户获取特定主播信息. Defaults to None.

    Returns:
        List[StreamRoomInfo] | None: 直播间信息
    """

    # 查询条件
    query_condiction = and_(StreamRoomInfo.user_id == user_id, StreamRoomInfo.delete == False)

    # 获取总数
    with Session(DB_ENGINE) as session:
        if room_id is not None:
            # 查询条件更改为查找特定 ID
            query_condiction = and_(
                StreamRoomInfo.user_id == user_id, StreamRoomInfo.delete == False, StreamRoomInfo.room_id == room_id
            )

        # 查询获取直播间信息
        stream_room_list = session.exec(select(StreamRoomInfo).where(query_condiction).order_by(StreamRoomInfo.room_id)).all()

    if stream_room_list is None:
        logger.warning("nothing to find in db...")
        stream_room_list = []

    # 将路径换成服务器路径
    for stream_room in stream_room_list:
        # 主播信息
        stream_room.streamer_info.avatar = API_CONFIG.REQUEST_FILES_URL + stream_room.streamer_info.avatar
        stream_room.streamer_info.tts_reference_audio = (
            API_CONFIG.REQUEST_FILES_URL + stream_room.streamer_info.tts_reference_audio
        )
        stream_room.streamer_info.poster_image = API_CONFIG.REQUEST_FILES_URL + stream_room.streamer_info.poster_image
        stream_room.streamer_info.base_mp4_path = API_CONFIG.REQUEST_FILES_URL + stream_room.streamer_info.base_mp4_path

        # 商品信息
        for idx, product in enumerate(stream_room.product_list):
            stream_room.product_list[idx].product_info.image_path = API_CONFIG.REQUEST_FILES_URL + product.product_info.image_path
            stream_room.product_list[idx].product_info.instruction = (
                API_CONFIG.REQUEST_FILES_URL + product.product_info.instruction
            )

    logger.info(stream_room_list)
    logger.info(f"len {len(stream_room_list)}")

    return stream_room_list


async def delete_room_id(room_id: int, user_id: int) -> bool:
    """删除特定的主播间 ID

    Args:
        room_id (int): 直播间 ID
        user_id (int): 用户 ID,用于防止其他用户恶意删除

    Returns:
        bool: 是否删除成功
    """

    delete_success = True

    try:
        # 获取总数
        with Session(DB_ENGINE) as session:
            # 查找特定 ID
            room_info = session.exec(
                select(StreamRoomInfo).where(and_(StreamRoomInfo.room_id == room_id, StreamRoomInfo.user_id == user_id))
            ).one()

            if room_info is None:
                logger.error("Delete by other ID !!!")
                return False

            room_info.delete = True  # 设置为删除
            session.add(room_info)
            session.commit()  # 提交
    except Exception:
        delete_success = False

    return delete_success


def create_or_update_db_room_by_id(room_id: int, new_info: StreamRoomInfo, user_id: int):
    """新增 or 编辑直播间信息

    Args:
        room_id (int): 直播间 ID
        new_info (StreamRoomInfo): 新的信息
        user_id (int): 用户 ID,用于防止其他用户恶意修改
    """

    with Session(DB_ENGINE) as session:

        # 更新 status 内容
        if new_info.status_id is not None:
            status_info = session.exec(
                select(OnAirRoomStatusItem).where(OnAirRoomStatusItem.status_id == new_info.status_id)
            ).one()
        else:
            status_info = OnAirRoomStatusItem()

        status_info.streaming_video_path = new_info.status.streaming_video_path.replace(API_CONFIG.REQUEST_FILES_URL, "")
        status_info.live_status = new_info.status.live_status
        session.add(status_info)
        session.commit()
        session.refresh(status_info)

        if room_id > 0:

            # 更新主播间其他信息
            room_info = session.exec(
                select(StreamRoomInfo).where(and_(StreamRoomInfo.room_id == room_id, StreamRoomInfo.user_id == user_id))
            ).one()

            if room_info is None:
                logger.error("Edit by other ID !!!")
                return

        else:
            room_info = StreamRoomInfo(status_id=status_info.status_id, user_id=user_id)

        # 更新直播间基础信息
        room_info.name = new_info.name
        room_info.prohibited_words_id = new_info.prohibited_words_id
        room_info.room_poster = new_info.room_poster.replace(API_CONFIG.REQUEST_FILES_URL, "")
        room_info.background_image = new_info.background_image.replace(API_CONFIG.REQUEST_FILES_URL, "")
        room_info.streamer_id = new_info.streamer_id

        session.add(room_info)
        session.commit()  # 提交
        session.refresh(room_info)

        # 更新商品信息
        if len(new_info.product_list) > 0:
            selected_id_list = [product.product_id for product in new_info.product_list]
            for product in new_info.product_list:
                if product.sales_info_id is not None:
                    # 更新
                    sales_info = session.exec(
                        select(SalesDocAndVideoInfo).where(
                            and_(
                                SalesDocAndVideoInfo.room_id == room_info.room_id,
                                SalesDocAndVideoInfo.product_id == product.product_id,
                                SalesDocAndVideoInfo.sales_info_id == product.sales_info_id,
                            )
                        )
                    ).one()
                else:
                    # 新建
                    sales_info = SalesDocAndVideoInfo()

                sales_info.product_id = product.product_id
                sales_info.sales_doc = product.sales_doc
                sales_info.start_time = product.start_time
                sales_info.start_video = product.start_video.replace(API_CONFIG.REQUEST_FILES_URL, "")
                sales_info.selected = True
                sales_info.room_id = room_info.room_id
                session.add(sales_info)
                session.commit()

            # 删除没选上的
            if len(selected_id_list) > 0:
                cancel_select_sales_info = session.exec(
                    select(SalesDocAndVideoInfo).where(
                        and_(
                            SalesDocAndVideoInfo.room_id == room_info.room_id,
                            not_(SalesDocAndVideoInfo.product_id.in_(selected_id_list)),
                        )
                    )
                ).all()

                if cancel_select_sales_info is not None:
                    for cancel_select in cancel_select_sales_info:
                        session.delete(cancel_select)
                        session.commit()

        return room_info.room_id


def init_conversation(db_session, sales_info_id: int, streamer_id: int, sales_doc: str):
    """新建直播间对话,一般触发于点击 开始直播 or 下一个商品

    Args:
        db_session (it): 数据库句柄
        sales_info_id (int): 销售 ID
        streamer_id (int): 主播 ID
        sales_doc (str): 主播文案
    """
    message_info = ChatMessageInfo(
        sales_info_id=sales_info_id, streamer_id=streamer_id, role="streamer", message=sales_doc, send_time=datetime.now()
    )
    db_session.add(message_info)


def update_message_info(sales_info_id: int, role_id: int, role: str, message: str):
    """新增对话记录

    Args:
        sales_info_id (int): 销售 ID
        role_id (int): 角色 ID
        role (str): 角色类型:"streamer", "user"
        message (str): 插入的消息
    """

    assert role in ["streamer", "user"]

    with Session(DB_ENGINE) as session:

        role_key = "streamer_id" if role == "streamer" else "user_id"
        role_id_info = {role_key: role_id}

        message_info = ChatMessageInfo(
            **role_id_info, sales_info_id=sales_info_id, role=role, message=message, send_time=datetime.now()
        )
        session.add(message_info)
        session.commit()


def update_db_room_status(room_id: int, user_id: int, process_type: str):
    """编辑直播间状态信息

    Args:
        room_id (int): 直播间 ID
        new_status_info (OnAirRoomStatusItem): 新的信息
        user_id (int): 用户 ID,用于防止其他用户恶意修改
    """

    with Session(DB_ENGINE) as session:

        # 更新主播间其他信息
        room_info = session.exec(
            select(StreamRoomInfo).where(and_(StreamRoomInfo.room_id == room_id, StreamRoomInfo.user_id == user_id))
        ).one()

        if room_info is None:
            logger.error("Edit by other ID !!!")
            return

        # 更新 status 内容
        if room_info.status_id is not None:
            status_info = session.exec(
                select(OnAirRoomStatusItem).where(OnAirRoomStatusItem.status_id == room_info.status_id)
            ).one()

        if status_info is None:
            logger.error("status_info is None !!!")
            return

        if process_type in ["online", "next-product"]:

            if process_type == "online":
                status_info.live_status = 1
                status_info.start_time = datetime.now()
                status_info.end_time = None
                status_info.current_product_index = 0

            elif process_type == "next-product":
                status_info.current_product_index += 1

            current_idx = status_info.current_product_index

            status_info.streaming_video_path = room_info.product_list[current_idx].start_video
            status_info.sales_info_id = room_info.product_list[current_idx].sales_info_id

            sales_info = session.exec(
                select(SalesDocAndVideoInfo).where(
                    SalesDocAndVideoInfo.sales_info_id == room_info.product_list[current_idx].sales_info_id
                )
            ).one()

            sales_info.start_time = datetime.now()
            session.add(sales_info)

            # 新建对话
            init_conversation(
                session, status_info.sales_info_id, room_info.streamer_id, room_info.product_list[current_idx].sales_doc
            )

        elif process_type == "offline":
            status_info.streaming_video_path = ""
            status_info.live_status = 2
            status_info.end_time = datetime.now()

        else:
            raise NotImplemented("process type error !!")

        session.add(status_info)
        session.commit()


def get_message_list(sales_info_id: int) -> List[ChatMessageInfo]:
    """根据销售 ID 获取全部对话

    Args:
        sales_info_id (int): 销售 ID

    Returns:
        List[ChatMessageInfo]: 对话列表
    """
    with Session(DB_ENGINE) as session:

        message_info = session.exec(
            select(ChatMessageInfo)
            .where(and_(ChatMessageInfo.sales_info_id == sales_info_id))
            .order_by(ChatMessageInfo.message_id)
        ).all()

        if message_info is None:
            return []

    formate_message_list = []
    for message_ in message_info:
        chat_item = {
            "role": message_.role,
            "avatar": message_.user_info.avatar if message_.role == "user" else message_.streamer_info.avatar,
            "userName": message_.user_info.username if message_.role == "user" else message_.streamer_info.name,
            "message": message_.message,
            "datetime": message_.send_time,
        }

        chat_item["avatar"] = API_CONFIG.REQUEST_FILES_URL + chat_item["avatar"]
        formate_message_list.append(chat_item)

    return formate_message_list


def update_room_video_path(status_id: int, news_video_server_path: str):
    """数据库更新 status 主播视频

    Args:
        status_id (int): 主播间 status ID
        news_video_server_path (str): 需要更新的主播视频 服务器地址

    """
    with Session(DB_ENGINE) as session:
        # 更新 status 内容
        status_info = session.exec(select(OnAirRoomStatusItem).where(OnAirRoomStatusItem.status_id == status_id)).one()

        status_info.streaming_video_path = news_video_server_path.replace(API_CONFIG.REQUEST_FILES_URL, "")
        session.add(status_info)
        session.commit()


async def get_live_room_info(user_id: int, room_id: int):
    """获取直播间的开播实时信息

    Args:
        user_id (int): 用户 ID
        room_id (int): 直播间 ID

    Returns:
        dict: 直播间实时信息
    """

    # 根据直播间 ID 获取信息
    streaming_room_info = await get_db_streaming_room_info(user_id, room_id)
    streaming_room_info = streaming_room_info[0]

    # 主播信息
    streamer_info = streaming_room_info.streamer_info

    # 商品索引
    prodcut_index = streaming_room_info.status.current_product_index

    # 是否为最后的商品
    final_procut = True if len(streaming_room_info.product_list) - 1 == prodcut_index else False

    # 对话信息
    conversation_list = get_message_list(streaming_room_info.status.sales_info_id)

    # 视频转换为服务器地址
    video_path = API_CONFIG.REQUEST_FILES_URL + streaming_room_info.status.streaming_video_path

    # 返回报文
    res_data = {
        "streamerInfo": streamer_info,
        "conversation": conversation_list,
        "currentProductInfo": streaming_room_info.product_list[prodcut_index].product_info,
        "currentStreamerVideo": video_path,
        "currentProductIndex": streaming_room_info.status.current_product_index,
        "startTime": streaming_room_info.status.start_time,
        "currentPoductStartTime": streaming_room_info.product_list[prodcut_index].start_time,
        "finalProduct": final_procut,
    }

    logger.info(res_data)

    return res_data