1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| from tool import const from tool.error import CustomFlaskErr from fishbase.fish_logger import logger from fishbase.fish_common import SingleTon from application import session_scope
class DBUtils(SingleTon): def __init__(self, db): pass
def add(self, table_object, *data_list, **data): """ 新增数据 :param table_object: 表对象 :param data_list: list, 批量插入的数据 :param data: dict, 插入单条数据 :return: """ with session_scope() as session: try: if data_list: table_object_list = [] for x in data_list: table_object_list.append(table_object(**x)) session.add_all(table_object_list) elif data: obj = table_object(**data) session.add(obj) session.commit() except Exception as e: session.rollback() session.close() logger.error("数据库插入错误: {0}".format(str(e))) raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('插入'))
def delete(self, table_object, **filter_data): """ 删除数据 :param table_object: 表对象 :param filter_data: dict, 删除条件 :return: """ with session_scope() as session: try: session.query(table_object).filter_by(**filter_data).delete() session.commit() except Exception as e: session.rollback() session.close() logger.error("数据库删除错误: {0}".format(str(e))) raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('删除'))
def modify(self, table_object, filter_dict, update_dict): """ 修改操作 :param table_object: 表对象 :param filter_dict: dict, 修改条件 :param update_dict: dict, 修改的值 :return: """ with session_scope() as session: try: logger.info("开始更新数据库数据") session.query(table_object).filter_by(**filter_dict).update(update_dict, synchronize_session='fetch') logger.info("update 结束") session.commit() logger.info("commit 结束") except Exception as e: logger.info("出现错误!") session.rollback() logger.error("数据库修改错误: {0}".format(str(e))) session.close() raise CustomFlaskErr(const.INSERT_DATA_TO_DB_ERROR.format('修改'))
def search(self, table_object, result_range='all', start=None, end=None, **filter_dict): """ 简单条件查询, 支持查询所有数据, 第一条, 最后一条和切片操作 :param table_object: 表对象 :param result_range: 结果范围 :param start: 开始截取位置 :param end: 结束位置 :param filter_dict: 过滤条件 :return: """ ret = None with session_scope() as session: try: if result_range == 'all': ret = session.query(table_object).filter_by(**filter_dict).all() elif result_range == 'first': ret = session.query(table_object).filter_by(**filter_dict)[:1] elif result_range == 'last': ret = session.query(table_object).filter_by(**filter_dict)[-1:] elif start and end: ret = session.query(table_object).filter_by(**filter_dict).all()[start:end] session.commit() except Exception as e: session.close() logger.error("数据库查询错误: {0}".format(str(e))) return ret
def create_table(self, all_table=False, table_name=None): """ 创建表 :param all_table: 创建所有表 :param table_name: 创建指定表 :return: """ with session_scope() as session: if all_table: session.create_all() elif table_name: pass
def delete_table(self, all_table=False, table_name=None): """ 删除表 :param all_table: 删除所有的表 :param table_name: 删除指定表 :return: """ with session_scope() as session: if all_table: session.drop_all() elif table_name: pass
def like(self, table_object, lick_obj, result_range='all', start=None, end=None): with session_scope() as session: ret = None try: if result_range == 'all': ret = session.query(table_object).filter(lick_obj).all() elif result_range == 'first': ret = session.query(table_object).filter_by(lick_obj)[:1] elif result_range == 'last': ret = session.query(table_object).filter_by(lick_obj)[-1:] elif start and end: ret = session.query(table_object).filter_by(lick_obj).all()[start:end] except Exception as e: logger.error("数据库查询错误: {0}".format(str(e))) session.close() return ret
def object_search(self, table_object, filter_object): with session_scope() as session: ret = None try: ret = session.query(table_object).filter(filter_object) except Exception as e: logger.error("数据库查询错误: {0}".format(str(e))) session.close() return ret
pass
|