sig / option_mysql.py
Evilmass
change pymysql to aiomysql
8786c0f
import aiomysql
from __init__ import mysql_option
class AIOMYSQL:
def __init__(self) -> None:
self.pool = None
async def init_pool(self):
try:
__pool = await aiomysql.create_pool(
host=mysql_option["HOST"],
port=mysql_option["PORT"],
user=mysql_option["USERNAME"],
password=mysql_option["PASSWORD"],
db=mysql_option["DATABASE"],
charset="utf8",
autocommit=False,
minsize=5,
maxsize=10,
cursorclass=aiomysql.DictCursor,
)
return __pool
except:
raise ("aiomysql create_pool error")
async def get_cursor(self):
conn = await self.pool.acquire()
cur = await conn.cursor()
return conn, cur
async def query(self, sql, param=None):
conn, cur = await self.get_cursor()
try:
await cur.execute(sql, param)
except:
await cur.rollback()
print("aiomysql query error")
else:
return await cur.fetchall()
finally:
if cur:
await cur.close()
# 释放 conn 连接回连接池
await self.pool.release(conn)
async def update(self, sql, param=None):
conn, cur = await self.get_cursor()
try:
await cur.execute(sql, param)
except:
await cur.rollback()
print("aiomysql query error")
else:
await conn.commit()
finally:
if cur:
await cur.close()
# 释放 conn 连接回连接池
await self.pool.release(conn)
async def get_aiomysql_instance():
instance = AIOMYSQL()
instance.pool = await instance.init_pool()
return instance