phpbar.de logo

Mailinglisten-Archive

Re: Server Synchronisieren ?
Archiv Mailingliste mysql-de

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Server Synchronisieren ?



Hier sind zwei brauchbare Skripte zur Server -Synchronisation mit MySQL ....

Vielleicht müsst Ihr die noch mit recode latin1:ibmpc datei ... oder
UNIX2DOS Datei in DOSE Format konvertieren ...

Ich hab' nur UNIX....

Gru/3, Guido

-----Ursprüngliche Nachricht-----
Von: Alexander Meis <simmail_(at)_pixelhouse.de>
An: <mysql-de_(at)_lists.4t2.com>
Gesendet: Donnerstag, 9. September 1999 18:16
Betreff: Server Synchronisieren ?


> Hi...
>
> Ich würde gerne 2 Mysql Server synchronisieren und zwar in der Art das
sich
> Server 2 an Server 1 Updatet einmal in der Stunde oder so.
> Weis jemand von euch ne Möglichkeit wie man das machen kann ?
>
> Gruss Alex
>
> ---
> *** Abmelden von dieser Mailingliste funktioniert per E-Mail
> *** an mysql-de-request_(at)_lists.4t2.com mit Betreff/Subject: unsubscribe

mysqlsync
a.k.a "the mysql-syncer-upper-script-thingy"
mysqlsync (a.k.a the mysql-syncer-upper-script-thingy)

Current version: 	1.0-alpha
Current status:  	experimental
Author/Maintainer:	Mark Jeftovic 
Latest version:		http://www.shmooze.net/~markjr/mysqlsync/

This is mysqlsync, a script that endevours to keep remote copies of a
mysql database in sync with a master copy. Run "mysqlsync -u" for usage.
</code>
0.0 CONTENTS:
	1.0 REQUIREMENTS
	2.0 INSTALLATION & USAGE
	3.0 CAVEATS, BUGS & TODO

1.0 REQUIREMENTS:

1) perl 5
2) the Mysql perl module
2) mysql server and client libs

2.0 INSTALLATION & USAGE:

1) You need to turn on logging on your mysqld. You can do this by changing
the
following lines in your safe_mysqld script:

from:
<code>
if test "$#" -eq 0
  then
    nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \
    --skip-locking >> $err 2>&1
else
    nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \
    --skip-locking "$_(at)_" >> $err 2>&1
fi              

to:

if test "$#" -eq 0
  then
    nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \
    --log --skip-locking >> $err 2>&1
else
    nohup $ledir/mysqld --basedir=$MY_BASEDIR_VERSION --datadir=$DATADIR \
    --log --skip-locking "$_(at)_" >> $err 2>&1
fi              
</code>
(NOTE: depending on your platform the --skip-locking option may or may not
be present, search on "nohup" and edit those lines.)

2) edit the global variables $MYSQL_DIR, $DEFAULT_LOG et al to reflect
   your system. This was developed on a box where mysqld is in a 
   nonstandard place so $MYSQL_DIR *WILL LIKELY BE WRONG* for your
   box.

3) the mysql grant tables must be set up to accept connections from the
   master host. mysqlsync will connect as whatever user connects to
   the master.   

4) when mysqlsync catches a SIGTERM or a SIGINT it writes it's current
position
   in the mysql log to mysqlsync.offset. Whenever mysqlsync starts it looks
   for this file and seeks to that position in the log file and begins 
   processing from there. If no offset file is present, and no -s option
   is specified, mysqlsync defaults to 80 bytes before EOF.

3.0) CAVEATS, BUGS & TODO:

1) multiple databases are not yet supported. If you want to sync more than
   one database, run multiple instances of mysqlsync

2) depending on your traffic, mysql logs can get very large, very fast.
   Be aware of this and rotate accordingly. We rotate our mysql log
   nightly. mysqlsync will detect if it's current postion is larger
   than the file size, assume the log has been rotated, and reset it's
   file pointer back to the beginning of the log.

3) different passwords for different users are not supported. Set $PASSWD
   in the script, and if you do, you should remember to chmod 700 or 750
   depending on your situation.
   
4) you should use FQDN's when specifying your host list. In order to 
   prevent loops, mysqlsync checks if it's connection is coming from a 
   slave and ignores any queries that do. If you want to allow updates 
   from the remote hosts, run mysqlsync on the remote hosts.

5) mysqlync should support DBD::Mysqld, but it doesn't at the moment.

<code>
--------------------------------------------------------------------------------

