List:Commits« Previous MessageNext Message »
From:John David Duncan Date:October 12 2012 5:02am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:4023)
View as plain text  
 4023 John David Duncan	2012-10-11 [merge]
      merge

    modified:
      storage/ndb/src/ndbclient_exports.cpp
      storage/ndb/src/ndbjtie/ndbjtie_lib.cpp
      storage/ndb/tools/delete_all.cpp
      storage/ndb/tools/desc.cpp
      storage/ndb/tools/drop_index.cpp
      storage/ndb/tools/drop_tab.cpp
      storage/ndb/tools/listTables.cpp
      storage/ndb/tools/ndb_config.cpp
      storage/ndb/tools/ndbinfo_select_all.cpp
      storage/ndb/tools/restore/restore_main.cpp
      storage/ndb/tools/select_all.cpp
      storage/ndb/tools/select_count.cpp
      storage/ndb/tools/waiter.cpp
      support-files/mysql.spec.sh
=== modified file 'mysql-test/lib/My/Memcache.pm'
--- a/mysql-test/lib/My/Memcache.pm	2012-04-24 01:41:51 +0000
+++ b/mysql-test/lib/My/Memcache.pm	2012-10-12 04:54:41 +0000
@@ -57,20 +57,36 @@ use lib 'lib';
 use IO::Socket::INET;
 use IO::File;
 use Carp;
+use Time::HiRes;
 use mtr_report;            # for main::mtr_verbose()
 
 require "mtr_process.pl";  # for mtr_ping_port() 
-require "mtr_misc.pl";     # for mtr_milli_sleep()
 
 package My::Memcache;
 
 sub new {
   my $pkg = shift;
+  # min/max wait refer to msec. wait during temporary errors.  Both powers of 2.
   bless { "created" => 1 , "error" => "" , "cf_gen" => 0,
-          "exptime" => 0 , "flags" => 0
+          "exptime" => 0 , "flags" => 0  , 
+          "minWait" => 4,  "maxWait" => 8192, 
+          "temp_errors" => 0 , "total_wait" => 0
         }, $pkg;
 }
 
