From: Mikael Ronstrom Date: July 4 2012 12:10pm Subject: bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3950 to 3952) List-Archive: http://lists.mysql.com/commits/144373 Message-Id: <201207041211.q64CBrln028031@dator6> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3952 Mikael Ronstrom 2012-07-04 Step 3 to make receive thread for NDB API: Make receive thread call do_poll and be constant poll owner modified: storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp 3951 Mikael Ronstrom 2012-07-04 Step 2 to make receiver threads in NDB API: Added ReceiveThreadClient class for receiver thread to become poll owner modified: storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp 3950 Mikael Ronstrom 2012-07-04 [merge] merge modified: storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp === modified file 'mysql-test/suite/ndb/t/ndb_reconnect.test' --- a/mysql-test/suite/ndb/t/ndb_reconnect.test revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/mysql-test/suite/ndb/t/ndb_reconnect.test revid:mikael.ronstrom@stripped @@ -100,7 +100,7 @@ End // delimiter ;// # drop the ndb table inside ndb -system exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT ; +--exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT # Restart cluster nodes and clear all meta-data --exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all restart" >> $NDB_TOOLS_OUTPUT === modified file 'mysql-test/suite/ndb_memcache/r/ttls_flags.result' --- a/mysql-test/suite/ndb_memcache/r/ttls_flags.result revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/mysql-test/suite/ndb_memcache/r/ttls_flags.result revid:mikael.ronstrom@stripped @@ -3,6 +3,6 @@ Flags Test OK USE ndbmemcache; SELECT * FROM demo_table_tabs order by mkey; mkey val1 val2 val3 flags expire_time -1 Groen NULL NULL NULL NULL +1 Groen NULL NULL 0 NULL 12 Con Brio NULL 100 NULL 13 Sul ponticello NULL NULL 200 NULL === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/sql/ha_ndbcluster_binlog.cc revid:mikael.ronstrom@stripped @@ -1107,7 +1107,7 @@ static void clean_away_stray_files(THD * char path[FN_REFLEN + 1]; DBUG_ENTER("clean_away_stray_files"); - bzero((char*) &lookup_field_values, sizeof(LOOKUP_FIELD_VALUES)); + memset(&lookup_field_values, 0, sizeof(LOOKUP_FIELD_VALUES)); if (make_db_list(thd, &db_names, &lookup_field_values, &with_i_schema)) { thd->clear_error(); === modified file 'storage/ndb/memcache/src/ExternalValue.cc' --- a/storage/ndb/memcache/src/ExternalValue.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/storage/ndb/memcache/src/ExternalValue.cc revid:mikael.ronstrom@stripped @@ -581,8 +581,7 @@ void ExternalValue::setMiscColumns(Opera /* Set flags */ if(wqitem->prefix_info.has_flags_col) { uint32_t flags = hash_item_get_flags(wqitem->cache_item); - if(flags) - op.setColumnInt(COL_STORE_FLAGS, ntohl(flags)); + op.setColumnInt(COL_STORE_FLAGS, ntohl(flags)); } } === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/storage/ndb/memcache/src/ndb_worker.cc revid:mikael.ronstrom@stripped @@ -397,8 +397,7 @@ op_status_t WorkerStep1::do_write() { /* Set flags */ if(wqitem->prefix_info.has_flags_col) { uint32_t flags = hash_item_get_flags(wqitem->cache_item); - if(flags) - op.setColumnInt(COL_STORE_FLAGS, ntohl(flags)); + op.setColumnInt(COL_STORE_FLAGS, ntohl(flags)); } /* Start the transaction */ === modified file 'storage/ndb/memcache/unit/extvals.pl' --- a/storage/ndb/memcache/unit/extvals.pl revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/storage/ndb/memcache/unit/extvals.pl revid:mikael.ronstrom@stripped @@ -193,3 +193,10 @@ if($do_test=='11' || $do_all) { $mc->set("b:testtoobig", $val_too_big); $mc->{error} =~ "VALUE_TOO_LARGE" || Carp::confess "Expected TOO_LARGE"; } + +if($do_test=='12' || $do_all) { + # Test SET with flags + $mc->set_flags(0); + $mc->set("b:test12", "Mikrokosmos"); +} + === modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped @@ -577,6 +577,68 @@ runReceiveResponse_C(void * me) return 0; } +ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade) +{ + DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient"); + Uint32 ret = this->open(facade); + if (unlikely(ret == 0)) + { + ndbout_c("Failed to register receive thread, ret = %d", ret); + abort(); + } + DBUG_VOID_RETURN; +} + +ReceiveThreadClient::~ReceiveThreadClient() +{ + DBUG_ENTER("ReceiveThreadClient::~ReceiveThreadClient"); + this->close(); + DBUG_VOID_RETURN; +} + +void +ReceiveThreadClient::trp_deliver_signal(const NdbApiSignal *signal, + const LinearSectionPtr ptr[3]) +{ + DBUG_ENTER("ReceiveThreadClient::trp_deliver_signal"); + ndbout_c("Receive thread block should not receive signals, gsn: %d", + signal->theVerId_signalNumber); + abort(); + DBUG_VOID_RETURN; +} + +void +TransporterFacade::checkClusterMgr(NDB_TICKS currTime, + NDB_TICKS & lastTime) +{ + if (currTime <= (lastTime + ((NDB_TICKS)100))) + return; /* 100 milliseconds haven't passed yet */ + lastTime = NdbTick_CurrentMillisecond(); + theClusterMgr->lock(); + theTransporterRegistry->update_connections(); + theClusterMgr->flush_send_buffers(); + theClusterMgr->unlock(); +} + +bool +TransporterFacade::become_poll_owner(trp_client* clnt) +{ + bool poll_owner = false; + lock_poll_mutex(); + if (m_poll_owner == NULL) + { + poll_owner = true; + m_poll_owner = clnt; + } + unlock_poll_mutex(); + if (poll_owner) + { + clnt->m_poll.m_poll_owner = true; + return true; + } + return false; +} + /* The receiver thread is changed to only wake up once every 10 milliseconds to poll. It will first check that nobody owns the poll "right" before @@ -587,17 +649,22 @@ runReceiveResponse_C(void * me) */ void TransporterFacade::threadMainReceive(void) { + bool poll_owner = false; + NDB_TICKS currTime = NdbTick_CurrentMillisecond(); + NDB_TICKS lastTime = currTime; theTransporterRegistry->startReceiving(); #ifdef NDB_SHM_TRANSPORTER NdbThread_set_shm_sigmask(TRUE); #endif + ReceiveThreadClient *recv_client = new ReceiveThreadClient(this); while(!theStopReceive) { - theClusterMgr->lock(); - theTransporterRegistry->update_connections(); - theClusterMgr->flush_send_buffers(); - theClusterMgr->unlock(); - NdbSleep_MilliSleep(100); + currTime = NdbTick_CurrentMillisecond(); + checkClusterMgr(currTime, lastTime); + if (!poll_owner) + poll_owner = become_poll_owner(recv_client); + if (poll_owner) + do_poll(recv_client, 100, true); }//while theTransporterRegistry->stopReceiving(); } @@ -1855,29 +1922,30 @@ TransporterFacade::get_poll_owner(trp_cl void TransporterFacade::finish_poll(trp_client* clnt, + Uint32 cnt, Uint32& cnt_woken, trp_client** arr) { #ifndef NDEBUG { - Uint32 cnt = clnt->m_poll.m_locked_cnt; - assert(cnt >= 1); - assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients)); + Uint32 lock_cnt = clnt->m_poll.m_locked_cnt; + assert(lock_cnt >= 1); + assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients)); assert(clnt->m_poll.m_locked_clients[0] == clnt); // no duplicates - if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt); - for (Uint32 i = 0; i < cnt; i++) + if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt); + for (Uint32 i = 0; i < lock_cnt; i++) { trp_client * tmp = clnt->m_poll.m_locked_clients[i]; if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting); - for (Uint32 j = i + 1; j < cnt; j++) + for (Uint32 j = i + 1; j < lock_cnt; j++) { assert(tmp != clnt->m_poll.m_locked_clients[j]); } } if (DBG_POLL) printf("\n"); - for (Uint32 i = 1; i < cnt; i++) + for (Uint32 i = 1; i < lock_cnt; i++) { trp_client * tmp = clnt->m_poll.m_locked_clients[i]; if (tmp->m_poll.m_locked == true) @@ -1903,7 +1971,6 @@ TransporterFacade::finish_poll(trp_clien * count woken clients * and put them to the left in array */ - Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self for (Uint32 i = 0; i < cnt; i++) { trp_client * tmp = arr[i]; @@ -1917,9 +1984,9 @@ TransporterFacade::finish_poll(trp_clien if (DBG_POLL) { - Uint32 cnt = clnt->m_poll.m_locked_cnt; - printf("after sort: cnt: %u ", cnt); - for (Uint32 i = 0; i < cnt; i++) + Uint32 lock_cnt = clnt->m_poll.m_locked_cnt; + printf("after sort: cnt: %u ", lock_cnt); + for (Uint32 i = 0; i < lock_cnt; i++) { trp_client * tmp = clnt->m_poll.m_locked_clients[i]; printf("%p(%u) ", tmp, tmp->m_poll.m_waiting); @@ -1973,6 +2040,7 @@ TransporterFacade::try_lock_last_client( */ dbg("wake new_owner(%p)", new_owner); #ifndef NDEBUG + Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self for (Uint32 i = 0; i <= cnt; i++) { assert(clnt->m_poll.m_locked_clients[i] != new_owner); @@ -2012,10 +2080,11 @@ TransporterFacade::do_poll(trp_client* c external_poll(clnt, wait_time); Uint32 cnt_woken = 0; + Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self if (!poll_owner) clnt->m_poll.m_poll_owner = false; - finish_poll(clnt, &cnt_woken, arr); + finish_poll(clnt, cnt, cnt_woken, arr); lock_poll_mutex(); @@ -2061,7 +2130,7 @@ TransporterFacade::do_poll(trp_client* c */ if (new_owner_locked == false) { - dbg("new_owner_locked == false", 0); + dbg("new_owner_locked == %s", "false"); trp_client * new_owner; while (true) { === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped @@ -220,8 +220,11 @@ private: friend class Ndb_cluster_connection; friend class Ndb_cluster_connection_impl; + void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime); bool get_poll_owner(trp_client* clnt, Uint32 wait_time); + bool become_poll_owner(trp_client* clnt); void finish_poll(trp_client* clnt, + Uint32 cnt, Uint32& cnt_woken, trp_client** arr); void try_lock_last_client(trp_client* clnt, @@ -541,7 +544,12 @@ public : } }; - - - +class ReceiveThreadClient : public trp_client +{ + public : + ReceiveThreadClient(TransporterFacade *facade); + ~ReceiveThreadClient(); + void trp_deliver_signal(const NdbApiSignal *, + const LinearSectionPtr ptr[3]); +}; #endif // TransporterFacade_H No bundle (reason: useless for push emails).