1.连接的基本语法如下:

先下载pymySQL

pip install PyMySQL

pyimport pymysql

#创建连接
conn = pymysql.Connect(host="127.0.0.1", port=3306, user="root", passwd="123456",db="test")
# 创建游标
cursor = conn.cursor()

#SQL语句增
sql1 = "insert into user(name) values ('大猪')"
#SQL语句查
sql2 = "select * from user"
#SQL语句改
sql3 = "update user set name = '大大猪' where id = 3"
#SQL语句删
sql4 = "delete from user where id = 3"

# 读取数据
number = cursor.execute(sql4)

print(number)

# 提交事务
conn.commit()

# 关闭游标
cursor.close()

# 关闭连接
conn.close()

2.创建连接模块

# -*- coding: UTF-8 -*-
import pymysql, traceback

class Mysql(object):

    # 初始化
    def __init__(self):
        #配置mysql数据库信息
        config = {
            'host': '127.0.0.1',
            'port': 3306,
            'user': 'root',
            'passwd': '123456',
            'db': 'api',
            'charset': 'utf8'
        }

        try:
            self.con = pymysql.connect(**config)
            # 所有的查询,都在连接 con 的一个模块 cursor 上面运行的
            self.cursor = self.con.cursor(cursor=pymysql.cursors.DictCursor)
        except:
            # pass
            print('数据库连接错误,请检查数据库连接配置。')

    # 关闭数据库连接和对象
    def __del__(self):
        # 关闭cursor对象
        self.cursor.close()
        self.con.close()

    # 查询记录数
    def get_count(self, sql='', payload=()):
        count = 0
        try:
            count = self.cursor.execute(sql,payload)
        except pymysql.Error:
           print('查询记录数失败:' + traceback.format_exc())
        return count

    # 查询单行记录
    def get_one(self, sql='', payload=()):
        res = None
        try:
            self.cursor.execute(sql,payload)
            res = self.cursor.fetchone()
        except pymysql.Error:
            print("查询单行记录失败:" + traceback.format_exc())
        return res

    # 查询列表记录
    def get_all(self, sql='', payload=()):
        res = None
        try:
            self.cursor.execute(sql,payload)
            res = self.cursor.fetchall()
        except pymysql.Error:
            print("查询全部记录失败:" + traceback.format_exc())
        return res

    # 新增数据
    def db_insert(self, sql='', payload=()):
        count = 0
        try:
            count = self.cursor.execute(sql,payload)
            self.con.commit()
        except pymysql.Error:
            print("新增/修改/删除记录失败:" + traceback.format_exc())
            self.con.rollback()
        return count

    # 新增数据返回自增主键值
    def db_insert_id(self, sql='', payload=()):
        id = None
        try:
            self.cursor.execute(sql, payload)
            self.con.commit()
            id = self.cursor.lastrowid
        except pymysql.Error:
            print("新增记录失败:" + traceback.format_exc())
            self.con.rollback()
        return id

    # 更新记录
    def db_update(self, sql='', payload=()):
        return self.db_insert(sql, payload)

    # 删除记录
    def db_delete(self, sql='', payload=()):
        return self.db_insert(sql,payload)

3.注意事项

上方的工具类有一个问题,就是在web使用此工具类的时候,因为长时间保持着连接却没有操作,数据库会关闭连接,比如mysql默认在8小时无操作后关闭链接,这时候工具类会因为连接关闭而无法操作数据库,进而抛出异常。

解决的办法就是通过dbutils数据库连接池来解决。

先下载数据库连接池,注意这里使用最新的2.0版本,不同版本语法可能有些许差别:

pip install DBUtils==2.0

改进后的工具类如下:

# -*- coding: UTF-8 -*-
import pymysql, traceback
from dbutils.pooled_db import PooledDB

class Mysql(object):

    # 初始化
    def __init__(self):
        #配置mysql数据库信息
        config = {
            'host': '127.0.0.1',
            'port': 3306,
            'user': 'root',
            'passwd': '123456',
            'db': 'api',
            'charset': 'utf8'
        }
        self.poolDB = PooledDB(
            # 指定数据库连接驱动
            creator=pymysql,
            # 连接池允许的最大连接数,0和None表示没有限制
            maxconnections=3,
            # 初始化时,连接池至少创建的空闲连接,0表示不创建
            mincached=2,
            # 连接池中空闲的最多连接数,0和None表示没有限制
            maxcached=6,
            # 连接池中最多共享的连接数量,0和None表示全部共享(其实没什么卵用)
            maxshared=3,
            # 连接池中如果没有可用共享连接后,是否阻塞等待,True表示等等,
            # False表示不等待然后报错
            blocking=True,
            # 开始会话前执行的命令列表
            setsession=[],
            # ping Mysql服务器检查服务是否可用
            ping=0,
            **config
        )

    # 创建连接
    def create_conn(self):
        # 从数据库连接池中取出一条连接
        conn = self.poolDB.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    # 关闭连接
    def close_conn(self, conn, cursor):
        if conn:
            conn.close()
        if cursor:
            cursor.close()

    # 查询记录数
    def get_count(self, sql='', payload=()):
        conn, cursor = self.create_conn()
        count = 0
        try:
            count = cursor.execute(sql,payload)
        except pymysql.Error:
            print('查询记录数失败:' + traceback.format_exc())
        finally:
            self.close_conn(conn, cursor)
        return count

    # 查询单行记录
    def get_one(self, sql='', payload=()):
        conn, cursor = self.create_conn()
        res = None
        try:
            cursor.execute(sql,payload)
            res = cursor.fetchone()
        except pymysql.Error:
            print("查询单行记录失败:" + traceback.format_exc())
        finally:
            self.close_conn(conn, cursor)
        return res

    # 查询列表记录
    def get_all(self, sql='', payload=()):
        conn, cursor = self.create_conn()
        res = None
        try:
            cursor.execute(sql,payload)
            res = cursor.fetchall()
        except pymysql.Error:
            print("查询全部记录失败:" + traceback.format_exc())
        finally:
            self.close_conn(conn, cursor)
        return res

    # 新增数据
    def db_insert(self, sql='', payload=()):
        conn, cursor = self.create_conn()
        count = 0
        try:
            count = cursor.execute(sql,payload)
            conn.commit()
        except pymysql.Error:
            print("新增/修改/删除记录失败:" + traceback.format_exc())
            conn.rollback()
        finally:
            self.close_conn(conn, cursor)
        return count

    # 新增数据返回自增主键值
    def db_insert_id(self, sql='', payload=()):
        conn, cursor = self.create_conn()
        id = None
        try:
            cursor.execute(sql, payload)
            conn.commit()
            id = cursor.lastrowid
        except pymysql.Error:
            print("新增记录失败:" + traceback.format_exc())
            conn.rollback()
        finally:
            self.close_conn(conn, cursor)
        return id

    # 更新记录
    def db_update(self, sql='', payload=()):
        return self.db_insert(sql, payload)

    # 删除记录
    def db_delete(self, sql='', payload=()):
        return self.db_insert(sql,payload)