List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:July 4 2012 12:10pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3950 to 3952)
View as plain text  
 3952 Mikael Ronstrom	2012-07-04
      Step 3 to make receive thread for NDB API: Make receive thread call do_poll and be constant poll owner

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
 3951 Mikael Ronstrom	2012-07-04
      Step 2 to make receiver threads in NDB API: Added ReceiveThreadClient class for receiver thread to become poll owner

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
 3950 Mikael Ronstrom	2012-07-04 [merge]
      merge

    modified:
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
=== modified file 'mysql-test/suite/ndb/t/ndb_reconnect.test'
--- a/mysql-test/suite/ndb/t/ndb_reconnect.test	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/mysql-test/suite/ndb/t/ndb_reconnect.test	revid:mikael.ronstrom@stripped
@@ -100,7 +100,7 @@ End //
 delimiter ;//
 
 # drop the ndb table inside ndb
-system exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT ; 
+--exec $NDB_DROP_TABLE --no-defaults -d test t1 >> $NDB_TOOLS_OUTPUT 
 
 # Restart cluster nodes and clear all meta-data
 --exec $NDB_MGM --no-defaults --ndb-connectstring="$NDB_CONNECTSTRING" -e "all restart" >> $NDB_TOOLS_OUTPUT

=== modified file 'mysql-test/suite/ndb_memcache/r/ttls_flags.result'
--- a/mysql-test/suite/ndb_memcache/r/ttls_flags.result	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/mysql-test/suite/ndb_memcache/r/ttls_flags.result	revid:mikael.ronstrom@stripped
@@ -3,6 +3,6 @@ Flags Test OK 
 USE ndbmemcache;
 SELECT * FROM demo_table_tabs order by mkey;
 mkey	val1	val2	val3	flags	expire_time