#!/usr/bin/perl
#
# mysqlsync v1.0-alpha (a.k.a the mysql-syncer-upper-script-thingy)
# keep remote copies of a mysql database in sync with a master
# by mark jeftovic <markjr_(at)_easyDNS.com>
# copyright feb/1999 easyDNS Technologies Inc.
#
# All rights reserved.
#
# This code provided "As Is" with no warrantees express or implied.
# The author and contributors are not liable for anything good or
# bad that results from your use of this code.
#
# You are free to distribute this for free provided this notice is
# included. Please forward fixes/enhancements to the author for
# inclusion in the next revision.
#                                                                 

$VERSION = "v1.0-alpha";
$MAINTAINER = 'markjr_(at)_easydns.com';

use Mysql;
use Getopt::Std;
use Sys::Hostname; $thishost = hostname;   
use File::Basename;

_(at)_SAVE_ARGV=_(at)_ARGV;
$script = basename($0);

getopts("d:t:h:l:o:vs:bu");

if($opt_u) {
print<<"EOF";
$script $VERSION (a.k.a the mysql-syncer-upper-script-thingy) 
email: $MAINTAINER

Usage: $script -d <dbase> -h <hosts> -t <tables> \\
	[ -o <operations> -l <mysql_log> -s <offset,whence> -b -v -u ]

Where:
	-d database to keep in sync 
	-h comma seperated list of remote hosts
	-t comma seperated list of tables

Optional switches:
	-o comma seperated list of operations to sync 
	   (defaults to: update,insert,delete)
	-l alternate mysql_log
	-s start from offset relative to whence
	   (defaults to value in \$OFFSET_LOG,0 if present, otherwise 80,2)
	-b fork a daemon
	-v verbose mode (logs operations to \$SYNC_LOG)
	-u this message
EOF
exit;
}

$MYSQL_DIR	= "/export/mysql";
$DEFAULT_LOG 	= sprintf("%s/var/%s.log", $MYSQL_DIR, $thishost);
$OFFSET_LOG	= $MYSQL_DIR."/mysqlsync.offset";
$PID_FILE	= $MYSQL_DIR."/mysqlsync.pid";
$SYNC_LOG	= $MYSQL_DIR."/mysqlsync.log";
$DEFAULT_OFFSET	= 80;
$DEBUG_LEVEL	= 0;
$SLEEP		= 1;	# sleep value between log polls
$PASSWD		= '';	# chmod this script appropriately if you set this

$MYSQL_LOG 	= $opt_l ? $opt_l : $DEFAULT_LOG; 
_(at)_OP		= $opt_o ? split(/\,/,$opt_o) : qw(insert update delete);

$opt_h ? _(at)_HOST = split(/\,/,$opt_h) : die "Need host list use \"$script -u\"
for usage";
$opt_t ? _(at)_TABLE = split(/\,/,$opt_t) : die "Need table list use \"$script
-u\" for usage";
$opt_d ? _(at)_DBASE = split(/\,/,$opt_d) : die "Need dbase use \"$script -u\"
for usage";

if(_(at)_DBASE>1) {
	print "$script $VERSION does not yet support multiple databases\n";
	print "run another process for each additional database.\n";
	exit(-1);
	}

if(-f $SYNC_LOG){ open(SYNC_LOG, ">>$SYNC_LOG") or die $!; }
else { open(SYNC_LOG, ">$SYNC_LOG") or die $!; }
select SYNC_LOG; $|=1;

&logit("mysqlsync $VERSION <$MAINTAINER> starting.");
&write_pid($PID_FILE);

($offset,$pos)=&get_offset();

BEGIN:
open(LOG, $MYSQL_LOG) or die $!;
seek(LOG, $offset, $pos);

&logit("Seeking to $offset, relative to $pos in $MYSQL_LOG") if($opt_v);
&init_signals; 
&daemon if($opt_b);

