List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:March 27 2012 11:21am
Subject:bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4906 to 4907)
Bug#12319939
View as plain text  
 4907 Jonas Oreland	2012-03-27
      ndb - bug#12319939 - commit for CluB

    modified:
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
      storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
      storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
      storage/ndb/src/ndbapi/ndb_internal.cpp
      storage/ndb/src/ndbapi/ndb_internal.hpp
 4906 magnus.blaudd@stripped	2012-03-27 [merge]
      Merge

    modified:
      mysql-test/suite/ndb/include/ndb_share_check_shares.inc
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb/r/ndb_ddl_open_trans.result
      mysql-test/suite/ndb/r/ndb_index_stat_restart.result
      mysql-test/suite/ndb/r/ndb_share.result
      mysql-test/suite/ndb/t/ndb_basic.test
      mysql-test/suite/ndb/t/ndb_ddl_open_trans.test
      mysql-test/suite/ndb/t/ndb_index_stat_restart.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_ddl_open_trans.result
      sql/ha_ndbcluster.cc
      storage/ndb/src/mgmsrv/ConfigManager.cpp
      storage/ndb/test/ndbapi/testMgmd.cpp
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2012-02-17 08:03:56 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2012-03-27 11:21:10 +0000
@@ -33,6 +33,11 @@
 #include <portlib/NdbTick.h>
 #include "ndb_table_guard.h"
 
+/**
+ * try_wakeup_event_poller
+ */
+#include "../storage/ndb/src/ndbapi/ndb_internal.hpp"
+
 /*
   defines for cluster replication table names
 */
@@ -125,6 +130,13 @@ extern THD * injector_thd; // Declared i
 static Ndb *injector_ndb= 0;
 static Ndb *schema_ndb= 0;
 
+/**
+ * This function is used by client thread to lock the injector mutex
+ *   for create/dropEventOperation...but it also wakeups
+ *   the injector thread if that has the mutex, and is waiting in pollEvents
+ */
+static pthread_mutex_t & wakeup_and_lock(pthread_mutex_t&, Ndb*);
+
 static int ndbcluster_binlog_inited= 0;
 /*
   Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
@@ -155,6 +167,12 @@ static ulonglong ndb_latest_applied_binl
 static ulonglong ndb_latest_handled_binlog_epoch= 0;
 static ulonglong ndb_latest_received_binlog_epoch= 0;
 
+/* Condition variable to wait for ndb_latest_handled_binlog_epoch*/
+static pthread_mutex_t wait_gci_mutex;
+static pthread_cond_t wait_gci_cond;
+static bool ndb_latest_handled_binlog_epoch_waiter= FALSE;
+
+
 NDB_SHARE *ndb_apply_status_share= 0;
 NDB_SHARE *ndb_schema_share= 0;
 pthread_mutex_t ndb_schema_share_mutex;
@@ -567,6 +585,18 @@ int ndbcluster_binlog_init_share(THD *th
   functions called from master sql client threads
 ****************************************************************/
 
+static void set_ndb_latest_handled_binlog_epoch(ulonglong val)
+{
+  pthread_mutex_lock(&wait_gci_mutex);
+  ndb_latest_handled_binlog_epoch= val;
+  if (ndb_latest_handled_binlog_epoch_waiter)
+  {
+    ndb_latest_handled_binlog_epoch_waiter= FALSE;
+    pthread_cond_signal(&wait_gci_cond);
+  }
+  pthread_mutex_unlock(&wait_gci_mutex);
+}
+
 /*
   called in mysql_show_binlog_events and reset_logs to make sure we wait for
   all events originating from this mysql server to arrive in the binlog
@@ -587,22 +617,29 @@ static void ndbcluster_binlog_wait(THD *
     */
     if (!wait_epoch)
       DBUG_VOID_RETURN;
+
+    Ndb *ndb= check_ndb_in_thd(thd);
+    NDBDICT *dict= ndb->getDictionary();
+
     const char *save_info= thd ? thd->proc_info : 0;
     int count= 30;
     if (thd)
       thd->proc_info= "Waiting for ndbcluster binlog update to "
 	"reach current position";
-    pthread_mutex_lock(&injector_mutex);
+    pthread_mutex_lock(&wait_gci_mutex);
     while (!(thd && thd->killed) && count && ndb_binlog_running &&
            (ndb_latest_handled_binlog_epoch == 0 ||
             ndb_latest_handled_binlog_epoch < wait_epoch))
     {
+      ndb_latest_handled_binlog_epoch_waiter= TRUE;
       count--;
       struct timespec abstime;
       set_timespec(abstime, 1);
-      pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
+      dict->forceGCPWait(1);
+      pthread_cond_timedwait(&wait_gci_cond, &wait_gci_mutex, &abstime);
     }
-    pthread_mutex_unlock(&injector_mutex);
+    pthread_mutex_unlock(&wait_gci_mutex);
+
     if (thd)
       thd->proc_info= save_info;
     DBUG_VOID_RETURN;
@@ -894,6 +931,8 @@ int ndbcluster_binlog_end(THD *thd)
     pthread_mutex_destroy(&injector_mutex);
     pthread_cond_destroy(&injector_cond);
     pthread_mutex_destroy(&ndb_schema_share_mutex);
+    pthread_mutex_destroy(&wait_gci_mutex);
+    pthread_cond_destroy(&wait_gci_cond);
   }
 
   DBUG_RETURN(0);
