SQLAlchemy 连接池中的连接失效
大约 2 分钟
SQLAlchemy 连接池中的连接失效
1. 问题复现
在使用 SQLAlchemy 连接 MySQL 时,如果 MySQL 服务端主动断开连接,那么 SQLAlchemy 会报错。
查看 MySQL 连接超时时间 wait_timeout
,如未进行配置,默认值为 28800,即 8 小时。
show global variables like '%timeout%';
设置 MySQL 超时时间为 2 秒:
set global wait_timeout=2;
下面是在使用 asyncio-mqtt
时出现的问题,如果取消注释 await asyncio.sleep(3)
,连接将被闲置 3 秒,此时 MySQL 已经断开连接,再次发送请求时,就会报错。
import asyncio
import sys
import asyncio_mqtt as aiomqtt
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
import config
from src.services.save_data import select_strategy
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
engine = create_async_engine(config.DB_URL, echo=False)
session = async_sessionmaker(
engine,
expire_on_commit=False
)
async def main():
reconnect_interval = 1.0
while True:
try:
async with aiomqtt.Client(config.MQTT_URL) as client:
await client.subscribe('Home/#')
async with client.messages() as messages:
async for message in messages:
try:
msg = message.payload.decode()
print(message.topic.value, '\t', msg)
# 这是业务函数入口
await select_strategy(session, message.topic.value, msg)
except UnicodeDecodeError:
print('[ERROR] decode error',
message.topic.value, message.payload)
# await asyncio.sleep(3)
except aiomqtt.MqttError as error:
print(
f'Error `{error}`. Reconnecting in {reconnect_interval} seconds.')
await asyncio.sleep(reconnect_interval)
except Exception as e:
print('[ERROR] ', e)
print(message.topic.value, message.payload)
if __name__ == '__main__':
asyncio.run(main())
SQLAlchemy 会报错:
[ERROR] (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query')
(Background on this error at: https://sqlalche.me/e/20/e3q8)
2. 解决方案
我们先学习一下 SQLAlchemy 的连接池参数。
pool_size
:设置连接池中,保持的连接数。初始化时并不产生连接。max_overflow
:当连接池里的连接数已达到pool_size
时,且都被使用时。又要求从连接池里获取连接时,max_overflow
就是允许再新建的连接数。pool_timeout
:从连接池里获取连接,如果此时无空闲的连接。且连接数已经到达了pool_size+max_overflow
。此时获取连接的进程会等待pool_timeout
秒。如果超过这个时间,还没有获得将会抛出异常,默认为 30 秒。pool_recycle
:数据库连接的生存时间。一个连接当连接空闲pool_recycle
秒后,会被重置。默认为 -1,即永久可用。
当 pool_recycle
设置为 -1 时,也就是连接池不会主动丢弃这个连接。但是有可能数据库设置了连接超时时间。例如 MySQL,设置的有 wait_timeout
默认为 28800。当连接空闲 8 小时时会自动断开。
2.1 不使用连接池
在创建引擎时,设置 poolclass=NullPool
,不使用连接池。
缺点是降低连接效率。
engine = create_async_engine(config.DB_URL, echo=False, poolclass=NullPool)
2.2 设置连接超时时间
在创建引擎时,设置 pool_recycle=3600
,每 3600 秒(1 小时)回收连接,建议和 pool_pre_ping=True
一起使用。
engine = create_async_engine(config.DB_URL, echo=False, pool_recycle=3600)
2.3 自动重连
设置 pool_pre_ping=True
,每次从连接池中取出连接时,都会先尝试 ping(相当于 SELECT 1
),如果连接已经断开,那么 SQLAlchemy 会自动重连,并放弃所有旧的连接。
如果出现错误将最多重试 3 次,否则此错误正常抛出。
engine = create_async_engine(config.DB_URL, echo=False, pool_pre_ping=True)