|
import aiomysql |
|
|
|
from __init__ import mysql_option |
|
|
|
|
|
class AIOMYSQL: |
|
def __init__(self) -> None: |
|
self.pool = None |
|
|
|
async def init_pool(self): |
|
try: |
|
__pool = await aiomysql.create_pool( |
|
host=mysql_option["HOST"], |
|
port=mysql_option["PORT"], |
|
user=mysql_option["USERNAME"], |
|
password=mysql_option["PASSWORD"], |
|
db=mysql_option["DATABASE"], |
|
charset="utf8", |
|
autocommit=False, |
|
minsize=5, |
|
maxsize=10, |
|
cursorclass=aiomysql.DictCursor, |
|
) |
|
return __pool |
|
except: |
|
raise ("aiomysql create_pool error") |
|
|
|
async def get_cursor(self): |
|
conn = await self.pool.acquire() |
|
cur = await conn.cursor() |
|
return conn, cur |
|
|
|
async def query(self, sql, param=None): |
|
conn, cur = await self.get_cursor() |
|
try: |
|
await cur.execute(sql, param) |
|
except: |
|
await cur.rollback() |
|
print("aiomysql query error") |
|
else: |
|
return await cur.fetchall() |
|
finally: |
|
if cur: |
|
await cur.close() |
|
|
|
await self.pool.release(conn) |
|
|
|
async def update(self, sql, param=None): |
|
conn, cur = await self.get_cursor() |
|
try: |
|
await cur.execute(sql, param) |
|
except: |
|
await cur.rollback() |
|
print("aiomysql query error") |
|
else: |
|
await conn.commit() |
|
finally: |
|
if cur: |
|
await cur.close() |
|
|
|
await self.pool.release(conn) |
|
|
|
|
|
async def get_aiomysql_instance(): |
|
instance = AIOMYSQL() |
|
instance.pool = await instance.init_pool() |
|
return instance |
|
|