List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:March 12 2012 4:34pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4875 to 4876)
Bug#13428909
View as plain text  
 4876 Frazer Clement	2012-03-12
      Bug#13428909 CONFLICT DETECTION OPERATION EXECUTION ERROR HANDLING INCOMPLETE
      
      - Error handling in some conflict detection/resolution scenarios was incomplete
      - This patch improves error handling and reporting
      - Also, reporting of misconfiguration is improved.
      - Infrastructure for manual testing is added, automated testing not setup yet.

    modified:
      mysql-test/suite/ndb_rpl/t/disabled.def
      sql/ha_ndbcluster.cc
      sql/ndb_conflict.cc
      sql/ndb_conflict_trans.cc
      storage/ndb/test/ndbapi/bench/asyncGenerator.cpp
      storage/ndb/test/ndbapi/bench/dbPopulate.cpp
      storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp
      storage/ndb/test/ndbapi/bench/mainPopulate.cpp
      storage/ndb/test/ndbapi/bench/ndb_error.hpp
      storage/ndb/test/ndbapi/bench/testData.h
      storage/ndb/test/ndbapi/bench/userInterface.h
 4875 Jonas Oreland	2012-03-12 [merge]
      ndb - merge 63 to 70

=== modified file 'mysql-test/suite/ndb_rpl/t/disabled.def'
--- a/mysql-test/suite/ndb_rpl/t/disabled.def	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/disabled.def	2012-03-12 15:16:41 +0000
@@ -11,3 +11,4 @@
 ##############################################################################
 
 ndb_rpl_ctype_ucs2_def : bug #34661 rpl_ndb_ctype_ucs2_def fails in 6.2
