Welcome 微信登录

首页 / 数据库 / MySQL / Python访问MySQL数据库并实现其增删改查功能

概述:
对于访问MySQL数据库的操作,我想大家也都有一些了解。不过,因为最近在学习Python,以下就用Python来实现它。其中包括创建数据库和数据表、插入记录、删除记录、修改记录数据、查询数据、删除数据表、删除数据库。还有一点就是我们最好使用一个新定义的类来处理这件事。因为这会使在以后的使用过程中更加的方便(只需要导入即可,避免了重复制造轮子)。实现功能介绍:1.封装一个DB类
2.数据库操作:创建数据库和数据表
3.数据库操作:插入记录
4.数据库操作:一次插入多条记录
5.数据库操作:删除记录
6.数据库操作:修改记录数据
7.数据库操作:一次修改多条记录数据
8.数据库操作:查询数据
9.数据库操作:删除数据表
10.数据库操作:删除数据库数据库类的定义:
heroDB.py#!/usr/bin/env pythonimport MySQLdbDATABASE_NAME = "hero"class HeroDB:
    # init class and create a database
    def __init__(self, name, conn, cur):
        self.name = name
        self.conn = conn
        self.cur = cur
        try:
            cur.execute("create database if not exists " + name)
            conn.select_db(name)
            conn.commit()
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
                       
    # create a table
    def createTable(self, name):
        try:
            ex = self.cur.execute
            if ex("show tables") == 0:
                ex("create table " + name + "(id int, name varchar(20), sex int, age int, info varchar(50))")
                self.conn.commit()
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # insert single record
    def insert(self, name, value):
        try:
            self.cur.execute("insert into " + name + " values(%s,%s,%s,%s,%s)", value)
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # insert more records
    def insertMore(self, name, values):
        try:
            self.cur.executemany("insert into " + name + " values(%s,%s,%s,%s,%s)", values)
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # update single record from table
    # name: table name
    # values: waiting to update data
    def updateSingle(self, name, value):
        try:
            # self.cur.execute("update " + name + " set name=" + str(values[1]) + ", sex=" + str(values[2]) + ", age=" + str(values[3]) + ", info=" + str(values[4]) + " where id=" + str(values[0]) + ";")
            self.cur.execute("update " + name + " set name=%s, sex=%s, age=%s, info=%s where id=%s;", value)
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # update some record from table
    def update(self, name, values):
        try:
            self.cur.executemany("update " + name + " set name=%s, sex=%s, age=%s, info=%s where id=%s;", values)
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # get record count from db table
    def getCount(self, name):
        try:
            count = self.cur.execute("select * from " + name)
            return count
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # select first record from database
    def selectFirst(self, name):
        try:
            self.cur.execute("select * from " + name + ";")
            result = self.cur.fetchone()
            return result
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # select last record from database
    def selectLast(self, name):
        try:
            self.cur.execute("SELECT * FROM " + name + " ORDER BY id DESC;")
            result = self.cur.fetchone()
            return result
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # select next n records from database
    def selectNRecord(self, name, n):
        try:
            self.cur.execute("select * from " + name + ";")
            results = self.cur.fetchmany(n)
            return results
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # select all records
    def selectAll(self, name):
        try:
            self.cur.execute("select * from " + name + ";")
            self.cur.scroll(0, mode="absolute") # reset cursor location (mode = absolute | relative)
            results = self.cur.fetchall()
            return results
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # delete a record
    def deleteByID(self, name, id):
        try:
            self.cur.execute("delete from " + name + " where id=%s;", id)
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
   
    # delete some record
    def deleteSome(self, name):
        pass
   
    # drop the table
    def dropTable(self, name):
        try:
            self.cur.execute("drop table " + name + ";")
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    # drop the database
    def dropDB(self, name):
        try:
            self.cur.execute("drop database " + name + ";")
        except MySQLdb.Error, e:
            print "Mysql Error %d: %s" % (e.args[0], e.args[1])
           
    def __del__(self):
        if self.cur != None:
            self.cur.close()
        if self.conn != None:
            self.conn.close()使用范例:
testHeroDB.py#!/usr/bin/env pythonimport MySQLdb
from heroDB import HeroDBdef main():
    conn = MySQLdb.connect(host="localhost", user="root", passwd="260606", db="hero", port=3306, charset="utf8")
    cur = conn.cursor()
   
    # ------------------------------------------- create -----------------------------------------------------
    hero = HeroDB("hero", conn, cur)
    hero.createTable("heros")
   
    # ------------------------------------------- insert -----------------------------------------------------
    hero.insert("heros", [3, "Prophet", 0, 2000, "The hero who in fairy tale."])    # ------------------------------------------- select -----------------------------------------------------
    print "-" * 60
    print "first record"
    result = hero.selectFirst("heros")
    print result
   
    print "-" * 60
    print "last record"
    result = hero.selectLast("heros")
    print result
   
    print "-" * 60
    print "more record"
    results = hero.selectNRecord("heros", 3)
    for item in results:
        print item
   
    print "-" * 60
    print "all record"
    results = hero.selectAll("heros")
    for item in results:
        print item
       
    # ------------------------------------------- update -----------------------------------------------------
    hero.updateSingle("heros", ["Zeus", 1, 22000, "The god.", 2])
   
    values = []
    values.append(["SunWukong", 1, 1300, "The hero who in fairy tale.", 1])
    values.append(["Zeus", 1, 50000, "The king who in The Quartet myth.", 2])
    values.append(["Prophet", 1, 20000, "The hero who in fairy tale.3", 3])
    hero.update("heros", values)
   
    # ------------------------------------------- delete -----------------------------------------------------
    hero.deleteByID("heros", 1)
   
    hero.dropTable("heros")
   
    hero.dropDB("hero")
   
if __name__ == "__main__":
    main()注:请不要不假思索地使用他们。如果你想实现某一个功能点,请最好将其他的功能点注释掉,这样才符合单元测试的规范。源码下载:------------------------------------------分割线------------------------------------------免费下载地址在 http://linux.linuxidc.com/用户名与密码都是www.linuxidc.com具体下载目录在 /2015年资料/4月/21日/Python访问MySQL数据库并实现其增删改查功能/下载方法见 http://www.linuxidc.com/Linux/2013-07/87684.htm------------------------------------------分割线------------------------------------------本文永久更新链接地址