From: Jonas Oreland Date: March 27 2012 11:21am Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4906 to 4907) Bug#12319939 List-Archive: http://lists.mysql.com/commits/143338 X-Bug: 12319939 Message-Id: <20120327112135.053D855C8EA@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4907 Jonas Oreland 2012-03-27 ndb - bug#12319939 - commit for CluB modified: sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp storage/ndb/src/ndbapi/ndb_internal.cpp storage/ndb/src/ndbapi/ndb_internal.hpp 4906 magnus.blaudd@stripped 2012-03-27 [merge] Merge modified: mysql-test/suite/ndb/include/ndb_share_check_shares.inc mysql-test/suite/ndb/r/ndb_basic.result mysql-test/suite/ndb/r/ndb_ddl_open_trans.result mysql-test/suite/ndb/r/ndb_index_stat_restart.result mysql-test/suite/ndb/r/ndb_share.result mysql-test/suite/ndb/t/ndb_basic.test mysql-test/suite/ndb/t/ndb_ddl_open_trans.test mysql-test/suite/ndb/t/ndb_index_stat_restart.test mysql-test/suite/ndb_rpl/r/ndb_rpl_ddl_open_trans.result sql/ha_ndbcluster.cc storage/ndb/src/mgmsrv/ConfigManager.cpp storage/ndb/test/ndbapi/testMgmd.cpp === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2012-02-17 08:03:56 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2012-03-27 11:21:10 +0000 @@ -33,6 +33,11 @@ #include #include "ndb_table_guard.h" +/** + * try_wakeup_event_poller + */ +#include "../storage/ndb/src/ndbapi/ndb_internal.hpp" + /* defines for cluster replication table names */ @@ -125,6 +130,13 @@ extern THD * injector_thd; // Declared i static Ndb *injector_ndb= 0; static Ndb *schema_ndb= 0; +/** + * This function is used by client thread to lock the injector mutex + * for create/dropEventOperation...but it also wakeups + * the injector thread if that has the mutex, and is waiting in pollEvents + */ +static pthread_mutex_t & wakeup_and_lock(pthread_mutex_t&, Ndb*); + static int ndbcluster_binlog_inited= 0; /* Flag "ndbcluster_binlog_terminating" set when shutting down mysqld. @@ -155,6 +167,12 @@ static ulonglong ndb_latest_applied_binl static ulonglong ndb_latest_handled_binlog_epoch= 0; static ulonglong ndb_latest_received_binlog_epoch= 0; +/* Condition variable to wait for ndb_latest_handled_binlog_epoch*/ +static pthread_mutex_t wait_gci_mutex; +static pthread_cond_t wait_gci_cond; +static bool ndb_latest_handled_binlog_epoch_waiter= FALSE; + + NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; pthread_mutex_t ndb_schema_share_mutex; @@ -567,6 +585,18 @@ int ndbcluster_binlog_init_share(THD *th functions called from master sql client threads ****************************************************************/ +static void set_ndb_latest_handled_binlog_epoch(ulonglong val) +{ + pthread_mutex_lock(&wait_gci_mutex); + ndb_latest_handled_binlog_epoch= val; + if (ndb_latest_handled_binlog_epoch_waiter) + { + ndb_latest_handled_binlog_epoch_waiter= FALSE; + pthread_cond_signal(&wait_gci_cond); + } + pthread_mutex_unlock(&wait_gci_mutex); +} + /* called in mysql_show_binlog_events and reset_logs to make sure we wait for all events originating from this mysql server to arrive in the binlog @@ -587,22 +617,29 @@ static void ndbcluster_binlog_wait(THD * */ if (!wait_epoch) DBUG_VOID_RETURN; + + Ndb *ndb= check_ndb_in_thd(thd); + NDBDICT *dict= ndb->getDictionary(); + const char *save_info= thd ? thd->proc_info : 0; int count= 30; if (thd) thd->proc_info= "Waiting for ndbcluster binlog update to " "reach current position"; - pthread_mutex_lock(&injector_mutex); + pthread_mutex_lock(&wait_gci_mutex); while (!(thd && thd->killed) && count && ndb_binlog_running && (ndb_latest_handled_binlog_epoch == 0 || ndb_latest_handled_binlog_epoch < wait_epoch)) { + ndb_latest_handled_binlog_epoch_waiter= TRUE; count--; struct timespec abstime; set_timespec(abstime, 1); - pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime); + dict->forceGCPWait(1); + pthread_cond_timedwait(&wait_gci_cond, &wait_gci_mutex, &abstime); } - pthread_mutex_unlock(&injector_mutex); + pthread_mutex_unlock(&wait_gci_mutex); + if (thd) thd->proc_info= save_info; DBUG_VOID_RETURN; @@ -894,6 +931,8 @@ int ndbcluster_binlog_end(THD *thd) pthread_mutex_destroy(&injector_mutex); pthread_cond_destroy(&injector_cond); pthread_mutex_destroy(&ndb_schema_share_mutex); + pthread_mutex_destroy(&wait_gci_mutex); + pthread_cond_destroy(&wait_gci_cond); } DBUG_RETURN(0); @@ -2563,6 +2602,16 @@ end: DBUG_RETURN(0); } +static +pthread_mutex_t & +wakeup_and_lock(pthread_mutex_t& injector_mutex, Ndb * injector_ndb) +{ + Ndb_internal::block_and_try_wakeup_event_poller(injector_ndb); + pthread_mutex_lock(&injector_mutex); + Ndb_internal::unblock_event_poller(injector_ndb); + return injector_mutex; +} + /* Handle _non_ data events from the storage nodes */ @@ -2650,10 +2699,12 @@ ndb_handle_schema_change(THD *thd, Ndb * pOp->setCustomData(NULL); } - pthread_mutex_lock(&injector_mutex); - is_ndb->dropEventOperation(pOp); - pOp= 0; - pthread_mutex_unlock(&injector_mutex); + + { + Mutex_guard g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE); + is_ndb->dropEventOperation(pOp); + pOp= 0; + } if (do_close_cached_tables) { @@ -3255,7 +3306,7 @@ ndb_binlog_thread_handle_schema_event_po delete event_data; share->op->setCustomData(NULL); { - Mutex_guard injector_mutex_g(injector_mutex); + Mutex_guard g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE); injector_ndb->dropEventOperation(share->op); } share->op= 0; @@ -3421,7 +3472,7 @@ ndb_binlog_thread_handle_schema_event_po delete event_data; share->op->setCustomData(NULL); { - Mutex_guard injector_mutex_g(injector_mutex); + Mutex_guard injector_mutex_g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE); injector_ndb->dropEventOperation(share->op); } share->op= share->new_op; @@ -3692,6 +3743,10 @@ int ndbcluster_binlog_start() pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST); pthread_cond_init(&injector_cond, NULL); + + pthread_mutex_init(&wait_gci_mutex, MY_MUTEX_INIT_FAST); + pthread_cond_init(&wait_gci_cond, NULL); + pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST); /* Create injector thread */ @@ -5303,7 +5358,7 @@ ndbcluster_create_event_ops(THD *thd, ND int retry_sleep= 100; while (1) { - Mutex_guard injector_mutex_g(injector_mutex); + Mutex_guard injector_mutex_g(wakeup_and_lock(injector_mutex, injector_ndb), TRUE); Ndb *ndb= injector_ndb; if (do_ndb_schema_share) ndb= schema_ndb; @@ -6778,6 +6833,7 @@ restart_cluster_failure: { if (ndbcluster_binlog_terminating) goto err; + Mutex_guard injector_mutex_g(injector_mutex); res= i_ndb->pollEvents(10, &gci); } if (gci > schema_gci) @@ -6802,7 +6858,7 @@ restart_cluster_failure: (uint)(schema_gci >> 32), (uint)(schema_gci)); ndb_set_latest_trans_gci(0); - ndb_latest_handled_binlog_epoch= 0; + set_ndb_latest_handled_binlog_epoch(0); ndb_latest_applied_binlog_epoch= 0; ndb_latest_received_binlog_epoch= 0; ndb_index_stat_restart(); @@ -6879,10 +6935,15 @@ restart_cluster_failure: int res= 0, tot_poll_wait= 1000; if (ndb_binlog_running) { + Mutex_guard injector_mutex_g(injector_mutex); res= i_ndb->pollEvents(tot_poll_wait, &gci); tot_poll_wait= 0; } - int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci); + int schema_res; + { + Mutex_guard injector_mutex_g(injector_mutex); + schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci); + } ndb_latest_received_binlog_epoch= gci; while (gci > schema_gci && schema_res >= 0) @@ -6970,7 +7031,12 @@ restart_cluster_failure: e.g. node failure events */ Uint64 tmp_gci; - if (i_ndb->pollEvents(0, &tmp_gci)) + int res = 0; + { + Mutex_guard injector_mutex_g(injector_mutex); + res = i_ndb->pollEvents(0, &tmp_gci); + } + if (res) { NdbEventOperation *pOp; while ((pOp= i_ndb->nextEvent())) @@ -7293,7 +7359,7 @@ restart_cluster_failure: ndb_latest_applied_binlog_epoch= gci; break; } - ndb_latest_handled_binlog_epoch= gci; + set_ndb_latest_handled_binlog_epoch(gci); #ifdef RUN_NDB_BINLOG_TIMER gci_timer.stop(); @@ -7326,7 +7392,7 @@ restart_cluster_failure: &post_epoch_unlock_list); free_root(&mem_root, MYF(0)); *root_ptr= old_root; - ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch; + set_ndb_latest_handled_binlog_epoch(ndb_latest_received_binlog_epoch); } err: if (do_ndbcluster_binlog_close_connection != BCCC_restart) === modified file 'sql/ha_ndbcluster_binlog.h' --- a/sql/ha_ndbcluster_binlog.h 2012-01-25 17:13:13 +0000 +++ b/sql/ha_ndbcluster_binlog.h 2012-03-27 11:21:10 +0000 @@ -104,7 +104,16 @@ public: Mutex_guard(pthread_mutex_t &mutex) : m_mutex(mutex) { pthread_mutex_lock(&m_mutex); - }; + } + + Mutex_guard(pthread_mutex_t &mutex, bool already_locked) : m_mutex(mutex) + { + if (!already_locked) + { + pthread_mutex_lock(&m_mutex); + } + } + ~Mutex_guard() { pthread_mutex_unlock(&m_mutex); === modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp' --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp 2012-03-27 11:21:10 +0000 @@ -1123,11 +1123,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) m_flush_gci = 0; #endif - if ((p_cond = NdbCondition_Create()) == NULL) { - ndbout_c("NdbEventHandle: NdbCondition_Create() failed"); - exit(-1); - } m_mutex = 0; // Set in Ndb::init() + m_wakeup = 0; // ToDo set event buffer size // pre allocate event data array @@ -1179,8 +1176,6 @@ NdbEventBuffer::~NdbEventBuffer() } NdbMem_Free((char*)m_allocated_data[j]); } - - NdbCondition_Destroy(p_cond); } void @@ -1251,14 +1246,17 @@ NdbEventBuffer::pollEvents(int aMillisec m_latest_command= "NdbEventBuffer::pollEvents"; #endif - NdbMutex_Lock(m_mutex); + m_ndb->theImpl->start_poll(); + Uint64 tmpGCI = m_latestGCI; NdbEventOperationImpl *ev_op= move_data(); - if (unlikely(ev_op == 0 && aMillisecondNumber)) + while (ev_op == 0 && aMillisecondNumber && m_wakeup == 0 && + tmpGCI == m_latestGCI) { - NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber); + int sleepTime = aMillisecondNumber > 10 ? 10 : aMillisecondNumber; + m_ndb->theImpl->do_poll(sleepTime); + aMillisecondNumber -= sleepTime; + ev_op= move_data(); - if (unlikely(ev_op == 0)) - ret= 0; } m_latest_poll_GCI= m_latestGCI; #ifdef VM_TRACE @@ -1279,7 +1277,10 @@ NdbEventBuffer::pollEvents(int aMillisec */ deleteUsedEventOperations(m_latest_poll_GCI); } - NdbMutex_Unlock(m_mutex); // we have moved the data + m_ndb->theImpl->complete_poll(); + + if (unlikely(ev_op == 0)) + ret= 0; if (latestGCI) *latestGCI= m_latest_poll_GCI; @@ -1287,6 +1288,19 @@ NdbEventBuffer::pollEvents(int aMillisec return ret; } +void +NdbEventBuffer::block_and_try_wakeup_event_poller() +{ + m_wakeup = 1; + m_ndb->theImpl->m_transporter_facade->requestWakeup(); +} + +void +NdbEventBuffer::unblock_event_poller() +{ + m_wakeup = 0; +} + int NdbEventBuffer::flushIncompleteEvents(Uint64 gci) { @@ -2021,8 +2035,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP } // signal that somethings happened - - NdbCondition_Signal(p_cond); + m_ndb-> theImpl->wakeup(); } else { @@ -2315,7 +2328,7 @@ NdbEventBuffer::set_total_buckets(Uint32 } if (found) { - NdbCondition_Signal(p_cond); + m_ndb->theImpl->wakeup(); } } === modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2012-01-16 07:14:30 +0000 +++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp 2012-03-27 11:21:10 +0000 @@ -527,6 +527,16 @@ public: bool isConsistent(Uint64& gci); bool isConsistentGCI(Uint64 gci); + /** + * prevent pollEvents thread from sleeping and try to wake it up + */ + void block_and_try_wakeup_event_poller(); + + /** + * allow pollEvents thread to sleep again + */ + void unblock_event_poller(); + NdbEventOperationImpl* getGCIEventOperations(Uint32* iter, Uint32* event_types); void deleteUsedEventOperations(Uint64 last_consumed_gci); @@ -578,8 +588,8 @@ public: bool m_startup_hack; + bool m_wakeup; NdbMutex *m_mutex; - struct NdbCondition *p_cond; // receive thread Gci_container m_complete_data; === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-03-05 12:53:12 +0000 +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2012-03-27 11:21:10 +0000 @@ -870,6 +870,12 @@ int Ndb_cluster_connection_impl::connect DBUG_RETURN(-1); } + /** + * Setup extra wakeup socket...always... + * assume it doesn't cost anything + */ + m_transporter_facade->setupWakeup(); + ndb_mgm_destroy_configuration(props); m_transporter_facade->connected(); m_latest_error = 0; === modified file 'storage/ndb/src/ndbapi/ndb_internal.cpp' --- a/storage/ndb/src/ndbapi/ndb_internal.cpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/src/ndbapi/ndb_internal.cpp 2012-03-27 11:21:10 +0000 @@ -30,4 +30,16 @@ Ndb_internal::setForceShortRequests(Ndb* { ndb->theImpl->forceShortRequests = val; } - + +void +Ndb_internal::block_and_try_wakeup_event_poller(Ndb* ndb) +{ + ndb->theEventBuffer->block_and_try_wakeup_event_poller(); +} + +void +Ndb_internal::unblock_event_poller(Ndb* ndb) +{ + ndb->theEventBuffer->unblock_event_poller(); +} + === modified file 'storage/ndb/src/ndbapi/ndb_internal.hpp' --- a/storage/ndb/src/ndbapi/ndb_internal.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/ndbapi/ndb_internal.hpp 2012-03-27 11:21:10 +0000 @@ -29,6 +29,9 @@ public: static int send_event_report(bool has_lock, Ndb *ndb, Uint32*data,Uint32 len); static void setForceShortRequests(Ndb*, bool val); + + static void block_and_try_wakeup_event_poller(Ndb*); + static void unblock_event_poller(Ndb*); }; #endif No bundle (reason: useless for push emails).