3952 Mikael Ronstrom 2012-07-04
Step 3 to make receive thread for NDB API: Make receive thread call do_poll and be constant poll owner
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
3951 Mikael Ronstrom 2012-07-04
Step 2 to make receiver threads in NDB API: Added ReceiveThreadClient class for receiver thread to become poll owner
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
3950 Mikael Ronstrom 2012-07-04 [merge]
merge
modified:
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
=== modified file 'mysql-test/suite/ndb/t/ndb_reconnect.test'
--- a/mysql-test/suite/ndb/t/ndb_reconnect.test revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/mysql-test/suite/ndb/t/ndb_reconnect.test revid:mikael.ronstrom@stripped
@@ -100,7 +100,7 @@ End //
delimiter ;//
# drop the ndb table inside ndb
-system exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT ;
+--exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT
# Restart cluster nodes and clear all meta-data
--exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all restart" >> $NDB_TOOLS_OUTPUT
=== modified file 'mysql-test/suite/ndb_memcache/r/ttls_flags.result'
--- a/mysql-test/suite/ndb_memcache/r/ttls_flags.result revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/mysql-test/suite/ndb_memcache/r/ttls_flags.result revid:mikael.ronstrom@stripped
@@ -3,6 +3,6 @@ Flags Test OK
USE ndbmemcache;
SELECT * FROM demo_table_tabs order by mkey;
mkey val1 val2 val3 flags expire_time
-1 Groen NULL NULL NULL NULL
+1 Groen NULL NULL 0 NULL
12 Con Brio NULL 100 NULL
13 Sul ponticello NULL NULL 200 NULL
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/sql/ha_ndbcluster_binlog.cc revid:mikael.ronstrom@stripped
@@ -1107,7 +1107,7 @@ static void clean_away_stray_files(THD *
char path[FN_REFLEN + 1];
DBUG_ENTER("clean_away_stray_files");
- bzero((char*) &lookup_field_values, sizeof(LOOKUP_FIELD_VALUES));
+ memset(&lookup_field_values, 0, sizeof(LOOKUP_FIELD_VALUES));
if (make_db_list(thd, &db_names, &lookup_field_values, &with_i_schema))
{
thd->clear_error();
=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/src/ExternalValue.cc revid:mikael.ronstrom@stripped
@@ -581,8 +581,7 @@ void ExternalValue::setMiscColumns(Opera
/* Set flags */
if(wqitem->prefix_info.has_flags_col) {
uint32_t flags = hash_item_get_flags(wqitem->cache_item);
- if(flags)
- op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
+ op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
}
}
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/src/ndb_worker.cc revid:mikael.ronstrom@stripped
@@ -397,8 +397,7 @@ op_status_t WorkerStep1::do_write() {
/* Set flags */
if(wqitem->prefix_info.has_flags_col) {
uint32_t flags = hash_item_get_flags(wqitem->cache_item);
- if(flags)
- op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
+ op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
}
/* Start the transaction */
=== modified file 'storage/ndb/memcache/unit/extvals.pl'
--- a/storage/ndb/memcache/unit/extvals.pl revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/unit/extvals.pl revid:mikael.ronstrom@stripped
@@ -193,3 +193,10 @@ if($do_test=='11' || $do_all) {
$mc->set("b:testtoobig", $val_too_big);
$mc->{error} =~ "VALUE_TOO_LARGE" || Carp::confess "Expected TOO_LARGE";
}
+
+if($do_test=='12' || $do_all) {
+ # Test SET with flags
+ $mc->set_flags(0);
+ $mc->set("b:test12", "Mikrokosmos");
+}
+
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp revid:mikael.ronstrom@stripped
@@ -577,6 +577,68 @@ runReceiveResponse_C(void * me)
return 0;
}
+ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade)
+{
+ DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient");
+ Uint32 ret = this->open(facade);
+ if (unlikely(ret == 0))
+ {
+ ndbout_c("Failed to register receive thread, ret = %d", ret);
+ abort();
+ }
+ DBUG_VOID_RETURN;
+}
+
+ReceiveThreadClient::~ReceiveThreadClient()
+{
+ DBUG_ENTER("ReceiveThreadClient::~ReceiveThreadClient");
+ this->close();
+ DBUG_VOID_RETURN;
+}
+
+void
+ReceiveThreadClient::trp_deliver_signal(const NdbApiSignal *signal,
+ const LinearSectionPtr ptr[3])
+{
+ DBUG_ENTER("ReceiveThreadClient::trp_deliver_signal");
+ ndbout_c("Receive thread block should not receive signals, gsn: %d",
+ signal->theVerId_signalNumber);
+ abort();
+ DBUG_VOID_RETURN;
+}
+
+void
+TransporterFacade::checkClusterMgr(NDB_TICKS currTime,
+ NDB_TICKS & lastTime)
+{
+ if (currTime <= (lastTime + ((NDB_TICKS)100)))
+ return; /* 100 milliseconds haven't passed yet */
+ lastTime = NdbTick_CurrentMillisecond();
+ theClusterMgr->lock();
+ theTransporterRegistry->update_connections();
+ theClusterMgr->flush_send_buffers();
+ theClusterMgr->unlock();
+}
+
+bool
+TransporterFacade::become_poll_owner(trp_client* clnt)
+{
+ bool poll_owner = false;
+ lock_poll_mutex();
+ if (m_poll_owner == NULL)
+ {
+ poll_owner = true;
+ m_poll_owner = clnt;
+ }
+ unlock_poll_mutex();
+ if (poll_owner)
+ {
+ clnt->m_poll.m_poll_owner = true;
+ return true;
+ }
+ return false;
+}
+
/*
The receiver thread is changed to only wake up once every 10 milliseconds
to poll. It will first check that nobody owns the poll "right" before
@@ -587,17 +649,22 @@ runReceiveResponse_C(void * me)
*/
void TransporterFacade::threadMainReceive(void)
{
+ bool poll_owner = false;
+ NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+ NDB_TICKS lastTime = currTime;
theTransporterRegistry->startReceiving();
#ifdef NDB_SHM_TRANSPORTER
NdbThread_set_shm_sigmask(TRUE);
#endif
+ ReceiveThreadClient *recv_client = new ReceiveThreadClient(this);
while(!theStopReceive)
{
- theClusterMgr->lock();
- theTransporterRegistry->update_connections();
- theClusterMgr->flush_send_buffers();
- theClusterMgr->unlock();
- NdbSleep_MilliSleep(100);
+ currTime = NdbTick_CurrentMillisecond();
+ checkClusterMgr(currTime, lastTime);
+ if (!poll_owner)
+ poll_owner = become_poll_owner(recv_client);
+ if (poll_owner)
+ do_poll(recv_client, 100, true);
}//while
theTransporterRegistry->stopReceiving();
}
@@ -1855,29 +1922,30 @@ TransporterFacade::get_poll_owner(trp_cl
void
TransporterFacade::finish_poll(trp_client* clnt,
+ Uint32 cnt,
Uint32& cnt_woken,
trp_client** arr)
{
#ifndef NDEBUG
{
- Uint32 cnt = clnt->m_poll.m_locked_cnt;
- assert(cnt >= 1);
- assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+ Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
+ assert(lock_cnt >= 1);
+ assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
assert(clnt->m_poll.m_locked_clients[0] == clnt);
// no duplicates
- if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt);
- for (Uint32 i = 0; i < cnt; i++)
+ if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt);
+ for (Uint32 i = 0; i < lock_cnt; i++)
{
trp_client * tmp = clnt->m_poll.m_locked_clients[i];
if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
- for (Uint32 j = i + 1; j < cnt; j++)
+ for (Uint32 j = i + 1; j < lock_cnt; j++)
{
assert(tmp != clnt->m_poll.m_locked_clients[j]);
}
}
if (DBG_POLL) printf("\n");
- for (Uint32 i = 1; i < cnt; i++)
+ for (Uint32 i = 1; i < lock_cnt; i++)
{
trp_client * tmp = clnt->m_poll.m_locked_clients[i];
if (tmp->m_poll.m_locked == true)
@@ -1903,7 +1971,6 @@ TransporterFacade::finish_poll(trp_clien
* count woken clients
* and put them to the left in array
*/
- Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
for (Uint32 i = 0; i < cnt; i++)
{
trp_client * tmp = arr[i];
@@ -1917,9 +1984,9 @@ TransporterFacade::finish_poll(trp_clien
if (DBG_POLL)
{
- Uint32 cnt = clnt->m_poll.m_locked_cnt;
- printf("after sort: cnt: %u ", cnt);
- for (Uint32 i = 0; i < cnt; i++)
+ Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
+ printf("after sort: cnt: %u ", lock_cnt);
+ for (Uint32 i = 0; i < lock_cnt; i++)
{
trp_client * tmp = clnt->m_poll.m_locked_clients[i];
printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
@@ -1973,6 +2040,7 @@ TransporterFacade::try_lock_last_client(
*/
dbg("wake new_owner(%p)", new_owner);
#ifndef NDEBUG
+ Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
for (Uint32 i = 0; i <= cnt; i++)
{
assert(clnt->m_poll.m_locked_clients[i] != new_owner);
@@ -2012,10 +2080,11 @@ TransporterFacade::do_poll(trp_client* c
external_poll(clnt, wait_time);
Uint32 cnt_woken = 0;
+ Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
if (!poll_owner)
clnt->m_poll.m_poll_owner = false;
- finish_poll(clnt, &cnt_woken, arr);
+ finish_poll(clnt, cnt, cnt_woken, arr);
lock_poll_mutex();
@@ -2061,7 +2130,7 @@ TransporterFacade::do_poll(trp_client* c
*/
if (new_owner_locked == false)
{
- dbg("new_owner_locked == false", 0);
+ dbg("new_owner_locked == %s", "false");
trp_client * new_owner;
while (true)
{
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp revid:mikael.ronstrom@stripped
@@ -220,8 +220,11 @@ private:
friend class Ndb_cluster_connection;
friend class Ndb_cluster_connection_impl;
+ void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime);
bool get_poll_owner(trp_client* clnt, Uint32 wait_time);
+ bool become_poll_owner(trp_client* clnt);
void finish_poll(trp_client* clnt,
+ Uint32 cnt,
Uint32& cnt_woken,
trp_client** arr);
void try_lock_last_client(trp_client* clnt,
@@ -541,7 +544,12 @@ public :
}
};
-
-
-
+class ReceiveThreadClient : public trp_client
+{
+ public :
+ ReceiveThreadClient(TransporterFacade *facade);
+ ~ReceiveThreadClient();
+ void trp_deliver_signal(const NdbApiSignal *,
+ const LinearSectionPtr ptr[3]);
+};
#endif // TransporterFacade_H
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3950 to 3952) | Mikael Ronstrom | 4 Jul |