@@ -2563,6 +2602,16 @@ end:
   DBUG_RETURN(0);
 }
 
+static
+pthread_mutex_t &
+wakeup_and_lock(pthread_mutex_t& injector_mutex, Ndb * injector_ndb)
+{
+  Ndb_internal::block_and_try_wakeup_event_poller(injector_ndb);
+  pthread_mutex_lock(&injector_mutex);
+  Ndb_internal::unblock_event_poller(injector_ndb);
+  return injector_mutex;
+}
+
 /*
   Handle _non_ data events from the storage nodes
 */
@@ -2650,10 +2699,12 @@ ndb_handle_schema_change(THD *thd, Ndb *
     pOp->setCustomData(NULL);
   }
 
-  pthread_mutex_lock(&injector_mutex);
-  is_ndb->dropEventOperation(pOp);
-  pOp= 0;
-  pthread_mutex_unlock(&injector_mutex);
+
+  {
+    Mutex_guard g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE);
+    is_ndb->dropEventOperation(pOp);
+    pOp= 0;
+  }
 
   if (do_close_cached_tables)
   {
@@ -3255,7 +3306,7 @@ ndb_binlog_thread_handle_schema_event_po
               delete event_data;
             share->op->setCustomData(NULL);
             {
-              Mutex_guard injector_mutex_g(injector_mutex);
+              Mutex_guard g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE);
               injector_ndb->dropEventOperation(share->op);
             }
             share->op= 0;
@@ -3421,7 +3472,7 @@ ndb_binlog_thread_handle_schema_event_po
               delete event_data;
             share->op->setCustomData(NULL);
             {
-              Mutex_guard injector_mutex_g(injector_mutex);
+              Mutex_guard injector_mutex_g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE);
               injector_ndb->dropEventOperation(share->op);
             }
             share->op= share->new_op;
@@ -3692,6 +3743,10 @@ int ndbcluster_binlog_start()
 
   pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&injector_cond, NULL);
+
+  pthread_mutex_init(&wait_gci_mutex, MY_MUTEX_INIT_FAST);
+  pthread_cond_init(&wait_gci_cond, NULL);
+
   pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
 
   /* Create injector thread */
@@ -5303,7 +5358,7 @@ ndbcluster_create_event_ops(THD *thd, ND
   int retry_sleep= 100;
   while (1)
   {
-    Mutex_guard injector_mutex_g(injector_mutex);
+    Mutex_guard injector_mutex_g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE);
     Ndb *ndb= injector_ndb;
     if (do_ndb_schema_share)
       ndb= schema_ndb;
@@ -6778,6 +6833,7 @@ restart_cluster_failure:
       {
         if (ndbcluster_binlog_terminating)
           goto err;
+        Mutex_guard injector_mutex_g(injector_mutex);
         res= i_ndb->pollEvents(10, &gci);
       }
       if (gci > schema_gci)
@@ -6802,7 +6858,7 @@ restart_cluster_failure:
                         (uint)(schema_gci >> 32),
                         (uint)(schema_gci));
         ndb_set_latest_trans_gci(0);
-        ndb_latest_handled_binlog_epoch= 0;
+        set_ndb_latest_handled_binlog_epoch(0);
         ndb_latest_applied_binlog_epoch= 0;
         ndb_latest_received_binlog_epoch= 0;
         ndb_index_stat_restart();
