Mailinglisten-Archive |
Hier sind zwei brauchbare Skripte zur Server -Synchronisation mit MySQL .... Vielleicht müsst Ihr die noch mit recode latin1:ibmpc datei ... oder UNIX2DOS Datei in DOSE Format konvertieren ... Ich hab' nur UNIX.... Gru/3, Guido -----Ursprüngliche Nachricht----- Von: Alexander Meis <simmail_(at)_pixelhouse.de> An: <mysql-de_(at)_lists.4t2.com> Gesendet: Donnerstag, 9. September 1999 18:16 Betreff: Server Synchronisieren ? > Hi... > > Ich würde gerne 2 Mysql Server synchronisieren und zwar in der Art das sich > Server 2 an Server 1 Updatet einmal in der Stunde oder so. > Weis jemand von euch ne Möglichkeit wie man das machen kann ? > > Gruss Alex > > --- > *** Abmelden von dieser Mailingliste funktioniert per E-Mail > *** an mysql-de-request_(at)_lists.4t2.com mit Betreff/Subject: unsubscribe
mysqlsync a.k.a "the mysql-syncer-upper-script-thingy" mysqlsync (a.k.a the mysql-syncer-upper-script-thingy) Current version: 1.0-alpha Current status: experimental Author/Maintainer: Mark Jeftovic Latest version: http://www.shmooze.net/~markjr/mysqlsync/ This is mysqlsync, a script that endevours to keep remote copies of a mysql database in sync with a master copy. Run "mysqlsync -u" for usage. </code> 0.0 CONTENTS: 1.0 REQUIREMENTS 2.0 INSTALLATION & USAGE 3.0 CAVEATS, BUGS & TODO 1.0 REQUIREMENTS: 1) perl 5 2) the Mysql perl module 2) mysql server and client libs 2.0 INSTALLATION & USAGE: 1) You need to turn on logging on your mysqld. You can do this by changing the following lines in your safe_mysqld script: from: <code> if test "$#" -eq 0 then nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \ --skip-locking >> $err 2>&1 else nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \ --skip-locking "$_(at)_" >> $err 2>&1 fi to: if test "$#" -eq 0 then nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \ --log --skip-locking >> $err 2>&1 else nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \ --log --skip-locking "$_(at)_" >> $err 2>&1 fi </code> (NOTE: depending on your platform the --skip-locking option may or may not be present, search on "nohup" and edit those lines.) 2) edit the global variables $MYSQL_DIR, $DEFAULT_LOG et al to reflect your system. This was developed on a box where mysqld is in a nonstandard place so $MYSQL_DIR *WILL LIKELY BE WRONG* for your box. 3) the mysql grant tables must be set up to accept connections from the master host. mysqlsync will connect as whatever user connects to the master. 4) when mysqlsync catches a SIGTERM or a SIGINT it writes it's current position in the mysql log to mysqlsync.offset. Whenever mysqlsync starts it looks for this file and seeks to that position in the log file and begins processing from there. If no offset file is present, and no -s option is specified, mysqlsync defaults to 80 bytes before EOF. 3.0) CAVEATS, BUGS & TODO: 1) multiple databases are not yet supported. If you want to sync more than one database, run multiple instances of mysqlsync 2) depending on your traffic, mysql logs can get very large, very fast. Be aware of this and rotate accordingly. We rotate our mysql log nightly. mysqlsync will detect if it's current postion is larger than the file size, assume the log has been rotated, and reset it's file pointer back to the beginning of the log. 3) different passwords for different users are not supported. Set $PASSWD in the script, and if you do, you should remember to chmod 700 or 750 depending on your situation. 4) you should use FQDN's when specifying your host list. In order to prevent loops, mysqlsync checks if it's connection is coming from a slave and ignores any queries that do. If you want to allow updates from the remote hosts, run mysqlsync on the remote hosts. 5) mysqlync should support DBD::Mysqld, but it doesn't at the moment. <code> -------------------------------------------------------------------------------- #!/usr/bin/perl # # mysqlsync v1.0-alpha (a.k.a the mysql-syncer-upper-script-thingy) # keep remote copies of a mysql database in sync with a master # by mark jeftovic <markjr_(at)_easyDNS.com> # copyright feb/1999 easyDNS Technologies Inc. # # All rights reserved. # # This code provided "As Is" with no warrantees express or implied. # The author and contributors are not liable for anything good or # bad that results from your use of this code. # # You are free to distribute this for free provided this notice is # included. Please forward fixes/enhancements to the author for # inclusion in the next revision. # $VERSION = "v1.0-alpha"; $MAINTAINER = 'markjr_(at)_easydns.com'; use Mysql; use Getopt::Std; use Sys::Hostname; $thishost = hostname; use File::Basename; _(at)_SAVE_ARGV=_(at)_ARGV; $script = basename($0); getopts("d:t:h:l:o:vs:bu"); if($opt_u) { print<<"EOF"; $script $VERSION (a.k.a the mysql-syncer-upper-script-thingy) email: $MAINTAINER Usage: $script -d <dbase> -h <hosts> -t <tables> \\ [ -o <operations> -l <mysql_log> -s <offset,whence> -b -v -u ] Where: -d database to keep in sync -h comma seperated list of remote hosts -t comma seperated list of tables Optional switches: -o comma seperated list of operations to sync (defaults to: update,insert,delete) -l alternate mysql_log -s start from offset relative to whence (defaults to value in \$OFFSET_LOG,0 if present, otherwise 80,2) -b fork a daemon -v verbose mode (logs operations to \$SYNC_LOG) -u this message EOF exit; } $MYSQL_DIR = "/export/mysql"; $DEFAULT_LOG = sprintf("%s/var/%s.log", $MYSQL_DIR, $thishost); $OFFSET_LOG = $MYSQL_DIR."/mysqlsync.offset"; $PID_FILE = $MYSQL_DIR."/mysqlsync.pid"; $SYNC_LOG = $MYSQL_DIR."/mysqlsync.log"; $DEFAULT_OFFSET = 80; $DEBUG_LEVEL = 0; $SLEEP = 1; # sleep value between log polls $PASSWD = ''; # chmod this script appropriately if you set this $MYSQL_LOG = $opt_l ? $opt_l : $DEFAULT_LOG; _(at)_OP = $opt_o ? split(/\,/,$opt_o) : qw(insert update delete); $opt_h ? _(at)_HOST = split(/\,/,$opt_h) : die "Need host list use \"$script -u\" for usage"; $opt_t ? _(at)_TABLE = split(/\,/,$opt_t) : die "Need table list use \"$script -u\" for usage"; $opt_d ? _(at)_DBASE = split(/\,/,$opt_d) : die "Need dbase use \"$script -u\" for usage"; if(_(at)_DBASE>1) { print "$script $VERSION does not yet support multiple databases\n"; print "run another process for each additional database.\n"; exit(-1); } if(-f $SYNC_LOG){ open(SYNC_LOG, ">>$SYNC_LOG") or die $!; } else { open(SYNC_LOG, ">$SYNC_LOG") or die $!; } select SYNC_LOG; $|=1; &logit("mysqlsync $VERSION <$MAINTAINER> starting."); &write_pid($PID_FILE); ($offset,$pos)=&get_offset(); BEGIN: open(LOG, $MYSQL_LOG) or die $!; seek(LOG, $offset, $pos); &logit("Seeking to $offset, relative to $pos in $MYSQL_LOG") if($opt_v); &init_signals; &daemon if($opt_b); for(;;) { while($line=<LOG>){ print STDERR $line if($DEBUG_LEVEL>=3); # if($line=~/\d+\s+\d\d:\d\d:\d\d\s+(\d+)\sConnect\s+(.+)_(at)_(.+)\s+on/) { if($line=~/\s+(\d+)\sConnect\s+(.+)_(at)_(.+)\s+on/) { if(!grep(/^$3$/,_(at)_HOST)){ ${$1}{host}=$3; ${$1}{user}=$2; print STDERR "host: ${$1}{host} / ${$1}{user}\n" if($DEBUG_LEVEL); } } elsif($line=~/\d+\s+\d\d:\d\d:\d\d\s(\d+)\sQuit/) { if(defined(%{$1})) { undef(%{$1}); } } elsif($line=~/\s+(\d+)\sInit\sDB\s+\W(.+)\W/) { if(defined(%{$1})){ if(grep(/^$2$/,_(at)_DBASE)) { ${$1}{dbase}=$2; print STDERR "dbase: ${$1}{dbase}\n" if($DEBUG_LEVEL); } # else { undef(%{$1}); } } } elsif($line=~/\s+(\d+)\s+Query\s+(\w+)\s+(.+)$/) { if(defined(%{$1})){ if(grep(/^$2$/i,_(at)_OP)) { ${$1}{op}=$2; print STDERR "op: ${$1}{op} ($3)\n" if($DEBUG_LEVEL); ${$1}{table}=get_tbl(${$1}{op}, $3); print STDERR "table: ${$1}{table}\n" if($DEBUG_LEVEL); if(grep(/^${$1}{table}$/,_(at)_TABLE)) { ${$1}{query}=sprintf("%s %s", ${$1}{op}, $3); foreach $h (_(at)_HOST) { print STDERR "$h/${$1}{dbase}/${$1}{user}: ${$1}{query}\n" if($DEBUG_LEVEL); &logit("On $h: ".${$1}{query}) if($opt_v); $dbh = Mysql->Connect($h, ${$1}{dbase}, ${$1}{user},$PASSWD); eval '$sth = $dbh->Query("${$1}{query}") || &logit("ERROR: ".$dbh->errmsg)'; &logit("ERROR: ".$_(at)_) if($_(at)_); undef $dbh; } } } } } seek(LOG,0,1); $cur_log_pos = tell LOG; print STDERR "Seeking to $cur_log_pos\n" if($DEBUG_LEVEL>=2); } $cur_log_size = (stat $MYSQL_LOG)[7]; if($cur_log_size<$cur_log_pos) { &logit("\$cur_log_size < \$cur_log_pos ($cur_log_size/$cur_log_pos), log rotated?") if($opt_v); &logit("Resetting log pointer"); close(LOG); $offset=$pos=$cur_log_pos=0; goto BEGIN; } sleep($SLEEP); } sub write_pid { my($pid_file)=_(at)__; open(PID, ">$pid_file") || warn $!; print PID $$; close(PID); } sub get_tbl { my($op, $query)=_(at)__; if($op =~ /^update$/i) { $query =~ /^\s*(\w+)\s+set\s+.+/; return($1); } elsif ($op =~ /^delete$/i) { $query =~ /^from\s+(\w+)\s+.*/; return($1); } elsif ($op =~ /^insert$/i) { $query =~ /^into\s+(\w+)\s+.*/; return($1); } elsif ($op =~ /^drop$/i) { $query =~ /^table\s+(\w+)\s+.*/; return($1); } # select serves no real purpose in sync-ing other than debugging elsif ($op =~ /^select$/i) { $query =~ /^.+\s+from\s+(\w+)\s+.*/; return($1); } return(undef); } sub get_offset { if($opt_s) { ($offset,$pos)=split(/\,/,$opt_s); } elsif(-f $OFFSET_LOG) { $pos = 0; open(OFFSET, $OFFSET_LOG) || die $!; chomp($offset=<OFFSET>); close(OFFSET); } else { print STDERR "warning: no $OFFSET_LOG and no -s option, using $DEFAULT_OFFSET\n"; $pos = 2; $offset = $DEFAULT_OFFSET; } return($offset,$pos); } sub init_signals { $SIG{'INT'} = \&do_kill; $SIG{'QUIT'} = \&do_kill; $SIG{'TERM'} = \&do_kill; } sub logit { my($msg)=_(at)__; chomp($msg); chomp($now=scalar localtime(time)); print SYNC_LOG "[$now] $msg\n"; } sub do_kill { &logit("Caught SIGTERM -exiting.") if($opt_v); open(OFFSET, ">$OFFSET_LOG") || die $!; print OFFSET tell(LOG); close(OFFSET); exit; } # this routine was pilfered from lbnamed -mark sub daemon { local(*TTY,*NULL); exit(0) if (fork); write_pid($PID_FILE); if (open(NULL,"/dev/null")) { open(STDIN,">&NULL") || close(STDIN); open(STDOUT,">&NULL") || close(STDOUT); open(STDERR,">&NULL") || close(STDERR); } else { close(STDIN); close(STDOUT); close(STDERR); } eval 'require "sys/ioctl.ph";'; return if !defined(&TIOCNOTTY); open(TTY,"+>/dev/tty") || return; ioctl(TTY,&TIOCNOTTY,0); close(TTY); }
#!/usr/local/bin/perl =pod =head1 NAME mysql_replicate - realtime replication a mysql database server =head1 SYNOPSIS mysql_replicate.pl --path I<path> --remote-server I<name> [--resync-with-remote-server] [--force-resync I<database>] [--exclude-database I<database>] [--maintain-state-database] =head1 PREREQUISITES requires that DBI with the MySQL drivers be installed =head1 DESCRIPTION Replicates all changes to databases on a mysql server to a remote mysql server. =head1 OPTIONS =over 4 =item --path Specifies the path to the update logs. logfiles should be named like "update.*". mysql_replicate will create a status directory called replica_logs in this directory, so the user who is running mysql_replicate will need the appropriate permissions to create that directory. =item --remote-server The hostname of the remote server to replicate to. =item --resync-with-remote-server If a database doesn't exist on the remote server, recreate it. =item --force-resync I<database> Force a resync/recreation of a given database on the remote server =item --exclude-databases I<database> Exclude a database from being mirrored on the remote server =item --maintain-state-database Create and update a database on the local server called "replica". This database has information that reflects the status of the remote replica. If this item is specified, one can trigger force-resync's without having to restart the process by updating the 'force_resync' field with a comma seperated list of databases to resync in the row for the given remote server. =back =head1 USAGE =over 4 =item Create a "replica" user on all servers involved. This user should have "root" access to all databases on the server(s). edit the script, and change the variables $replicate_user and $replicate_password as appropriate. =item Set up the server for logging updates. The mysql server should be running with the --log-update option configured as follows: --log-update=<path>/update this <path> is what you should specify for the path option to the script at runtime. The logs will be rotated every ten minutes. mysql_replicate will create a status directory in <path>, so the user who is running mysql_replicate will need write access to that directory. =item NOTE: logfiles that are no longer required will be I<deleted>! if multiple mysql_replicates are running to multiple servers, the lowest needed update log will still be tracked amongst the running mysql_replicates. =back if the remote server should disappear, the script will keep retrying the connection/replication transaction until it is successful. =head1 SEE ALSO =head1 NOTES =over 4 =item Should daemonize, and run as the mysql-user. This implies parsing the my.cnf file. =item Should write a utility script to help users set force-resyncs if running with the --maintain-state-database option. =back =head1 AUTHOR =over 4 =item Andrew Elble elble_(at)_icculus.nsg.nwu.edu =back =head1 BUGS =over 4 =item probably some more lurking in there somewhere. =back =cut use strict; use DBI; use FileHandle; use Getopt::Long; # # global variables # my %hold_commit; my _(at)_exclude_databases; my _(at)_force_resync; my ($pathname, $remote_server, $resync_with_remote_server, $maintain_state_database, $database, $default, $filename, $next_flush_time, $synced, $prefix, $extension, $curpos, $dbh, $localdbh); # # Get our options # GetOptions("path=s" => \$pathname, "remote-server=s" => \$remote_server, "resync-with-remote-server" => \$resync_with_remote_server, "exclude-databases=s" => \_(at)_exclude_databases, "force-resync=s" => \_(at)_force_resync, "maintain-state-database" => \$maintain_state_database); if (($pathname eq "") || ($remote_server eq "")) { print "you must specify a path and a remote server!\n"; exit(0); } my $replicate_username = ""; my $replicate_password = ""; my $logflush_interval = (30*60); my $fileprefix = "update"; my $synclogdir = "$pathname/replica_logs"; # # ROUTINE # reload_database # # DESCRIPTION # replicates the entire database on the remote system # sub reload_database { if (($resync_with_remote_server) || (grep(/\Q$database\E/,_(at)_force_resync))) { print "dumping $database to $remote_server\n"; if ($maintain_state_database) { replica_table_check(); $localdbh->do("use replica"); $localdbh->do("update replica_status set resyncing = '".$database."' where hostname = '".$remote_server."'"); } $default = undef; commit("drop database $database"); commit("create database $database"); commit("use $database"); $localdbh->do("SET SQL_LOG_UPDATE=0"); $localdbh->do("use $database"); # # get a write lock on all tables so that the data doesn't change. # my $query = $localdbh->prepare("show tables"); $query->execute; my $table = $query->fetchall_arrayref; my $i; my $dostring = "lock tables"; my $first = 1; foreach $i (0 .. $#{ $table }) { if ($first == 1) { $dostring .= " ".$table->[$i][0]." write"; $first = 0; } else { $dostring .= ",".$table->[$i][0]." write"; } } # print "LOCK: $dostring\n"; $localdbh->do($dostring); # # force a logflush. that way, any potential changes to this database # are only valid in the latest logfile. # $next_flush_time = 0; flush_logs(); # # find the highest numbered logfile, and temporarily block commits to the database # until we reach the highest numbered logfile. # opendir(LOGDIR,$pathname); my _(at)_logs = sort {$b cmp $a} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR)); closedir(LOGDIR); my $tmpfilename = $pathname."/".$logs[0]; if ( -f $tmpfilename) { print "updates for $database will recommence with: $tmpfilename\n"; my ($tmpprefix, $tmpextension) = ($tmpfilename =~ /(\w+)\.(\d+)/); $hold_commit{$tmpprefix.$tmpextension} = $database; } else { die "unable to keep track of logfile!"; } # # Create the tables and dump the data! # my $dostring = ""; foreach $i (0 .. $#{ $table }) { $dostring = "CREATE TABLE ".$table->[$i][0]." (\n"; my $subquery = $localdbh->prepare("show fields from ".$table->[$i][0]); $subquery->execute; my $subtable = $subquery->fetchall_arrayref; my %prikey; my %unikey; my %mulkey; my $subi; foreach $subi (0 .. $#{ $subtable }) { $dostring .= $subtable->[$subi][0]." ".$subtable->[$subi][1]; if ($subtable->[$subi][2] eq "") { $dostring .= " NOT NULL"; } if ($subtable->[$subi][3] eq "PRI") { $prikey{$subtable->[$subi][0]} = 1; } elsif ($subtable->[$subi][3] eq "UNI") { $unikey{$subtable->[$subi][0]} = 1; } elsif ($subtable->[$subi][3] eq "MUL") { $mulkey{$subtable->[$subi][0]} = 1; } if ($subtable->[$subi][4] eq "NULL") { $dostring .= " DEFAULT ''"; } elsif ($subtable->[$subi][4] ne "") { $dostring .= " DEFAULT '$subtable->[$subi][4]'"; } if ($subtable->[$subi][5] ne "") { $dostring .= " ".$subtable->[$subi][5]; } if ($subi < $#{ $subtable}) { $dostring .= ",\n"; } } my $key; foreach $key (keys %prikey) { $dostring .= ",\nPRIMARY KEY($key)"; } my $key; foreach $key (keys %unikey) { $dostring .= ",\nUNIQUE KEY($key)"; } my $key; foreach $key (keys %mulkey) { $dostring .= ",\nKEY($key)"; } $dostring .= ")\n\n"; $subquery->finish; $subtable = undef; # print "doing: $dostring"; $dbh->do($dostring); # # dump data (should lock tables as well!) # my $subquery = $localdbh->prepare("select * from ".$table->[$i][0]); $subquery->execute; my $subtable = $subquery->fetchall_arrayref; my $subi; $dbh->do("lock tables ".$table->[$i][0]." write"); foreach $subi (0 .. $#{ $subtable }) { my $subdostring = "INSERT INTO ".$table->[$i][0]." VALUES ("; my $j; foreach $j (0 .. $#{ $subtable->[$subi] }) { if ($subtable->[$subi][$j] ne "") { my $quoted = $localdbh->quote($subtable->[$subi][$j]); $subdostring .= $quoted; } else { $subdostring .= "''"; } if ($j < $#{ $subtable->[$subi] }) { $subdostring .= ","; } } $subdostring .= ")\n"; # print "insert: $subdostring"; $dbh->do($subdostring); } $dbh->do("unlock tables"); $subquery->finish; $subquery = undef; $subtable = undef; } $query->finish; $query = undef; $table = undef; # # exclude the database from updates 'till the selected log. # push(_(at)_exclude_databases, $database); print "excluding $database from updates until log: $tmpfilename\n"; $localdbh->do("unlock tables"); if ($maintain_state_database) { replica_table_check(); $localdbh->do("use replica"); $localdbh->do("update replica_status set resyncing = '' where hostname = '".$remote_server."'"); } } else { print "database $database doesn't exist on remote server!\n"; if (!(grep(/\Q$database\E/,_(at)_exclude_databases))) { print "adding $database to exclude list!\n"; push(_(at)_exclude_databases, $database); } } } # # ROUTINE # check_for_resyncs # # DESCRIPTION # checks to see if there are resync requests pending. # sub check_for_resyncs { if ($maintain_state_database) { $localdbh->do("use replica"); my $query = $localdbh->prepare("select force_resync, resyncing from replica_status where hostname = '".$remote_server."'"); $query->execute; my $table = $query->fetchall_arrayref; my $i; foreach $i (0 .. $#{ $table }) { if ($table->[$i][0] ne "") { my _(at)_tmppush = split(/\,/,$table->[$i][0]); my $tpsh; foreach $tpsh (_(at)_tmppush) { if (!(grep(/\Q$tpsh\E/,_(at)_force_resync))) { print "acknowledging resync request for: ".$tpsh."\n"; push(_(at)_force_resync,$tpsh); } } } if ($table->[$i][1] ne "") { my _(at)_tmppush = split(/\,/,$table->[$i][1]); my $tpsh; foreach $tpsh (_(at)_tmppush) { if (!(grep(/\Q$tpsh\E/,_(at)_force_resync))) { print "redoing interrupted resync for : ".$tpsh."\n"; push(_(at)_force_resync,$tpsh); } } } } $query->finish; $query = undef; $table = undef; $localdbh->do("update replica_status set force_resync='' where hostname = '".$remote_server."'"); } } # # ROUTINE # check_for_next_log_in_sequence # # DESCRIPTION # check to see if the next logfile is available # # RETURNS # the name of the next logfile (if next logfile is present) # undef otherwise # sub check_for_next_log_in_sequence { my $tmpprefix = $prefix; my $tmpextension = $extension; $tmpextension++; my $tempfilename = $pathname."/".$tmpprefix.".".$tmpextension; if ( -f $tempfilename) { return($tempfilename); } else { return(undef); } } # # ROUTINE # flush_old_update_files # # DESCRIPTION # discover what update logs are no longer needed, and remove them # sub flush_old_update_files { if (!stat($synclogdir)) { if (!mkdir($synclogdir,0755)) { die("can't create status directory\n"); } } opendir(SYNC,$synclogdir); my _(at)_logs = sort {$a cmp $b} (grep { -f "$synclogdir/$_" } readdir(SYNC)); closedir(SYNC); my $lowestlognumber = $extension; my $origlognumber = $lowestlognumber; my $file; foreach $file (_(at)_logs) { open(LOG, "$synclogdir/$file"); while (<LOG>) { chomp; my ($num,$pos) = ($_ =~ /(\d+)\:(\d+)/); if (($num < $lowestlognumber) && ($num ne "")) { $lowestlognumber = $num; } } close(LOG); } print "lowest needed file# is $lowestlognumber\n"; while ($lowestlognumber > 0) { $lowestlognumber--; my $rmfile = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber); if (-f $rmfile) { print "removing $rmfile\n"; unlink($rmfile); } } } # # ROUTINE # replica_table_check # # DESCRIPTION # check to see if the replica_status table exists. if not, create it. # sub replica_table_check { $localdbh->do("SET SQL_LOG_UPDATE=0"); if ($maintain_state_database) { $localdbh->do("use replica"); if ($localdbh->errstr =~ /Unknown database/) { print "creating replica database\n"; $localdbh->do("create database replica"); } $localdbh->do("select * from replica_status limit 1"); my $err = $localdbh->err; if ($err == 0) { $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')"); $localdbh->do("update replica_status set in_sync = '".$synced."' where hostname = '".$remote_server."'"); return 1; } print "creating replica tables\n"; $localdbh->do("CREATE table replica_status ( hostname char(255) NOT NULL, in_sync tinyint(1) default 0, resyncing char(255), force_resync char(255), PRIMARY KEY(hostname))"); $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')"); return 0; } } sub lost_sync { $synced = 0; replica_table_check(); print "lost sync\n"; } sub attained_sync { $synced = 1; replica_table_check(); print "attained sync\n"; } sub flush_logs { if ($next_flush_time < time()) { $localdbh->do("SET SQL_LOG_UPDATE=0"); $localdbh->do("flush logs"); $next_flush_time = time() + $logflush_interval; } return; } # # ROUTINE # mfile # # DESCRIPTION # 1.) finds the next logfile to work from. # 2.) determines the correct position within the file to work from # 3.) calls readfile() to open the file and process the updates. # sub mfile { my ($num, $pos); if ($filename eq "") { flush_logs(); lost_sync(); if ( -f "$synclogdir/$remote_server") { open(LOG, "$synclogdir/$remote_server"); while (<LOG>) { chomp; my ($tnum,$tpos) = ($_ =~ /(\d+)\:(\d+)/); if ($tnum ne "") { $num = $tnum; } if ($tpos ne "") { $pos = $tpos; } } close(LOG); $filename = "$pathname/$fileprefix.$num"; if ( -f $filename) { print "restarting with: $filename\n"; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = $pos; readfile(); } else { unlink("$synclogdir/$remote_server"); $filename = ""; } } if ($filename eq "") { opendir(LOGDIR,$pathname); my _(at)_logs = sort {$a cmp $b} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR)); closedir(LOGDIR); $filename = $pathname."/".$logs[0]; if ( -f $filename) { print "starting with: $filename\n"; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = 0; readfile(); } else { die "i can't find a log to start syncing with!"; } } } else { if (defined($dbh)) { local $SIG{ALRM} = sub { lost_sync(); }; alarm(2); $dbh->ping; alarm(0); local $SIG{ALRM} = undef; } if (my $tempfilename = check_for_next_log_in_sequence()) { close(IN); lost_sync(); print "switching to log: $tempfilename\n"; $filename = $tempfilename; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = 0; readfile(); } else { return; } } } sub write_log { alarm(0); my $reset_alarm = shift; if (($extension ne "") && ($curpos ne "")) { truncate(POSLOG,0); print POSLOG "$extension:$curpos\n"; } if ($reset_alarm == 1) { alarm(2); } } # # ROUTINE # commit # # DESCRIPTION # calls $dbh->do in a safe manner, and only when the correct conditions are met! # sub commit { my $line = shift; if ($line eq "") { return; } # print "commit: got $line\n"; if ($line =~ /^use (\w+)/i) { ($database) = ($line =~ /^use (\w+)/i); if (!(grep(/\Q$database\E/,_(at)_exclude_databases))) { print "switching to database $database\n"; } else { print "database $database has been excluded\n"; } while (!defined($dbh)) { sleep(1); $dbh = DBI->connect("DBI:mysql:$database:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 }); $dbh->do("SET SQL_LOG_UPDATE=0"); } } if (grep(/\Q$database\E/,_(at)_force_resync)) { print "forced resync of $database\n"; my _(at)_tmpforce_resync = grep(!/\Q$database\E/,_(at)_force_resync); _(at)_force_resync = _(at)_tmpforce_resync; lost_sync(); reload_database(); } my $result; my $first = 1; my $dbherr; my $dbherrstr; # print "$default / ".$dbh->errstr."\n"; while (($result eq "") && ($dbherr eq "") && (!(grep(/\Q$database\E/,_(at)_exclude_databases))) && (($default ne "set") || ($line =~ /^use (\w+)/i))) { if ($first == 0) { if ($synced == 1) { lost_sync(); } sleep(1); } # print "doing: $database: $default: $line\n"; $result = $dbh->do($line); $dbherr = $dbh->err; $dbherrstr = $dbh->errstr; if (($dbherr eq "") && ($line =~ /^use (\w+)/i)) { $default = ""; } if ($dbherrstr =~ /Unknown database/) { reload_database(); $default = ""; } $first = 0; } } # # ROUTINE # readfile # # DESCRIPTION # 1.) opens the update log, and seeks to the correct position. # 2.) sends the updates from the log to the remote server. # sub readfile { # # remove holddown for database that has been recently loaded to the remote server. # if (defined($hold_commit{$prefix.$extension})) { print "removing database $hold_commit{$prefix.$extension} from exclude list\n"; my _(at)_tmpexclude_databases = grep(!/\Q$hold_commit{$prefix.$extension}\E/,_(at)_exclude_databases); _(at)_exclude_databases = _(at)_tmpexclude_databases; } open(POSLOG,"> $synclogdir/$remote_server"); POSLOG->autoflush(); write_log(0); while(!open(IN,"$filename")) { sleep(3); } if (!defined($dbh)) { print "using default database\n"; sleep(1); $database = "mysql"; $default = "set"; $dbh = DBI->connect("DBI:mysql:mysql:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 }); commit("SET SQL_LOG_UPDATE=0"); } check_for_resyncs(); if ($curpos != 0) { my $line; while(<IN>) { chomp; if ($_ !~ /^\#/) { if ($_ =~ /\;$/) { $line .= $_; if ($line =~ /^use (\w+)/i) { commit($line); } } $line = undef; } else { $line .= $_; } } seek(IN, $curpos, 0); } flush_old_update_files(); my $line; for (;;) { flush_logs(); local $SIG{ALRM} = sub { write_log(1); }; alarm(2); for ($curpos = tell(IN); $_ = <IN>; $curpos = tell(IN)) { # print "syncing: $curpos\n"; chomp; if ($_ !~ /^\#/) { if ($_ =~ /\;$/) { $line .= $_; commit("SET SQL_LOG_UPDATE=0"); commit($line); $line = undef; } else { $line .= $_; } } } alarm(0); local $SIG{ALRM} = undef; write_log(0); sleep(1); check_for_resyncs(); my $cur_log_size = (stat $filename)[7]; if ($cur_log_size == $curpos) { mfile(); } if ($synced == 0) { # print "status: ($curpos / $cur_log_size)\n"; if ((($cur_log_size - $curpos) < 8192) && (!defined(check_for_next_log_in_sequence()))) { attained_sync(); } else { lost_sync(); } } seek(IN, $curpos, 0); } close(POSLOG); } sub closelogs { write_log(0); close(POSLOG); close(IN); lost_sync(); print "clean exit\n"; exit(0); } $SIG{TERM} = \&closelogs; $SIG{INT} = \&closelogs; $localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username, $replicate_password, { PrintError => 0 }); $localdbh->do("SET SQL_LOG_UPDATE=0"); flush_logs(); mfile();
php::bar PHP Wiki - Listenarchive