File size: 4,705 Bytes
959541f
 
 
 
 
 
 
 
 
caa74a1
 
 
 
 
 
 
959541f
 
 
 
 
caa74a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
959541f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
caa74a1
959541f
 
 
 
bc56d67
959541f
 
ae8872d
bc56d67
959541f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3415add
959541f
 
 
 
 
 
 
 
 
caa74a1
 
959541f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bc56d67
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Copyright (2024) Bytedance Ltd. and/or its affiliates 
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.

import requests
import base64
import json
import io
import os
import hashlib
import hmac
import time
import random
from PIL import Image, ImageFilter
from loguru import logger

# 接口 URL
t2i_url = 'https://magicarena.bytedance.com/api/evaluate/v1/algo/process'


APP_KEY = os.environ['APP_KEY']
SECRET_KEY = os.environ["SECRET_KEY"]

def get_auth(app_key, secret_key):
    # 生成随机数作为 nonce
    nonce = str(random.randint(0, 2**31 - 1))
    # 获取当前时间戳
    timestamp = str(int(time.time()))
    # 调用 get_sign 函数生成签名
    sign = get_sign(nonce, timestamp, secret_key)
    return {
        "AppKey": app_key,
        "Timestamp":timestamp,
        "Nonce":nonce,
        "Sign": sign}

def get_sign(nonce, timestamp, secret_key):
    keys = [nonce, secret_key, timestamp]
    keys.sort()
    key_str = ''.join(keys)
    sha1_hash = hashlib.sha1()
    sha1_hash.update(key_str.encode('utf-8'))
    signature = sha1_hash.hexdigest()
    return signature.lower()

class SeedT2ICaller():
    def __init__(self, cfg, *args, **kwargs):
        self.cfg = cfg

    def generate(self, text, *args, **kwargs):
        try:
            logger.info("Generate images ...")
            req_json = json.dumps({
                "prompt": str(text),
                "use_sr": True,
                "model_version": "general_v2.0_L",
                "req_schedule_conf": "general_v20_9B_pe"
                # "width": 64,
                # "height": 64
            })
            authInfo = get_auth(APP_KEY, SECRET_KEY)
            logger.info(f"{req_json}")
            # 请求发送
            response = requests.post(
                t2i_url,
                data=json.dumps({
                    'AlgoType': 1,
                    'ReqJson': req_json,
                    'AuthInfo': authInfo
                })
            )
            logger.info(f"header: {response.headers}")
            if response.status_code != 200:
                return None, False
            resp = response.json()
            if resp.get('code',{}) != 0:
                logger.info(f"response error {resp}")
                return None, False

            binary_data1 = resp.get('data', {}).get('BinaryData')
            binary_data = binary_data1[0]
            #logger.info(f"binary_data: {binary_data}")
            image = Image.open(io.BytesIO(base64.b64decode(binary_data)))
            image = image.resize((self.cfg['resolution'], self.cfg['resolution']))
            return image, True
        
        except Exception as e:
            logger.exception("An error occurred during image generation.")
            return None, False

class SeedEditCaller():
    def __init__(self, cfg, *args, **kwargs):
        self.cfg = cfg

    def edit(self, image, edit, cfg_scale=0.5, *args, **kwargs):
        try:
            image_bytes = io.BytesIO()
            image.save(image_bytes, format='JPEG')  # 或 format='PNG'
            logger.info("Edit images ...")
            req_json = json.dumps({
                "prompt": str(edit),
                "model_version": "byteedit_v2.0",
                "scale": cfg_scale,
                "use_sr": True
            })
            logger.info(f"{req_json}")
            binary =base64.b64encode(image_bytes.getvalue()).decode('utf-8')
            # 请求发送
            response = requests.post(
                t2i_url,
                data=json.dumps({
                    'AlgoType': 2,
                    'ReqJson': req_json,
                    'BinaryData': [binary],
                    'AuthInfo': get_auth(APP_KEY, SECRET_KEY)
                })
            )

            logger.info(f"header: {response.headers}")
            if response.status_code != 200:
                return None, False
            resp = response.json()
            if resp.get('code',{}) != 0:
                logger.info(f"response error {resp}")
                return None, False

            binary_data = resp.get('data', {}).get('BinaryData')
            image = Image.open(io.BytesIO(base64.b64decode(binary_data[0])))
            return image, True

        except Exception as e:
            logger.exception("An error occurred during image generation.")
            return None, False



if __name__ == "__main__":
    cfg_t2i = {
        "resolution": 611
    }
    model_t2i = SeedT2ICaller(cfg_t2i)
    image, _ = model_t2i.generate("a beautiful girl")
    model_edit = SeedEditCaller(cfg_t2i)
    model_edit.edit(image, edit="please edit to a good man")