List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:September 30 2010 10:36am
Subject:bzr commit into mysql-5.1-telco-7.0 branch (jonas:3809)
View as plain text  
#At file:///home/jonas/src/telco-7.0/ based on revid:jonas@stripped

 3809 Jonas Oreland	2010-09-30
      ndb - this patch moves ndbapi config parameters (batch size etc) from TransporterFacade to ndb_cluster_connection

    modified:
      storage/ndb/src/ndbapi/NdbImpl.hpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/src/ndbapi/Ndbif.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
      storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
      storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp	2010-09-30 10:36:47 +0000
@@ -113,6 +113,13 @@ public:
     ndb->theImpl->forceShortRequests = val;
   }
 
+  Uint32 get_waitfor_timeout() const {
+    return m_ndb_cluster_connection.m_config.m_waitfor_timeout;
+  }
+  const NdbApiConfig& get_ndbapi_config_parameters() const {
+    return m_ndb_cluster_connection.m_config;
+  }
+
 
   BaseString m_systemPrefix; // Buffer for preformatted for <sys>/<def>/
 

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2010-09-30 10:36:47 +0000
@@ -140,10 +140,10 @@ NdbReceiver::calculate_batch_size(Uint32
                                   Uint32& first_batch_size,
                                   const NdbRecord *record)
 {
-  TransporterFacade *tp= m_ndb->theImpl->m_transporter_facade;
-  Uint32 max_scan_batch_size= tp->get_scan_batch_size();
-  Uint32 max_batch_byte_size= tp->get_batch_byte_size();
-  Uint32 max_batch_size= tp->get_batch_size();
+  const NdbApiConfig & cfg = m_ndb->theImpl->get_ndbapi_config_parameters();
+  Uint32 max_scan_batch_size= cfg.m_scan_batch_size;
+  Uint32 max_batch_byte_size= cfg.m_batch_byte_size;
+  Uint32 max_batch_size= cfg.m_batch_size;
   Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
 
   if (record)

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2010-09-30 10:36:47 +0000
@@ -1829,6 +1829,7 @@ NdbScanOperation::nextResultNdbRecord(co
 
   /* Now we have to wait for more rows (or end-of-file on all receivers). */
   Uint32 nodeId = theNdbCon->theDBnode;
+  Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
   TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
   int retVal= 2;
   Uint32 idx, last;
@@ -1851,7 +1852,6 @@ NdbScanOperation::nextResultNdbRecord(co
   {
     idx= m_current_api_receiver;
     last= m_api_receivers_count;
-    Uint32 timeout= tp->m_waitfor_timeout;
 
     do {
       if (theError.code){
@@ -3709,6 +3709,7 @@ NdbIndexScanOperation::ordered_insert_re
 int
 NdbIndexScanOperation::ordered_send_scan_wait_for_all(bool forceSend)
 {
+  Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
   TransporterFacade* tp= theNdb->theImpl->m_transporter_facade;
 
   PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
@@ -3718,7 +3719,6 @@ NdbIndexScanOperation::ordered_send_scan
 
   Uint32 seq= theNdbCon->theNodeSequence;
   Uint32 nodeId= theNdbCon->theDBnode;
-  Uint32 timeout= tp->m_waitfor_timeout;
   if (seq == tp->getNodeSequence(nodeId) &&
       !send_next_scan_ordered(m_current_api_receiver))
   {
@@ -3808,6 +3808,7 @@ int
 NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
                              PollGuard *poll_guard)
 {
+  Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
   Uint32 seq = theNdbCon->theNodeSequence;
   Uint32 nodeId = theNdbCon->theDBnode;
   
@@ -3817,7 +3818,6 @@ NdbScanOperation::close_impl(Transporter
     return -1;
   }
   
-  Uint32 timeout = tp->m_waitfor_timeout;
   /**
    * Wait for outstanding
    */

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2010-09-30 10:36:47 +0000
@@ -525,7 +525,7 @@ NdbTransaction::executeNoBlobs(NdbTransa
 //------------------------------------------------------------------------
   Ndb* tNdb = theNdb;
 
-  Uint32 timeout = theNdb->theImpl->m_transporter_facade->m_waitfor_timeout;
+  Uint32 timeout = theNdb->theImpl->get_waitfor_timeout();
   m_waitForReply = false;
   executeAsynchPrepare(aTypeOfExec, NULL, NULL, abortOption);
   if (m_waitForReply){

=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp	2010-09-30 10:36:47 +0000
@@ -1085,7 +1085,7 @@ Ndb::pollCompleted(NdbTransaction** aCop
 void
 Ndb::check_send_timeout()
 {
-  Uint32 timeout = theImpl->m_transporter_facade->m_waitfor_timeout;
+  Uint32 timeout = theImpl->get_ndbapi_config_parameters().m_waitfor_timeout;
   NDB_TICKS current_time = NdbTick_CurrentMillisecond();
   assert(current_time >= the_last_check_time);
   if (current_time - the_last_check_time > 1000) {

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2010-09-30 10:36:47 +0000
@@ -725,9 +725,6 @@ TransporterFacade::TransporterFacade(Glo
   theArbitMgr(NULL),
   checkCounter(4),
   currentSendLimit(1),
-  m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
-  m_batch_byte_size(SCAN_BATCH_SIZE),
-  m_batch_size(DEF_BATCH_SIZE),
   theStopReceive(0),
   theSendThread(NULL),
   theReceiveThread(NULL),
@@ -849,20 +846,6 @@ TransporterFacade::configure(NodeId node
     theArbitMgr= NULL;
   }
 
-  // Configure scan settings
-  Uint32 scan_batch_size= 0;
-  if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
-    m_scan_batch_size= scan_batch_size;
-  }
-  Uint32 batch_byte_size= 0;
-  if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
-    m_batch_byte_size= batch_byte_size;
-  }
-  Uint32 batch_size= 0;
-  if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
-    m_batch_size= batch_size;
-  }
-
   Uint32 auto_reconnect=1;
   iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
 
@@ -880,19 +863,6 @@ TransporterFacade::configure(NodeId node
     theClusterMgr->m_auto_reconnect = auto_reconnect;
   }
   
-  // Configure timeouts
-  Uint32 timeout = 120000;
-  for (iter.first(); iter.valid(); iter.next())
-  {
-    Uint32 tmp1 = 0, tmp2 = 0;
-    iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
-    iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
-    tmp1 += tmp2;
-    if (tmp1 > timeout)
-      timeout = tmp1;
-  }
-  m_waitfor_timeout = timeout;
-
 #ifdef API_TRACE
   signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
 #endif

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2010-09-30 09:37:36 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2010-09-30 10:36:47 +0000
@@ -130,12 +130,6 @@ public:
   // Close this block number
   int close_local(BlockNumber blockNumber);
 
-  // Scan batch configuration parameters
-  Uint32 get_scan_batch_size();
-  Uint32 get_batch_byte_size();
-  Uint32 get_batch_size();
-  Uint32 m_waitfor_timeout; // in milli seconds...
-
   TransporterRegistry* get_registry() { return theTransporterRegistry;};
 
 /*
@@ -243,11 +237,6 @@ private:
   
   void calculateSendLimit();
 
-  // Scan batch configuration parameters
-  Uint32 m_scan_batch_size;
-  Uint32 m_batch_byte_size;
-  Uint32 m_batch_size;
-
   // Declarations for the receive and send thread
   int  theStopReceive;
 
@@ -454,24 +443,6 @@ TransporterFacade::getMinDbNodeVersion()
     return 0;
 }
 
-inline
-Uint32
-TransporterFacade::get_scan_batch_size() {
-  return m_scan_batch_size;
-}
-
-inline
-Uint32
-TransporterFacade::get_batch_byte_size() {
-  return m_batch_byte_size;
-}
-
-inline
-Uint32
-TransporterFacade::get_batch_size() {
-  return m_batch_size;
-}
-
 /** 
  * LinearSectionIterator
  *

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2010-09-30 10:36:47 +0000
@@ -592,6 +592,46 @@ Ndb_cluster_connection_impl::init_nodes_
   DBUG_RETURN(0);
 }
 
+int
+Ndb_cluster_connection_impl::configure(Uint32 nodeId,
+                                       const ndb_mgm_configuration &config)
+{
+  DBUG_ENTER("Ndb_cluster_connection_impl::configure");
+  {
+    ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+    if(iter.find(CFG_NODE_ID, nodeId))
+      DBUG_RETURN(-1);
+
+    // Configure scan settings
+    Uint32 scan_batch_size= 0;
+    if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
+      m_config.m_scan_batch_size= scan_batch_size;
+    }
+    Uint32 batch_byte_size= 0;
+    if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
+      m_config.m_batch_byte_size= batch_byte_size;
+    }
+    Uint32 batch_size= 0;
+    if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
+      m_config.m_batch_size= batch_size;
+    }
+
+    // Configure timeouts
+    Uint32 timeout = 120000;
+    for (iter.first(); iter.valid(); iter.next())
+    {
+      Uint32 tmp1 = 0, tmp2 = 0;
+      iter.get(CFG_DB_TRANSACTION_CHECK_INTERVAL, &tmp1);
+      iter.get(CFG_DB_TRANSACTION_DEADLOCK_TIMEOUT, &tmp2);
+      tmp1 += tmp2;
+      if (tmp1 > timeout)
+        timeout = tmp1;
+    }
+    m_config.m_waitfor_timeout = timeout;
+  }
+  DBUG_RETURN(init_nodes_vector(nodeId, config));
+}
+
 void
 Ndb_cluster_connection_impl::do_test()
 {
@@ -686,16 +726,16 @@ int Ndb_cluster_connection_impl::connect
     if(props == 0)
       break;
 
-    if (m_transporter_facade->start_instance(nodeId, props) < 0)
+    if (configure(nodeId, *props))
     {
       ndb_mgm_destroy_configuration(props);
+      DBUG_PRINT("exit", ("malloc failure, ret: -1"));
       DBUG_RETURN(-1);
     }
 
-    if (init_nodes_vector(nodeId, *props))
+    if (m_transporter_facade->start_instance(nodeId, props) < 0)
     {
       ndb_mgm_destroy_configuration(props);
-      DBUG_PRINT("exit", ("malloc failure, ret: -1"));
       DBUG_RETURN(-1);
     }
 

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2010-05-04 14:34:54 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp	2010-09-30 10:36:47 +0000
@@ -37,6 +37,21 @@ extern "C" {
   void* run_ndb_cluster_connection_connect_thread(void*);
 }
 
+struct NdbApiConfig
+{
+  NdbApiConfig() :
+    m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
+    m_batch_byte_size(SCAN_BATCH_SIZE),
+    m_batch_size(DEF_BATCH_SIZE),
+    m_waitfor_timeout(120000)
+    {}
+
+  Uint32 m_scan_batch_size;
+  Uint32 m_batch_byte_size;
+  Uint32 m_batch_size;
+  Uint32 m_waitfor_timeout; // in milli seconds...
+};
+
 class Ndb_cluster_connection_impl : public Ndb_cluster_connection
 {
   Ndb_cluster_connection_impl(const char *connectstring,
@@ -75,6 +90,7 @@ private:
 
   Vector<Node> m_all_nodes;
   int init_nodes_vector(Uint32 nodeid, const ndb_mgm_configuration &config);
+  int configure(Uint32 nodeid, const ndb_mgm_configuration &config);
   void connect_thread();
   void set_name(const char *name);
 
@@ -101,6 +117,9 @@ private:
 
   BaseString m_latest_error_msg;
   unsigned m_latest_error;
+
+  // Scan batch configuration parameters
+  NdbApiConfig m_config;
 };
 
 #endif


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20100930103647-g410tfag52d77ieo.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (jonas:3809) Jonas Oreland30 Sep