-1	Groen	NULL	NULL	NULL	NULL
+1	Groen	NULL	NULL	0	NULL
 12	Con	Brio	NULL	100	NULL
 13	Sul ponticello	NULL	NULL	200	NULL

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/sql/ha_ndbcluster_binlog.cc	revid:mikael.ronstrom@stripped
@@ -1107,7 +1107,7 @@ static void clean_away_stray_files(THD *
   char path[FN_REFLEN + 1];
  
   DBUG_ENTER("clean_away_stray_files");
-  bzero((char*) &lookup_field_values, sizeof(LOOKUP_FIELD_VALUES));
+  memset(&lookup_field_values, 0, sizeof(LOOKUP_FIELD_VALUES));
   if (make_db_list(thd, &db_names, &lookup_field_values, &with_i_schema))
   {
     thd->clear_error();

=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/src/ExternalValue.cc	revid:mikael.ronstrom@stripped
@@ -581,8 +581,7 @@ void ExternalValue::setMiscColumns(Opera
   /* Set flags */
   if(wqitem->prefix_info.has_flags_col) {
     uint32_t flags = hash_item_get_flags(wqitem->cache_item);
-    if(flags)
-      op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
+    op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
   }
 }
 

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/src/ndb_worker.cc	revid:mikael.ronstrom@stripped
@@ -397,8 +397,7 @@ op_status_t WorkerStep1::do_write() {
   /* Set flags */
   if(wqitem->prefix_info.has_flags_col) {
     uint32_t flags = hash_item_get_flags(wqitem->cache_item);
-    if(flags)
-      op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
+    op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
   }
   
   /* Start the transaction */

=== modified file 'storage/ndb/memcache/unit/extvals.pl'
--- a/storage/ndb/memcache/unit/extvals.pl	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/memcache/unit/extvals.pl	revid:mikael.ronstrom@stripped
@@ -193,3 +193,10 @@ if($do_test=='11' || $do_all) {
   $mc->set("b:testtoobig", $val_too_big);
   $mc->{error} =~ "VALUE_TOO_LARGE" || Carp::confess "Expected TOO_LARGE";
 }
+
+if($do_test=='12' || $do_all) {
+  # Test SET with flags
+  $mc->set_flags(0);
+  $mc->set("b:test12", "Mikrokosmos");
+}
+

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
@@ -577,6 +577,68 @@ runReceiveResponse_C(void * me)
   return 0;
 }
 
+ReceiveThreadClient::ReceiveThreadClient(TransporterFacade * facade)
+{
+  DBUG_ENTER("ReceiveThreadClient::ReceiveThreadClient");
+  Uint32 ret = this->open(facade);
+  if (unlikely(ret == 0))
+  {
+    ndbout_c("Failed to register receive thread, ret = %d", ret);
+    abort();
+  }
+  DBUG_VOID_RETURN;
+}
+
+ReceiveThreadClient::~ReceiveThreadClient()
+{
+  DBUG_ENTER("ReceiveThreadClient::~ReceiveThreadClient");
+  this->close();
+  DBUG_VOID_RETURN;
+}
+
+void
+ReceiveThreadClient::trp_deliver_signal(const NdbApiSignal *signal,
+                                        const LinearSectionPtr ptr[3])
+{
+  DBUG_ENTER("ReceiveThreadClient::trp_deliver_signal");
+  ndbout_c("Receive thread block should not receive signals, gsn: %d",
+           signal->theVerId_signalNumber);
+  abort();
+  DBUG_VOID_RETURN;
+}
+
+void
+TransporterFacade::checkClusterMgr(NDB_TICKS currTime, 
+                                   NDB_TICKS & lastTime)
+{
+  if (currTime <= (lastTime + ((NDB_TICKS)100)))
+    return; /* 100 milliseconds haven't passed yet */
+  lastTime = NdbTick_CurrentMillisecond();
+  theClusterMgr->lock();
+  theTransporterRegistry->update_connections();
+  theClusterMgr->flush_send_buffers();
+  theClusterMgr->unlock();
+}
+
+bool
+TransporterFacade::become_poll_owner(trp_client* clnt)
+{
+  bool poll_owner = false;
+  lock_poll_mutex();
+  if (m_poll_owner == NULL)
+  {
+    poll_owner = true;
+    m_poll_owner = clnt;
+  }
+  unlock_poll_mutex();
+  if (poll_owner)
+  {
+    clnt->m_poll.m_poll_owner = true;
+    return true;
+  }
+  return false;
+}
+
 /*
   The receiver thread is changed to only wake up once every 10 milliseconds
   to poll. It will first check that nobody owns the poll "right" before
@@ -587,17 +649,22 @@ runReceiveResponse_C(void * me)
 */
 void TransporterFacade::threadMainReceive(void)
 {
+  bool poll_owner = false;
+  NDB_TICKS currTime = NdbTick_CurrentMillisecond();
+  NDB_TICKS lastTime = currTime;
   theTransporterRegistry->startReceiving();
 #ifdef NDB_SHM_TRANSPORTER
   NdbThread_set_shm_sigmask(TRUE);
 #endif
+  ReceiveThreadClient *recv_client = new ReceiveThreadClient(this);
   while(!theStopReceive)
   {
-    theClusterMgr->lock();
-    theTransporterRegistry->update_connections();
-    theClusterMgr->flush_send_buffers();
-    theClusterMgr->unlock();
-    NdbSleep_MilliSleep(100);
+    currTime = NdbTick_CurrentMillisecond();
+    checkClusterMgr(currTime, lastTime);
+    if (!poll_owner)
+      poll_owner = become_poll_owner(recv_client);
+    if (poll_owner)
+      do_poll(recv_client, 100, true);
   }//while
   theTransporterRegistry->stopReceiving();
 }
@@ -1855,29 +1922,30 @@ TransporterFacade::get_poll_owner(trp_cl
 
 void
 TransporterFacade::finish_poll(trp_client* clnt,
+                               Uint32 cnt,
                                Uint32& cnt_woken,
                                trp_client** arr)
 {
 #ifndef NDEBUG
   {
-    Uint32 cnt = clnt->m_poll.m_locked_cnt;
-    assert(cnt >= 1);
-    assert(cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
+    Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
+    assert(lock_cnt >= 1);
+    assert(lock_cnt <= NDB_ARRAY_SIZE(clnt->m_poll.m_locked_clients));
     assert(clnt->m_poll.m_locked_clients[0] == clnt);
     // no duplicates
-    if (DBG_POLL) printf("after external_poll: cnt: %u ", cnt);
-    for (Uint32 i = 0; i < cnt; i++)
+    if (DBG_POLL) printf("after external_poll: cnt: %u ", lock_cnt);
+    for (Uint32 i = 0; i < lock_cnt; i++)
     {
       trp_client * tmp = clnt->m_poll.m_locked_clients[i];
       if (DBG_POLL) printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
-      for (Uint32 j = i + 1; j < cnt; j++)
+      for (Uint32 j = i + 1; j < lock_cnt; j++)
       {
         assert(tmp != clnt->m_poll.m_locked_clients[j]);
       }
     }
     if (DBG_POLL) printf("\n");
 
-    for (Uint32 i = 1; i < cnt; i++)
+    for (Uint32 i = 1; i < lock_cnt; i++)
     {
       trp_client * tmp = clnt->m_poll.m_locked_clients[i];
       if (tmp->m_poll.m_locked == true)
@@ -1903,7 +1971,6 @@ TransporterFacade::finish_poll(trp_clien
    * count woken clients
    *   and put them to the left in array
    */
-  Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
   for (Uint32 i = 0; i < cnt; i++)
   {
     trp_client * tmp = arr[i];
@@ -1917,9 +1984,9 @@ TransporterFacade::finish_poll(trp_clien
 
   if (DBG_POLL)
   {
-    Uint32 cnt = clnt->m_poll.m_locked_cnt;
-    printf("after sort: cnt: %u ", cnt);
-    for (Uint32 i = 0; i < cnt; i++)
+    Uint32 lock_cnt = clnt->m_poll.m_locked_cnt;
+    printf("after sort: cnt: %u ", lock_cnt);
+    for (Uint32 i = 0; i < lock_cnt; i++)
     {
       trp_client * tmp = clnt->m_poll.m_locked_clients[i];
       printf("%p(%u) ", tmp, tmp->m_poll.m_waiting);
@@ -1973,6 +2040,7 @@ TransporterFacade::try_lock_last_client(
      */
     dbg("wake new_owner(%p)", new_owner);
 #ifndef NDEBUG
+    Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
     for (Uint32 i = 0; i <= cnt; i++)
     {
       assert(clnt->m_poll.m_locked_clients[i] != new_owner);
@@ -2012,10 +2080,11 @@ TransporterFacade::do_poll(trp_client* c
   external_poll(clnt, wait_time);
 
   Uint32 cnt_woken = 0;
+  Uint32 cnt = clnt->m_poll.m_locked_cnt - 1; // skip self
   trp_client ** arr = clnt->m_poll.m_locked_clients + 1; // skip self
   if (!poll_owner)
     clnt->m_poll.m_poll_owner = false;
-  finish_poll(clnt, &cnt_woken, arr);
+  finish_poll(clnt, cnt, cnt_woken, arr);
 
   lock_poll_mutex();
 
@@ -2061,7 +2130,7 @@ TransporterFacade::do_poll(trp_client* c
    */
   if (new_owner_locked == false)
   {
-    dbg("new_owner_locked == false", 0);
+    dbg("new_owner_locked == %s", "false");
     trp_client * new_owner;
     while (true)
     {

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael@dator9-20120703185601-o0n43mvf74jmedhd
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
@@ -220,8 +220,11 @@ private:
   friend class Ndb_cluster_connection;
   friend class Ndb_cluster_connection_impl;
 
+  void checkClusterMgr(NDB_TICKS currTime, NDB_TICKS & lastTime);
   bool get_poll_owner(trp_client* clnt, Uint32 wait_time);
+  bool become_poll_owner(trp_client* clnt);
   void finish_poll(trp_client* clnt,
+                   Uint32 cnt,
                    Uint32& cnt_woken,
                    trp_client** arr);
   void try_lock_last_client(trp_client* clnt,
@@ -541,7 +544,12 @@ public :
   }
 };
 
-  
-
-
+class ReceiveThreadClient : public trp_client
+{
+  public :
+  ReceiveThreadClient(TransporterFacade *facade);
+  ~ReceiveThreadClient();
+  void trp_deliver_signal(const NdbApiSignal *,
+                          const LinearSectionPtr ptr[3]);
+};
 #endif // TransporterFacade_H

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3950 to 3952) Mikael Ronstrom4 Jul