import gradio as gr from response_parser import * def initialization(state_dict: Dict) -> None: if not os.path.exists('cache'): os.mkdir('cache') if state_dict["bot_backend"] is None: state_dict["bot_backend"] = BotBackend() if 'OPENAI_API_KEY' in os.environ: del os.environ['OPENAI_API_KEY'] def get_bot_backend(state_dict: Dict) -> BotBackend: return state_dict["bot_backend"] def switch_to_gpt4(state_dict: Dict, whether_switch: bool) -> None: bot_backend = get_bot_backend(state_dict) if whether_switch: bot_backend.update_gpt_model_choice("GPT-4") else: bot_backend.update_gpt_model_choice("GPT-3.5") def add_text(state_dict: Dict, history: List, text: str) -> Tuple[List, Dict]: bot_backend = get_bot_backend(state_dict) bot_backend.add_text_message(user_text=text) history = history + [(text, None)] return history, gr.update(value="", interactive=False) def add_file(state_dict: Dict, history: List, files) -> List: bot_backend = get_bot_backend(state_dict) for file in files: path = file.name filename = os.path.basename(path) bot_msg = [f'📁[{filename}]', None] history.append(bot_msg) bot_backend.add_file_message(path=path, bot_msg=bot_msg) _, suffix = os.path.splitext(filename) if suffix in {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}: copied_file_path = f'{bot_backend.jupyter_work_dir}/{filename}' width, height = get_image_size(copied_file_path) bot_msg[0] += \ f'\n ' return history def undo_upload_file(state_dict: Dict, history: List) -> Tuple[List, Dict]: bot_backend = get_bot_backend(state_dict) bot_msg = bot_backend.revoke_file() if bot_msg is None: return history, gr.Button.update(interactive=False) else: assert history[-1] == bot_msg del history[-1] if bot_backend.revocable_files: return history, gr.Button.update(interactive=True) else: return history, gr.Button.update(interactive=False) def refresh_file_display(state_dict: Dict) -> List[str]: bot_backend = get_bot_backend(state_dict) work_dir = bot_backend.jupyter_work_dir filenames = os.listdir(work_dir) paths = [] for filename in filenames: path = os.path.join(work_dir, filename) if not os.path.isdir(path): paths.append(path) return paths def refresh_token_count(state_dict: Dict): bot_backend = get_bot_backend(state_dict) model_choice = bot_backend.gpt_model_choice sliced = bot_backend.sliced token_count = bot_backend.context_window_tokens token_limit = config['model_context_window'][config['model'][model_choice]['model_name']] display_text = f"**Context token:** {token_count}/{token_limit}" if sliced: display_text += '\\\nToken limit exceeded, conversion has been sliced.' return gr.Markdown.update(value=display_text) def restart_ui(history: List) -> Tuple[List, Dict, Dict, Dict, Dict, Dict, Dict]: history.clear() return ( history, gr.Textbox.update(value="", interactive=False), gr.Button.update(interactive=False), gr.Button.update(interactive=False), gr.Button.update(interactive=False), gr.Button.update(interactive=False), gr.Button.update(visible=False) ) def restart_bot_backend(state_dict: Dict) -> None: bot_backend = get_bot_backend(state_dict) bot_backend.restart() def stop_generating(state_dict: Dict) -> None: bot_backend = get_bot_backend(state_dict) if bot_backend.code_executing: bot_backend.send_interrupt_signal() else: bot_backend.update_stop_generating_state(stop_generating=True) def bot(state_dict: Dict, history: List) -> List: bot_backend = get_bot_backend(state_dict) while bot_backend.finish_reason in ('new_input', 'function_call'): if history[-1][1]: history.append([None, ""]) else: history[-1][1] = "" try: response = chat_completion(bot_backend=bot_backend) for chunk in response: if chunk['choices'] and chunk['choices'][0]['finish_reason'] == 'function_call': if bot_backend.function_name in bot_backend.jupyter_kernel.available_functions: yield history, gr.Button.update(value='⏹️ Interrupt execution'), gr.Button.update(visible=False) else: yield history, gr.Button.update(interactive=False), gr.Button.update(visible=False) if bot_backend.stop_generating: response.close() if bot_backend.content: bot_backend.add_gpt_response_content_message() if bot_backend.display_code_block: bot_backend.update_display_code_block( display_code_block="\n⚫Stopped:\n```python\n{}\n```".format(bot_backend.code_str) ) history = copy.deepcopy(bot_backend.bot_history) history[-1][1] += bot_backend.display_code_block bot_backend.add_function_call_response_message(function_response=None) bot_backend.reset_gpt_response_log_values() break history, weather_exit = parse_response( chunk=chunk, history=history, bot_backend=bot_backend ) yield ( history, gr.Button.update( interactive=False if bot_backend.stop_generating else True, value='⏹️ Stop generating' ), gr.Button.update(visible=False) ) if weather_exit: exit(-1) except openai.OpenAIError as openai_error: bot_backend.reset_gpt_response_log_values(exclude=['finish_reason']) yield history, gr.Button.update(interactive=False), gr.Button.update(visible=True) raise openai_error yield history, gr.Button.update(interactive=False, value='⏹️ Stop generating'), gr.Button.update(visible=False) if __name__ == '__main__': config = get_config() with gr.Blocks(theme=gr.themes.Base()) as block: """ Reference: https://www.gradio.app/guides/creating-a-chatbot-fast """ # UI components state = gr.State(value={"bot_backend": None}) with gr.Tab("Chat"): chatbot = gr.Chatbot([], elem_id="chatbot", label="Local Code Interpreter", height=750) with gr.Row(): with gr.Column(scale=0.85): text_box = gr.Textbox( show_label=False, placeholder="Enter text and press enter, or upload a file", container=False ) with gr.Column(scale=0.15, min_width=0): file_upload_button = gr.UploadButton("📁", file_count='multiple', file_types=['file']) with gr.Row(equal_height=True): with gr.Column(scale=0.08, min_width=0): check_box = gr.Checkbox(label="Use GPT-4", interactive=config['model']['GPT-4']['available']) with gr.Column(scale=0.314, min_width=0): model_token_limit = config['model_context_window'][config['model']['GPT-3.5']['model_name']] token_count_display_text = f"**Context token:** 0/{model_token_limit}" token_monitor = gr.Markdown(value=token_count_display_text) with gr.Column(scale=0.15, min_width=0): retry_button = gr.Button(value='🔂OpenAI Error, click here to retry', visible=False) with gr.Column(scale=0.15, min_width=0): stop_generation_button = gr.Button(value='⏹️ Stop generating', interactive=False) with gr.Column(scale=0.15, min_width=0): restart_button = gr.Button(value='🔄 Restart') with gr.Column(scale=0.15, min_width=0): undo_file_button = gr.Button(value="↩️Undo upload file", interactive=False) with gr.Tab("Files"): file_output = gr.Files() # Components function binding txt_msg = text_box.submit(add_text, [state, chatbot, text_box], [chatbot, text_box], queue=False).then( lambda: gr.Button.update(interactive=False), None, [undo_file_button], queue=False ).then( bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] ) txt_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) txt_msg.then(lambda: gr.update(interactive=True), None, [text_box], queue=False) txt_msg.then(fn=refresh_token_count, inputs=[state], outputs=[token_monitor]) retry_button.click(lambda: gr.Button.update(visible=False), None, [retry_button], queue=False).then( bot, [state, chatbot], [chatbot, stop_generation_button, retry_button] ).then( fn=refresh_file_display, inputs=[state], outputs=[file_output] ).then( lambda: gr.update(interactive=True), None, [text_box], queue=False ).then( fn=refresh_token_count, inputs=[state], outputs=[token_monitor] ) check_box.change(fn=switch_to_gpt4, inputs=[state, check_box]).then( fn=refresh_token_count, inputs=[state], outputs=[token_monitor] ) file_msg = file_upload_button.upload( add_file, [state, chatbot, file_upload_button], [chatbot], queue=False ) file_msg.then(lambda: gr.Button.update(interactive=True), None, [undo_file_button], queue=False) file_msg.then(fn=refresh_file_display, inputs=[state], outputs=[file_output]) undo_file_button.click( fn=undo_upload_file, inputs=[state, chatbot], outputs=[chatbot, undo_file_button] ).then( fn=refresh_file_display, inputs=[state], outputs=[file_output] ) stop_generation_button.click(fn=stop_generating, inputs=[state], queue=False).then( fn=lambda: gr.Button.update(interactive=False), inputs=None, outputs=[stop_generation_button], queue=False ) restart_button.click( fn=restart_ui, inputs=[chatbot], outputs=[ chatbot, text_box, restart_button, file_upload_button, undo_file_button, stop_generation_button, retry_button ] ).then( fn=restart_bot_backend, inputs=[state], queue=False ).then( fn=refresh_file_display, inputs=[state], outputs=[file_output] ).then( fn=lambda: (gr.Textbox.update(interactive=True), gr.Button.update(interactive=True), gr.Button.update(interactive=True)), inputs=None, outputs=[text_box, restart_button, file_upload_button], queue=False ).then( fn=refresh_token_count, inputs=[state], outputs=[token_monitor] ) block.load(fn=initialization, inputs=[state]) block.queue() block.launch(inbrowser=True)