Spaces:
Running
Running
# [BEGIN OF pluto_happy] | |
## required lib, required "pip install" | |
import torch | |
import cryptography | |
import cryptography.fernet | |
from flopth import flopth | |
import huggingface_hub | |
import huggingface_hub.hf_api | |
## standard libs, no need to install | |
import json | |
import requests | |
import time | |
import os | |
import random | |
import re | |
import sys | |
import psutil | |
import threading | |
import socket | |
import PIL | |
import pandas | |
import matplotlib | |
import numpy | |
import importlib.metadata | |
import types | |
import cpuinfo | |
import pynvml | |
import pathlib | |
import re | |
import subprocess | |
# define class Pluto_Happy | |
class Pluto_Happy(object): | |
""" | |
The Pluto projects starts with fun AI hackings and become a part of my | |
first book "Data Augmentation with Python" with Packt Publishing. | |
In particular, Pluto_Happy is a clean and lite kernel of a simple class, | |
and using @add_module decoractor to add in specific methods to be a new class, | |
such as Pluto_HFace with a lot more function on HuggingFace, LLM and Transformers. | |
Args: | |
name (str): the display name, e.g. "Hanna the seeker" | |
Returns: | |
(object): the class instance. | |
""" | |
# initialize the object | |
def __init__(self, name="Pluto",*args, **kwargs): | |
super(Pluto_Happy, self).__init__(*args, **kwargs) | |
self.author = "Duc Haba" | |
self.name = name | |
self._ph() | |
self._pp("Hello from class", str(self.__class__) + " Class: " + str(self.__class__.__name__)) | |
self._pp("Code name", self.name) | |
self._pp("Author is", self.author) | |
self._ph() | |
# | |
# define class var for stable division | |
# | |
self.fname_id = 0 | |
self.dname_img = "img_colab/" | |
self.flops_per_sec_gcolab_cpu = 4887694725 # 925,554,209 | 9,276,182,810 | 1,722,089,747 | 5,287,694,725 | |
self.flops_per_sec_gcolab_gpu = 6365360673 # 1,021,721,764 | 9,748,048,188 | 2,245,406,502 | 6,965,360,673 | |
self.fname_requirements = './pluto_happy/requirements.txt' | |
# | |
self.color_primary = '#2780e3' #blue | |
self.color_secondary = '#373a3c' #dark gray | |
self.color_success = '#3fb618' #green | |
self.color_info = '#9954bb' #purple | |
self.color_warning = '#ff7518' #orange | |
self.color_danger = '#ff0039' #red | |
self.color_mid_gray = '#495057' | |
self._xkeyfile = '.xoxo' | |
return | |
# | |
# pretty print output name-value line | |
def _pp(self, a, b,is_print=True): | |
""" | |
Pretty print output name-value line | |
Args: | |
a (str) : | |
b (str) : | |
is_print (bool): whether to print the header or footer lines to console or return a str. | |
Returns: | |
y : None or output as (str) | |
""" | |
# print("%34s : %s" % (str(a), str(b))) | |
x = f'{"%34s" % str(a)} : {str(b)}' | |
y = None | |
if (is_print): | |
print(x) | |
else: | |
y = x | |
return y | |
# | |
# pretty print the header or footer lines | |
def _ph(self,is_print=True): | |
""" | |
Pretty prints the header or footer lines. | |
Args: | |
is_print (bool): whether to print the header or footer lines to console or return a str. | |
Return: | |
y : None or output as (str) | |
""" | |
x = f'{"-"*34} : {"-"*34}' | |
y = None | |
if (is_print): | |
print(x) | |
else: | |
y = x | |
return y | |
# | |
# fetch huggingface file | |
def fetch_hface_files(self, | |
hf_names, | |
hf_space="duchaba/monty", | |
local_dir="/content/"): | |
""" | |
Given a list of huggingface file names, download them from the provided huggingface space. | |
Args: | |
hf_names: (list) list of huggingface file names to download | |
hf_space: (str) huggingface space to download from. | |
local_dir: (str) local directory to store the files. | |
Returns: | |
status: (bool) True if download was successful, False otherwise. | |
""" | |
status = True | |
# f = str(hf_names) + " is not iteratable, type: " + str(type(hf_names)) | |
try: | |
for f in hf_names: | |
lo = local_dir + f | |
huggingface_hub.hf_hub_download(repo_id=hf_space, | |
filename=f, | |
use_auth_token=True, | |
repo_type=huggingface_hub.REPO_TYPE_SPACE, | |
force_filename=lo) | |
except: | |
self._pp("*Error", f) | |
status = False | |
return status | |
# | |
# push files to huggingface | |
def push_hface_files(self, | |
hf_names, | |
hf_space="duchaba/skin_cancer_diagnose", | |
local_dir="/content/"): | |
# push files to huggingface space | |
""" | |
Pushes files to huggingface space. | |
The function takes a list of file names as a | |
paramater and pushes to the provided huggingface space. | |
Args: | |
hf_names: list(of strings), list of file names to be pushed. | |
hf_space: (str), the huggingface space to push to. | |
local_dir: (str), the local directory where the files | |
are stored. | |
Returns: | |
status: (bool) True if successfully pushed else False. | |
""" | |
status = True | |
try: | |
for f in hf_names: | |
lo = local_dir + f | |
huggingface_hub.upload_file( | |
path_or_fileobj=lo, | |
path_in_repo=f, | |
repo_id=hf_space, | |
repo_type=huggingface_hub.REPO_TYPE_SPACE) | |
except Exception as e: | |
self._pp("*Error", e) | |
status = False | |
return status | |
# | |
# push the folder to huggingface space | |
def push_hface_folder(self, | |
hf_folder, | |
hf_space_id, | |
hf_dest_folder=None): | |
""" | |
This function pushes the folder to huggingface space. | |
Args: | |
hf_folder: (str). The path to the folder to push. | |
hf_space_id: (str). The space id to push the folder to. | |
hf_dest_folder: (str). The destination folder in the space. If not specified, | |
the folder name will be used as the destination folder. | |
Returns: | |
status: (bool) True if the folder is pushed successfully, otherwise False. | |
""" | |
status = True | |
try: | |
api = huggingface_hub.HfApi() | |
api.upload_folder(folder_path=hf_folder, | |
repo_id=hf_space_id, | |
path_in_repo=hf_dest_folder, | |
repo_type="space") | |
except Exception as e: | |
self._pp("*Error: ",e) | |
status = False | |
return status | |
# | |
# automatically restart huggingface space | |
def restart_hface_periodically(self): | |
""" | |
This function restarts the huggingface space automatically in random | |
periodically. | |
Args: | |
None | |
Returns: | |
None | |
""" | |
while True: | |
random_time = random.randint(15800, 21600) | |
time.sleep(random_time) | |
os.execl(sys.executable, sys.executable, *sys.argv) | |
return | |
# | |
# log into huggingface | |
def login_hface(self, key=None): | |
""" | |
Log into HuggingFace. | |
Args: | |
key: (str, optional) If key is set, this key will be used to log in, | |
otherwise the key will be decrypted from the key file. | |
Returns: | |
None | |
""" | |
if (key is None): | |
x = self._decrypt_it(self._huggingface_crkey) | |
else: | |
x = key | |
huggingface_hub.login(x, add_to_git_credential=True) # non-blocking login | |
self._ph() | |
return | |
# | |
# Define a function to display available CPU and RAM | |
def fetch_info_system(self): | |
""" | |
Fetches system information, such as CPU usage and memory usage. | |
Args: | |
None. | |
Returns: | |
s: (str) A string containing the system information. | |
""" | |
s='' | |
# Get CPU usage as a percentage | |
cpu_usage = psutil.cpu_percent() | |
# Get available memory in bytes | |
mem = psutil.virtual_memory() | |
# Convert bytes to gigabytes | |
mem_total_gb = mem.total / (1024 ** 3) | |
mem_available_gb = mem.available / (1024 ** 3) | |
mem_used_gb = mem.used / (1024 ** 3) | |
# save the results | |
s += f"Total memory: {mem_total_gb:.2f} GB\n" | |
s += f"Available memory: {mem_available_gb:.2f} GB\n" | |
# print(f"Used memory: {mem_used_gb:.2f} GB") | |
s += f"Memory usage: {mem_used_gb/mem_total_gb:.2f}%\n" | |
try: | |
cpu_info = cpuinfo.get_cpu_info() | |
s += f'CPU type: {cpu_info["brand_raw"]}, arch: {cpu_info["arch"]}\n' | |
s += f'Number of CPU cores: {cpu_info["count"]}\n' | |
s += f"CPU usage: {cpu_usage}%\n" | |
s += f'Python version: {cpu_info["python_version"]}' | |
except Exception as e: | |
s += f'CPU type: Not accessible, Error: {e}' | |
return s | |
# | |
# fetch GPU RAM info | |
def fetch_info_gpu(self): | |
""" | |
Function to fetch GPU RAM info | |
Args: | |
None. | |
Returns: | |
s: (str) GPU RAM info in human readable format. | |
""" | |
s='' | |
mtotal = 0 | |
mfree = 0 | |
try: | |
nvml_handle = pynvml.nvmlInit() | |
devices = pynvml.nvmlDeviceGetCount() | |
for i in range(devices): | |
device = pynvml.nvmlDeviceGetHandleByIndex(i) | |
memory_info = pynvml.nvmlDeviceGetMemoryInfo(device) | |
mtotal += memory_info.total | |
mfree += memory_info.free | |
mtotal = mtotal / 1024**3 | |
mfree = mfree / 1024**3 | |
# print(f"GPU {i}: Total Memory: {memory_info.total/1024**3} GB, Free Memory: {memory_info.free/1024**3} GB") | |
s += f'GPU type: {torch.cuda.get_device_name(0)}\n' | |
s += f'GPU ready staus: {torch.cuda.is_available()}\n' | |
s += f'Number of GPUs: {devices}\n' | |
s += f'Total Memory: {mtotal:.2f} GB\n' | |
s += f'Free Memory: {mfree:.2f} GB\n' | |
s += f'GPU allocated RAM: {round(torch.cuda.memory_allocated(0)/1024**3,2)} GB\n' | |
s += f'GPU reserved RAM {round(torch.cuda.memory_reserved(0)/1024**3,2)} GB\n' | |
except Exception as e: | |
s += f'**Warning, No GPU: {e}' | |
return s | |
# | |
# fetch info about host ip | |
def fetch_info_host_ip(self): | |
""" | |
Function to fetch current host name and ip address | |
Args: | |
None. | |
Returns: | |
s: (str) host name and ip info in human readable format. | |
""" | |
s='' | |
try: | |
hostname = socket.gethostname() | |
ip_address = socket.gethostbyname(hostname) | |
s += f"Hostname: {hostname}\n" | |
s += f"IP Address: {ip_address}\n" | |
except Exception as e: | |
s += f"**Warning, No hostname: {e}" | |
return s | |
# | |
# fetch files name | |
def fetch_file_names(self,directory, file_extension=None): | |
""" | |
This function gets all the filenames with a given extension. | |
Args: | |
directory (str): | |
directory path to scan for files in. | |
file_extension (list): | |
file extension to look for or "None" (default) to get all files. | |
Returns: | |
filenames (list): | |
list of strings containing the filenames with the given extension. | |
""" | |
filenames = [] | |
for (root, subFolders, files) in os.walk(directory): | |
for fname in files: | |
if (file_extension is None): | |
filenames.append(os.path.join(root, fname)) | |
else: | |
for ext in file_extension: | |
if fname.endswith(ext): | |
filenames.append(os.path.join(root, fname)) | |
return filenames | |
# | |
# fetch the crypto key | |
def _fetch_crypt(self,has_new_key=False): | |
""" | |
This function fetches the crypto key from the file or from the | |
variable created previously in the class. | |
Args: | |
has_new_key (bool): | |
is_generate flag to indicate whether the key should be | |
use as-is or fetch from the file. | |
Returns: | |
s (str): | |
string value containing the crypto key. | |
""" | |
if self._fkey == 'your_key_goes_here': | |
raise Exception('Cryto Key is not correct!') | |
# | |
s=self._fkey[::-1] | |
if (has_new_key): | |
s=open(self._xkeyfile, "rb").read() | |
self._fkey = s[::-1] | |
return s | |
# | |
# generate new cryto key | |
def gen_key(self): | |
""" | |
This function generates a new cryto key and saves it to a file | |
Args: | |
None | |
Returns: | |
(str) crypto key | |
""" | |
key = cryptography.fernet.Fernet.generate_key() | |
with open(self._xkeyfile, "wb") as key_file: | |
key_file.write(key[::-1]) # write in reversed | |
return key | |
# | |
# decrypt message | |
def decrypt_it(self, x): | |
""" | |
Decrypts the encrypted string using the stored crypto key. | |
Args: | |
x: (str) to be decrypted. | |
Returns: | |
x: (str) decrypted version of x. | |
""" | |
y = self._fetch_crypt() | |
f = cryptography.fernet.Fernet(y) | |
m = f.decrypt(x) | |
return m.decode() | |
# | |
# encrypt message | |
def encrypt_it(self, x): | |
""" | |
encrypt message | |
Args: | |
x (str): message to encrypt | |
Returns: | |
str: encrypted message | |
""" | |
key = self._fetch_crypt() | |
p = x.encode() | |
f = cryptography.fernet.Fernet(key) | |
y = f.encrypt(p) | |
return y | |
# | |
# fetch import libraries | |
def _fetch_lib_import(self): | |
""" | |
This function fetches all the imported libraries that are installed. | |
Args: | |
None | |
Returns: | |
x (list): | |
list of strings containing the name of the imported libraries. | |
""" | |
x = [] | |
for name, val in globals().items(): | |
if isinstance(val, types.ModuleType): | |
x.append(val.__name__) | |
x.sort() | |
return x | |
# | |
# fetch lib version | |
def _fetch_lib_version(self,lib_name): | |
""" | |
This function fetches the version of the imported libraries. | |
Args: | |
lib_name (list): | |
list of strings containing the name of the imported libraries. | |
Returns: | |
val (list): | |
list of strings containing the version of the imported libraries. | |
""" | |
val = [] | |
for x in lib_name: | |
try: | |
y = importlib.metadata.version(x) | |
val.append(f'{x}=={y}') | |
except Exception as e: | |
val.append(f'|{x}==unknown_*or_system') | |
val.sort() | |
return val | |
# | |
# fetch the lib name and version | |
def fetch_info_lib_import(self): | |
""" | |
This function fetches all the imported libraries name and version that are installed. | |
Args: | |
None | |
Returns: | |
x (list): | |
list of strings containing the name and version of the imported libraries. | |
""" | |
x = self._fetch_lib_version(self._fetch_lib_import()) | |
return x | |
# | |
# write a file to local or cloud diskspace | |
def write_file(self,fname, in_data): | |
""" | |
Write a file to local or cloud diskspace or append to it if it already exists. | |
Args: | |
fname (str): The name of the file to write. | |
in_data (list): The | |
This is a utility function that writes a file to disk. | |
The file name and text to write are passed in as arguments. | |
The file is created, the text is written to it, and then the file is closed. | |
Args: | |
fname (str): The name of the file to write. | |
in_data (list): The text to write to the file. | |
Returns: | |
None | |
""" | |
if os.path.isfile(fname): | |
f = open(fname, "a") | |
else: | |
f = open(fname, "w") | |
f.writelines("\n".join(in_data)) | |
f.close() | |
return | |
# | |
# fetch flops info | |
def fetch_info_flops(self,model, input_shape=(1, 3, 224, 224), device="cpu", max_epoch=1): | |
""" | |
Calculates the number of floating point operations (FLOPs). | |
Args: | |
model (torch.nn.Module): neural network model. | |
input_shape (tuple): input tensor size. | |
device (str): device to perform computation on. | |
max_epoch (int): number of times | |
Returns: | |
(float): number of FLOPs, average from epoch, default is 1 epoch. | |
(float): elapsed seconds | |
(list): of string for a friendly human readable output | |
""" | |
ttm_input = torch.rand(input_shape, dtype=torch.float32, device=device) | |
# ttm_input = torch.rand((1, 3, 224, 224), dtype=torch.float32, device=device) | |
tstart = time.time() | |
for i in range(max_epoch): | |
flops, params = flopth(model, inputs=(ttm_input,), bare_number=True) | |
tend = time.time() | |
etime = (tend - tstart)/max_epoch | |
# kilo = 10^3, maga = 10^6, giga = 10^9, tera=10^12, peta=10^15, exa=10^18, zetta=10^21 | |
valstr = [] | |
valstr.append(f'Tensors device: {device}') | |
valstr.append(f'flops: {flops:,}') | |
valstr.append(f'params: {params:,}') | |
valstr.append(f'epoch: {max_epoch}') | |
valstr.append(f'sec: {etime}') | |
# valstr += f'Tensors device: {device}, flops: {flops}, params: {params}, epoch: {max_epoch}, sec: {etime}\n' | |
x = flops/etime | |
y = (x/10**15)*86400 | |
valstr.append(f'Flops/s: {x:,}') | |
valstr.append(f'PetaFlops/s: {x/10**15}') | |
valstr.append(f'PetaFlops/day: {y}') | |
valstr.append(f'1 PetaFlopsDay (on this system will take): {round(1/y, 2):,.2f} days') | |
return flops, etime, valstr | |
# | |
def print_petaflops(self): | |
""" | |
Prints the flops and peta-flops-day calculation. | |
**WARING**: This method will break/interfer with Stable Diffusion use of LoRA. | |
I can't debug why yet. | |
Args: | |
None | |
Returns: | |
None | |
""" | |
self._pp('Model', 'TTM, Tiny Torch Model on: CPU') | |
mtoy = TTM() | |
# my_model = MyModel() | |
dev = torch.device("cuda:0") | |
a,b,c = self.fetch_info_flops(mtoy) | |
y = round((a/b)/self.flops_per_sec_gcolab_cpu * 100, 2) | |
self._pp('Flops', f'{a:,} flops') | |
self._pp('Total elapse time', f'{b:,} seconds') | |
self._pp('Flops compared', f'{y:,}% of Google Colab Pro') | |
for i, val in enumerate(c): | |
self._pp(f'Info {i}', val) | |
self._ph() | |
try: | |
self._pp('Model', 'TTM, Tiny Torch Model on: GPU') | |
dev = torch.device("cuda:0") | |
a2,b2,c2 = self.fetch_info_flops(mtoy, device=dev) | |
y2 = round((a2/b2)/self.flops_per_sec_gcolab_gpu * 100, 2) | |
self._pp('Flops', f'{a2:,} flops') | |
self._pp('Total elapse time', f'{b2:,} seconds') | |
self._pp('Flops compared', f'{y2:,}% of Google Colab Pro') | |
d2 = round(((a2/b2)/(a/b))*100, 2) | |
self._pp('Flops GPU compared', f'{d2:,}% of CPU (or {round(d2-100,2):,}% faster)') | |
for i, val in enumerate(c2): | |
self._pp(f'Info {i}', val) | |
except Exception as e: | |
self._pp('Error', e) | |
self._ph() | |
return | |
# | |
# | |
def fetch_installed_libraries(self): | |
""" | |
Retrieves and prints the names and versions of Python libraries installed by the user, | |
excluding the standard libraries. | |
Args: | |
----- | |
None | |
Returns: | |
-------- | |
dictionary: (dict) | |
A dictionary where keys are the names of the libraries and values are their respective versions. | |
Examples: | |
--------- | |
libraries = get_installed_libraries() | |
for name, version in libraries.items(): | |
print(f"{name}: {version}") | |
""" | |
# List of standard libraries (this may not be exhaustive and might need updates based on the Python version) | |
# Run pip freeze command to get list of installed packages with their versions | |
result = subprocess.run(['pip', 'freeze'], stdout=subprocess.PIPE) | |
# Decode result and split by lines | |
packages = result.stdout.decode('utf-8').splitlines() | |
# Split each line by '==' to separate package names and versions | |
installed_libraries = {} | |
for package in packages: | |
try: | |
name, version = package.split('==') | |
installed_libraries[name] = version | |
except Exception as e: | |
#print(f'{package}: Error: {e}') | |
pass | |
return installed_libraries | |
# | |
# | |
def fetch_match_file_dict(self, file_path, reference_dict): | |
""" | |
Reads a file from the disk, creates an array with each line as an item, | |
and checks if each line exists as a key in the provided dictionary. If it exists, | |
the associated value from the dictionary is also returned. | |
Parameters: | |
----------- | |
file_path: str | |
Path to the file to be read. | |
reference_dict: dict | |
Dictionary against which the file content (each line) will be checked. | |
Returns: | |
-------- | |
dict: | |
A dictionary where keys are the lines from the file and values are either | |
the associated values from the reference dictionary or None if the key | |
doesn't exist in the dictionary. | |
Raises: | |
------- | |
FileNotFoundError: | |
If the provided file path does not exist. | |
""" | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"The file at {file_path} does not exist.") | |
with open(file_path, 'r') as file: | |
lines = file.readlines() | |
# Check if each line (stripped of whitespace and newline characters) exists in the reference dictionary. | |
# If it exists, fetch its value. Otherwise, set the value to None. | |
results = {line.strip(): reference_dict.get(line.strip().replace('_', '-'), None) for line in lines} | |
return results | |
# print fech_info about myself | |
def print_info_self(self): | |
""" | |
Prints information about the model/myself. | |
Args: | |
None | |
Returns: | |
None | |
""" | |
self._ph() | |
self._pp("Hello, I am", self.name) | |
self._pp("I will display", "Python, Jupyter, and system info.") | |
self._pp("For complete doc type", "help(pluto) ...or help(your_object_name)") | |
self._pp('.','.') | |
self._pp("...", "Β―\_(γ)_/Β―") | |
self._ph() | |
# system | |
self._pp('System', 'Info') | |
x = self.fetch_info_system() | |
print(x) | |
self._ph() | |
# gpu | |
self._pp('GPU', 'Info') | |
x = self.fetch_info_gpu() | |
print(x) | |
self._ph() | |
# lib used | |
self._pp('Installed lib from', self.fname_requirements) | |
self._ph() | |
x = self.fetch_match_file_dict(self.fname_requirements, self.fetch_installed_libraries()) | |
for item, value in x.items(): | |
self._pp(f'{item} version', value) | |
self._ph() | |
self._pp('Standard lib from', 'System') | |
self._ph() | |
self._pp('matplotlib version', matplotlib.__version__) | |
self._pp('numpy version', numpy.__version__) | |
self._pp('pandas version',pandas.__version__) | |
self._pp('PIL version', PIL.__version__) | |
self._pp('torch version', torch.__version__) | |
self._ph() | |
# host ip | |
self._pp('Host', 'Info') | |
x = self.fetch_info_host_ip() | |
print(x) | |
self._ph() | |
# | |
return | |
# | |
# | |
# define TTM for use in calculating flops | |
class TTM(torch.nn.Module): | |
""" | |
Tiny Torch Model (TTM) | |
This is a toy model consisting of four convolutional layers. | |
Args: | |
input_shape (tuple): input tensor size. | |
Returns: | |
(tensor): output of the model. | |
""" | |
def __init__(self, input_shape=(1, 3, 224, 224)): | |
super(TTM, self).__init__() | |
self.conv1 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) | |
self.conv2 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) | |
self.conv3 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) | |
self.conv4 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1) | |
def forward(self, x1): | |
x1 = self.conv1(x1) | |
x1 = self.conv2(x1) | |
x1 = self.conv3(x1) | |
x1 = self.conv4(x1) | |
return x1 | |
# | |
# (end of class TTM) | |
# add module/method | |
# | |
import functools | |
def add_method(cls): | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
return func(*args, **kwargs) | |
setattr(cls, func.__name__, wrapper) | |
return func # returning func means func can still be used normally | |
return decorator | |
# | |
# [END OF pluto_happy] | |
## %%write app.py | |
import openai | |
import gradio | |
# %%write -a app.py | |
# wake up monty | |
monty = Pluto_Happy('Monty, shares or steal') | |
# %%write -a app.py | |
# check out my environments | |
# monty.fname_requirements = 'pluto_happy/requirements.txt' | |
# monty.print_info_self() | |
# %%write -a app.py | |
monty._huggingface_key=b'gAAAAABld_3fKLl7aPBJzfAq-th37t95pMu2bVbH9QccOSecaUnm33XrpKpCXP4GL6Wr23g3vtrKWli5JK1ZPh18ilnDb_Su6GoVvU92Vzba64k3gBQwKF_g5DoH2vWq2XM8vx_5mKJh' | |
monty._kaggle_key=b'gAAAAABld_4_B6rrRhFYyfl77dacu1RhR4ktaLU6heYhQBSIj4ELBm7y4DzU1R8-H4yPKd0w08s11wkFJ9AR7XyESxM1SsrMBzqQEeW9JKNbl6jAaonFGmqbhFblkQqH4XjsapZru0qX' | |
monty._fkey="fes_f8Im569hYnI1Tn6FqP-6hS4rdmNOJ6DWcRPOsvc=" | |
monty._fkey=monty._fkey[::-1] | |
monty._ok=b'gAAAAABld_-y70otUll4Jwq3jEBXiw1tooSFo_gStRbkCyuu9_Dmdehc4M8lI_hFbum9CwyZuj9ZnXgxFIROebcPSF5qoA197VRvzUDQOMxY5zmHnImVROrsXVdZqXyIeYH_Q6cvXvFTX3rLBIKKWgvJmnpYGRaV6Q==' | |
# %%write -a app.py | |
# client.moderations.create() | |
ai_client = openai.OpenAI(api_key=monty.decrypt_it(monty._ok)) | |
# %%write -a app.py | |
fname = 'toxic_data.csv' | |
monty.df_toxic_data = pandas.read_csv(fname) | |
# %%writefile -a app.py | |
# | |
# # for openai version 1.3.8 | |
# | |
def _fetch_moderate_engine(self): | |
self.ai_client = openai.OpenAI(api_key=self.decrypt_it(self._ok)) | |
self.text_model = "text-moderation-latest" | |
return | |
# | |
# | |
def _censor_me(self, p, safer=0.0005): | |
self._fetch_moderate_engine() | |
resp_orig = self.ai_client.moderations.create(input=p, model=self.text_model) | |
resp_dict = resp_orig.model_dump() | |
# | |
v1 = resp_dict["results"][0]["category_scores"] | |
max_key = max(v1, key=v1.get) | |
max_value = v1[max_key] | |
sum_value = sum(v1.values()) | |
# | |
v1["is_safer_flagged"] = False | |
if (max_value >= safer): | |
v1["is_safer_flagged"] = True | |
v1["is_flagged"] = resp_dict["results"][0]["flagged"] | |
v1['max_key'] = max_key | |
v1['max_value'] = max_value | |
v1['sum_value'] = sum_value | |
v1['safer_value'] = safer | |
v1['message'] = p | |
return v1 | |
# | |
def _draw_censor(self,data): | |
self._color_mid_gray = '#6c757d' | |
exp = (0.01, 0.01) | |
x = [data['max_value'], (1-data['max_value'])] | |
title=f"\nUnsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n" | |
lab = [data['max_key'], 'Other 13 categories'] | |
if (data['is_flagged']): | |
col=[self.color_danger, self.color_mid_gray] | |
elif (data['is_safer_flagged']): | |
col=[self.color_warning, self.color_mid_gray] | |
lab = ['Relative Score:\n'+data['max_key'], 'Other 13 categories'] | |
title=f"\nPersonal Unsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n" | |
else: | |
col=[self.color_mid_gray, self.color_success] | |
lab = ['False Negative:\n'+data['max_key'], 'Other 13 categories'] | |
title='\nSafe Message\n' | |
canvas = self._draw_donut(x, lab, col, exp,title) | |
return canvas | |
# | |
def _draw_donut(self,data,labels,col, exp,title): | |
# col = [self.color_danger, self._color_secondary] | |
# exp = (0.01, 0.01) | |
# Create a pie chart | |
canvas, pic = matplotlib.pyplot.subplots() | |
pic.pie(data, explode=exp, | |
labels=labels, | |
colors=col, | |
autopct='%1.1f%%', | |
startangle=90, | |
textprops={'color':'#0a0a0a'}) | |
# Draw a circle at the center of pie to make it look like a donut | |
# centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc='white') | |
centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc=col[0],linewidth=2, ec='white') | |
canvas = matplotlib.pyplot.gcf() | |
canvas.gca().add_artist(centre_circle) | |
# Equal aspect ratio ensures that pie is drawn as a circle. | |
pic.axis('equal') | |
pic.set_title(title) | |
canvas.tight_layout() | |
# canvas.show() | |
return canvas | |
# | |
# def censor_me(self, msg, safer=0.02, ibutton_1=0): | |
def censor_me(self, msg, safer): | |
# safer=0.2 | |
yjson = self._censor_me(msg,safer) | |
_canvas = self._draw_censor(yjson) | |
_yjson = json.dumps(yjson, indent=4) | |
# return (_canvas, _yjson) | |
return(_canvas) | |
# %%write -a app.py | |
# result from a lot of prompt AI and old fashion try and error | |
# print(gradio.__version__) | |
import random | |
def say_hello(val): | |
return f"Hello: {val}" | |
def say_toxic(): | |
return f"I am toxic" | |
def fetch_toxic_tweets(maxi=2): | |
sample_df = monty.df_toxic_data.sample(maxi) | |
is_true = random.choice([True, False]) | |
c1 = "more_toxic" | |
if is_true: | |
c1 = "less_toxic" | |
toxic1 = sample_df[c1].iloc[0] | |
# toxic1 = "cat eats my homework." | |
return sample_df.to_html(index=False), toxic1 | |
# | |
# define all gradio widget/components outside the block for easy to visualize the blocks structure | |
# | |
in1 = gradio.Textbox(lines=3, label="Enter Text:") | |
in2 = gradio.Slider(0.005, .1, value=0.02, step=.005,label="Personalize Safer Value: (larger value is less safe)") | |
out1 = gradio.Plot(label="Output:") | |
out2 = gradio.HTML(label="Real-world Toxic Posts/Tweets: *WARNING") | |
out3 = gradio.Textbox(lines=5, label="Output JSON:") | |
but1 = gradio.Button("Measure 14 Toxicity", variant="primary",size="sm") | |
but2 = gradio.Button("Fetch Toxic Text", variant="stop", size="sm") | |
# | |
txt1 = """ | |
# π Welcome Friendly Text Moderation | |
### Identify 14 categories of text toxicity. | |
>The purpose of this NLP (Natural Language Processing) AI demonstration is to prevent profanity, vulgarity, hate speech, violence, sexism, and any other offensive language. | |
>It is **not an act of censorship**, as the final UI (User Interface) will give the reader, but not a young reader, the option to click on a label to read the toxic message. | |
>The goal is to create a safer and more respectful environment for you, your colleages, and your family. | |
--- | |
### π΄ Helpful Instruction: | |
1. Enter your [harmful] message in the input box. | |
2. Click the "Measure 14 Toxicity" button. | |
3. View the result on the Donut plot. | |
4. (**Optional**) Click on the "Fetch Real World Toxic Dataset" below. | |
5. Please find below the explanation of additional options available. | |
""" | |
txt2 = """ | |
## π» Author and Developer Notes: | |
--- | |
- The demo uses the cutting-edge (2024) AI Natural Language Processing (NLP) model from OpenAI. | |
- It is not a Generative (GenAI) model, such as Google Gemini or GPT-4. | |
- The NLP understands the message context, nuance, innuendo, and not just swear words. | |
- We **challenge you** to trick it, i.e., write a toxic tweet or post, but our AI thinks it is safe. If you win, please send us your message. | |
- The 14 toxicity categories are as follows: | |
1. harassment | |
2. harassment threatening | |
3. harassment instructions | |
4. hate | |
5. hate threatening | |
6. hate instructions | |
7. self harm | |
8. self harm instructions | |
9. self harm intent | |
10. self harm minor | |
11. sexual | |
12. sexual minors | |
13. violence | |
14. violence graphic | |
- If the NLP model classifies the message as "safe," you can still limit the level of toxicity by using the "Personal Safe" slider. | |
- The smaller the personal-safe value, the stricter the limitation. It means that if you're a young or sensitive adult, you should choose a lower personal-safe value, less than 0.02, to ensure you're not exposed to harmful content. | |
- The color of the donut plot is as follows: | |
- Red is an "unsafe" message by the NLP model | |
- Green is a "safe" message | |
- Yellow is an "unsafe" message by your toxicity level | |
- The real-world dataset is from the Jigsaw Rate Severity of Toxic Comments on Kaggle. It has 30,108 records. | |
- The intent is to share with Duc's friends and colleagues, but for those with nefarious intent, this Text Moderation model is governed by the GNU 3.0 License: https://www.gnu.org/licenses/gpl-3.0.en.html | |
- Author: **Duc Haba, 2024** | |
""" | |
txt3 = """ | |
## π₯ WARNING: WARNING: | |
--- | |
- The following button will retrieve **real-world** offensive posts from Twitter and customer reviews from consumer companies. | |
- The button will display four toxic messages at a time. **Click again** for four more random messages. | |
- They contain **profanity, vulgarity, hate, violence, sexism, and other offensive language.** | |
- After you fetch the toxic messages, Click on the **"Measure 14 Toxicity" button**. | |
""" | |
#reverse_button.click(process_text, inputs=text_input, outputs=reversed_text) | |
# | |
with gradio.Blocks() as gradio_app: | |
# title | |
gradio.Markdown(txt1) # any html or simple mark up | |
# | |
# first row, has two columns 1/3 size and 2/3 size | |
with gradio.Row(): # items inside rows are columns | |
# left column | |
with gradio.Column(scale=1): # items under columns are row, scale is 1/3 size | |
# left column has two rows, text entry, and buttons | |
in1.render() | |
in2.render() | |
but1.render() | |
but1.click(monty.censor_me, inputs=[in1, in2], outputs=out1) | |
with gradio.Column(scale=2): | |
out1.render() | |
# | |
# second row is warning text | |
with gradio.Row(): | |
gradio.Markdown(txt3) | |
# third row is fetching toxic data | |
with gradio.Row(): | |
with gradio.Column(scale=1): | |
but2.render() | |
but2.click(fetch_toxic_tweets, inputs=None, outputs=[out2, in1]) | |
with gradio.Column(scale=2): | |
out2.render() | |
# fourth row is note text | |
with gradio.Row(): | |
gradio.Markdown(txt2) | |
# %%write -a app.py | |
# open/launch it | |
gradio_app.launch() |