From: John David Duncan Date: October 12 2012 5:02am Subject: bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:4023) List-Archive: http://lists.mysql.com/commits/145015 Message-Id: <20121012050206.77266.44643.4023@dhcp-whq-twvpn-1-vpnpool-10-159-223-50.vpn.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4023 John David Duncan 2012-10-11 [merge] merge modified: storage/ndb/src/ndbclient_exports.cpp storage/ndb/src/ndbjtie/ndbjtie_lib.cpp storage/ndb/tools/delete_all.cpp storage/ndb/tools/desc.cpp storage/ndb/tools/drop_index.cpp storage/ndb/tools/drop_tab.cpp storage/ndb/tools/listTables.cpp storage/ndb/tools/ndb_config.cpp storage/ndb/tools/ndbinfo_select_all.cpp storage/ndb/tools/restore/restore_main.cpp storage/ndb/tools/select_all.cpp storage/ndb/tools/select_count.cpp storage/ndb/tools/waiter.cpp support-files/mysql.spec.sh === modified file 'mysql-test/lib/My/Memcache.pm' --- a/mysql-test/lib/My/Memcache.pm 2012-04-24 01:41:51 +0000 +++ b/mysql-test/lib/My/Memcache.pm 2012-10-12 04:54:41 +0000 @@ -57,20 +57,36 @@ use lib 'lib'; use IO::Socket::INET; use IO::File; use Carp; +use Time::HiRes; use mtr_report; # for main::mtr_verbose() require "mtr_process.pl"; # for mtr_ping_port() -require "mtr_misc.pl"; # for mtr_milli_sleep() package My::Memcache; sub new { my $pkg = shift; + # min/max wait refer to msec. wait during temporary errors. Both powers of 2. bless { "created" => 1 , "error" => "" , "cf_gen" => 0, - "exptime" => 0 , "flags" => 0 + "exptime" => 0 , "flags" => 0 , + "minWait" => 4, "maxWait" => 8192, + "temp_errors" => 0 , "total_wait" => 0 }, $pkg; } +sub fail { + my $self = shift; + my $msg = + "error: " . $self->{error} . "\n" . + "bin_req_id: " . $self->{bin_req_id} . "\n" . + "temp_errors: " . $self->{temp_errors} . "\n". + "total_wait: " . $self->{total_wait} . "\n"; + while(my $extra = shift) { + $msg .= $extra . "\n"; + } + Carp::confess($msg); +} + sub connect { my $self = shift; my $host = shift; @@ -80,7 +96,7 @@ sub connect { my $retries = 100; while($retries && (::mtr_ping_port($port) == 0)) { - ::mtr_milli_sleep(100); + Time::HiRes::usleep(100 * 1000); $retries--; } @@ -170,7 +186,7 @@ sub wait_for_config_generation { my $retries = 100; # 100 retries x 100 ms = 10s while($retries && ! $ready) { - ::mtr_milli_sleep(100); + Time::HiRes::usleep(100 * 1000); my %stats = $self->stats("reconf"); if($stats{"Running"} >= $cf_gen) { $ready = $stats{"Running"}; @@ -182,16 +198,39 @@ sub wait_for_config_generation { return $ready; } +sub ascii_command { + my $self = shift; + my $packet = shift; + my $sock = $self->{connection}; + my $waitTime = $self->{minWait}; + my $maxWait = $self->{maxWait}; + + do { + $sock->print($packet) || Carp::confess("send error: ". $packet); + $self->{error} = $sock->getline(); + $self->normalize_error(); + if($self->{error} eq "SERVER_TEMPORARY_ERROR") { + if($waitTime < $maxWait) { + $self->{temp_errors} += 1; + $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000); + $waitTime *= 2; + } + else { + $self->fail("Too Many Temporary Errors", $waitTime); + } + } + } while($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime <= $maxWait); + + return $self->{error}; +} + sub delete { my $self = shift; my $key = shift; - my $sock = $self->{connection}; - - $sock->print("delete $key\r\n") || Carp::confess "send error"; - $self->{error} = $sock->getline(); - return $self->{error} =~ "^DELETED" ? 1 : $self->normalize_error(); + $self->ascii_command("delete $key\r\n"); + return ($self->{error} =~ "^DELETED"); } @@ -200,62 +239,49 @@ sub _txt_store { my $cmd = shift; my $key = shift; my $value = shift; - my $sock = $self->{connection}; - - $sock->printf("%s %s %d %d %d\r\n%s\r\n",$cmd, $key, - $self->{flags}, $self->{exptime}, length($value), $value); - return $sock->getline(); + my $packet = sprintf("%s %s %d %d %d\r\n%s\r\n",$cmd, $key, $self->{flags}, + $self->{exptime}, length($value), $value); + $self->ascii_command($packet); + return ($self->{error} eq "OK"); } sub set { - my ($self, $key, $value) = @_; - - $self->{error} = $self->_txt_store("set", $key, $value); - return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error(); + my ($self, $key, $value) = @_; + return $self->_txt_store("set", $key, $value); } sub add { - my ($self, $key, $value) = @_; - - $self->{error} = $self->_txt_store("add", $key, $value); - return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error(); + my ($self, $key, $value) = @_; + return $self->_txt_store("add", $key, $value); } sub append { - my ($self, $key, $value) = @_; - - $self->{error} = $self->_txt_store("append", $key, $value); - return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error(); + my ($self, $key, $value) = @_; + return $self->_txt_store("append", $key, $value); } sub prepend { - my ($self, $key, $value) = @_; - - $self->{error} = $self->_txt_store("prepend", $key, $value); - return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error(); + my ($self, $key, $value) = @_; + return $self->_txt_store("prepend", $key, $value); } sub replace { - my ($self, $key, $value) = @_; - - $self->{error} = $self->_txt_store("replace", $key, $value); - return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error(); + my ($self, $key, $value) = @_; + return $self->_txt_store("replace", $key, $value); } sub get { my $self = shift; - my $key = shift; - my $sock = $self->{connection}; - - $sock->print("get $key\r\n") || Carp::confess "send error"; - my $response = $sock->getline(); + my $key = shift; my $val; + my $sock = $self->{connection}; + my $response = $self->ascii_command("get $key\r\n"); if ($response =~ /^END/) { @@ -278,24 +304,16 @@ sub get { sub _txt_math { my ($self, $cmd, $key, $delta) = @_; - my $sock = $self->{connection}; + my $response = $self->ascii_command("$cmd $key $delta \r\n"); - $sock->print("$cmd $key $delta \r\n") || Carp::confess "send error"; - my $response = $sock->getline(); - my $val; - - if ($response =~ "^NOT_FOUND") - { + if ($response =~ "^NOT_FOUND") { $self->{error} = "NOT_FOUND"; return undef; } - elsif ($response =~ "ERROR") - { - $self->{error} = $response; - $self->normalize_error(); + elsif ($response =~ "ERROR") { return undef; } - + $response =~ /(\d+)/; return $1; } @@ -336,12 +354,8 @@ sub stats { sub flush { my $self = shift; my $key = shift; - my $sock = $self->{connection}; - - $sock->print("flush_all\r\n") || Carp::confess "send error"; - - $self->{error} = $sock->getline(); - return $self->{error} =~ "^OK" ? 1 : $self->normalize_error(); + my $result = $self->ascii_command("flush_all\r\n"); + return ($self->{error} =~ "^OK"); } @@ -357,7 +371,8 @@ sub normalize_error { "SERVER_ERROR not my vbucket\r\n" => "NOT_MY_VBUCKET", "SERVER_ERROR out of memory\r\n" => "SERVER_OUT_OF_MEMORY", "SERVER_ERROR not supported\r\n" => "NOT_SUPPORTED", - "SERVER_ERROR internal\r\n" => "INTERNAL_ERROR" + "SERVER_ERROR internal\r\n" => "INTERNAL_ERROR", + "SERVER_ERROR temporary failure\r\n" => "SERVER_TEMPORARY_ERROR" ); my $norm_error = $error_message{$self->{error}}; $self->{error} = $norm_error if(defined($norm_error)); @@ -462,6 +477,32 @@ sub get_binary_response { } +sub binary_command { + my $self = shift; + my ($cmd, $key, $value, $extra_header) = @_; + my $waitTime = $self->{minWait}; + my $maxWait = $self->{maxWait}; + my $status; + + do { + $self->send_binary_request($cmd, $key, $value, $extra_header); + ($status) = $self->get_binary_response(); + if($status == 0x86) { + if($waitTime < $maxWait) { + $self->{temp_errors} += 1; + $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000); + $waitTime *= 2; + } + else { + $self->fail("Too Many Temporary Errors", $waitTime); + } + } + } while($status == 0x86 && $waitTime <= $maxWait); + + return ($status == 0) ? 1 : undef; +} + + sub bin_math { my $self = shift; my ($cmd, $key, $delta, $initial) = @_; @@ -491,13 +532,10 @@ sub bin_store { my $self = shift; my $cmd = shift; my $key = shift; - my $value = shift; - + my $value = shift; my $extra_header = pack "NN", $self->{flags}, $self->{exptime}; - $self->send_binary_request($cmd, $key, $value, $extra_header); - my ($status) = $self->get_binary_response(); - return ($status == 0) ? 1 : 0; + return $self->binary_command($cmd, $key, $value, $extra_header); } @@ -510,6 +548,7 @@ sub get { return ($status == 0) ? $value : undef; } + sub stats { my $self = shift; my $key = shift; @@ -550,23 +589,17 @@ sub replace { sub append { my ($self, $key, $value) = @_; - $self->send_binary_request(BIN_CMD_APPEND, $key, $value, ''); - my ($status) = $self->get_binary_response(); - return ($status == 0) ? 1 : 0; + return $self->binary_command(BIN_CMD_APPEND, $key, $value, ''); } sub prepend { my ($self, $key, $value) = @_; - $self->send_binary_request(BIN_CMD_PREPEND, $key, $value, ''); - my ($status) = $self->get_binary_response(); - return ($status == 0) ? 1 : 0; + return $self->binary_command(BIN_CMD_PREPEND, $key, $value, ''); } sub delete { my ($self, $key) = @_; - $self->send_binary_request(BIN_CMD_DELETE, $key, '', ''); - my ($status, $value) = $self->get_binary_response(); - return ($status == 0) ? 1 : 0; + return $self->binary_command(BIN_CMD_DELETE, $key, '', ''); } sub incr { === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2012-07-18 06:54:14 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-10-12 04:54:41 +0000 @@ -169,7 +169,10 @@ status_block status_block_too_big = { ENGINE_E2BIG, "Value too large" }; status_block status_block_no_mem = - { ENGINE_ENOMEM, "NDB out of data memory" }; + { ENGINE_ENOMEM, "NDB out of data memory" }; + +status_block status_block_temp_failure = + { ENGINE_TMPFAIL, "NDB Temporary Error" }; void worker_set_cas(ndb_pipeline *p, uint64_t *cas) { /* Be careful here -- ndbmc_atomic32_t might be a signed type. @@ -730,6 +733,11 @@ void callback_main(int, NdbTransaction * if(wqitem->cas) * wqitem->cas = 0ULL; wqitem->status = & status_block_bad_add; } + /* Overload Error, e.g. 410 "REDO log files overloaded" */ + else if(tx->getNdbError().classification == NdbError::OverloadError) { + log_ndb_error(tx->getNdbError()); + wqitem->status = & status_block_temp_failure; + } /* Attempt to insert via unique index access */ else if(tx->getNdbError().code == 897) { wqitem->status = & status_block_idx_insert; No bundle (reason: useless for push emails).