Linux基于row模式的binlog,生成DML(insert/update/delete)的rollback语句
通过mysqlbinlog -v 解析binlog生成可读的sql文件
提取需要处理的有效sql
"### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误
将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行
INSERT: INSERT INTO => DELETE FROM, SET => WHERE
UPDATE: WHERE => SET, SET => WHERE
DELETE: DELETE FROM => INSERT INTO, WHERE => SET
用列名替换位置@{1,2,3}
通过desc table获得列顺序及对应的列名
特殊列类型value做特别处理
逆序
注意:
表结构与现在的表结构必须相同[谨记]
由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT
只能对INSERT/UPDATE/DELETE进行处理
mysql> select * from yoon;
+----------+------------+-----------+---------------------+
| actor_id | first_name | last_name | last_update |
+----------+------------+-----------+---------------------+
| 1 | HANK | YOON | 2006-02-15 04:34:33 |
| 2 | HANK | YOON | 2006-02-15 04:34:33 |
| 3 | HANK | YOON | 2006-02-15 04:34:33 |
| 4 | HANK | YOON | 2006-02-15 04:34:33 |
| 5 | HANK | YOON | 2006-02-15 04:34:33 |
| 6 | HANK | YOON | 2006-02-15 04:34:33 |
| 7 | HANK | YOON | 2006-02-15 04:34:33 |
| 8 | HANK | YOON | 2006-02-15 04:34:33 |
| 9 | HANK | YOON | 2006-02-15 04:34:33 |
| 10 | HANK | YOON | 2006-02-15 04:34:33 |
| 11 | HANK | YOON | 2006-02-15 04:34:33 |
+----------+------------+-----------+---------------------+
11 rows in set (0.00 sec)
mysql> delete from yoon;
Query OK, 11 rows affected (1.03 sec)
mysql> select * from yoon;
Empty set (0.00 sec)
命令之间的空格一定要注意,否则就会无法提取SQL语句:
[root@hank-yoon data]# perl binlog-rollback.pl -f "mysql-bin.000001" -o "/export/data/mysql/data/yoon.sql" -u "root" -p "yoon"
Warning: Using a password on the command line interface can be insecure.
[root@hank-yoon data]# ls
auto.cnf hank ibdata2 ib_logfile1 modify.pl mysql-bin.000001 performance_schema test yoon.sql
binlog-rollback.pl ibdata1 ib_logfile0 ib_logfile2 mysql mysql-bin.index sakila yoon
[root@hank-yoon data]# cat yoon.sql
INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=10, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=9, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=8, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=7, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=6, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=5, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=4, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=3, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=2, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
INSERT INTO `yoon`.`yoon` SET `actor_id`=1, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
mysql> INSERT INTO `yoon`.`yoon` SET `actor_id`=11, `first_name`="HANK", `last_name`="YOON", `last_update`=from_unixtime(1139949273);
Query OK, 1 row affected (0.01 sec)
mysql> select * from yoon;
+----------+------------+-----------+---------------------+
| actor_id | first_name | last_name | last_update |
+----------+------------+-----------+---------------------+
| 11 | HANK | YOON | 2006-02-15 04:34:33 |
+----------+------------+-----------+---------------------+
- #!/usr/lib/perl -w
- use strict;
- use warnings;
- use Class::Struct;
- use Getopt::Long qw(:config no_ignore_case); # GetOption
- # register handler system signals
- use sigtrap "handler", &sig_int, "normal-signals";
- # catch signal
- sub sig_int(){
- my ($signals) = @_;
- print STDERR "# Caught SIG$signals.
";
- exit 1;
- }
- my %opt;
- my $srcfile;
- my $host = "127.0.0.1";
- my $port = 3306;
- my ($user,$pwd);
- my ($MYSQL, $MYSQLBINLOG, $ROLLBACK_DML);
- my $outfile = "/dev/null";
- my (%do_dbs,%do_tbs);
- # tbname=>tbcol, tbcol: @n=>colname,type
- my %tbcol_pos;
- my $SPLITER_COL = ",";
- my $SQLTYPE_IST = "INSERT";
- my $SQLTYPE_UPD = "UPDATE";
- my $SQLTYPE_DEL = "DELETE";
- my $SQLAREA_WHERE = "WHERE";
- my $SQLAREA_SET = "SET";
- my $PRE_FUNCT = "========================== ";
- # =========================================================
- # 基于row模式的binlog,生成DML(insert/update/delete)的rollback语句
- # 通过mysqlbinlog -v 解析binlog生成可读的sql文件
- # 提取需要处理的有效sql
- # "### "开头的行.如果输入的start-position位于某个event group中间,则会导致"无法识别event"错误
- #
- # 将INSERT/UPDATE/DELETE 的sql反转,并且1个完整sql只能占1行
- # INSERT: INSERT INTO => DELETE FROM, SET => WHERE
- # UPDATE: WHERE => SET, SET => WHERE
- # DELETE: DELETE FROM => INSERT INTO, WHERE => SET
- # 用列名替换位置@{1,2,3}
- # 通过desc table获得列顺序及对应的列名
- # 特殊列类型value做特别处理
- # 逆序
- #
- # 注意:
- # 表结构与现在的表结构必须相同[谨记]
- # 由于row模式是幂等的,并且恢复是一次性,所以只提取sql,不提取BEGIN/COMMIT
- # 只能对INSERT/UPDATE/DELETE进行处理
- # ========================================================
- sub main{
- # get input option
- &get_options();
- #
- &init_tbcol();
- #
- &do_binlog_rollback();
- }
- &main();
- # ----------------------------------------------------------------------------------------
- # Func : get options and set option flag
- # ----------------------------------------------------------------------------------------
- sub get_options{
- #Get options info
- GetOptions(\%opt,
- "help", # OUT : print help info
- "f|srcfile=s", # IN : binlog file
- "o|outfile=s", # out : output sql file
- "h|host=s", # IN : host
- "u|user=s", # IN : user
- "p|password=s", # IN : password
- "P|port=i", # IN : port
- "start-datetime=s", # IN : start datetime
- "stop-datetime=s", # IN : stop datetime
- "start-position=i", # IN : start position
- "stop-position=i", # IN : stop position
- "d|database=s", # IN : database, split comma
- "T|table=s", # IN : table, split comma
- "i|ignore", # IN : ignore binlog check ddl and so on
- "debug", # IN : print debug information
- ) or print_usage();
- if (!scalar(%opt)) {
- &print_usage();
- }
- # Handle for options
- if ($opt{"f"}){
- $srcfile = $opt{"f"};
- }else{
- &merror("please input binlog file");
- }
- $opt{"h"} and $host = $opt{"h"};
- $opt{"u"} and $user = $opt{"u"};
- $opt{"p"} and $pwd = $opt{"p"};
- $opt{"P"} and $port = $opt{"P"};
- if ($opt{"o"}) {
- $outfile = $opt{"o"};
- # 清空 outfile
- `echo "" > $outfile`;
- }
- #
- $MYSQL = qq{mysql -h$host -u$user -p"$pwd" -P$port};
- &mdebug("get_options::MYSQL
$MYSQL");
- # 提取binlog,不需要显示列定义信息,用-v,而不用-vv
- $MYSQLBINLOG = qq{mysqlbinlog -v};
- $MYSQLBINLOG .= " --start-position=".$opt{"start-position"} if $opt{"start-position"};
- $MYSQLBINLOG .= " --stop-position=".$opt{"stop-position"} if $opt{"stop-postion"};
- $MYSQLBINLOG .= " --start-datetime="".$opt{"start-datetime"}.""" if $opt{"start-datetime"};
- $MYSQLBINLOG .= " --stop-datetime="$opt{"stop-datetime"}"" if $opt{"stop-datetime"};
- $MYSQLBINLOG .= " $srcfile";
- &mdebug("get_options::MYSQLBINLOG
$MYSQLBINLOG");
- # 检查binlog中是否含有 ddl sql: CREATE|ALTER|DROP|RENAME
- &check_binlog() unless ($opt{"i"});
- # 不使用mysqlbinlog过滤,USE dbname;方式可能会漏掉某些sql,所以不在mysqlbinlog过滤
- # 指定数据库
- if ($opt{"d"}){
- my @dbs = split(/,/,$opt{"d"});
- foreach my $db (@dbs){
- $do_dbs{$db}=1;
- }
- }
- # 指定表
- if ($opt{"T"}){
- my @tbs = split(/,/,$opt{"T"});
- foreach my $tb (@tbs){
- $do_tbs{$tb}=1;
- }
- }
- # 提取有效DML SQL
- $ROLLBACK_DML = $MYSQLBINLOG." | grep "^### "";
- # 去掉注释: "### " -> ""
- # 删除首尾空格
- $ROLLBACK_DML .= " | sed "s/###\s*//g;s/\s*$//g"";
- &mdebug("rollback dml
$ROLLBACK_DML");
-
- # 检查内容是否为空
- my $cmd = "$ROLLBACK_DML | wc -l";
- &mdebug("check contain dml sql
$cmd");
- my $size = `$cmd`;
- chomp($size);
- unless ($size >0){
- &merror("binlog DML is empty:$ROLLBACK_DML");
- };
- }
- # ----------------------------------------------------------------------------------------
- # Func : check binlog contain DDL
- # ----------------------------------------------------------------------------------------
- sub check_binlog{
- &mdebug("$PRE_FUNCT check_binlog");
- my $cmd = "$MYSQLBINLOG ";
- $cmd .= " | grep -E -i "^(CREATE|ALTER|DROP|RENAME)" ";
- &mdebug("check binlog has DDL cmd
$cmd");
- my $ddlcnt = `$cmd`;
- chomp($ddlcnt);
- my $ddlnum = `$cmd | wc -l`;
- chomp($ddlnum);
- my $res = 0;
- if ($ddlnum>0){
- # 在ddl sql前面加上前缀<DDL>
- $ddlcnt = `echo "$ddlcnt" | sed "s/^//g"`;
- &merror("binlog contain $ddlnum DDL:$MYSQLBINLOG. ddl sql:
$ddlcnt");
- }
- return $res;
- }
- # ----------------------------------------------------------------------------------------
- # Func : init all table column order
- # if input --database --table params, only get set table column order
- # ----------------------------------------------------------------------------------------
- sub init_tbcol{
- &mdebug("$PRE_FUNCT init_tbcol");
- # 提取DML语句
- my $cmd .= "$ROLLBACK_DML | grep -E "^(INSERT|UPDATE|DELETE)"";
- # 提取表名,并去重
- #$cmd .= " | awk "{if ($1 ~ "^UPDATE") {print $2}else {print $3}}" | uniq ";
- $cmd .= " | awk "{if ($1 ~ "^UPDATE") {print $2}else {print $3}}" | sort | uniq ";
- &mdebug("get table name cmd
$cmd");
- open ALLTABLE, "$cmd | " or die "can"t open file:$cmd
";
- while (my $tbname = <ALLTABLE>){
- chomp($tbname);
- #if (exists $tbcol_pos{$tbname}){
- # next;
- #}
- &init_one_tbcol($tbname) unless (&ignore_tb($tbname));
-
- }
- close ALLTABLE or die "can"t close file:$cmd
";
- # init tb col
- foreach my $tb (keys %tbcol_pos){
- &mdebug("tbname->$tb");
- my %colpos = %{$tbcol_pos{$tb}};
- foreach my $pos (keys %colpos){
- my $col = $colpos{$pos};
- my ($cname,$ctype) = split(/$SPLITER_COL/, $col);
- &mdebug(" pos->$pos,cname->$cname,ctype->$ctype");
- }
- }
- };
- # ----------------------------------------------------------------------------------------
- # Func : init one table column order
- # ----------------------------------------------------------------------------------------
- sub init_one_tbcol{
- my $tbname = shift;
- &mdebug("$PRE_FUNCT init_one_tbcol");
- # 获取表结构及列顺序
- my $cmd = $MYSQL." --skip-column-names --silent -e "desc $tbname"";
- # 提取列名,并拼接
- $cmd .= " | awk -F"\t" "{print NR"$SPLITER_COL`"$1"`$SPLITER_COL"$2}"";
- &mdebug("get table column infor cmd
$cmd");
- open TBCOL,"$cmd | " or die "can"t open desc $tbname;";
- my %colpos;
- while (my $line = <TBCOL>){
- chomp($line);
- my ($pos,$col,$coltype) = split(/$SPLITER_COL/,$line);
- &mdebug("linesss=$line
pos=$pos
col=$col
type=$coltype");
- $colpos{$pos} = $col.$SPLITER_COL.$coltype;
- }
- close TBCOL or die "can"t colse desc $tbname";
- $tbcol_pos{$tbname} = \%colpos;
- }
- # ----------------------------------------------------------------------------------------
- # Func : rollback sql: INSERT/UPDATE/DELETE
- # ----------------------------------------------------------------------------------------
- sub do_binlog_rollback{
- my $binlogfile = "$ROLLBACK_DML ";
- &mdebug("$PRE_FUNCT do_binlog_rollback");
- # INSERT|UPDATE|DELETE
- my $sqltype;
- # WHERE|SET
- my $sqlarea;
-
- my ($tbname, $sqlstr) = ("", "");
- my ($notignore, $isareabegin) = (0,0);
- # output sql file
- open SQLFILE, ">> $outfile" or die "Can"t open sql file:$outfile";
- # binlog file
- open BINLOG, "$binlogfile |" or die "Can"t open file: $binlogfile";
- while (my $line = <BINLOG>){
- chomp($line);
- if ($line =~ /^(INSERT|UPDATE|DELETE)/){
- # export sql
- if ($sqlstr ne ""){
- $sqlstr .= ";
";
- print SQLFILE $sqlstr;
- &mdebug("export sql
".$sqlstr);
- $sqlstr = "";
- }
- if ($line =~ /^INSERT/){
- $sqltype = $SQLTYPE_IST;
- $tbname = `echo "$line" | awk "{print $3}"`;
- chomp($tbname);
- $sqlstr = qq{DELETE FROM $tbname};
- }elsif ($line =~ /^UPDATE/){
- $sqltype = $SQLTYPE_UPD;
- $tbname = `echo "$line" | awk "{print $2}"`;
- chomp($tbname);
- $sqlstr = qq{UPDATE $tbname};
- }elsif ($line =~ /^DELETE/){
- $sqltype = $SQLTYPE_DEL;
- $tbname = `echo "$line" | awk "{print $3}"`;
- chomp($tbname);
- $sqlstr = qq{INSERT INTO $tbname};
- }
- # check ignore table
- if(&ignore_tb($tbname)){
- $notignore = 0;
- &mdebug("#IGNORE#:line:".$line);
- $sqlstr = "";
- }else{
- $notignore = 1;
- &mdebug("#DO#:line:".$line);
- }
- }else {
- if($notignore){
- &merror("can"t get tbname") unless (defined($tbname));
- if ($line =~ /^WHERE/){
- $sqlarea = $SQLAREA_WHERE;
- $sqlstr .= qq{ SET};
- $isareabegin = 1;
- }elsif ($line =~ /^SET/){
- $sqlarea = $SQLAREA_SET;
- $sqlstr .= qq{ WHERE};
- $isareabegin = 1;
- }elsif ($line =~ /^@/){
- $sqlstr .= &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);
- $isareabegin = 0;
- }else{
- &mdebug("::unknown sql:".$line);
- }
- }
- }
- }
- # export last sql
- if ($sqlstr ne ""){
- $sqlstr .= ";
";
- print SQLFILE $sqlstr;
- &mdebug("export sql
".$sqlstr);
- }
-
- close BINLOG or die "Can"t close binlog file: $binlogfile";
- close SQLFILE or die "Can"t close out sql file: $outfile";
- # 逆序
- # 1!G: 只有第一行不执行G, 将hold space中的内容append回到pattern space
- # h: 将pattern space 拷贝到hold space
- # $!d: 除最后一行都删除
- my $invert = "sed -i "1!G;h;$!d" $outfile";
- my $res = `$invert`;
- &mdebug("inverter order sqlfile :$invert");
- }
- # ----------------------------------------------------------------------------------------
- # Func : transfer column pos to name
- # deal column value
- #
- # &deal_col_value($tbname, $sqltype, $sqlarea, $isareabegin, $line);
- # ----------------------------------------------------------------------------------------
- sub deal_col_value($$$$$){
- my ($tbname, $sqltype, $sqlarea, $isareabegin, $line) = @_;
- &mdebug("$PRE_FUNCT deal_col_value");
- &mdebug("input:tbname->$tbname,type->$sqltype,area->$sqlarea,areabegin->$isareabegin,line->$line");
- my @vals = split(/=/, $line);
- my $pos = substr($vals[0],1);
- my $valstartpos = length($pos)+2;
- my $val = substr($line,$valstartpos);
- my %tbcol = %{$tbcol_pos{$tbname}};
- my ($cname,$ctype) = split(/$SPLITER_COL/,$tbcol{$pos});
- &merror("can"t get $tbname column $cname type") unless (defined($cname) || defined($ctype));
- &mdebug("column infor:cname->$cname,type->$ctype");
- # join str
- my $joinstr;
- if ($isareabegin){
- $joinstr = " ";
- }else{
- # WHERE 被替换为 SET, 使用 , 连接
- if ($sqlarea eq $SQLAREA_WHERE){
- $joinstr = ", ";
- # SET 被替换为 WHERE 使用 AND 连接
- }elsif ($sqlarea eq $SQLAREA_SET){
- $joinstr = " AND ";
- }else{
- &merror("!!!!!!The scripts error");
- }
- }
-
- #
- my $newline = $joinstr;
- # NULL value
- if (($val eq "NULL") && ($sqlarea eq $SQLAREA_SET)){
- $newline .= qq{ $cname IS NULL};
- }else{
- # timestamp: record seconds
- if ($ctype eq "timestamp"){
- $newline .= qq{$cname=from_unixtime($val)};
- # datetime: @n=yyyy-mm-dd hh::ii::ss
- }elsif ($ctype eq "datetime"){
- $newline .= qq{$cname="$val"};
- }else{
- $newline .= qq{$cname=$val};
- }
- }
- &mdebug(" old>$line
new>$newline");
-
- return $newline;
- }
- # ----------------------------------------------------------------------------------------
- # Func : check is ignore table
- # params: IN table full name # format:`dbname`.`tbname`
- # RETURN:
- # 0 not ignore
- # 1 ignore
- # ----------------------------------------------------------------------------------------
- sub ignore_tb($){
- my $fullname = shift;
- # 删除`
- $fullname =~ s/`//g;
- my ($dbname,$tbname) = split(/./,$fullname);
- my $res = 0;
-
- # 指定了数据库
- if ($opt{"d"}){
- # 与指定库相同
- if ($do_dbs{$dbname}){
- # 指定表
- if ($opt{"T"}){
- # 与指定表不同
- unless ($do_tbs{$tbname}){
- $res = 1;
- }
- }
- # 与指定库不同
- }else{
- $res = 1;
- }
- }
- #&mdebug("Table check ignore:$fullname->$res");
- return $res;
- }
- # ----------------------------------------------------------------------------------------
- # Func : print debug msg
- # ----------------------------------------------------------------------------------------
- sub mdebug{
- my (@msg) = @_;
- print "@msg
" if ($opt{"debug"});
- }
- # ----------------------------------------------------------------------------------------
- # Func : print error msg and exit
- # ----------------------------------------------------------------------------------------
- sub merror{
- my (@msg) = @_;
- print ":@msg
";
- &print_usage();
- exit(1);
- }
- # ----------------------------------------------------------------------------------------
- # Func : print usage
- # ----------------------------------------------------------------------------------------
- sub print_usage{
- print <<EOF;
- ==========================================================================================
- Command line options :
- --help # OUT : print help info
- -f, --srcfile # IN : binlog file. [required]
- -o, --outfile # OUT : output sql file. [required]
- -h, --host # IN : host. default "127.0.0.1"
- -u, --user # IN : user. [required]
- -p, --password # IN : password. [required]
- -P, --port # IN : port. default "3306"
- --start-datetime # IN : start datetime
- --stop-datetime # IN : stop datetime
- --start-position # IN : start position
- --stop-position # IN : stop position
- -d, --database # IN : database, split comma
- -T, --table # IN : table, split comma. [required] set -d
- -i, --ignore # IN : ignore binlog check contain DDL(CREATE|ALTER|DROP|RENAME)
- --debug # IN : print debug information
- Sample :
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd"
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd" -i
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd" --debug
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -h "192.168.1.2" -u "user" -p "pwd" -P 3307
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd" --start-position=107
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd" --start-position=107 --stop-position=10000
- shell> perl binlog-rollback.pl -f "mysql-bin.000001" -o "/tmp/t.sql" -u "user" -p "pwd" -d "db1,db2"
- shell> perl binlog-rollback.pl -f "mysql-bin.0000*" -o "/tmp/t.sql" -u "user" -p "pwd" -d "db1,db2" -T "tb1,tb2"
- ==========================================================================================
- EOF
- exit;
- }
- 1;
关于使用MySQL binlog对数据进行恢复的实战 http://www.linuxidc.com/Linux/2016-01/127808.htmMySQL 5.6.12切换binlog二进制日志路径 http://www.linuxidc.com/Linux/2016-01/126979.htmmysqlbinlog解析binlog乱码问题解密 http://www.linuxidc.com/Linux/2016-01/126978.htmMySQL通过binlog来恢复数据 http://www.linuxidc.com/Linux/2015-12/126897.htmMySQL binlog 组提交与 XA(两阶段提交) http://www.linuxidc.com/Linux/2015-11/124942.htmMySQL使用备份和binlog进行数据恢复 http://www.linuxidc.com/Linux/2014-12/110875.htm
本文永久更新链接地址