for(;;) {
	while($line=<LOG>){
		print STDERR $line if($DEBUG_LEVEL>=3);
		#
if($line=~/\d+\s+\d\d:\d\d:\d\d\s+(\d+)\sConnect\s+(.+)_(at)_(.+)\s+on/) {
		if($line=~/\s+(\d+)\sConnect\s+(.+)_(at)_(.+)\s+on/) {
			if(!grep(/^$3$/,_(at)_HOST)){
				${$1}{host}=$3;
				${$1}{user}=$2;
				print STDERR "host: ${$1}{host} /
${$1}{user}\n" if($DEBUG_LEVEL);
				}
			}
		elsif($line=~/\d+\s+\d\d:\d\d:\d\d\s(\d+)\sQuit/) {
			if(defined(%{$1})) { undef(%{$1}); }
			}
		elsif($line=~/\s+(\d+)\sInit\sDB\s+\W(.+)\W/) {
			if(defined(%{$1})){
				if(grep(/^$2$/,_(at)_DBASE)) {
					${$1}{dbase}=$2;
					print STDERR "dbase: ${$1}{dbase}\n"
if($DEBUG_LEVEL);
					}
				 # else	{ undef(%{$1}); }
				}
			}
		elsif($line=~/\s+(\d+)\s+Query\s+(\w+)\s+(.+)$/) {
			if(defined(%{$1})){ 
				if(grep(/^$2$/i,_(at)_OP)) {  
					${$1}{op}=$2;
					print STDERR "op: ${$1}{op} ($3)\n"
if($DEBUG_LEVEL);
					${$1}{table}=get_tbl(${$1}{op}, $3);
					print STDERR "table: ${$1}{table}\n"
if($DEBUG_LEVEL);
					if(grep(/^${$1}{table}$/,_(at)_TABLE)) {
						${$1}{query}=sprintf("%s
%s", ${$1}{op}, $3);
						foreach $h (_(at)_HOST)  { 
							print STDERR
"$h/${$1}{dbase}/${$1}{user}: ${$1}{query}\n" if($DEBUG_LEVEL);
							&logit("On $h:
".${$1}{query}) if($opt_v); 
							$dbh =
Mysql->Connect($h, ${$1}{dbase}, ${$1}{user},$PASSWD);
							eval '$sth =
$dbh->Query("${$1}{query}") || &logit("ERROR: ".$dbh->errmsg)';
							&logit("ERROR: ".$_(at)_)
if($_(at)_);
							undef $dbh;
							}
						}
					}
				}
			}
		seek(LOG,0,1);
		$cur_log_pos = tell LOG;
		print STDERR "Seeking to $cur_log_pos\n"
if($DEBUG_LEVEL>=2);
		}
	$cur_log_size = (stat $MYSQL_LOG)[7];
	if($cur_log_size<$cur_log_pos) {
		&logit("\$cur_log_size < \$cur_log_pos
($cur_log_size/$cur_log_pos), log rotated?") if($opt_v);
		&logit("Resetting log pointer");
		close(LOG);
		$offset=$pos=$cur_log_pos=0;
		goto BEGIN;
		}
	sleep($SLEEP);
	}
	
sub write_pid {
my($pid_file)=_(at)__;
open(PID, ">$pid_file") || warn $!;
print PID $$;
close(PID);        
}

sub get_tbl {
my($op, $query)=_(at)__;
if($op =~ /^update$/i) { $query =~ /^\s*(\w+)\s+set\s+.+/; return($1); }
elsif ($op =~ /^delete$/i) { $query =~ /^from\s+(\w+)\s+.*/; return($1); }
elsif ($op =~ /^insert$/i) { $query =~ /^into\s+(\w+)\s+.*/; return($1); }
elsif ($op =~ /^drop$/i) { $query =~ /^table\s+(\w+)\s+.*/; return($1); }

# select serves no real purpose in sync-ing other than debugging 
elsif ($op =~ /^select$/i) { $query =~ /^.+\s+from\s+(\w+)\s+.*/;
return($1); }
return(undef);
}

sub get_offset {
if($opt_s) { ($offset,$pos)=split(/\,/,$opt_s); }
elsif(-f $OFFSET_LOG) {
        $pos = 0;
        open(OFFSET, $OFFSET_LOG) || die $!;
        chomp($offset=<OFFSET>);
        close(OFFSET);
        }
else    {
        print STDERR "warning: no $OFFSET_LOG and no -s option, using
$DEFAULT_OFFSET\n";
        $pos = 2;
        $offset = $DEFAULT_OFFSET;
        }                                      
return($offset,$pos);
}

sub init_signals {
    $SIG{'INT'} = \&do_kill;
    $SIG{'QUIT'} = \&do_kill;
    $SIG{'TERM'} = \&do_kill;
}

sub logit {
my($msg)=_(at)__;
chomp($msg);
chomp($now=scalar localtime(time));
print SYNC_LOG "[$now] $msg\n";
}

sub do_kill {
&logit("Caught SIGTERM -exiting.") if($opt_v);
open(OFFSET, ">$OFFSET_LOG") || die $!;
print OFFSET tell(LOG);
close(OFFSET);
exit;
}

# this routine was pilfered from lbnamed -mark
sub daemon {
    local(*TTY,*NULL);

    exit(0) if (fork);
    write_pid($PID_FILE);
    if (open(NULL,"/dev/null")) {
        open(STDIN,">&NULL") || close(STDIN);
        open(STDOUT,">&NULL") || close(STDOUT);
        open(STDERR,">&NULL") || close(STDERR);
    } else {
        close(STDIN);
        close(STDOUT);
        close(STDERR);
    }
    eval 'require "sys/ioctl.ph";';
    return if !defined(&TIOCNOTTY);
    open(TTY,"+>/dev/tty") || return;
    ioctl(TTY,&TIOCNOTTY,0);
    close(TTY);
}                

#!/usr/local/bin/perl

=pod

=head1 NAME

mysql_replicate - realtime replication a mysql database server

=head1 SYNOPSIS

mysql_replicate.pl --path I<path> --remote-server I<name>
                   [--resync-with-remote-server]
                   [--force-resync I<database>]
                   [--exclude-database I<database>]
                   [--maintain-state-database]

=head1 PREREQUISITES

requires that DBI with the MySQL drivers be installed

=head1 DESCRIPTION

Replicates all changes to databases on a mysql server to a remote mysql
server.

=head1 OPTIONS

=over 4

=item --path

Specifies the path to the update logs. logfiles should be named like
"update.*". mysql_replicate will create a status directory called
replica_logs in this directory, so the user who is running mysql_replicate
will need the appropriate permissions to create that directory.

=item --remote-server

The hostname of the remote server to replicate to.

=item --resync-with-remote-server

If a database doesn't exist on the remote server, recreate it.

=item --force-resync I<database>

Force a resync/recreation of a given database on the remote server

=item --exclude-databases I<database>

Exclude a database from being mirrored on the remote server

=item --maintain-state-database

Create and update a database on the local server called "replica". 
This database has information that reflects the status of the
remote replica. If this item is specified, one can trigger 
force-resync's without having to restart the process by updating
the 'force_resync' field with a comma seperated list of databases
to resync in the row for the given remote server.

=back

=head1 USAGE

=over 4

=item  Create a "replica" user on all servers involved. 

This user should have "root" access to all databases on the server(s). 
edit the script, and change the variables $replicate_user and
$replicate_password 
as appropriate. 


=item Set up the server for logging updates.

The mysql server should be running with the --log-update option configured
as follows: 

--log-update=<path>/update

this <path> is what you should specify for the path option to the script
at runtime. The logs will be rotated every ten minutes. mysql_replicate
will create a status directory in <path>, so the user who is running
mysql_replicate will need write access to that directory.

=item NOTE: logfiles that are no longer required will be I<deleted>!

if multiple mysql_replicates are running to multiple servers, the lowest
needed update log will still be tracked amongst the running
mysql_replicates.

=back

if the remote server should disappear, the script will keep retrying the
connection/replication transaction until it is successful.

=head1 SEE ALSO

=head1 NOTES

=over 4

=item 

Should daemonize, and run as the mysql-user. This implies parsing the my.cnf
file.

=item

Should write a utility script to help users set force-resyncs if running
with the --maintain-state-database option.

=back

=head1 AUTHOR

=over 4

=item Andrew Elble

elble_(at)_icculus.nsg.nwu.edu

=back

=head1 BUGS

=over 4

=item 

probably some more lurking in there somewhere.

=back

=cut

use strict;
use DBI;
use FileHandle;
use Getopt::Long;

#
# global variables
#

my %hold_commit;
my _(at)_exclude_databases;
my _(at)_force_resync;

my ($pathname,
    $remote_server,
    $resync_with_remote_server,
    $maintain_state_database,
    $database, 
    $default,
    $filename, 
    $next_flush_time,
    $synced,
    $prefix,
    $extension,
    $curpos,
    $dbh,
    $localdbh);

#
# Get our options
#

GetOptions("path=s" => \$pathname,
           "remote-server=s" => \$remote_server,
           "resync-with-remote-server" => \$resync_with_remote_server,
           "exclude-databases=s" => \_(at)_exclude_databases,
	   "force-resync=s" => \_(at)_force_resync,
           "maintain-state-database" => \$maintain_state_database);


if (($pathname eq "") ||
    ($remote_server eq "")) {
  print "you must specify a path and a remote server!\n";
  exit(0);
}

my $replicate_username = "";
my $replicate_password = "";
my $logflush_interval = (30*60);

my $fileprefix = "update";
my $synclogdir = "$pathname/replica_logs";


#
# ROUTINE
#   reload_database
#
# DESCRIPTION
#   replicates the entire database on the remote system
#
sub reload_database {
  if (($resync_with_remote_server) ||
      (grep(/\Q$database\E/,_(at)_force_resync))) {
    print "dumping $database to $remote_server\n";

    if ($maintain_state_database) {
      replica_table_check();
      $localdbh->do("use replica");
      $localdbh->do("update replica_status set resyncing = '".$database."'
where hostname = '".$remote_server."'");
    }

    $default = undef;
    commit("drop database $database");
    commit("create database $database");
    commit("use $database");
    
    $localdbh->do("SET SQL_LOG_UPDATE=0");
    $localdbh->do("use $database");
#
# get a write lock on all tables so that the data doesn't change.
#
    my $query = $localdbh->prepare("show tables");
    $query->execute;
    my $table = $query->fetchall_arrayref;
    my $i;
    my $dostring = "lock tables";
    my $first = 1;
    foreach $i (0 .. $#{ $table }) {
      if ($first == 1) {
	$dostring .= " ".$table->[$i][0]." write";
	$first = 0;
      } else {
	$dostring .= ",".$table->[$i][0]." write";
      }
    }
#    print "LOCK: $dostring\n";
    $localdbh->do($dostring);

#
# force a logflush. that way, any potential changes to this database
# are only valid in the latest logfile.
#
    $next_flush_time = 0;
    flush_logs();
#
# find the highest numbered logfile, and temporarily block commits to the
database
# until we reach the highest numbered logfile.
#
    opendir(LOGDIR,$pathname);
    my _(at)_logs = sort {$b cmp $a} (grep { /^\Q$fileprefix\E/ && -f
"$pathname/$_" } readdir(LOGDIR));
    closedir(LOGDIR);
    my $tmpfilename = $pathname."/".$logs[0];
    if ( -f $tmpfilename) {
      print "updates for $database will recommence with: $tmpfilename\n";
      my ($tmpprefix, $tmpextension) = ($tmpfilename =~ /(\w+)\.(\d+)/);
      $hold_commit{$tmpprefix.$tmpextension} = $database;
    } else {
      die "unable to keep track of logfile!";
    }


#
# Create the tables and dump the data!
#
    my $dostring = "";
    foreach $i (0 .. $#{ $table }) {
      $dostring = "CREATE TABLE ".$table->[$i][0]." (\n";
      my $subquery = $localdbh->prepare("show fields from
".$table->[$i][0]);
      $subquery->execute;
      my $subtable = $subquery->fetchall_arrayref;
      my %prikey;
      my %unikey;
      my %mulkey;
      my $subi;
      foreach $subi (0 .. $#{ $subtable }) {
	$dostring .= $subtable->[$subi][0]." ".$subtable->[$subi][1];
	if ($subtable->[$subi][2] eq "") {
	  $dostring .= " NOT NULL";
	}
	if ($subtable->[$subi][3] eq "PRI") {
	  $prikey{$subtable->[$subi][0]} = 1;
	} elsif ($subtable->[$subi][3] eq "UNI") {
	  $unikey{$subtable->[$subi][0]} = 1;
	} elsif ($subtable->[$subi][3] eq "MUL") {
	  $mulkey{$subtable->[$subi][0]} = 1;
	}
	
	if ($subtable->[$subi][4] eq "NULL") {
	  $dostring .= " DEFAULT ''";
	} elsif ($subtable->[$subi][4] ne "") {
	  $dostring .= " DEFAULT '$subtable->[$subi][4]'";
	}
	if ($subtable->[$subi][5] ne "") {
	  $dostring .= " ".$subtable->[$subi][5];
	}
	if ($subi < $#{ $subtable}) {
	  $dostring .= ",\n";
	}
      }
      my $key;
      foreach $key (keys %prikey) {
	$dostring .= ",\nPRIMARY KEY($key)";
      }
      my $key;
      foreach $key (keys %unikey) {
	$dostring .= ",\nUNIQUE KEY($key)";
      }
      my $key;
      foreach $key (keys %mulkey) {
	$dostring .= ",\nKEY($key)";
      }
      $dostring .= ")\n\n";
      $subquery->finish;
      $subtable = undef;
#      print "doing: $dostring";
      $dbh->do($dostring);
      #
      # dump data (should lock tables as well!)
      #
      my $subquery = $localdbh->prepare("select * from ".$table->[$i][0]);
      $subquery->execute;
      my $subtable = $subquery->fetchall_arrayref;
      my $subi;
      $dbh->do("lock tables ".$table->[$i][0]." write");
      foreach $subi (0 .. $#{ $subtable }) {
	my $subdostring = "INSERT INTO ".$table->[$i][0]." VALUES (";
	my $j;
	foreach $j (0 .. $#{ $subtable->[$subi] }) {
	  if ($subtable->[$subi][$j] ne "") {
	    my $quoted = $localdbh->quote($subtable->[$subi][$j]);
	    $subdostring .= $quoted;
	  } else {
	    $subdostring .= "''";
	  }
	  if ($j < $#{ $subtable->[$subi] }) {
	    $subdostring .= ",";
	  }
	}
	$subdostring .= ")\n";
#	print "insert: $subdostring";
	$dbh->do($subdostring);
      }
      $dbh->do("unlock tables");
      $subquery->finish;
      $subquery = undef;
      $subtable = undef;
    }
    $query->finish;
    $query = undef;
    $table = undef;

#
# exclude the database from updates 'till the selected log.
#

    push(_(at)_exclude_databases, $database);
    print "excluding $database from updates until log: $tmpfilename\n";
    $localdbh->do("unlock tables");
    if ($maintain_state_database) {
      replica_table_check();
      $localdbh->do("use replica");
      $localdbh->do("update replica_status set resyncing = '' where hostname
= '".$remote_server."'");
    }
  } else {
    print "database $database doesn't exist on remote server!\n";
    if (!(grep(/\Q$database\E/,_(at)_exclude_databases))) {
      print "adding $database to exclude list!\n";
      push(_(at)_exclude_databases, $database);
    }
  }
}

#
# ROUTINE
#   check_for_resyncs
#
# DESCRIPTION
#   checks to see if there are resync requests pending.
#
sub check_for_resyncs {
  
  if ($maintain_state_database) {
    $localdbh->do("use replica");
    my $query = $localdbh->prepare("select force_resync, resyncing from
replica_status where hostname = '".$remote_server."'");
    $query->execute;
    my $table = $query->fetchall_arrayref;
    my $i;
    foreach $i (0 .. $#{ $table }) {
      if ($table->[$i][0] ne "") {
	my _(at)_tmppush = split(/\,/,$table->[$i][0]);
	my $tpsh;
	foreach $tpsh (_(at)_tmppush) {
	  if (!(grep(/\Q$tpsh\E/,_(at)_force_resync))) {
	    print "acknowledging resync request for: ".$tpsh."\n";
	    push(_(at)_force_resync,$tpsh);
	  }
	}
      }
      if ($table->[$i][1] ne "") {
	my _(at)_tmppush = split(/\,/,$table->[$i][1]);
	my $tpsh;
	foreach $tpsh (_(at)_tmppush) {
	  if (!(grep(/\Q$tpsh\E/,_(at)_force_resync))) {
	    print "redoing interrupted resync for : ".$tpsh."\n";
	    push(_(at)_force_resync,$tpsh);
	  }
	}
      }
    }
    $query->finish;
    $query = undef;
    $table = undef;
    $localdbh->do("update replica_status set force_resync='' where hostname
= '".$remote_server."'");
  }
}


#
# ROUTINE
#   check_for_next_log_in_sequence
#
# DESCRIPTION
#   check to see if the next logfile is available
#
# RETURNS
#   the name of the next logfile (if next logfile is present)
#   undef otherwise
#

sub check_for_next_log_in_sequence {
  
  my $tmpprefix = $prefix;
  my $tmpextension = $extension;
  $tmpextension++;
  my $tempfilename = $pathname."/".$tmpprefix.".".$tmpextension;
  if ( -f $tempfilename) {
    return($tempfilename);
  } else {
    return(undef);
  }
}

#
# ROUTINE
#   flush_old_update_files
#
# DESCRIPTION
#   discover what update logs are no longer needed, and remove them
#


sub flush_old_update_files {
  if (!stat($synclogdir)) {
    if (!mkdir($synclogdir,0755)) {
      die("can't create status directory\n");
    }
  }
  opendir(SYNC,$synclogdir);
  my _(at)_logs = sort {$a cmp $b} (grep { -f "$synclogdir/$_" } readdir(SYNC));
  closedir(SYNC);
  my $lowestlognumber = $extension;
  my $origlognumber = $lowestlognumber;
  my $file;
  foreach $file (_(at)_logs) {
    open(LOG, "$synclogdir/$file");
    while (<LOG>) {
      chomp;
      my ($num,$pos) = ($_ =~ /(\d+)\:(\d+)/);
      if (($num < $lowestlognumber) &&
	  ($num ne "")) {
	$lowestlognumber = $num;
      }
    }
    close(LOG);
  }

  print "lowest needed file# is $lowestlognumber\n";
  while ($lowestlognumber > 0) {
    $lowestlognumber--;
    my $rmfile = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber);
    if (-f $rmfile) {
      print "removing $rmfile\n";
      unlink($rmfile);
    }
  }
}

#
# ROUTINE
#   replica_table_check
#
# DESCRIPTION
#   check to see if the replica_status table exists. if not, create it.
#


sub replica_table_check {
  $localdbh->do("SET SQL_LOG_UPDATE=0");
  if ($maintain_state_database) {
    $localdbh->do("use replica");
    if ($localdbh->errstr =~ /Unknown database/) {
      print "creating replica database\n";
      $localdbh->do("create database replica");
    }
    $localdbh->do("select * from replica_status limit 1");
    my $err = $localdbh->err;
    if ($err == 0) {
      $localdbh->do("insert into replica_status (hostname, in_sync) values
('".$remote_server."','".$synced."')");
      $localdbh->do("update replica_status set in_sync = '".$synced."' where
hostname = '".$remote_server."'");
      return 1;
    }
    print "creating replica tables\n";
    $localdbh->do("CREATE table replica_status (
                 hostname char(255) NOT NULL,
                 in_sync tinyint(1) default 0,
                 resyncing char(255),
                 force_resync char(255),
                 PRIMARY KEY(hostname))");
    $localdbh->do("insert into replica_status (hostname, in_sync) values
('".$remote_server."','".$synced."')");
    return 0;
  }
}

sub lost_sync {
  $synced = 0;
  replica_table_check();
  print "lost sync\n";
}

sub attained_sync {
  $synced = 1;
  replica_table_check();
  print "attained sync\n";
}

sub flush_logs {
  if ($next_flush_time < time()) {
    $localdbh->do("SET SQL_LOG_UPDATE=0");
    $localdbh->do("flush logs");
    $next_flush_time = time() + $logflush_interval;
  }
  return;
}

#
# ROUTINE
#   mfile
#
# DESCRIPTION
#   1.) finds the next logfile to work from.
#   2.) determines the correct position within the file to work from
#   3.) calls readfile() to open the file and process the updates.
#

sub mfile {
  my ($num, $pos);
  if ($filename eq "") {
    flush_logs();
    lost_sync();
    if ( -f "$synclogdir/$remote_server") {
      open(LOG, "$synclogdir/$remote_server");
      while (<LOG>) {
	chomp;
	my ($tnum,$tpos) = ($_ =~ /(\d+)\:(\d+)/);
	if ($tnum ne "") {
	  $num = $tnum;
	}
	if ($tpos ne "") {
	  $pos = $tpos;
	}
      }
      close(LOG);
      $filename = "$pathname/$fileprefix.$num";
      if ( -f $filename) {
	print "restarting with: $filename\n";
	($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
	$curpos = $pos;
	readfile();
      } else {
	unlink("$synclogdir/$remote_server");
	$filename = "";
      }
    }
    if ($filename eq "") {
      opendir(LOGDIR,$pathname);
      my _(at)_logs = sort {$a cmp $b} (grep { /^\Q$fileprefix\E/ && -f
"$pathname/$_" } readdir(LOGDIR));
      closedir(LOGDIR);
      $filename = $pathname."/".$logs[0];
      if ( -f $filename) {
	print "starting with: $filename\n";
	($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
	$curpos = 0;
	readfile();
      } else {
	die "i can't find a log to start syncing with!";
      }
    }
  } else {
    if (defined($dbh)) {
      local $SIG{ALRM} = sub { lost_sync(); };
      alarm(2);
      $dbh->ping;
      alarm(0);
      local $SIG{ALRM} = undef;
    }
    if (my $tempfilename = check_for_next_log_in_sequence()) {
      close(IN);
      lost_sync();
      print "switching to log: $tempfilename\n";
      $filename = $tempfilename;
      ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
      $curpos = 0;
      readfile();
    } else {
      return;
    }
  }
}

sub write_log {
  alarm(0);
  my $reset_alarm = shift;
  if (($extension ne "") &&
      ($curpos ne "")) {
    truncate(POSLOG,0);
    print POSLOG "$extension:$curpos\n";
  }
  if ($reset_alarm == 1) {
    alarm(2);
  }
}

#
# ROUTINE
#  commit
#
# DESCRIPTION
#   calls $dbh->do in a safe manner, and only when the correct conditions
are met!
#

sub commit {
  my $line = shift;
  
  if ($line eq "") {
    return;
  }
#  print "commit: got $line\n";
  if ($line =~ /^use (\w+)/i) {
    ($database) = ($line =~ /^use (\w+)/i);
    if (!(grep(/\Q$database\E/,_(at)_exclude_databases))) {
      print "switching to database $database\n";
    } else {
      print "database $database has been excluded\n";
    }
    while (!defined($dbh)) {
      sleep(1);
      $dbh = DBI->connect("DBI:mysql:$database:$remote_server",
$replicate_username, $replicate_password, { PrintError => 0 });
      $dbh->do("SET SQL_LOG_UPDATE=0");
    }
  }

  if (grep(/\Q$database\E/,_(at)_force_resync)) {
    print "forced resync of $database\n";
    my _(at)_tmpforce_resync = grep(!/\Q$database\E/,_(at)_force_resync);
    _(at)_force_resync = _(at)_tmpforce_resync;
    lost_sync();
    reload_database();
  }
  
  my $result;
  my $first = 1;
  my $dbherr;
  my $dbherrstr;
#  print "$default / ".$dbh->errstr."\n";
  while (($result eq "") && 
	 ($dbherr eq "") &&
	 (!(grep(/\Q$database\E/,_(at)_exclude_databases))) &&
	 (($default ne "set") || ($line =~ /^use (\w+)/i))) {
    if ($first == 0) {
      if ($synced == 1) {
	lost_sync();
      }
      sleep(1);
    }
#    print "doing: $database: $default: $line\n";
    $result = $dbh->do($line);
    $dbherr = $dbh->err;
    $dbherrstr = $dbh->errstr;
    if (($dbherr eq "") &&
	($line =~ /^use (\w+)/i)) {
      $default = "";
    }
    if ($dbherrstr =~ /Unknown database/) {
      reload_database();
      $default = "";
    }
    $first = 0;
  }
}

#
# ROUTINE
#   readfile
#
# DESCRIPTION
#  1.) opens the update log, and seeks to the correct position.
#  2.) sends the updates from the log to the remote server.
#

sub readfile {

#
# remove holddown for database that has been recently loaded to the remote
server.
#

  if (defined($hold_commit{$prefix.$extension})) {
    print "removing database $hold_commit{$prefix.$extension} from exclude
list\n";
    my _(at)_tmpexclude_databases =
grep(!/\Q$hold_commit{$prefix.$extension}\E/,_(at)_exclude_databases);
    _(at)_exclude_databases = _(at)_tmpexclude_databases;
  }
  open(POSLOG,"> $synclogdir/$remote_server");
  POSLOG->autoflush();
  write_log(0);
  while(!open(IN,"$filename")) {
    sleep(3);
  }

  if (!defined($dbh)) {
    print "using default database\n";
    sleep(1);
    $database = "mysql";
    $default = "set";
    $dbh = DBI->connect("DBI:mysql:mysql:$remote_server",
$replicate_username, $replicate_password, { PrintError => 0 });
    commit("SET SQL_LOG_UPDATE=0");
  }

  check_for_resyncs();

  if ($curpos != 0) {
    my $line;
    while(<IN>) {
      chomp;
      if ($_ !~ /^\#/) {
	if ($_ =~ /\;$/) {
	  $line .= $_;
	  if ($line =~ /^use (\w+)/i) {
	    commit($line);
	  }
	}
	$line = undef;
      } else {
	$line .= $_;
      }
    }
    seek(IN, $curpos, 0);
  }
  
  flush_old_update_files();

  my $line;
  for (;;) {
    flush_logs();
    local $SIG{ALRM} = sub { write_log(1); };
    alarm(2);
    for ($curpos = tell(IN); $_ = <IN>; $curpos = tell(IN)) {
#      print "syncing: $curpos\n";
      chomp;
      if ($_ !~ /^\#/) {
	if ($_ =~ /\;$/) {
	  $line .= $_;
	  commit("SET SQL_LOG_UPDATE=0");
	  commit($line);
	  $line = undef;
	} else {
	  $line .= $_;
	}
      }
    }
    alarm(0);
    local $SIG{ALRM} = undef;
    write_log(0);
    sleep(1);
    check_for_resyncs();
    my $cur_log_size = (stat $filename)[7];
    if ($cur_log_size == $curpos) {
      mfile();
    }
    if ($synced == 0) {
#      print "status: ($curpos / $cur_log_size)\n";
      if ((($cur_log_size - $curpos) < 8192) &&
	  (!defined(check_for_next_log_in_sequence()))) {
	  attained_sync();
      } else {
	lost_sync();
      }	
    }
    seek(IN, $curpos, 0);
  }
  close(POSLOG);
}

sub closelogs {
  write_log(0);
  close(POSLOG);
  close(IN);
  lost_sync();
  print "clean exit\n";
  exit(0);
}

$SIG{TERM} = \&closelogs;
$SIG{INT} = \&closelogs;
$localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username,
$replicate_password, { PrintError => 0 });
$localdbh->do("SET SQL_LOG_UPDATE=0");
flush_logs();
mfile();

Home | Main Index | Thread Index

php::bar PHP Wiki   -   Listenarchive