@@ -6879,10 +6935,15 @@ restart_cluster_failure:
     int res= 0, tot_poll_wait= 1000;
     if (ndb_binlog_running)
     {
+      Mutex_guard injector_mutex_g(injector_mutex);
       res= i_ndb->pollEvents(tot_poll_wait, &gci);
       tot_poll_wait= 0;
     }
-    int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
+    int schema_res;
+    {
+      Mutex_guard injector_mutex_g(injector_mutex);
+      schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
+    }
     ndb_latest_received_binlog_epoch= gci;
 
     while (gci > schema_gci && schema_res >= 0)
@@ -6970,7 +7031,12 @@ restart_cluster_failure:
         e.g. node failure events
       */
       Uint64 tmp_gci;
-      if (i_ndb->pollEvents(0, &tmp_gci))
+      int res = 0;
+      {
+        Mutex_guard injector_mutex_g(injector_mutex);
+        res = i_ndb->pollEvents(0, &tmp_gci);
+      }
+      if (res)
       {
         NdbEventOperation *pOp;
         while ((pOp= i_ndb->nextEvent()))
@@ -7293,7 +7359,7 @@ restart_cluster_failure:
           ndb_latest_applied_binlog_epoch= gci;
           break;
         }
-        ndb_latest_handled_binlog_epoch= gci;
+        set_ndb_latest_handled_binlog_epoch(gci);
 
 #ifdef RUN_NDB_BINLOG_TIMER
         gci_timer.stop();
@@ -7326,7 +7392,7 @@ restart_cluster_failure:
                                                      &post_epoch_unlock_list);
     free_root(&mem_root, MYF(0));
     *root_ptr= old_root;
-    ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
+    set_ndb_latest_handled_binlog_epoch(ndb_latest_received_binlog_epoch);
   }
  err:
   if (do_ndbcluster_binlog_close_connection != BCCC_restart)

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2012-01-25 17:13:13 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2012-03-27 11:21:10 +0000
@@ -104,7 +104,16 @@ public:
   Mutex_guard(pthread_mutex_t &mutex) : m_mutex(mutex)
   {
     pthread_mutex_lock(&m_mutex);
-  };
+  }
+
+  Mutex_guard(pthread_mutex_t &mutex, bool already_locked) : m_mutex(mutex)
+  {
+    if (!already_locked)
+    {
+      pthread_mutex_lock(&m_mutex);
+    }
+  }
+
   ~Mutex_guard()
   {
     pthread_mutex_unlock(&m_mutex);

=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2012-03-27 11:21:10 +0000
@@ -1123,11 +1123,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb)
   m_flush_gci = 0;
 #endif
 
-  if ((p_cond = NdbCondition_Create()) ==  NULL) {
-    ndbout_c("NdbEventHandle: NdbCondition_Create() failed");
-    exit(-1);
-  }
   m_mutex = 0; // Set in Ndb::init()
+  m_wakeup = 0;
 
   // ToDo set event buffer size
   // pre allocate event data array
@@ -1179,8 +1176,6 @@ NdbEventBuffer::~NdbEventBuffer()
     }
     NdbMem_Free((char*)m_allocated_data[j]);
   }
-
-  NdbCondition_Destroy(p_cond);
 }
 
 void
