记一次SQLAlchemy连接MySQL超时的处理

问题描述:

在公司有一个项目使用 MySQL,ORM 使用的 SQLAlchemy,但是一段时间后总后出现MySQL 2013,或者 2006 之类的错误,大致的意思的就是链接失效了,导致不能正确的连接MySQL,因为是线上环境,错误要很长时间才会出错,加上每次修改都需要上线,且测试环境不好复现,没办法,知道今天才把代码悄悄的拷回来研究。

使用 docker 启动 MySQL

1
2
3
docker pull mysql 

docker run -itd --name mysql8 -p 3306:3306 MYSQL_ROOT_PASSWORD=123456 mysql:latest

因为拉取的是最新的 MySQL 镜像,即 MySQL8,所以默认不能使用 Navicat 连接,只需要修改即可。
进入docker 后,修改配置文件 /etc/my.cnf 下 在 [mysqld] 下新增 ( 因为我们要测试MySQL主动断开链接的情况,默认是 8 小时,是不能等8小时的 )

1
2
wait_timeout=20  
interactive_timeout=20

也可以不使用修改配置文件的方式来改变wait_timeout,interactive_timeout等,我们也可以在交互式命令行中使用下面命令

1
2
msyql> set global wait_timeout=28800;
msyql> set global interactive_timeout=28800;

查看设置的过期时间

1
show global variables like '%timeout%';

设置允许远程连接,密码等

1
2
3
4
5
mysql -uroot -p
grant all PRIVILEGES on *.* to root@'%' WITH GRANT OPTION;
ALTER user 'root'@'%' IDENTIFIED BY '123456' PASSWORD EXPIRE NEVER;
ALTER user 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
FLUSH PRIVILEGES;

最后成功的代码如下,大致就是使用请求上下文,不使用pool,每次操作前都去获取一个链接,并且添加监听,使用 MySQL 的 ping 让其保持有效

app 加载时 session 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def checkout_listener(dbapi_con, con_record, con_proxy):
try:
try:
dbapi_con.ping(False)
except TypeError as e:
dbapi_con.ping()
except dbapi_con.OperationalError as exc:
if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
raise DisconnectionError()
else:
raise

with app.app_context():
engine = create_engine(app.config.get('SQLALCHEMY_DATABASE_URI'), pool_pre_ping=True, encoding='utf-8',poolclass=NullPool, convert_unicode=True, echo=True)
event.listen(engine, 'checkout', checkout_listener)

session_factory = sessionmaker(bind=engine)

@contextmanager
def session_scope():
session = session_factory()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
pass

dbUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from tool import const
from tool.error import CustomFlaskErr
from fishbase.fish_logger import logger
from fishbase.fish_common import SingleTon
from application import session_scope

class DBUtils(SingleTon):
def __init__(self, db):
pass

def add(self, table_object, *data_list, **data):
"""
新增数据
:param table_object: 表对象
:param data_list: list, 批量插入的数据
:param data: dict, 插入单条数据
:return:
"""
with session_scope() as session:
try:
if data_list: # 批量插入
table_object_list = []
for x in data_list:
table_object_list.append(table_object(**x))
session.add_all(table_object_list)
elif data: # 单条插入
obj = table_object(**data)
session.add(obj)
session.commit()
except Exception as e:
session.rollback()
session.close()
logger.error("数据库插入错误: {0}".format(str(e)))
raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('插入'))

def delete(self, table_object, **filter_data):
"""
删除数据
:param table_object: 表对象
:param filter_data: dict, 删除条件
:return:
"""
with session_scope() as session:
try:
session.query(table_object).filter_by(**filter_data).delete()
session.commit()
except Exception as e:
session.rollback()
session.close()
logger.error("数据库删除错误: {0}".format(str(e)))
raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('删除'))

def modify(self, table_object, filter_dict, update_dict):
"""
修改操作
:param table_object: 表对象
:param filter_dict: dict, 修改条件
:param update_dict: dict, 修改的值
:return:
"""
with session_scope() as session:
try:
logger.info("开始更新数据库数据")
session.query(table_object).filter_by(**filter_dict).update(update_dict, synchronize_session='fetch')
logger.info("update 结束")
session.commit()
logger.info("commit 结束")
except Exception as e:
logger.info("出现错误!")
session.rollback()
logger.error("数据库修改错误: {0}".format(str(e)))
session.close()
raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('修改'))



def search(self, table_object, result_range='all', start=None, end=None, **filter_dict):
"""
简单条件查询, 支持查询所有数据, 第一条, 最后一条和切片操作
:param table_object: 表对象
:param result_range: 结果范围
:param start: 开始截取位置
:param end: 结束位置
:param filter_dict: 过滤条件
:return:
"""
ret = None
with session_scope() as session:
try:
if result_range == 'all':
ret = session.query(table_object).filter_by(**filter_dict).all()
elif result_range == 'first':
ret = session.query(table_object).filter_by(**filter_dict)[:1]
elif result_range == 'last':
ret = session.query(table_object).filter_by(**filter_dict)[-1:]
elif start and end:
ret = session.query(table_object).filter_by(**filter_dict).all()[start:end]
session.commit()
except Exception as e:
session.close()
logger.error("数据库查询错误: {0}".format(str(e)))
return ret

