Welcome 微信登录

首页 / 数据库 / MySQL / MySQL 5.5 DBA工具 多进程dump 多进程load 多进程备份还原 Python 脚本

需要安装 python MySQL-python gzip : yum install python MySQL-python gzip -ydump 脚本
  1. """"" 
  2. Created on 2012-8-20 
  3. mysql dump to load 
  4. @author: tudou@b2c.xiaomi.com 
  5. """  
  6. import os,time,MySQLdb,multiprocessing  
  7.   
  8. mysql_bak="/tmp/mysqlbak"  
  9. mysql_base="/opt/soft/mysql_5.5.25"  
  10. unix_socket="/tmp/mysql.sock"  
  11. dump_user="root"  
  12. dump_pwd="123456"  
  13. dump_database=["test","mysql"]  
  14.   
  15. def start_process():  
  16.     print ("MySQLdump starting", multiprocessing.current_process().name)  
  17.   
  18. class mysqldump(object):  
  19.     def __init__(self,conf):  
  20.         self.conf=conf  
  21.         self.dumpdir=mysql_bak  
  22.           
  23.     def dump(self):  
  24.         #create dir   
  25.         self.dumpdir += "/"+str(time.strftime("%Y-%m-%d-%H-%M-%S",time.localtime(time.time())))  
  26.         for dir in dump_database:  
  27.             os.system("mkdir -p "+self.dumpdir+"/"+dir)  
  28.             os.system("mkdir -p "+self.dumpdir+"/"+dir+"/schema")  
  29.         os.system("chmod 777 -R "+self.dumpdir)  
  30.         #get create table   
  31.         for dir in dump_database:  
  32.             self.getschemainfo(dir)  
  33.         #dump per table   
  34.         self.getdbinfo()  
  35.       
  36.     def getschemainfo(self,dbconf):  
  37.         os.system(mysql_base+"/bin/mysqldump -d --add-drop-table -u"+dump_user+" -p"+dump_pwd+" -S"+unix_socket+" "+dbconf+" > "+self.dumpdir+"/"+dbconf+"/schema/schemainfo" )  
  38.         os.system("gzip "+self.dumpdir+"/"+dbconf+"/schema/schemainfo")  
  39.         os.system(mysql_base+"/bin/mysqldump -tdRE -u"+dump_user+" -p"+dump_pwd+" -S"+unix_socket+" "+dbconf+" > "+self.dumpdir+"/"+dbconf+"/schema/objectinfo" )  
  40.         os.system("gzip "+self.dumpdir+"/"+dbconf+"/schema/objectinfo")  
  41.       
  42.     def getdbinfo(self):  
  43.         con=db(self.conf)  
  44.         sql="SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA IN ("""","".join(dump_database) +"")"  
  45.         re = list(con.execute(sql))  
  46.         #inputs=list()   
  47.         pool_size = multiprocessing.cpu_count()  
  48.         pool = multiprocessing.Pool(processes=pool_size,initializer=start_process,)  
  49.         for tb in re:  
  50.             #inputs.append({"dbname":tb[0],"tablename":tb[1]})   
  51.             pool.apply_async(self.dumplay({"dbname":tb[0],"tablename":tb[1]}))  
  52.             #self.dumplay({"dbname":tb[0],"tablename":tb[1]})   
  53.         #print inputs   
  54.         pool.close() # no more tasks   
  55.         pool.join()  # wrap up current tasks   
  56.           
  57.     def dumplay(self,dbconf):  
  58.         loadname=self.dumpdir+"/"+dbconf["dbname"]+"/"+dbconf["tablename"]+".sql"  
  59.         con=db(self.conf)  
  60.         sql="SELECT * FROM `"+dbconf["dbname"]+"`.`"+dbconf["tablename"]+"` INTO OUTFILE ""+loadname+"""  
  61.         #print sql   
  62.         con.executeNoQuery(sql)  
  63.         self.dogzip(loadname)  
  64.           
  65.     def dogzip(self,fileconf):  
  66.         os.system("gzip "+fileconf)  
  67.           
  68. """"" 
  69.  
  70. """  
  71. class MySQLHelper(object):  
  72.     @staticmethod  
  73.     def getConn(conf):  
  74.         pot = 3306  
  75.         if(conf.has_key("port")):  
  76.             pot=conf["port"]  
  77.         dbname="test"  
  78.         if(conf.has_key("db")):  
  79.             dbname=conf["db"]  
  80.           
  81.         if(conf.has_key("socket")):  
  82.             return MySQLdb.connect(host=conf["host"],unix_socket=conf["socket"],user=conf["user"],passwd=conf["pwd"],db=dbname)  
  83.         else:  
  84.             return MySQLdb.connect(host=conf["host"],port=pot,user=conf["user"],passwd=conf["pwd"],db=dbname)  
  85. """"" 
  86.  
  87. """  
  88. class db (object):  
  89.     def __init__(self,conf):  
  90.         self.conn=None  
  91.         self.conn=MySQLHelper.getConn(conf)  
  92.       
  93.     def execute(self,sql,mod=""):  
  94.         if(mod=="dict"):  
  95.             cursor=self.conn.cursor(MySQLdb.cursors.DictCursor)  
  96.         else:  
  97.             cursor=self.conn.cursor()  
  98.         cursor.execute(sql)  
  99.         set=cursor.fetchall()  
  100.         return set  
  101.       
  102.     def executeNoQuery(self,sql,param={}):  
  103.         cursor=self.conn.cursor()  
  104.         try:  
  105.             if(param=={}):  
  106.                 rownum=cursor.execute(sql)  
  107.             else:  
  108.                 rownum=cursor.executemany(sql,param)  
  109.             self.conn.commit()  
  110.             return rownum  
  111.         finally:  
  112.             cursor.close()  
  113.       
  114.     def __del__(self):  
  115.         if (self.conn!=None):  
  116.             self.conn.close()  
  117.               
  118. if __name__ == "__main__":  
  119.     conf={"host":"localhost","socket":unix_socket,"user":dump_user,"pwd":dump_pwd,"db":"information_schema"}  
  120.     dump=mysqldump(conf);  
  121.     dump.dump();  
  122.     print "dump success"  
