import aiomysql from __init__ import mysql_option class AIOMYSQL: def __init__(self) -> None: self.pool = None async def init_pool(self): try: __pool = await aiomysql.create_pool( host=mysql_option["HOST"], port=mysql_option["PORT"], user=mysql_option["USERNAME"], password=mysql_option["PASSWORD"], db=mysql_option["DATABASE"], charset="utf8", autocommit=False, minsize=5, maxsize=10, cursorclass=aiomysql.DictCursor, ) return __pool except: raise ("aiomysql create_pool error") async def get_cursor(self): conn = await self.pool.acquire() cur = await conn.cursor() return conn, cur async def query(self, sql, param=None): conn, cur = await self.get_cursor() try: await cur.execute(sql, param) except: await cur.rollback() print("aiomysql query error") else: return await cur.fetchall() finally: if cur: await cur.close() # 释放 conn 连接回连接池 await self.pool.release(conn) async def update(self, sql, param=None): conn, cur = await self.get_cursor() try: await cur.execute(sql, param) except: await cur.rollback() print("aiomysql query error") else: await conn.commit() finally: if cur: await cur.close() # 释放 conn 连接回连接池 await self.pool.release(conn) async def get_aiomysql_instance(): instance = AIOMYSQL() instance.pool = await instance.init_pool() return instance