def create_table(self, all_table=False, table_name=None):
"""
创建表
:param all_table: 创建所有表
:param table_name: 创建指定表
:return:
"""
with session_scope() as session:
if all_table:
session.create_all()
elif table_name:
pass

def delete_table(self, all_table=False, table_name=None):
"""
删除表
:param all_table: 删除所有的表
:param table_name: 删除指定表
:return:
"""
with session_scope() as session:
if all_table:
session.drop_all()
elif table_name:
pass

def like(self, table_object, lick_obj, result_range='all', start=None, end=None):
with session_scope() as session:
ret = None
try:
if result_range == 'all':
ret = session.query(table_object).filter(lick_obj).all()
elif result_range == 'first':
ret = session.query(table_object).filter_by(lick_obj)[:1]
elif result_range == 'last':
ret = session.query(table_object).filter_by(lick_obj)[-1:]
elif start and end:
ret = session.query(table_object).filter_by(lick_obj).all()[start:end]
except Exception as e:
logger.error("数据库查询错误: {0}".format(str(e)))
session.close()
return ret

def object_search(self, table_object, filter_object):
with session_scope() as session:
ret = None
try:
ret = session.query(table_object).filter(filter_object)
except Exception as e:
logger.error("数据库查询错误: {0}".format(str(e)))
session.close()
return ret

# TODO 待完善like, and, or, 排序, 分组等

# if __name__ == '__main__':
pass

BaseModel

20191209 更新,上述 dbUtils 在使用中存在缺陷。改进后如下:

application -> model -> baseModel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import datetime

from application import session_scope


class BaseModel:
DB_ERROR = "DB Error: {error}"

"""
********************* 通用查询方法 *********************
"""

@classmethod
def row_to_dict(cls, row):
"""
数据转换为字典
:param row:
:return:
"""
if row is None:
return None
d = {}
for column in row.__table__.columns:
# 时间类型格式化
if column.type.python_type == datetime.datetime:
column_value = getattr(row, column.name)
if column_value:
d[column.name] = column_value.strftime("%Y-%m-%d %H:%M:%S")
else:
d[column.name] = getattr(row, column.name)
return d

@classmethod
def get_condition_data_count(cls, condition):
"""
获取符合条件的数据的条数
:param condition:
:return:
"""
with session_scope() as session:
try:
rows = session.query(cls).filter_by(**condition).count()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return rows

@classmethod
def get_all_data(cls) -> list:
"""
获取所有数据
:return:
"""
with session_scope() as session:
try:
rows = session.query(cls).all()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return [BaseModel.row_to_dict(row) for row in rows]

@classmethod
def get_data_by_id(cls, idd: int) -> list:
"""
根据 ID 查询数据
:param idd:
:return:
"""
with session_scope() as session:
try:
rows = session.query(cls).filter_by(**{"id": idd}).all()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return [BaseModel.row_to_dict(row) for row in rows]

@classmethod
def get_data_by_field(cls, field_name: str, field_value: str or int) -> list:
"""
根据某字段查询数据
:param field_name:
:param field_value:
:return:
"""
with session_scope() as session:
try:
rows = session.query(cls).filter_by(**{field_name: field_value}).all()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return [BaseModel.row_to_dict(row) for row in rows]

@classmethod
def get_data_by_multiple_condition(cls, condition):
"""
自定义多条件查询
:param condition:
:return:
"""
with session_scope() as session:
try:
rows = session.query(cls).filter_by(**condition).all()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return [BaseModel.row_to_dict(row) for row in rows]

"""
********************* 通用新增方法 *********************
"""

@classmethod
def add_data(cls, data: dict) -> int:
"""
新增数据,返回新增 ID
:param data:
:return:
"""
with session_scope() as session:
try:
runtime_object = cls(**data)
session.add(runtime_object)
# flush 后才能获取到本次新增的 id
session.flush()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return runtime_object.__getattribute__("id")

@classmethod
def add_multiple_data(cls, data_list) -> [int]:
"""
新增多组数据
:param data_list:
:return:
"""
with session_scope() as session:
try:
instance_list = []
for data in data_list:
runtime_object = cls(**data)
instance_list.append(runtime_object)

session.add_all(instance_list)
# flush 后才能获取到本次新增的 id
session.flush()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))
return [x.__getattribute__("id") for x in instance_list]

"""
********************* 通用新增方法 *********************
"""

@classmethod
def update_data(cls, condition: dict, data: dict) -> int:
"""
更新数据,返回影响条数
:param condition:
:param data:
:return:
"""

with session_scope() as session:
try:
return session.query(cls).filter_by(**condition).update(data)
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))

"""
********************* 通用删除方法 *********************
"""

@classmethod
def delete_data_by_id(cls, idd) -> int:
"""
根据 id 删除数据
:param idd:
:return:
"""
with session_scope() as session:
try:
return session.query(cls).filter_by(**{"id": idd}).delete()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))

@classmethod
def delete_data(cls, condition: dict) -> int:
"""
删除数据
:param condition:
:return:
"""
with session_scope() as session:
try:
return session.query(cls).filter_by(**condition).delete()
except Exception as e:
raise Exception(BaseModel.DB_ERROR.format(error=e))

其余 model 只需要继承 BaseModel 就有了这些公共的方法,如果这些常用的方法不满足需求,那么就可以在自己 model 中进行操作。