+ndb_rpl_conflict_load_epoch_trans : Not designed for determinism
\ No newline at end of file

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-03-07 12:19:58 +0000
+++ b/sql/ha_ndbcluster.cc	2012-03-12 15:16:41 +0000
@@ -863,6 +863,8 @@ check_completed_operations_pre_commit(Th
     const bool op_has_conflict_detection = (first->getCustomData() != NULL);
     if (!op_has_conflict_detection)
     {
+      DBUG_ASSERT(err.code != (int) error_op_after_refresh_op);
+
       /* 'Normal path' - ignore key (not) present, others are errors */
       if (err.classification != NdbError::NoError &&
           err.classification != NdbError::ConstraintViolation &&
@@ -909,15 +911,18 @@ check_completed_operations_pre_commit(Th
   {
     const NdbOperation* last_conflict_op = trans->getLastDefinedOperation();
 
+    NdbError nonMaskedError;
+    assert(nonMaskedError.code == 0);
+
     if (trans->execute(NdbTransaction::NoCommit,
                        NdbOperation::AO_IgnoreError,
                        thd_ndb->m_force_send))
     {
-      abort();
-      //err= trans->getNdbError();
+      /* Transaction execute failed, even with IgnoreError... */
+      nonMaskedError = trans->getNdbError();
+      assert(nonMaskedError.code != 0);
     }
-
-    if (trans->getNdbError().code)
+    else if (trans->getNdbError().code)
     {
       /* Check the result codes of the operations we added */
       const NdbOperation* conflict_op = NULL;
@@ -932,26 +937,34 @@ check_completed_operations_pre_commit(Th
         if ((err.code != 0) &&
             (err.code != (int) error_op_after_refresh_op))
         {
-          if (err.status == NdbError::TemporaryError)
-          {
-            /* Slave will roll back and retry entire transaction. */
-            ERR_RETURN(err);
-          }
-          else
-          {
-            char msg[FN_REFLEN];
-            my_snprintf(msg, sizeof(msg), "Executing extra operations for "
-                        "conflict handling hit Ndb error %d '%s'",
-                        err.code, err.message);
-            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                ER_EXCEPTIONS_WRITE_ERROR,
-                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-            /* Slave will stop replication. */
-            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-          }
+          /* Found a real error, break out and handle it */
+          nonMaskedError = err;
+          break;
         }
       } while (conflict_op != last_conflict_op);
     }
+
+    /* Handle errors with extra conflict handling operations */
+    if (nonMaskedError.code != 0)
+    {
+      if (nonMaskedError.status == NdbError::TemporaryError)
+      {
+        /* Slave will roll back and retry entire transaction. */
+        ERR_RETURN(nonMaskedError);
+      }
+      else
+      {
+        char msg[FN_REFLEN];
+        my_snprintf(msg, sizeof(msg), "Executing extra operations for "
+                    "conflict handling hit Ndb error %d '%s'",
+                    nonMaskedError.code, nonMaskedError.message);
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_EXCEPTIONS_WRITE_ERROR,
+                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+        /* Slave will stop replication. */
+        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+      }
+    }
   }
 #endif
   DBUG_RETURN(0);
@@ -1016,11 +1029,11 @@ ha_ndbcluster::release_completed_operati
   trans->releaseCompletedOperations();
 }
 
-int execute_no_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_no_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                       bool ignore_no_key,
                       uint *ignore_count= 0);
 inline
-int execute_no_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_no_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                       bool ignore_no_key,
                       uint *ignore_count)
 {
@@ -1031,18 +1044,35 @@ int execute_no_commit(Thd_ndb *thd_ndb,
   thd_ndb->m_execute_count++;
   thd_ndb->m_unsent_bytes= 0;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
-  if (trans->execute(NdbTransaction::NoCommit,
-                     NdbOperation::AO_IgnoreError,
-                     thd_ndb->m_force_send))
-  {
-    DBUG_RETURN(-1);
-  }
-  if (!ignore_no_key || trans->getNdbError().code == 0)
-    DBUG_RETURN(trans->getNdbError().code);
-
-  DBUG_RETURN(check_completed_operations_pre_commit(thd_ndb, trans,
-                                                    first, last,
-                                                    ignore_count));
+  int rc= 0;
+  do
+  {
+    if (trans->execute(NdbTransaction::NoCommit,
+                       NdbOperation::AO_IgnoreError,
+                       thd_ndb->m_force_send))
+    {
+      rc= -1;
+      break;
+    }
+    if (!ignore_no_key || trans->getNdbError().code == 0)
+    {
+      rc= trans->getNdbError().code;
+      break;
+    }
+
+    rc = check_completed_operations_pre_commit(thd_ndb, trans,
+                                               first, last,
+                                               ignore_count);
+  } while (0);
+
+  if (unlikely(thd->slave_thread &&
+               rc != 0))
+  {
+    g_ndb_slave_state.atTransactionAbort();
+  }
+
+  DBUG_PRINT("info", ("execute_no_commit rc is %d", rc));
+  DBUG_RETURN(rc);
 }
 
 int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
@@ -1067,21 +1097,40 @@ int execute_commit(THD* thd, Thd_ndb *th
   thd_ndb->m_execute_count++;
   thd_ndb->m_unsent_bytes= 0;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
-  if (trans->execute(NdbTransaction::Commit, ao, force_send))
+  int rc= 0;
+  do
   {
-    if (thd->slave_thread)
-      g_ndb_slave_state.atTransactionAbort();
-    DBUG_RETURN(-1);
-  }
-  /* Success of some sort */
+    if (trans->execute(NdbTransaction::Commit, ao, force_send))
+    {
+      rc= -1;
+      break;
+    }
+
+    if (!ignore_error || trans->getNdbError().code == 0)
+    {
+      rc= trans->getNdbError().code;
+      break;
+    }
+
+    rc= check_completed_operations(thd_ndb, trans, first, last,
+                                   ignore_count);
+  } while (0);
+
   if (thd->slave_thread)
   {
-    g_ndb_slave_state.atTransactionCommit();
+    if (likely(rc == 0))
+    {
+      /* Success */
+      g_ndb_slave_state.atTransactionCommit();
+    }
+    else
+    {
+      g_ndb_slave_state.atTransactionAbort();
+    }
   }
-  if (!ignore_error || trans->getNdbError().code == 0)
-    DBUG_RETURN(trans->getNdbError().code);
-  DBUG_RETURN(check_completed_operations(thd_ndb, trans, first, last,
-                                         ignore_count));
+
+  DBUG_PRINT("info", ("execute_commit rc is %d", rc));
+  DBUG_RETURN(rc);
 }
 
 inline
@@ -3045,7 +3094,7 @@ int ha_ndbcluster::ndb_pk_update_row(THD
       if (error != 0)
         ERR_RETURN(op->getNdbError());
     }
-    if (execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0) 
+    if (execute_no_commit(thd, m_thd_ndb, trans, m_ignore_no_key) != 0)
     {
       table->status= STATUS_NOT_FOUND;
       DBUG_RETURN(ndb_err(trans));
@@ -3411,7 +3460,7 @@ inline int ha_ndbcluster::fetch_next(Ndb
     */
     if (m_thd_ndb->m_unsent_bytes && m_blobs_pending)
     {
-      if (execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0)
+      if (execute_no_commit(table->in_use, m_thd_ndb, trans, m_ignore_no_key) != 0)
         DBUG_RETURN(ndb_err(trans));
     }
     
@@ -3760,7 +3809,7 @@ int ha_ndbcluster::ordered_index_scan(co
 
   m_active_cursor= op;
 
-  if (execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0)
+  if (execute_no_commit(table->in_use, m_thd_ndb, trans, m_ignore_no_key) != 0)
     DBUG_RETURN(ndb_err(trans));
   
   DBUG_RETURN(next_result(buf));
@@ -3905,7 +3954,7 @@ int ha_ndbcluster::full_table_scan(const
       get_blob_values(op, NULL, table->read_set) != 0)
     ERR_RETURN(op->getNdbError());
 
-  if (execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0)
+  if (execute_no_commit(table->in_use, m_thd_ndb, trans, m_ignore_no_key) != 0)
     DBUG_RETURN(ndb_err(trans));
   DBUG_PRINT("exit", ("Scan started successfully"));
   DBUG_RETURN(next_result(buf));
@@ -4100,6 +4149,15 @@ ha_ndbcluster::prepare_conflict_detectio
   conflict_handled = false;
 
   /*
+    Special check for apply_status table, as we really don't want
+    to do any special handling with it
+  */
+  if (unlikely(m_share == ndb_apply_status_share))
+  {
+    DBUG_RETURN(0);
+  }
+
+  /*
      Check transaction id first, as in transactional conflict detection,
      the transaction id is what eventually dictates whether an operation
      is applied or not.
@@ -4182,6 +4240,8 @@ ha_ndbcluster::prepare_conflict_detectio
                       "events received without transaction ids.  Check --ndb-log-transaction-id setting "
                       "on upstream Cluster.",
                       m_share->key);
+    /* This is a user error, but we want them to notice, so treat seriously */
+    DBUG_RETURN( ER_SLAVE_CORRUPT_EVENT );
   }
 
   /*
@@ -5133,14 +5193,14 @@ int ha_ndbcluster::exec_bulk_update(uint
   }
 
   uint ignore_count= 0;
-  if (execute_no_commit(m_thd_ndb, trans,
+  THD *thd= table->in_use;
+  if (execute_no_commit(thd, m_thd_ndb, trans,
                         m_ignore_no_key || m_read_before_write_removal_used,
                         &ignore_count) != 0)
   {
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
-  THD *thd= table->in_use;
   if (!thd->slave_thread)
   {
     assert(m_rows_changed >= ignore_count);
@@ -5452,7 +5512,7 @@ int ha_ndbcluster::ndb_update_row(const
       !(cursor || (batch_allowed && have_pk)) ||
       need_flush)
   {
-    if (execute_no_commit(m_thd_ndb, trans,
+    if (execute_no_commit(thd, m_thd_ndb, trans,
                           m_ignore_no_key || m_read_before_write_removal_used,
                           &ignore_count) != 0)
     {
@@ -5560,7 +5620,8 @@ int ha_ndbcluster::end_bulk_delete()
   }
 
   uint ignore_count= 0;
-  if (execute_no_commit(m_thd_ndb, trans,
+  THD *thd= table->in_use;
+  if (execute_no_commit(thd, m_thd_ndb, trans,
                         m_ignore_no_key || m_read_before_write_removal_used,
                         &ignore_count) != 0)
   {
@@ -5568,7 +5629,6 @@ int ha_ndbcluster::end_bulk_delete()
     DBUG_RETURN(ndb_err(trans));
   }
 
-  THD *thd= table->in_use;
   if (!thd->slave_thread)
   {
     assert(m_rows_deleted >= ignore_count);
@@ -5770,7 +5830,7 @@ int ha_ndbcluster::ndb_delete_row(const
 
   // Execute delete operation
   uint ignore_count= 0;
-  if (execute_no_commit(m_thd_ndb, trans,
+  if (execute_no_commit(thd, m_thd_ndb, trans,
                         m_ignore_no_key || m_read_before_write_removal_used,
                         &ignore_count) != 0)
   {
@@ -6340,7 +6400,7 @@ int ha_ndbcluster::close_scan()
     */
     DBUG_PRINT("info", ("thd_ndb->m_unsent_bytes: %ld",
                         (long) m_thd_ndb->m_unsent_bytes));    
-    if (execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0)
+    if (execute_no_commit(table->in_use, m_thd_ndb, trans, m_ignore_no_key) != 0)
     {
       no_uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
@@ -6878,7 +6938,7 @@ ha_ndbcluster::flush_bulk_insert(bool al
   if (! (m_thd_ndb->trans_options & TNTO_TRANSACTIONS_OFF))
   {
     if (!allow_batch &&
-        execute_no_commit(m_thd_ndb, trans, m_ignore_no_key) != 0)
+        execute_no_commit(table->in_use, m_thd_ndb, trans, m_ignore_no_key) != 0)
     {
       no_uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
@@ -7683,7 +7743,7 @@ int ndbcluster_commit(handlerton *hton,
     if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
     {
       if (thd_ndb->m_unsent_bytes)
-        res = execute_no_commit(thd_ndb, trans, TRUE);
+        res = execute_no_commit(thd, thd_ndb, trans, TRUE);
     }
 
     if (likely(res == 0))

=== modified file 'sql/ndb_conflict.cc'
--- a/sql/ndb_conflict.cc	2012-01-31 13:12:32 +0000
+++ b/sql/ndb_conflict.cc	2012-03-12 15:16:41 +0000
@@ -247,6 +247,12 @@ st_ndb_slave_state::resetPerAttemptCount
 void
 st_ndb_slave_state::atTransactionAbort()
 {
+#ifdef HAVE_NDB_BINLOG
+  /* Reset any gathered transaction dependency information */
+  atEndTransConflictHandling();
+  trans_conflict_apply_state = SAS_NORMAL;
+#endif
+
   /* Reset current-transaction counters + state */
   resetPerAttemptCounters();
 }

=== modified file 'sql/ndb_conflict_trans.cc'
--- a/sql/ndb_conflict_trans.cc	2011-09-07 22:50:01 +0000
+++ b/sql/ndb_conflict_trans.cc	2012-03-12 15:16:41 +0000
@@ -401,6 +401,8 @@ track_operation(const NdbDictionary::Tab
                          packed_key_buff,
                          required_buff_size))
   {
+    if (!error_text)
+      error_text="track_operation : Failed packing key";
     DBUG_RETURN(-1);
   }
 
@@ -445,6 +447,8 @@ track_operation(const NdbDictionary::Tab
       */
       existing->updateRowTransactionId(newTransIdOnRow);
 
+      assert(res == 0 || error_text != NULL);
+
       DBUG_RETURN(res);
     }
     else
@@ -453,10 +457,18 @@ track_operation(const NdbDictionary::Tab
          How can we have two updates to the same row with the
          same transaction id?  Only if the transaction id
          is invalid (e.g. not set)
+         In normal cases with only one upstream master, each
+         distinct master user transaction will have a unique
+         id, and all operations on a row in that transaction
+         will be merged in TUP prior to emitting a SUMA
+         event.
+         This could be relaxed for more complex upstream
+         topologies, but acts as a sanity guard currently.
       */
       if (existingTransIdOnRow != InvalidTransactionId)
       {
         assert(false);
+        error_text= "Two row operations to same key sharing user transaction id";
         DBUG_RETURN(-1);
       }
     }

=== modified file 'storage/ndb/test/ndbapi/bench/asyncGenerator.cpp'
--- a/storage/ndb/test/ndbapi/bench/asyncGenerator.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/bench/asyncGenerator.cpp	2012-03-12 15:16:41 +0000
@@ -27,6 +27,8 @@
 #include <NdbOut.hpp>
 #include <NdbSleep.h>
 
+void getRandomSubscriberNumber(SubscriberNumber number);
+
 /***************************************************************
 * L O C A L   C O N S T A N T S                                *
 ***************************************************************/
@@ -39,7 +41,6 @@
 * L O C A L   F U N C T I O N S                                *
 ***************************************************************/
 
-static void getRandomSubscriberNumber(SubscriberNumber number);
 static void getRandomServerId(ServerId *serverId);
 static void getRandomChangedBy(ChangedBy changedBy);
 //static void getRandomChangedTime(ChangedTime changedTime);
@@ -88,12 +89,20 @@ static unsigned maxsize = 0;
 * L O C A L   F U N C T I O N S   C O D E   S E C T I O N      *
 ****************************************************************
 ***************************************************************/
+#ifdef	__cplusplus
+extern "C" {
+#endif
+extern int subscriberCount;
+#ifdef	__cplusplus
+}
+#endif
+
 
-static void getRandomSubscriberNumber(SubscriberNumber number)
+void getRandomSubscriberNumber(SubscriberNumber number)
 {
    uint32 tmp;
    char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
-   tmp = myRandom48(NO_OF_SUBSCRIBERS);
+   tmp = myRandom48(subscriberCount);
    sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
    memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
 }

=== modified file 'storage/ndb/test/ndbapi/bench/dbPopulate.cpp'
--- a/storage/ndb/test/ndbapi/bench/dbPopulate.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/bench/dbPopulate.cpp	2012-03-12 15:16:41 +0000
@@ -238,9 +238,17 @@ static void populateGroups(UserHandle *u
 ****************************************************************
 ***************************************************************/
 
+#ifdef	__cplusplus
+extern "C" {
+#endif
+  extern int subscriberCount;
+#ifdef	__cplusplus
+}
+#endif
+
 void dbPopulate(UserHandle *uh)
 {
    populate("servers", NO_OF_SERVERS, populateServers, uh);
-   populate("subscribers", NO_OF_SUBSCRIBERS, populateSubscribers, uh);
+   populate("subscribers", subscriberCount, populateSubscribers, uh);
    populate("groups", NO_OF_GROUPS, populateGroups, uh);
 }

=== modified file 'storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp'
--- a/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp	2011-01-30 20:56:00 +0000
+++ b/storage/ndb/test/ndbapi/bench/mainAsyncGenerator.cpp	2012-03-12 15:16:41 +0000
@@ -39,6 +39,8 @@ static int   minEventSendPoll;
 static int   forceSendPoll;
 static bool  useNdbRecord;
 static bool  useCombUpd;
+int          subscriberCount;
+static bool  robustMode;
 
 static ThreadData *data;
 static Ndb_cluster_connection *g_cluster_connection= 0;
@@ -60,7 +62,7 @@ static void usage(const char *prog)
 
    ndbout_c(
            "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>]" 
-	   "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ]\n"
+	   "[-t <num> ] [ -e <num> ] [ -f <num>] [ -ndbrecord ] [ -s <num>]\n"
            "  -proc <num>    Specifies that <num> is the number of\n"
            "                 threads. The default is 1.\n"
            "  -time <num>    Specifies that the test will run for <num> sec.\n"
@@ -78,8 +80,10 @@ static void usage(const char *prog)
            "  -ndbrecord     Use NdbRecord Api.\n"
            "                 Default is to use old Api\n"
            "  -combupdread   Use update pre-read operation where possible\n"
-           "                 Default is to use separate read+update ops\n",
-           progname);
+           "                 Default is to use separate read+update ops\n"
+           "  -s <num>       Number of subscribers to operate on, default is %u.\n"
+           "  -r             Whether to be robust to key errors\n",
+           progname, NO_OF_SUBSCRIBERS);
 }
 
 static
@@ -97,6 +101,8 @@ parse_args(int argc, const char **argv)
    forceSendPoll    = 0;
    useNdbRecord     = false;
    useCombUpd       = false;
+   subscriberCount  = NO_OF_SUBSCRIBERS;
+   robustMode       = false;
 
    i = 1;
    while (i < argc){
@@ -176,6 +182,20 @@ parse_args(int argc, const char **argv)
        useCombUpd= true;
        i++;
      }
+     else if (strcmp("-s", argv[i]) == 0) {
+       if (i + 1 >= argc) {
+         return 1;
+       }
+       if (sscanf(argv[i+1], "%u", &subscriberCount) == -1) {
+         ndbout_c("-s flag requires a positive argument.");
+         return 1;
+       }
+       i+=2;
+     }
+     else if (strcmp("-r", argv[i]) == 0) {
+       robustMode= true;
+       i++;
+     }
      else {
        return 1;
      }
@@ -548,6 +568,7 @@ NDB_COMMAND(DbAsyncGenerator, "DbAsyncGe
       data[tid].runState              = Runnable;
       data[tid].ndbRecordSharedData   = ndbRecordSharedDataPtr;
       data[tid].useCombinedUpdate     = useCombUpd;
+      data[tid].robustMode            = robustMode;
     }
     sprintf(threadName, "AsyncThread[%d]", i);
     pThread = NdbThread_Create(threadRoutine, 

=== modified file 'storage/ndb/test/ndbapi/bench/mainPopulate.cpp'
--- a/storage/ndb/test/ndbapi/bench/mainPopulate.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/bench/mainPopulate.cpp	2012-03-12 15:16:41 +0000
@@ -30,6 +30,7 @@
 extern "C" {
 #endif
 int useTableLogging;
+int subscriberCount;
 #ifdef	__cplusplus
 }
 #endif
@@ -40,9 +41,10 @@ void usage(const char *prog)
 {
   
   ndbout_c(
-	   "Usage: %s [-l]\n"
-	   "  -l                  Use logging and checkpointing on tables\n",
-	   prog);
+	   "Usage: %s [-l][-s <count>]\n"
+	   "  -l                  Use logging and checkpointing on tables\n"
+           "  -s <count>          Number of subscribers to populate, default %u\n",
+	   prog, NO_OF_SUBSCRIBERS);
   
   exit(1);
 }
@@ -54,11 +56,23 @@ NDB_COMMAND(DbCreate, "DbCreate", "DbCre
   UserHandle *uh;
   
   useTableLogging = 0;
-  
+  subscriberCount = NO_OF_SUBSCRIBERS;
+
   for(i = 1; i<argc; i++){
     if(strcmp(argv[i], "-l") == 0){
       useTableLogging = 1;
-    } else {
+    }
+    else if (strcmp(argv[i], "-s") == 0)
+    {
+      if ((i + 1 >= argc) ||
+          (sscanf(argv[i+1], "%u", &subscriberCount) == -1))
+      {
+        usage(argv[0]);
+        return 0;
+      }
+      i++;
+    }
+    else {
       usage(argv[0]);
       return 0;
     }
@@ -66,6 +80,8 @@ NDB_COMMAND(DbCreate, "DbCreate", "DbCre
 
   ndbout_c("Using %s tables",
 	   useTableLogging ? "logging" : "temporary");
+  ndbout_c("Populating %u subscribers",
+           subscriberCount);
   
   myRandom48Init(0x3e6f);
   

=== modified file 'storage/ndb/test/ndbapi/bench/ndb_error.hpp'
--- a/storage/ndb/test/ndbapi/bench/ndb_error.hpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/ndbapi/bench/ndb_error.hpp	2012-03-12 15:16:41 +0000
@@ -45,6 +45,23 @@ CHECK_ALLOWED_ERROR(const char * str,
   ndbout << str << " " << error << endl
 	 << buf;
   showTime();
+
+  if (td->robustMode)
+  {
+    if ((error.code == 626) || /* NoDataFound */
+        (error.code == 630))   /* Tuple already existed */
+    {
+      /* Problem with a specific tuple, try a different one to
+       * avoid getting stuck
+       */
+      ThreadData* nctd = (ThreadData*) td;
+      getRandomSubscriberNumber(nctd->transactionData.number);
+      ndbout << "Problem with subscriber, changing to "
+             << td->transactionData.number
+             << endl;
+      return;
+    }
+  }
   
   switch(error.classification) { 
   case NdbError::TimeoutExpired:  

=== modified file 'storage/ndb/test/ndbapi/bench/testData.h'
--- a/storage/ndb/test/ndbapi/bench/testData.h	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/bench/testData.h	2012-03-12 15:16:41 +0000
@@ -156,6 +156,7 @@ typedef struct {
   class Ndb           * pNDB;
   NdbRecordSharedData*  ndbRecordSharedData;
   bool                  useCombinedUpdate;
+  bool                  robustMode;
 } ThreadData;
 
 /***************************************************************

=== modified file 'storage/ndb/test/ndbapi/bench/userInterface.h'
--- a/storage/ndb/test/ndbapi/bench/userInterface.h	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/bench/userInterface.h	2012-03-12 15:16:41 +0000
@@ -57,6 +57,8 @@ extern "C" {
   extern Ndb   *asyncDbConnect(int parallellism);
   extern void    asyncDbDisconnect(Ndb* pNDB);
 
+  extern void getRandomSubscriberNumber(SubscriberNumber number);
+
   extern void start_T1(Ndb * uh, ThreadData * data, int async);
   extern void start_T2(Ndb * uh, ThreadData * data, int async);
   extern void start_T3(Ndb * uh, ThreadData * data, int async);

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4875 to 4876)Bug#13428909Frazer Clement12 Mar