|
import requests |
|
import random |
|
from PIL import Image, ImageOps, ImageTk |
|
from datetime import datetime |
|
import time |
|
from pathlib import Path |
|
import io, re |
|
import zipfile |
|
|
|
BASE_URL="https://api.novelai.net" |
|
|
|
def make_turbo_prompt(gen_request): |
|
lines = gen_request['prompt'] |
|
result = { |
|
"boys": False, |
|
"girls": False, |
|
"1girl": False, |
|
"1boy": False, |
|
"1other": False, |
|
"others": False |
|
} |
|
state = { |
|
"nude,": False, |
|
"pov,": False, |
|
"cum,": False, |
|
"after ": False, |
|
"pussy juice": False, |
|
"barefoot": False, |
|
"breasts": False, |
|
"ejaculation": False, |
|
} |
|
|
|
def insert_spaces(source_list, reference_list): |
|
modified_list = source_list.copy() |
|
for index, keyword in enumerate(reference_list): |
|
if keyword not in source_list: |
|
space_count = len(keyword) |
|
modified_list.insert(index, ' ' * space_count) |
|
return modified_list |
|
|
|
keywords = gen_request['prompt'].split(', ') |
|
filtered_keywords = [] |
|
removed_indices = [] |
|
positive0, positive1, positive2, positive3 = gen_request.copy(),gen_request.copy(),gen_request.copy(),gen_request.copy() |
|
|
|
for word in result.keys(): |
|
if word in lines: |
|
result[word] = True |
|
for word in state.keys(): |
|
if word in gen_request['prompt']: |
|
state[word] = True |
|
|
|
key_index = int((len(keywords)/2)-1) |
|
|
|
if(result["1boy"]) or (result["boys"]): |
|
if(result["1girl"]): |
|
if(', sex' in gen_request['prompt']): |
|
sex_pos_keywords = ['stomach bulge','insertion', 'fucked silly', 'x-ray', 'orgasm', 'cross-section', 'uterus', 'overflow', 'rape', 'vaginal', 'anal'] |
|
facial_keywords = ['tongue','ahegao'] |
|
temp_sex_pos = [] |
|
temp_facial = [] |
|
cum_events = [] |
|
explicit_check = [] |
|
if 'open mouth' in keywords: keywords.remove('open mouth') |
|
if 'closed mouth' in keywords: keywords.remove('closed mouth') |
|
if 'after rape' in keywords: |
|
keywords.remove('after rape') |
|
explicit_check.append('after rape') |
|
for keyword in keywords: |
|
if ('sex' not in keyword and 'cum' not in keyword and 'ejaculation' not in keyword and 'vaginal' not in keyword and 'penetration' not in keyword) and all(sex_pos not in keyword for sex_pos in sex_pos_keywords) and all(facial not in keyword for facial in facial_keywords): |
|
filtered_keywords.append(keyword) |
|
elif 'sex' in keyword: |
|
removed_indices.append(keyword) |
|
elif 'penetration' in keyword: |
|
removed_indices.append(keyword) |
|
elif 'cum' in keyword and keyword != 'cum': |
|
cum_events.append(keyword) |
|
elif any(sex_pos in keyword for sex_pos in sex_pos_keywords): |
|
for sex_pos in sex_pos_keywords: |
|
if sex_pos in keyword: |
|
temp_sex_pos.append(sex_pos) |
|
elif any(facial not in keyword for facial in facial_keywords): |
|
for facial in facial_keywords: |
|
if facial in keyword: |
|
temp_facial.append(facial) |
|
filtered_keywords.insert(int((len(filtered_keywords)/2)-1), ' no penetration, imminent penetration') |
|
filtered_keywords_positive0 = filtered_keywords.copy() |
|
filtered_keywords.remove(' no penetration, imminent penetration') |
|
|
|
for i, keyword in enumerate(filtered_keywords): |
|
if 'pantyhose' in keyword: |
|
filtered_keywords[i] = 'torn ' + filtered_keywords[i] |
|
|
|
key_index = int((len(filtered_keywords)/2)-1) |
|
if 'pussy' in filtered_keywords: key_index = filtered_keywords.index('pussy') |
|
if 'penis' in filtered_keywords: key_index = filtered_keywords.index('penis') |
|
filtered_keywords[key_index:key_index] = ['motion lines', 'surprised'] |
|
for keyword in removed_indices: |
|
if 'cum' not in keyword and 'ejaculation' not in keyword: |
|
filtered_keywords.insert(key_index,keyword) |
|
if(temp_sex_pos): filtered_keywords[key_index:key_index] = temp_sex_pos |
|
if('clothed sex' in filtered_keywords and not 'bottomless' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('clothed sex')+1, 'bottomless') |
|
pos1_copied_keywords = filtered_keywords.copy() |
|
for i, keyword in enumerate(pos1_copied_keywords): |
|
if 'closed eyes' in keyword: |
|
rand_num = random.randint(0,2) |
|
if(rand_num == 0): pos1_copied_keywords[i] = 'half-' + pos1_copied_keywords[i] |
|
elif(rand_num == 1 and 'closed eyes' in pos1_copied_keywords): |
|
pos1_copied_keywords.remove('closed eyes') |
|
filtered_keywords[i] = 'half-closed eyes' |
|
filtered_keywords_positive1 = pos1_copied_keywords.copy() |
|
|
|
key_index = filtered_keywords.index('surprised') |
|
filtered_keywords.remove('surprised') |
|
filtered_keywords[key_index:key_index] = ["ejaculation","cum"] |
|
for keyword in removed_indices: |
|
if 'cum' in keyword: |
|
filtered_keywords.insert(key_index,keyword) |
|
if(temp_facial): filtered_keywords[key_index:key_index] =temp_facial |
|
filtered_keywords_positive2 = filtered_keywords.copy() |
|
|
|
for i, keyword in enumerate(filtered_keywords): |
|
if 'closed eyes' in keyword: |
|
rand_num = random.randint(0,2) |
|
if(rand_num == 0 and filtered_keywords[i] != 'half-closed eyes'): filtered_keywords[i] = 'half-' + filtered_keywords[i] |
|
elif(rand_num == 1): filtered_keywords[i] = 'empty eyes' |
|
else: filtered_keywords[i] = 'empty eyes, half-closed eyes' |
|
if 'sex' in filtered_keywords: |
|
key_index = filtered_keywords.index('sex') |
|
elif 'group sex' in filtered_keywords: |
|
key_index = filtered_keywords.index('group sex') |
|
filtered_keywords.remove('ejaculation') |
|
filtered_keywords[key_index:key_index] = ['cum drip', 'erection'] + cum_events |
|
if(explicit_check): filtered_keywords[key_index:key_index] = explicit_check |
|
if 'sex' in filtered_keywords and 'group sex' not in filtered_keywords: |
|
if('pussy' in filtered_keywords and not 'anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after vaginal, spread pussy') |
|
elif('anal' in filtered_keywords): filtered_keywords.insert(filtered_keywords.index('sex')+1, 'after anus, cum in ass') |
|
filtered_keywords.insert(filtered_keywords.index('sex'), 'after sex') |
|
filtered_keywords.remove('sex') |
|
elif 'group sex' in filtered_keywords: |
|
if('vaginal' in filtered_keywords and not 'anal' in filtered_keywords): |
|
filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after vaginal, spread pussy') |
|
if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') |
|
elif('anal' in filtered_keywords): |
|
filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'after anus, cum in ass') |
|
if 'multiple penises' in filtered_keywords: filtered_keywords.insert(filtered_keywords.index('group sex')+3, 'cum on body, bukkake') |
|
else: filtered_keywords.insert(filtered_keywords.index('group sex')+1, 'cum on body, {bukkake}') |
|
temp_post_keyword = [] |
|
for keyword in sex_pos_keywords: |
|
if not (keyword == 'orgasm' or keyword == 'overflow'): |
|
if keyword in filtered_keywords: |
|
temp_post_keyword.append(keyword) |
|
for keyword in temp_post_keyword: |
|
filtered_keywords.remove(keyword) |
|
|
|
positive0['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive0, filtered_keywords)).strip() |
|
positive1['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive1, filtered_keywords)).strip() |
|
positive2['prompt'] = ', '.join(insert_spaces(filtered_keywords_positive2, filtered_keywords)).strip() |
|
positive3['prompt'] = ', '.join(filtered_keywords).strip() |
|
positive0["type"] = "turbo" |
|
positive1["type"] = "turbo" |
|
positive2["type"] = "turbo" |
|
positive3["type"] = "turbo" |
|
return positive0, positive1, positive2, positive3 |
|
|
|
def generate_image(access_token, prompt, model, action, parameters): |
|
data = { |
|
"input": prompt, |
|
"model": model, |
|
"action": action, |
|
"parameters": parameters, |
|
} |
|
|
|
response = requests.post(f"{BASE_URL}/ai/generate-image", json=data, headers={ "Authorization": f"Bearer {access_token}" }) |
|
|
|
return response.content |
|
|
|
def generate(gen_request): |
|
def parse_and_execute_commands(_prompt, negative, user_input, rating): |
|
negative = negative.split(',') |
|
negative = [neg.strip() for neg in negative] |
|
prompt = _prompt.split(',') |
|
prompt = [key.strip() for key in prompt] |
|
commands = [cmd.strip() for cmd in user_input.split(',')] |
|
for command in commands: |
|
condition, cmd = parse_conditional_command(command) |
|
if check_condition(prompt, condition, rating): |
|
negative = execute_command(negative, cmd) |
|
return ', '.join(negative) |
|
|
|
def parse_conditional_command(command): |
|
match = re.match(r"\((.*?)\)\:(.*)", command) |
|
if match: |
|
return match.groups() |
|
return '', command |
|
|
|
def check_condition(prompt, condition, rating): |
|
if not condition: |
|
return True |
|
sub_conditions = re.split(r'\)\s*&\s*\(', condition) |
|
sub_conditions = [re.sub(r'^\(|\)$', '', cond) for cond in sub_conditions] |
|
|
|
results = [] |
|
for sub_cond in sub_conditions: |
|
if '&' in sub_cond: |
|
results.append(all(check_condition(prompt, cond, rating) for cond in sub_cond.split('&'))) |
|
elif '|' in sub_cond: |
|
results.append(any(check_condition(prompt, cond, rating) for cond in sub_cond.split('|'))) |
|
else: |
|
if sub_cond in ['e', 'q', 's', 'g']: |
|
results.append(sub_cond == rating) |
|
elif sub_cond in ['~e', '~q', '~s', '~g']: |
|
results.append(sub_cond != rating) |
|
|
|
elif sub_cond.startswith('*'): |
|
results.append(sub_cond[1:] in prompt) |
|
|
|
elif sub_cond.startswith('~!'): |
|
results.append(sub_cond[2:] not in prompt) |
|
elif sub_cond.startswith('~'): |
|
results.append(any(sub_cond[1:] not in element for element in prompt)) |
|
|
|
else: |
|
results.append(any(sub_cond in element for element in prompt)) |
|
return all(results) |
|
|
|
def execute_command(negative, command): |
|
if '+=' in command: |
|
keyword, addition = command.split('+=', 1) |
|
addition = addition.replace('^', ', ') |
|
return insert_text_after_keyword(negative, keyword, addition) |
|
elif command.startswith('add '): |
|
keyword = command[4:] |
|
keyword = keyword.replace('^', ', ') |
|
keys = keyword.split(',') |
|
keys = [key.strip() for key in keys] |
|
for key in keys: |
|
if key not in negative: |
|
negative.append(key) |
|
return negative |
|
elif command.startswith('rem '): |
|
keyword = command[4:] |
|
keyword = keyword.replace('^', ', ') |
|
keys = keyword.split(',') |
|
keys = [key.strip() for key in keys] |
|
for key in keys: |
|
if key in negative: |
|
negative.remove(key) |
|
return negative |
|
elif '=' in command: |
|
keyword, replacement = command.split('=', 1) |
|
if keyword in negative: |
|
replacement = replacement.replace('^', ', ') |
|
index = negative.index(keyword) |
|
negative[index] = replacement |
|
return negative |
|
|
|
def insert_text_after_keyword(negative, user_keyword, user_additional_keyword): |
|
if user_keyword in negative: |
|
index = negative.index(user_keyword) + 1 |
|
negative.insert(index, user_additional_keyword) |
|
return negative |
|
|
|
params = { |
|
"legacy": False, |
|
"quality_toggle": True if gen_request["quality_toggle"] == 1 else False, |
|
"width": gen_request["width"], |
|
"height": gen_request["height"], |
|
"n_samples": 1, |
|
"seed": gen_request["seed"], |
|
"extra_noise_seed": random.randint(0,9999999999), |
|
"sampler": gen_request["sampler"], |
|
"steps": 28 if gen_request["type"]!="upper" else gen_request["steps"], |
|
"scale": gen_request["scale"], |
|
"uncond_scale": gen_request["uncond_scale"], |
|
"negative_prompt": gen_request["negative"], |
|
"sm" : gen_request["sema"], |
|
"sm_dyn" : gen_request["sema_dyn"], |
|
"decrisper": False, |
|
"controlnet_strength": 1.0, |
|
"add_original_image": False, |
|
"cfg_rescale": gen_request["cfg_rescale"], |
|
"noise_schedule": "native" |
|
} |
|
|
|
if gen_request["type"]=="inpaint": |
|
if "mask" in gen_request: |
|
params["mask"] = gen_request["mask"] |
|
params['add_original_image'] = gen_request['add_original_image'] |
|
|
|
positive = gen_request["prompt"] |
|
keywords = [key.strip() for key in positive.split(',')] |
|
|
|
if "cond_negative" in gen_request and gen_request["cond_negative"]: |
|
user_input = gen_request["cond_negative"] |
|
rating = gen_request["rating"] |
|
params["negative_prompt"] = parse_and_execute_commands(positive, params["negative_prompt"], user_input, rating) |
|
|
|
if "repeat" in gen_request: |
|
max = gen_request["repeat_max"] |
|
|
|
for i, key in enumerate(keywords): |
|
if "->" in key: |
|
instant_keyword = [k for k in key.split('->')] |
|
if len(instant_keyword) > gen_request["repeat"]: |
|
current_key = instant_keyword[gen_request["repeat"]] |
|
else: |
|
current_key = instant_keyword[gen_request["repeat"] % len(instant_keyword)] |
|
keywords[i] = current_key |
|
|
|
filename_rule = gen_request["png_rule"] |
|
save_folder = gen_request["save_folder"] |
|
|
|
access_token = gen_request["access_token"] |
|
additional_folder = "" |
|
|
|
request_type = "generate" |
|
if "image" in gen_request: |
|
params["image"] = gen_request["image"] |
|
if "strength" in gen_request: |
|
params["strength"] = gen_request["strength"] |
|
params["noise"] = gen_request["noise"] |
|
params["sm"] = False |
|
params["sm_dyn"] = False |
|
request_type = "img2img" if "mask" not in gen_request else "infill" |
|
|
|
temp_del = [] |
|
for key in keywords: |
|
if key.startswith('*'): |
|
temp_del.append(key) |
|
for key in temp_del: |
|
if key in keywords: |
|
keywords.remove(key) |
|
|
|
positive = ', '.join(keywords) |
|
|
|
def resize_and_fill(image, max_size=None): |
|
if max_size is None: |
|
max_size = gen_request["user_screen_size"] |
|
original_width, original_height = image.size |
|
if original_width > max_size or original_height > max_size: |
|
|
|
image.thumbnail((max_size, max_size)) |
|
|
|
|
|
width, height = image.size |
|
new_image = Image.new("RGB", (max_size, max_size), "black") |
|
new_image.paste(image, ((max_size - width) // 2, (max_size - height) // 2)) |
|
return new_image |
|
else: |
|
return image |
|
|
|
def log_error(e, output_file_path="output_file_path"): |
|
|
|
current_time = datetime.now().strftime("%m/%d %H:%M:%S") |
|
|
|
|
|
error_message = f"#### Error occured at {current_time} ####\nError: {e}\n############################################\n" |
|
|
|
|
|
with open(f"error_log.txt", "a") as file: |
|
file.write(error_message) |
|
|
|
try: |
|
zipped_bytes = generate_image(access_token, positive, "nai-diffusion-3" if "mask" not in params else "nai-diffusion-3-inpainting", request_type, params) |
|
if gen_request["png_rule"] == "count": |
|
additional_folder = "/" + gen_request["start_time"] |
|
if gen_request["type"] == "turbo": |
|
additional_folder += "/turbo" |
|
d = Path(save_folder + additional_folder) |
|
d.mkdir(parents=True, exist_ok=True) |
|
zipped = zipfile.ZipFile(io.BytesIO(zipped_bytes)) |
|
image_bytes = zipped.read(zipped.infolist()[0]) |
|
if gen_request["png_rule"] == "count": |
|
_count = gen_request["count"] |
|
filename = (d / f"{_count:05}.png" ) |
|
else: filename = (d / f"{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" ) |
|
filename.write_bytes(image_bytes) |
|
i = Image.open(io.BytesIO(image_bytes)) |
|
i = ImageOps.exif_transpose(i).convert("RGB") |
|
i_resized = resize_and_fill(i) |
|
|
|
return i_resized, positive, params['seed'], i.info, str(filename) |
|
except Exception as e: |
|
try: |
|
if zipped_bytes is None: |
|
raise ValueError("Connection broken (Protocol Error)") |
|
error_message = zipped_bytes.decode('utf-8')[2:-2] |
|
except Exception as inner_exception: |
|
error_message = str(inner_exception) |
|
log_error(error_message, "path_to_output_folder") |
|
return None, error_message, params['seed'], None, None |