@@ -1251,14 +1246,17 @@ NdbEventBuffer::pollEvents(int aMillisec
   m_latest_command= "NdbEventBuffer::pollEvents";
 #endif
 
-  NdbMutex_Lock(m_mutex);
+  m_ndb->theImpl->start_poll();
+  Uint64 tmpGCI = m_latestGCI;
   NdbEventOperationImpl *ev_op= move_data();
-  if (unlikely(ev_op == 0 && aMillisecondNumber))
+  while (ev_op == 0 && aMillisecondNumber && m_wakeup == 0 &&
+         tmpGCI == m_latestGCI)
   {
-    NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
+    int sleepTime = aMillisecondNumber > 10 ? 10 : aMillisecondNumber;
+    m_ndb->theImpl->do_poll(sleepTime);
+    aMillisecondNumber -= sleepTime;
+
     ev_op= move_data();
-    if (unlikely(ev_op == 0))
-      ret= 0;
   }
   m_latest_poll_GCI= m_latestGCI;
 #ifdef VM_TRACE
@@ -1279,7 +1277,10 @@ NdbEventBuffer::pollEvents(int aMillisec
     */
     deleteUsedEventOperations(m_latest_poll_GCI);
   }
-  NdbMutex_Unlock(m_mutex); // we have moved the data
+  m_ndb->theImpl->complete_poll();
+
+  if (unlikely(ev_op == 0))
+    ret= 0;
 
   if (latestGCI)
     *latestGCI= m_latest_poll_GCI;
@@ -1287,6 +1288,19 @@ NdbEventBuffer::pollEvents(int aMillisec
   return ret;
 }
 
+void
+NdbEventBuffer::block_and_try_wakeup_event_poller()
+{
+  m_wakeup = 1;
+  m_ndb->theImpl->m_transporter_facade->requestWakeup();
+}
+
+void
+NdbEventBuffer::unblock_event_poller()
+{
+  m_wakeup = 0;
+}
+
 int
 NdbEventBuffer::flushIncompleteEvents(Uint64 gci)
 {
@@ -2021,8 +2035,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP
       }
 
       // signal that somethings happened
-
-      NdbCondition_Signal(p_cond);
+      m_ndb-> theImpl->wakeup();
     }
     else
     {
@@ -2315,7 +2328,7 @@ NdbEventBuffer::set_total_buckets(Uint32
   }
   if (found)
   {
-    NdbCondition_Signal(p_cond);
+    m_ndb->theImpl->wakeup();
   }
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2012-01-16 07:14:30 +0000
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2012-03-27 11:21:10 +0000
@@ -527,6 +527,16 @@ public:
   bool isConsistent(Uint64& gci);
   bool isConsistentGCI(Uint64 gci);
 
+  /**
+   * prevent pollEvents thread from sleeping and try to wake it up
+   */
+  void block_and_try_wakeup_event_poller();
+
+  /**
+   * allow pollEvents thread to sleep again
+   */
+  void unblock_event_poller();
+
   NdbEventOperationImpl* getGCIEventOperations(Uint32* iter,
                                                Uint32* event_types);
   void deleteUsedEventOperations(Uint64 last_consumed_gci);
@@ -578,8 +588,8 @@ public:
 
   bool m_startup_hack;
 
+  bool m_wakeup;
   NdbMutex *m_mutex;
-  struct NdbCondition *p_cond;
 
   // receive thread
   Gci_container m_complete_data;

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2012-03-05 12:53:12 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2012-03-27 11:21:10 +0000
@@ -870,6 +870,12 @@ int Ndb_cluster_connection_impl::connect
       DBUG_RETURN(-1);
     }
 
+    /**
+     * Setup extra wakeup socket...always...
+     *   assume it doesn't cost anything
+     */
+    m_transporter_facade->setupWakeup();
+
     ndb_mgm_destroy_configuration(props);
     m_transporter_facade->connected();
     m_latest_error = 0;

=== modified file 'storage/ndb/src/ndbapi/ndb_internal.cpp'
--- a/storage/ndb/src/ndbapi/ndb_internal.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/src/ndbapi/ndb_internal.cpp	2012-03-27 11:21:10 +0000
@@ -30,4 +30,16 @@ Ndb_internal::setForceShortRequests(Ndb*
 {
   ndb->theImpl->forceShortRequests = val;
 }
-                                    
+
+void
+Ndb_internal::block_and_try_wakeup_event_poller(Ndb* ndb)
+{
+  ndb->theEventBuffer->block_and_try_wakeup_event_poller();
+}
+
+void
+Ndb_internal::unblock_event_poller(Ndb* ndb)
+{
+  ndb->theEventBuffer->unblock_event_poller();
+}
+

=== modified file 'storage/ndb/src/ndbapi/ndb_internal.hpp'
--- a/storage/ndb/src/ndbapi/ndb_internal.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/ndb_internal.hpp	2012-03-27 11:21:10 +0000
@@ -29,6 +29,9 @@ public:
 
   static int send_event_report(bool has_lock, Ndb *ndb, Uint32*data,Uint32 len);
   static void setForceShortRequests(Ndb*, bool val);
+
+  static void block_and_try_wakeup_event_poller(Ndb*);
+  static void unblock_event_poller(Ndb*);
 };
 
 #endif

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4906 to 4907)Bug#12319939Jonas Oreland28 Mar