import requests import random from PIL import Image, ImageOps, ImageTk from datetime import datetime import time from pathlib import Path import io, re import zipfile BASE_URL="https://api.novelai.net" def make_turbo_prompt(gen_request): lines = gen_request['prompt'] result = { "boys": False, "girls": False, "1girl": False, "1boy": False, "1other": False, "others": False } state = { "nude,": False, "pov,": False, "cum,": False, "after ": False, "pussy juice": False, "barefoot": False, "breasts": False, "ejaculation": False, } def insert_spaces(source_list, reference_list): modified_list = source_list.copy() for index, keyword in enumerate(reference_list): if keyword not in source_list: space_count = len(keyword) # 키워드 길이만큼의 공백 문자 modified_list.insert(index, ' ' * space_count) return modified_list keywords = gen_request['prompt'].split(', ') filtered_keywords = [] removed_indices = [] positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy() for word in result.keys(): if word in lines: result[word] = True for word in state.keys(): if word in gen_request['prompt']: state[word] = True key_index = int((len(keywords)/2)-1) if(result["1boy"]) or (result["boys"]): if(result["1girl"]): if(', sex' in gen_request['prompt']): sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal'] facial_keywords = ['tongue','ahegao'] temp_sex_pos = [] temp_facial = [] cum_events = [] explicit_check = [] if 'open mouth' in keywords: keywords.remove('open mouth') if 'closed mouth' in keywords: keywords.remove('closed mouth') if 'after rape' in keywords: keywords.remove('after rape') explicit_check.append('after rape') for keyword in keywords: if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords): filtered_keywords.append(keyword) elif 'sex' in keyword: removed_indices.append(keyword) elif 'penetration' in keyword: removed_indices.append(keyword) elif 'cum' in keyword and keyword != 'cum': cum_events.append(keyword) elif any(sex_pos in keyword for sex_pos in sex_pos_keywords): for sex_pos in sex_pos_keywords: if sex_pos in keyword: temp_sex_pos.append(sex_pos) elif any(facial not in keyword for facial in facial_keywords): for facial in facial_keywords: if facial in keyword: temp_facial.append(facial) filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration') filtered_keywords_positive0 = filtered_keywords.copy() filtered_keywords.remove(' no penetration, imminent penetration') #0 imminent penetration, imminent sex for i, keyword in enumerate(filtered_keywords): if 'pantyhose' in keyword: filtered_keywords[i] = 'torn ' + filtered_keywords[i] #1 default key_index = int((len(filtered_keywords)/2)-1) if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy') if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis') filtered_keywords[key_index:key_index] = ['motion lines', 'surprised'] for keyword in removed_indices: if 'cum' not in keyword and 'ejaculation' not in keyword: filtered_keywords.insert(key_index,keyword) if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless') pos1_copied_keywords = filtered_keywords.copy() for i, keyword in enumerate(pos1_copied_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i] elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords): pos1_copied_keywords.remove('closed eyes') filtered_keywords[i] = 'half-closed eyes' filtered_keywords_positive1 = pos1_copied_keywords.copy() #2 ejaculation,cum in pussy key_index = filtered_keywords.index('surprised') filtered_keywords.remove('surprised') filtered_keywords[key_index:key_index] = ["ejaculation","cum"] for keyword in removed_indices: if 'cum' in keyword: filtered_keywords.insert(key_index,keyword) if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial filtered_keywords_positive2 = filtered_keywords.copy() #3 after sex, after ejaculation for i, keyword in enumerate(filtered_keywords): if 'closed eyes' in keyword: rand_num = random.randint(0,2) if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i] elif(rand_num == 1): filtered_keywords[i] = 'empty eyes' else: filtered_keywords[i] = 'empty eyes, half-closed eyes' if 'sex' in filtered_keywords: key_index = filtered_keywords.index('sex') elif 'group sex' in filtered_keywords: key_index = filtered_keywords.index('group sex') filtered_keywords.remove('ejaculation') filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords: if('pussy' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy') elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anus, cum in ass') filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex') filtered_keywords.remove('sex') elif 'group sex' in filtered_keywords: if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy') if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass') if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}') temp_post_keyword = [] for keyword in sex_pos_keywords: if not (keyword == 'orgasm' or keyword == 'overflow'): if keyword in filtered_keywords: temp_post_keyword.append(keyword) for keyword in temp_post_keyword: filtered_keywords.remove(keyword) positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip() positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip() positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip() positive3['prompt'] = ', '.join(filtered_keywords).strip() positive0["type"] = "turbo" positive1["type"] = "turbo" positive2["type"] = "turbo" positive3["type"] = "turbo" return positive0, positive1, positive2, positive3 def generate_image(access_token, prompt, model, action, parameters): data = { "input": prompt, "model": model, "action": action, "parameters": parameters, } response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) # catch any errors return response.content def generate(gen_request): def parse_and_execute_commands(_prompt, negative, user_input, rating): negative = negative.split(',') negative = [neg.strip() for neg in negative] prompt = _prompt.split(',') prompt = [key.strip() for key in prompt] commands = [cmd.strip() for cmd in user_input.split(',')] for command in commands: condition, cmd = parse_conditional_command(command) if check_condition(prompt, condition, rating): negative = execute_command(negative, cmd) return ', '.join(negative) def parse_conditional_command(command): match = re.match(r"\((.*?)\)\:(.*)", command) if match: return match.groups() return '', command def check_condition(prompt, condition, rating): if not condition: return True sub_conditions = re.split(r'\)\s*&\s*\(', condition) sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions] results = [] for sub_cond in sub_conditions: if '&' in sub_cond: results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&'))) elif '|' in sub_cond: results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|'))) else: if sub_cond in ['e', 'q', 's', 'g']: results.append(sub_cond == rating) elif sub_cond in ['~e', '~q', '~s', '~g']: results.append(sub_cond != rating) # PM elif sub_cond.startswith('*'): results.append(sub_cond[1:] in prompt) # NOT IN elif sub_cond.startswith('~!'): results.append(sub_cond[2:] not in prompt) elif sub_cond.startswith('~'): results.append(any(sub_cond[1:] not in element for element in prompt)) # CONTAIN else: results.append(any(sub_cond in element for element in prompt)) return all(results) def execute_command(negative, command): if '+=' in command: keyword, addition = command.split('+=', 1) addition = addition.replace('^', ', ') return insert_text_after_keyword(negative, keyword, addition) elif command.startswith('add '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key not in negative: negative.append(key) return negative elif command.startswith('rem '): keyword = command[4:] keyword = keyword.replace('^', ', ') keys = keyword.split(',') keys = [key.strip() for key in keys] for key in keys: if key in negative: negative.remove(key) return negative elif '=' in command: keyword, replacement = command.split('=', 1) if keyword in negative: replacement = replacement.replace('^', ', ') index = negative.index(keyword) negative[index] = replacement return negative def insert_text_after_keyword(negative, user_keyword, user_additional_keyword): if user_keyword in negative: index = negative.index(user_keyword) + 1 negative.insert(index, user_additional_keyword) return negative params = { "legacy": False, "quality_toggle": True if gen_request["quality_toggle"] == 1 else False, "width": gen_request["width"], "height": gen_request["height"], "n_samples": 1, "seed": gen_request["seed"], "extra_noise_seed": random.randint(0,9999999999), "sampler": gen_request["sampler"], "steps": 28 if gen_request["type"]!="upper" else gen_request["steps"], "scale": gen_request["scale"], "uncond_scale": gen_request["uncond_scale"], "negative_prompt": gen_request["negative"], "sm" : gen_request["sema"], "sm_dyn" : gen_request["sema_dyn"], "decrisper": False, "controlnet_strength": 1.0, "add_original_image": False, "cfg_rescale": gen_request["cfg_rescale"], "noise_schedule": "native" } if gen_request["type"]=="inpaint": if "mask" in gen_request: params["mask"] = gen_request["mask"] params['add_original_image'] = gen_request['add_original_image'] positive = gen_request["prompt"] keywords = [key.strip() for key in positive.split(',')] if "cond_negative" in gen_request and gen_request["cond_negative"]: user_input = gen_request["cond_negative"] rating = gen_request["rating"] params["negative_prompt"] = parse_and_execute_commands(positive, params["negative_prompt"], user_input, rating) if "repeat" in gen_request: max = gen_request["repeat_max"] for i, key in enumerate(keywords): if "->" in key: instant_keyword = [k for k in key.split('->')] if len(instant_keyword) > gen_request["repeat"]: current_key = instant_keyword[gen_request["repeat"]] else: current_key = instant_keyword[gen_request["repeat"] % len(instant_keyword)] keywords[i] = current_key filename_rule = gen_request["png_rule"] save_folder = gen_request["save_folder"] access_token = gen_request["access_token"] additional_folder = "" request_type = "generate" if "image" in gen_request: params["image"] = gen_request["image"] if "strength" in gen_request: params["strength"] = gen_request["strength"] params["noise"] = gen_request["noise"] params["sm"] = False params["sm_dyn"] = False request_type = "img2img" if "mask" not in gen_request else "infill" temp_del = [] for key in keywords: if key.startswith('*'): temp_del.append(key) for key in temp_del: if key in keywords: keywords.remove(key) positive = ', '.join(keywords) def resize_and_fill(image, max_size=None): if max_size is None: max_size = gen_request["user_screen_size"] original_width, original_height = image.size if original_width > max_size or original_height > max_size: # 비율을 유지하면서 크기 조정 image.thumbnail((max_size, max_size)) # 새 이미지 크기 계산 width, height = image.size new_image = Image.new("RGB", (max_size, max_size), "black") new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) return new_image else: return image def log_error(e, output_file_path="output_file_path"): # 현재 시간을 얻습니다 current_time = datetime.now().strftime("%m/%d %H:%M:%S") # 에러 로그 메시지 error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" # 지정된 출력 폴더의 error_log.txt 파일에 쓰기 with open(f"error_log.txt", "a") as file: file.write(error_message) try: zipped_bytes = generate_image(access_token, positive, "nai-diffusion-3" if "mask" not in params else "nai-diffusion-3-inpainting", request_type, params) if gen_request["png_rule"] == "count": additional_folder = "/" + gen_request["start_time"] if gen_request["type"] == "turbo": additional_folder += "/turbo" d = Path(save_folder + additional_folder) d.mkdir(parents=True, exist_ok=True) zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes)) image_bytes = zipped.read(zipped.infolist()[0]) if gen_request["png_rule"] == "count": _count = gen_request["count"] filename = (d / f"{_count:05}.png" ) else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) filename.write_bytes(image_bytes) i = Image.open(io.BytesIO(image_bytes)) i = ImageOps.exif_transpose(i).convert("RGB") i_resized = resize_and_fill(i) #tk_image = ImageTk.PhotoImage(i_resized) return i_resized, positive, params['seed'], i.info, str(filename) except Exception as e: try: if zipped_bytes is None: raise ValueError("Connection broken (Protocol Error)") error_message = zipped_bytes.decode('utf-8')[2:-2] except Exception as inner_exception: error_message = str(inner_exception) log_error(error_message, "path_to_output_folder") return None, error_message, params['seed'], None, None