File size: 6,161 Bytes
6755a2d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from __future__ import annotations
from copy import deepcopy

from typing import Iterable, List, Tuple, Dict, Hashable, Any, Union

import numpy as np

from ...utils.util import convert_class_attr_to_dict


from ..general.items import Items, Item
from .clipid import MatchedClipIds


import logging

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


__all__ = ["Clip", "ClipSeq"]


class Clip(Item):
    """媒体片段, 指转场点与转场点之间的部分"""

    def __init__(
        self,
        time_start: float,
        duration: float,
        clipid: int = None,
        media_type: str = None,
        mediaid: str = None,
        timepoint_type: str = None,
        text: str = None,
        stage: str = None,
        path: str = None,
        duration_num: int = None,
        similar_clipseq: MatchedClipIds = None,
        dynamic: float = None,
        **kwargs,
    ):
        """
        Args:
            time_start (float): 开始时间,秒为单位,对应该媒体文件的, 和media_map.json上的序号一一对应
            duration (_type_): 片段持续时间
            clipid (int, or [int]): 由media_map提供的片段序号, 和media_map.json上的序号一一对应
            media_type (str, optional): music, video,text, Defaults to None.
            mediaid (int): 多媒体id, 当clipid是列表时,表示该片段是个融合片段
            timepoint_type(int, ): 开始点的转场类型. Defaults to None.
            text(str, optional): 该片段的文本描述,音乐可以是歌词,视频可以是台词,甚至可以是弹幕. Defaults to None.
            stage(str, optional): 该片段在整个媒体文件中的结构位置,如音乐的intro、chrous、vesa,视频的片头、片尾、开始、高潮、转场等. Defaults to None.
            path (str, optional): 该媒体文件的路径,用于后续媒体读取、处理. Defaults to None.
            duration_num (_type_, optional): 片段持续帧数, Defaults to None.
            similar_clipseq ([Clip]], optional): 与该片段相似的片段,具体结构待定义. Defaults to None.
        """
        self.media_type = media_type
        self.mediaid = mediaid
        self.time_start = time_start
        self.duration = duration
        self.clipid = clipid
        self.path = path
        self.timepoint_type = timepoint_type
        self.text = text
        self.stage = stage
        self.duration_num = duration_num
        self.similar_clipseq = similar_clipseq
        self.dynamic = dynamic
        self.__dict__.update(**kwargs)

    def preprocess(self):
        pass

    def spread_parameters(self):
        pass

    @property
    def time_end(
        self,
    ) -> float:
        return self.time_start + self.duration

    def get_emb(self, key: str, idx: int) -> np.float:
        return self.emb.get_value(key, idx)


class ClipSeq(Items):
    """媒体片段序列"""

    def __init__(self, items: List[Clip] = None):
        super().__init__(items)
        self.clipseq = self.data

    def preprocess(self):
        pass

    def set_clip_value(self, k: Hashable, v: Any) -> None:
        """给序列中的每一个clip 赋值"""
        for i in range(len(self.clipseq)):
            self.clipseq[i].__setattr__(k, v)

    def __len__(
        self,
    ) -> int:
        return len(self.clipseq)

    @property
    def duration(
        self,
    ) -> float:
        """Clip.duration的和

        Returns:
            float: 序列总时长
        """
        if len(self.clipseq) == 0:
            return 0
        else:
            return sum([c.duration for c in self.clipseq])

    def __getitem__(self, i: Union[int, Iterable]) -> Union[Clip, ClipSeq]:
        """支持索引和切片操作,如果输入是整数则返回Clip,如果是切片,则返回ClipSeq

        Args:
            i (int or slice): 索引

        Raises:
            ValueError: 需要按照给的输入类型索引

        Returns:
            Clip or ClipSeq:
        """
        if "int" in str(type(i)):
            i = int(i)
        if isinstance(i, int):
            clip = self.clipseq[i]
            return clip
        elif isinstance(i, Iterable):
            clipseq = [self.__getitem__(x) for x in i]
            clipseq = ClipSeq(clipseq)
            return clipseq
        elif isinstance(i, slice):
            if i.step is None:
                step = 1
            else:
                step = i.step
            clipseq = [self.__getitem__(x) for x in range(i.start, i.stop, step)]
            clipseq = ClipSeq(clipseq)
            return clipseq
        else:
            raise ValueError(
                "unsupported input, should be int or slice, but given {}, type={}".format(
                    i, type(i)
                )
            )

    @property
    def mvp_clip(self):
        """读取实际的片段数据为moviepy格式

        Raises:
            NotImplementedError: _description_
        """
        raise NotImplementedError

    @property
    def duration_seq_emb(
        self,
    ) -> np.array:
        emb = np.array([c.duration for c in self.clipseq])
        return emb

    @property
    def timestamp_seq_emb(self) -> np.array:
        emb = np.array([c.time_start for c in self.clipseq])
        return emb

    @property
    def rela_timestamp_seq_emb(self) -> np.array:
        duration_seq = [c.duration for c in self.clipseq]
        emb = np.cumsum(duration_seq) / self.duration
        return emb

    def get_emb(self, key: str, idx: int) -> np.float:
        clip_start_idx = self.clipseq[0].clipid
        clip_end_idx = self.clipseq[-1].clipid
        # TODO: 待修改为更通用的形式
        if idx is None:
            idx = range(clip_start_idx, clip_end_idx + 1)
        elif isinstance(idx, int):
            idx += clip_start_idx
        elif isinstance(idx, Iterable):
            idx = [x + clip_start_idx for x in idx]
        else:
            raise ValueError(
                f"idx only support None, int, Iterable, but given {idx},type is {type(idx)}"
            )
        return self.emb.get_value(key, idx=idx)