+sub fail {
+  my $self = shift;
+  my $msg = 
+      "error:         " . $self->{error} . "\n" .
+      "bin_req_id:    " . $self->{bin_req_id} . "\n" .
+      "temp_errors:   " . $self->{temp_errors} . "\n".
+      "total_wait:    " . $self->{total_wait} . "\n";
+  while(my $extra = shift) { 
+    $msg .= $extra . "\n"; 
+  }
+  Carp::confess($msg);
+}
+
 sub connect {
   my $self = shift;
   my $host = shift;
@@ -80,7 +96,7 @@ sub connect {
   my $retries = 100;
   while($retries && (::mtr_ping_port($port) == 0))
   {
-     ::mtr_milli_sleep(100);
+     Time::HiRes::usleep(100 * 1000);
      $retries--;
   }
 
@@ -170,7 +186,7 @@ sub wait_for_config_generation {
   my $retries = 100;   # 100 retries x 100 ms = 10s
   
   while($retries && ! $ready) {
-    ::mtr_milli_sleep(100);     
+    Time::HiRes::usleep(100 * 1000);
     my %stats = $self->stats("reconf");
     if($stats{"Running"} >= $cf_gen) {
       $ready = $stats{"Running"};
@@ -182,16 +198,39 @@ sub wait_for_config_generation {
   return $ready;
 }
 
+sub ascii_command {
+  my $self = shift;
+  my $packet = shift;
+  my $sock = $self->{connection};
+  my $waitTime = $self->{minWait};
+  my $maxWait = $self->{maxWait};
+  
+  do {
+    $sock->print($packet) || Carp::confess("send error: ". $packet);
+    $self->{error} = $sock->getline();
+    $self->normalize_error();
+    if($self->{error} eq "SERVER_TEMPORARY_ERROR") {
+      if($waitTime < $maxWait) {
+        $self->{temp_errors} += 1;
+        $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000);
+        $waitTime *= 2;
+      }
+      else {
+        $self->fail("Too Many Temporary Errors", $waitTime);
+      }
+    }
+  } while($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime <= $maxWait);
+    
+  return $self->{error};
+}
+
   
 sub delete {
   my $self = shift;
   my $key = shift;
-  my $sock = $self->{connection}; 
-  
-  $sock->print("delete $key\r\n") || Carp::confess "send error";
   
-  $self->{error} = $sock->getline();
-  return $self->{error} =~ "^DELETED" ? 1 : $self->normalize_error();
+  $self->ascii_command("delete $key\r\n");  
+  return ($self->{error} =~ "^DELETED");
 }
 
 
@@ -200,62 +239,49 @@ sub _txt_store {
   my $cmd = shift;
   my $key = shift;
   my $value = shift;
-  my $sock = $self->{connection};
-  
-  $sock->printf("%s %s %d %d %d\r\n%s\r\n",$cmd, $key, 
-                $self->{flags}, $self->{exptime}, length($value), $value);
-  return $sock->getline();
+  my $packet = sprintf("%s %s %d %d %d\r\n%s\r\n",$cmd, $key, $self->{flags}, 
+                       $self->{exptime}, length($value), $value);
+  $self->ascii_command($packet);
+  return ($self->{error} eq "OK");
 }
 
 
 sub set {
-  my ($self, $key, $value) = @_;
-  
-  $self->{error} = $self->_txt_store("set", $key, $value);
-  return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error();
+  my ($self, $key, $value) = @_;  
+  return $self->_txt_store("set", $key, $value);
 }
 
 
 sub add {
-  my ($self, $key, $value) = @_;
-  
-  $self->{error} = $self->_txt_store("add", $key, $value);
-  return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error();
+  my ($self, $key, $value) = @_;  
+  return $self->_txt_store("add", $key, $value);
 }
 
 
 sub append {
-  my ($self, $key, $value) = @_;
-  
-  $self->{error} = $self->_txt_store("append", $key, $value);
-  return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error();
+  my ($self, $key, $value) = @_;  
+  return $self->_txt_store("append", $key, $value);
 }
 
 
 sub prepend {    
-  my ($self, $key, $value) = @_;
-  
-  $self->{error} = $self->_txt_store("prepend", $key, $value);
-  return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error();
+  my ($self, $key, $value) = @_;  
+  return $self->_txt_store("prepend", $key, $value);
 }
 
 
 sub replace {
-  my ($self, $key, $value) = @_;
-  
-  $self->{error} = $self->_txt_store("replace", $key, $value);
-  return $self->{error} =~ "^STORED" ? 1 : $self->normalize_error();
+  my ($self, $key, $value) = @_;  
+  return $self->_txt_store("replace", $key, $value);
 }
 
 
 sub get {
   my $self = shift;
-  my $key = shift;
-  my $sock = $self->{connection};
-  
-  $sock->print("get $key\r\n") || Carp::confess "send error";
-  my $response = $sock->getline();
+  my $key = shift;  
   my $val;
+  my $sock = $self->{connection};
+  my $response =  $self->ascii_command("get $key\r\n");
   
   if ($response =~ /^END/) 
   {
@@ -278,24 +304,16 @@ sub get {
 
 sub _txt_math {
   my ($self, $cmd, $key, $delta) = @_;
-  my $sock = $self->{connection};
+  my $response = $self->ascii_command("$cmd $key $delta \r\n");
   
-  $sock->print("$cmd $key $delta \r\n") || Carp::confess "send error";
-  my $response = $sock->getline();
-  my $val;
-  
-  if ($response =~ "^NOT_FOUND")
-  {
+  if ($response =~ "^NOT_FOUND") {
     $self->{error} = "NOT_FOUND";
     return undef;
   }
-  elsif ($response =~ "ERROR")
-  {
-    $self->{error} = $response;
-    $self->normalize_error();
+  elsif ($response =~ "ERROR") {
     return undef;
   }
-  
+
   $response =~ /(\d+)/;
   return $1;
 }
@@ -336,12 +354,8 @@ sub stats {
 sub flush {
   my $self = shift;
   my $key = shift;
-  my $sock = $self->{connection}; 
-  
-  $sock->print("flush_all\r\n") || Carp::confess "send error";
-  
-  $self->{error} = $sock->getline();
-  return $self->{error} =~ "^OK" ? 1 : $self->normalize_error();
+  my $result = $self->ascii_command("flush_all\r\n");  
+  return ($self->{error} =~ "^OK");
 }
 
 
@@ -357,7 +371,8 @@ sub normalize_error {
   "SERVER_ERROR not my vbucket\r\n"    => "NOT_MY_VBUCKET",
   "SERVER_ERROR out of memory\r\n"     => "SERVER_OUT_OF_MEMORY",
   "SERVER_ERROR not supported\r\n"     => "NOT_SUPPORTED",
-  "SERVER_ERROR internal\r\n"          => "INTERNAL_ERROR"
+  "SERVER_ERROR internal\r\n"          => "INTERNAL_ERROR",
+  "SERVER_ERROR temporary failure\r\n" => "SERVER_TEMPORARY_ERROR"
   );  
   my $norm_error = $error_message{$self->{error}};
   $self->{error} = $norm_error if(defined($norm_error));
@@ -462,6 +477,32 @@ sub get_binary_response {
 }  
 
 
+sub binary_command {
+  my $self = shift;
+  my ($cmd, $key, $value, $extra_header) = @_;
+  my $waitTime = $self->{minWait};
+  my $maxWait = $self->{maxWait};
+  my $status;
+  
+  do {
+    $self->send_binary_request($cmd, $key, $value, $extra_header);  
+    ($status) = $self->get_binary_response();
+    if($status == 0x86) {
+      if($waitTime < $maxWait) {
+        $self->{temp_errors} += 1;
+        $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000);
+        $waitTime *= 2;
+      }
+      else {
+        $self->fail("Too Many Temporary Errors", $waitTime);
+      }
+    }
+  } while($status == 0x86 && $waitTime <= $maxWait);
+
+  return ($status == 0) ? 1 : undef;
+}
+
+
 sub bin_math {
   my $self = shift;
   my ($cmd, $key, $delta, $initial) = @_;
@@ -491,13 +532,10 @@ sub bin_store {
   my $self = shift;
   my $cmd = shift;
   my $key = shift;
-  my $value = shift;
-  
+  my $value = shift;  
   my $extra_header = pack "NN", $self->{flags}, $self->{exptime};
-  $self->send_binary_request($cmd, $key, $value, $extra_header);
   
-  my ($status) = $self->get_binary_response();
-  return ($status == 0) ? 1 : 0;
+  return $self->binary_command($cmd, $key, $value, $extra_header);
 }
 
 
@@ -510,6 +548,7 @@ sub get {
   return ($status == 0) ? $value : undef;
 }
 
+
 sub stats {
   my $self = shift;
   my $key = shift;
@@ -550,23 +589,17 @@ sub replace {
 
 sub append {
   my ($self, $key, $value) = @_;
-  $self->send_binary_request(BIN_CMD_APPEND, $key, $value, '');
-  my ($status) = $self->get_binary_response();
-  return ($status == 0) ? 1 : 0;
+  return $self->binary_command(BIN_CMD_APPEND, $key, $value, '');
 }
 
 sub prepend {
   my ($self, $key, $value) = @_;
-  $self->send_binary_request(BIN_CMD_PREPEND, $key, $value, '');
-  my ($status) = $self->get_binary_response();
-  return ($status == 0) ? 1 : 0;
+  return $self->binary_command(BIN_CMD_PREPEND, $key, $value, '');
 }
 
 sub delete { 
   my ($self, $key) = @_;
-  $self->send_binary_request(BIN_CMD_DELETE, $key, '', '');
-  my ($status, $value) = $self->get_binary_response();
-  return ($status == 0) ? 1 : 0;
+  return $self->binary_command(BIN_CMD_DELETE, $key, '', '');
 }
   
 sub incr {

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2012-07-18 06:54:14 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2012-10-12 04:54:41 +0000
@@ -169,7 +169,10 @@ status_block status_block_too_big = 
   { ENGINE_E2BIG, "Value too large"                   };
 
 status_block status_block_no_mem =
-  { ENGINE_ENOMEM, "NDB out of data memory"           }; 
+  { ENGINE_ENOMEM, "NDB out of data memory"           };
+
+status_block status_block_temp_failure = 
+  { ENGINE_TMPFAIL, "NDB Temporary Error"             };
 
 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {  
   /* Be careful here --  ndbmc_atomic32_t might be a signed type.
@@ -730,6 +733,11 @@ void callback_main(int, NdbTransaction *
     if(wqitem->cas) * wqitem->cas = 0ULL;
     wqitem->status = & status_block_bad_add;    
   }
+  /* Overload Error, e.g. 410 "REDO log files overloaded" */
+  else if(tx->getNdbError().classification == NdbError::OverloadError) {
+    log_ndb_error(tx->getNdbError());
+    wqitem->status = & status_block_temp_failure;
+  }
   /* Attempt to insert via unique index access */
   else if(tx->getNdbError().code == 897) {
     wqitem->status = & status_block_idx_insert;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:4023) John David Duncan12 Oct