load 脚本
  1. """"" 
  2. Created on 2012-8-20 
  3. mysql dump to load 
  4. @author: tudou@b2c.xiaomi.com 
  5. """  
  6. import os,MySQLdb,multiprocessing  
  7.   
  8. mysql_bak="/tmp/mysqlbak/2012-09-18-21-44-34"  
  9. mysql_base="/opt/soft/mysql_5.5.25"  
  10. unix_socket="/tmp/mysql.sock"  
  11. dump_user="root"  
  12. dump_pwd="123456"  
  13. dump_database={"test":"test"}#dump_database={"test":"test","mysql":"mysql"}   
  14.   
  15. def start_process():  
  16.     print ("MySQLinput starting", multiprocessing.current_process().name)  
  17. #input shcema   
  18. #load data   
  19. #input object   
  20. class mysqlinput(object):  
  21.     def __init__(self,conf):  
  22.         self.conf=conf  
  23.           
  24.     def input(self):  
  25.         os.system("chmod 777 -R "+mysql_bak)  
  26.         dirnames=os.listdir(mysql_bak)  
  27.         for dirname in dirnames:  
  28.             #print dirname   
  29.             if dump_database.has_key(dirname):  
  30.                 self.inputschema(dirname)  
  31.                           
  32.         for dirname in dirnames:  
  33.             if dump_database.has_key(dirname):  
  34.                 self.loadata(dirname)  
  35.   
  36.         for dirname in dirnames:  
  37.             if dump_database.has_key(dirname):  
  38.                 self.inputobject(dirname)  
  39.           
  40.     def inputschema(self,dbconf):  
  41.         print "gunzip -cd "+mysql_bak+"/"+dbconf+"/schema/schemainfo.gz | "+mysql_base+"/bin/mysql -u"+dump_user+" -p****** -S"+unix_socket  
  42.         os.system("gunzip -cd "+mysql_bak+"/"+dbconf+"/schema/schemainfo.gz | "+mysql_base+"/bin/mysql -u"+dump_user+" -p"+dump_pwd+" -S"+unix_socket+" "+dbconf)  
  43.           
  44.     def loadata(self,dbconf):  
  45.         pool_size = multiprocessing.cpu_count()  
  46.         pool = multiprocessing.Pool(processes=pool_size,initializer=start_process,)  
  47.         filenames=os.listdir(mysql_bak+"/"+dbconf)  
  48.         for filename in filenames:  
  49.             filepath=mysql_bak+"/"+dbconf+"/"+filename  
  50.             if os.path.isfile(filepath):  
  51.                 fileconf={"dbname":dbconf,"gzfile":filepath,"filename":filepath,"tablename":filename[0:len(filename)-4]}  
  52.                 c=fileconf["gzfile"]  
  53.                 if c[len(c)-3:len(c)]==".gz":  
  54.                     fileconf={"dbname":dbconf,"gzfile":filepath,"filename":filepath[0:len(filepath)-3],"tablename":filename[0:len(filename)-7]}  
  55.                 pool.apply_async(self.mygunzip(fileconf))  
  56.                   
  57.         pool.close() # no more tasks   
  58.         pool.join()  # wrap up current tasks   
  59.           
  60.     def mygunzip(self,fileconf):  
  61.         c=fileconf["gzfile"]  
  62.         if c[len(c)-3:len(c)]==".gz":  
  63.             os.system("gunzip "+fileconf["gzfile"])  
  64.               
  65.         self.loadpertable(fileconf)  
  66.           
  67.     def loadpertable(self,fileconf):  
  68.         sql="TRUNCATE `"+fileconf["dbname"]+"`.`"+fileconf["tablename"]+"`;LOAD DATA INFILE ""+fileconf["filename"]+"" INTO TABLE `"+fileconf["dbname"]+"`.`"+fileconf["tablename"]+"`;"  
  69.         print sql  
  70.         os.system(mysql_base+"/bin/mysql -u"+dump_user+" -p"+dump_pwd+" -S"+unix_socket+" -e""+sql+""")  
  71.       
  72.     def inputobject(self,dbconf):  
  73.         print "gunzip -cd "+mysql_bak+"/"+dbconf+"/schema/objectinfo.sql.gz | "+mysql_base+"/bin/mysql -u"+dump_user+" -p****** -S"+unix_socket  
  74.         os.system("gunzip -cd "+mysql_bak+"/"+dbconf+"/schema/objectinfo.gz | "+mysql_base+"/bin/mysql -u"+dump_user+" -p"+dump_pwd+" -S"+unix_socket+" "+dbconf)  
  75.               
  76. if __name__ == "__main__":  
  77.     conf={"host":"localhost","socket":unix_socket,"user":dump_user,"pwd":dump_pwd,"db":"information_schema"}  
  78.     input=mysqlinput(conf);  
  79.     input.input();  
  80.     print "load success"  
Oracle with子句MySQL保存中文乱码的原因和解决办法相关资讯      MySQL基础知识 
  • MySQL增加普通用户后无法登陆问题  (12/15/2012 19:35:24)
  • MySQL连接查询精解  (12/11/2012 09:19:27)
  • MySQL备份与恢复的三种方法总结  (12/05/2012 12:15:32)
  • MySQL 多表 update sql语句总结  (12/11/2012 09:52:37)
  • Ubuntu下更改MySQL数据库文件的目  (12/08/2012 21:29:31)
  • MySQL游标循环示例  (12/05/2012 07:05:56)
本文评论 查看全部评论 (0)
表情: 姓名: 字数