List:Commits« Previous MessageNext Message »
From:jonas Date:April 3 2007 12:43pm
Subject:bk commit into 5.1 tree (jonas:1.2335)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-04-03 14:43:37+02:00, jonas@stripped +4 -0
  Merge perch.ndb.mysql.com:/home/jonas/src/check/51-telco
  into  perch.ndb.mysql.com:/home/jonas/src/check/51-wl1190
  MERGE: 1.2321.1.196

  mysql-test/mysql-test-run.pl@stripped, 2007-04-03 14:43:33+02:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.277.1.1

  sql/ha_ndbcluster.cc@stripped, 2007-04-03 14:43:33+02:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.362.1.78

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-04-03 14:43:33+02:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.152.1.15

  storage/ndb/src/ndbapi/TransporterFacade.cpp@stripped, 2007-04-03 14:43:33+02:00, jonas@stripped +0 -0
    Auto merged
    MERGE: 1.52.1.13

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/check/51-wl1190/RESYNC

--- 1.157/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-04-03 14:43:42 +02:00
+++ 1.158/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2007-04-03 14:43:42 +02:00
@@ -1474,7 +1474,7 @@
 NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
 {
   if(tf != 0){
-    m_globalHash = &tf->m_globalDictCache;
+    m_globalHash = tf->m_globalDictCache;
     return m_receiver.setTransporter(tf);
   }
   
@@ -1486,7 +1486,7 @@
 NdbDictionaryImpl::setTransporter(class Ndb* ndb, 
 				  class TransporterFacade * tf)
 {
-  m_globalHash = &tf->m_globalDictCache;
+  m_globalHash = tf->m_globalDictCache;
   if(m_receiver.setTransporter(ndb, tf)){
     return true;
   }

--- 1.55/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-04-03 14:43:42 +02:00
+++ 1.56/storage/ndb/src/ndbapi/TransporterFacade.cpp	2007-04-03 14:43:42 +02:00
@@ -311,14 +311,16 @@
 
      case GSN_ALTER_TABLE_REP:
      {
+       if (theFacade->m_globalDictCache == NULL)
+         break;
        const AlterTableRep* rep = (const AlterTableRep*)theData;
-       theFacade->m_globalDictCache.lock();
-       theFacade->m_globalDictCache.
+       theFacade->m_globalDictCache->lock();
+       theFacade->m_globalDictCache->
 	 alter_table_rep((const char*)ptr[0].p, 
 			 rep->tableId,
 			 rep->tableVersion,
 			 rep->changeType == AlterTableRep::CT_ALTERED);
-       theFacade->m_globalDictCache.unlock();
+       theFacade->m_globalDictCache->unlock();
        break;
      }
      case GSN_SUB_GCP_COMPLETE_REP:
@@ -641,12 +643,13 @@
   }
 }
 
-TransporterFacade::TransporterFacade() :
+TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
   theTransporterRegistry(0),
   theStopReceive(0),
   theSendThread(NULL),
   theReceiveThread(NULL),
-  m_fragmented_signal_id(0)
+  m_fragmented_signal_id(0),
+  m_globalDictCache(cache)
 {
   DBUG_ENTER("TransporterFacade::TransporterFacade");
   init_cond_wait_queue();

--- 1.366/sql/ha_ndbcluster.cc	2007-04-03 14:43:42 +02:00
+++ 1.367/sql/ha_ndbcluster.cc	2007-04-03 14:43:42 +02:00
@@ -48,6 +48,7 @@
 extern const char *opt_ndbcluster_connectstring;
 extern ulong opt_ndb_cache_check_time;
 extern ulong opt_ndb_wait_connected;
+extern ulong opt_ndb_cluster_connection_pool;
 
 // ndb interface initialization/cleanup
 #ifdef  __cplusplus
@@ -138,6 +139,10 @@
 
 static Ndb* g_ndb= NULL;
 Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+Ndb_cluster_connection **g_ndb_cluster_connection_pool= NULL;
+ulong g_ndb_cluster_connection_pool_alloc= 0;
+ulong g_ndb_cluster_connection_pool_pos= 0;
+pthread_mutex_t g_ndb_cluster_connection_pool_mutex;
 uchar g_node_id_map[max_ndb_nodes];
 
 // Handler synchronization
@@ -351,7 +356,15 @@
 
 Thd_ndb::Thd_ndb()
 {
-  ndb= new Ndb(g_ndb_cluster_connection, "");
+  pthread_mutex_lock(&g_ndb_cluster_connection_pool_mutex);
+  connection=
+    g_ndb_cluster_connection_pool[g_ndb_cluster_connection_pool_pos];
+  g_ndb_cluster_connection_pool_pos++;
+  if (g_ndb_cluster_connection_pool_pos ==
+      g_ndb_cluster_connection_pool_alloc)
+    g_ndb_cluster_connection_pool_pos= 0;
+  pthread_mutex_unlock(&g_ndb_cluster_connection_pool_mutex);
+  ndb= new Ndb(connection, "");
   lock_count= 0;
   count= 0;
   all= NULL;
@@ -4130,6 +4143,58 @@
   - refresh list of the indexes for the table if needed (if altered)
  */
 
+#ifdef HAVE_NDB_BINLOG
+extern MASTER_INFO *active_mi;
+static int ndbcluster_update_apply_status(THD *thd, int do_update)
+{
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Ndb *ndb= thd_ndb->ndb;
+  NDBDICT *dict= ndb->getDictionary();
+  const NDBTAB *ndbtab;
+  NdbTransaction *trans= thd_ndb->all ? thd_ndb->all : thd_ndb->stmt;
+  ndb->setDatabaseName(NDB_REP_DB);
+  Ndb_table_guard ndbtab_g(dict, NDB_APPLY_TABLE);
+  if (!(ndbtab= ndbtab_g.get_table()))
+  {
+    return -1;
+  }
+  NdbOperation *op= 0;
+  int r= 0;
+  r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+  DBUG_ASSERT(r == 0);
+  if (do_update)
+    r|= op->updateTuple();
+  else
+    r|= op->writeTuple();
+  DBUG_ASSERT(r == 0);
+  // server_id
+  r|= op->equal(0u, (Uint32)thd->server_id);
+  DBUG_ASSERT(r == 0);
+  if (!do_update)
+  {
+    // epoch
+    r|= op->setValue(1u, (Uint64)0);
+    DBUG_ASSERT(r == 0);
+  }
+  // log_name
+  char tmp_buf[FN_REFLEN];
+  ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
+                   active_mi->rli.group_master_log_name,
+                   strlen(active_mi->rli.group_master_log_name));
+  r|= op->setValue(2u, tmp_buf);
+  DBUG_ASSERT(r == 0);
+  // start_pos
+  r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
+  DBUG_ASSERT(r == 0);
+  // end_pos
+  r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos + 
+                   ((Uint64)active_mi->rli.future_event_relay_log_pos -
+                    (Uint64)active_mi->rli.group_relay_log_pos));
+  DBUG_ASSERT(r == 0);
+  return 0;
+}
+#endif /* HAVE_NDB_BINLOG */
+
 int ha_ndbcluster::external_lock(THD *thd, int lock_type)
 {
   int error=0;
@@ -4180,6 +4245,7 @@
         thd_ndb->init_open_tables();
         thd_ndb->stmt= trans;
 	thd_ndb->query_state&= NDB_QUERY_NORMAL;
+        thd_ndb->trans_options= 0;
         trans_register_ha(thd, FALSE, ndbcluster_hton);
       } 
       else 
@@ -4196,6 +4262,7 @@
           thd_ndb->init_open_tables();
           thd_ndb->all= trans; 
 	  thd_ndb->query_state&= NDB_QUERY_NORMAL;
+          thd_ndb->trans_options= 0;
           trans_register_ha(thd, TRUE, ndbcluster_hton);
 
           /*
@@ -4236,7 +4303,10 @@
     // Start of transaction
     m_rows_changed= 0;
     m_ops_pending= 0;
-
+#ifdef HAVE_NDB_BINLOG
+    if (m_share == ndb_apply_status_share && thd->slave_thread)
+      thd_ndb->trans_options|= TNTO_INJECTED_APPLY_STATUS;
+#endif
     // TODO remove double pointers...
     m_thd_ndb_share= thd_ndb->get_open_table(thd, m_table);
     m_table_info= &m_thd_ndb_share->stat;
@@ -4379,6 +4449,11 @@
                             "stmt" : "all"));
   DBUG_ASSERT(ndb && trans);
 
+#ifdef HAVE_NDB_BINLOG
+  if (thd->slave_thread)
+    ndbcluster_update_apply_status(thd, thd_ndb->trans_options & TNTO_INJECTED_APPLY_STATUS);
+#endif /* HAVE_NDB_BINLOG */
+
   if (execute_commit(thd,trans) != 0)
   {
     const NdbError err= trans->getNdbError();
@@ -6814,6 +6889,8 @@
   if ((g_ndb_cluster_connection=
        new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
   {
+    sql_print_error("NDB: failed to allocate global "
+                    "ndb cluster connection object");
     DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
                         opt_ndbcluster_connectstring));
     goto ndbcluster_init_error;
@@ -6829,6 +6906,7 @@
   // Create a Ndb object to open the connection  to NDB
   if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
   {
+    sql_print_error("NDB: failed to allocate global ndb object");
     DBUG_PRINT("error", ("failed to create global ndb object"));
     goto ndbcluster_init_error;
   }
@@ -6855,38 +6933,90 @@
     sleep(1);
   }
 
+  {
+    g_ndb_cluster_connection_pool_alloc= opt_ndb_cluster_connection_pool;
+    g_ndb_cluster_connection_pool= (Ndb_cluster_connection**)
+      my_malloc(g_ndb_cluster_connection_pool_alloc *
+                sizeof(Ndb_cluster_connection*),
+                MYF(MY_WME | MY_ZEROFILL));
+    pthread_mutex_init(&g_ndb_cluster_connection_pool_mutex,
+                       MY_MUTEX_INIT_FAST);
+    g_ndb_cluster_connection_pool[0]= g_ndb_cluster_connection;
+    for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+    {
+      if ((g_ndb_cluster_connection_pool[i]=
+           new Ndb_cluster_connection(opt_ndbcluster_connectstring,
+                                      g_ndb_cluster_connection)) == 0)
+      {
+        sql_print_error("NDB[%u]: failed to allocate cluster connect object",
+                        i);
+        DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
+                            i, opt_ndbcluster_connectstring));
+        goto ndbcluster_init_error;
+      }
+      {
+        char buf[128];
+        my_snprintf(buf, sizeof(buf), "mysqld --server-id=%lu (connection %u)",
+                    server_id, i+1);
+        g_ndb_cluster_connection_pool[i]->set_name(buf);
+      }
+      g_ndb_cluster_connection_pool[i]->set_optimized_node_selection
+        (opt_ndb_optimized_node_selection);
+    }
+  }
+
   if (res == 0)
   {
     connect_callback();
-    DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
-                       g_ndb_cluster_connection->get_connected_host(),
-                       g_ndb_cluster_connection->get_connected_port()));
+    for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
     {
+      if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
+      {
+        // not connected to mgmd yet, try again
+        g_ndb_cluster_connection_pool[i]->connect(0,0,0);
+        if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
+        {
+          sql_print_warning("NDB[%u]: starting connect thread", i);
+          g_ndb_cluster_connection_pool[i]->start_connect_thread();
+          continue;
+        }
+      }
+      DBUG_PRINT("info",
+                 ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
+                  g_ndb_cluster_connection_pool[i]->get_connected_host(),
+                  g_ndb_cluster_connection_pool[i]->get_connected_port()));
+
       struct timeval now_time;
       gettimeofday(&now_time, 0);
       ulong wait_until_ready_time = (end_time.tv_sec > now_time.tv_sec) ?
         end_time.tv_sec - now_time.tv_sec : 1;
-      res= g_ndb_cluster_connection->wait_until_ready(wait_until_ready_time,3);
-    }
-    if (res == 0)
-    {
-      sql_print_information("NDB: all storage nodes connected");
-    }
-    else if (res > 0)
-    {
-      sql_print_information("NDB: some storage nodes connected");
-    }
-    else if (res < 0)
-    {
-      sql_print_information("NDB: no storage nodes connected (timed out)");
+      res= g_ndb_cluster_connection_pool[i]->
+        wait_until_ready(wait_until_ready_time,3);
+      if (res == 0)
+      {
+        sql_print_information("NDB[%u]: all storage nodes connected", i);
+      }
+      else if (res > 0)
+      {
+        sql_print_information("NDB[%u]: some storage nodes connected", i);
+      }
+      else if (res < 0)
+      {
+        sql_print_information("NDB[%u]: no storage nodes connected (timed out)", i);
+      }
     }
   } 
   else if (res == 1)
   {
-    if (g_ndb_cluster_connection->start_connect_thread(connect_callback)) 
+    for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
     {
-      DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
-      goto ndbcluster_init_error;
+      if (g_ndb_cluster_connection_pool[i]->
+          start_connect_thread(i == 0 ? connect_callback :  NULL)) 
+      {
+        sql_print_error("NDB[%u]: failed to start connect thread", i);
+        DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
+        goto ndbcluster_init_error;
+      }
     }
 #ifndef DBUG_OFF
     {
@@ -6952,6 +7082,22 @@
   if (g_ndb)
     delete g_ndb;
   g_ndb= NULL;
+  {
+    if (g_ndb_cluster_connection_pool)
+    {
+      /* first in pool is the main one, wait with release */
+      for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+      {
+        if (g_ndb_cluster_connection_pool[i])
+          delete g_ndb_cluster_connection_pool[i];
+      }
+      my_free((gptr) g_ndb_cluster_connection_pool, MYF(MY_ALLOW_ZERO_PTR));
+      pthread_mutex_destroy(&g_ndb_cluster_connection_pool_mutex);
+      g_ndb_cluster_connection_pool= 0;
+    }
+    g_ndb_cluster_connection_pool_alloc= 0;
+    g_ndb_cluster_connection_pool_pos= 0;
+  }
   if (g_ndb_cluster_connection)
     delete g_ndb_cluster_connection;
   g_ndb_cluster_connection= NULL;
@@ -7015,6 +7161,22 @@
     delete g_ndb;
     g_ndb= NULL;
   }
+  {
+    if (g_ndb_cluster_connection_pool)
+    {
+      /* first in pool is the main one, wait with release */
+      for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+      {
+        if (g_ndb_cluster_connection_pool[i])
+          delete g_ndb_cluster_connection_pool[i];
+      }
+      my_free((gptr) g_ndb_cluster_connection_pool, MYF(MY_ALLOW_ZERO_PTR));
+      pthread_mutex_destroy(&g_ndb_cluster_connection_pool_mutex);
+      g_ndb_cluster_connection_pool= 0;
+    }
+    g_ndb_cluster_connection_pool_alloc= 0;
+    g_ndb_cluster_connection_pool_pos= 0;
+  }
   delete g_ndb_cluster_connection;
   g_ndb_cluster_connection= NULL;
 
@@ -8597,8 +8759,9 @@
 {
   THD *thd; /* needs to be first for thread_stack */
   struct timespec abstime;
-  List<NDB_SHARE> util_open_tables;
   Thd_ndb *thd_ndb;
+  uint share_list_size= 0;
+  NDB_SHARE **share_list= NULL;
 
   my_thread_init();
   DBUG_ENTER("ndb_util_thread");
@@ -8718,7 +8881,22 @@
     /* Lock mutex and fill list with pointers to all open tables */
     NDB_SHARE *share;
     pthread_mutex_lock(&ndbcluster_mutex);
-    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+    uint i, open_count, record_count= ndbcluster_open_tables.records;
+    if (share_list_size < record_count)
+    {
+      NDB_SHARE ** new_share_list= new NDB_SHARE * [record_count];
+      if (!new_share_list)
+      {
+        sql_print_warning("ndb util thread: malloc failure, "
+                          "query cache not maintained properly");
+        pthread_mutex_unlock(&ndbcluster_mutex);
+        goto next;                               // At least do not crash
+      }
+      delete [] share_list;
+      share_list_size= record_count;
+      share_list= new_share_list;
+    }
+    for (i= 0, open_count= 0; i < record_count; i++)
     {
       share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
 #ifdef HAVE_NDB_BINLOG
@@ -8736,14 +8914,14 @@
                   i, share->table_name, share->use_count));
 
       /* Store pointer to table */
-      util_open_tables.push_back(share);
+      share_list[open_count++]= share;
     }
     pthread_mutex_unlock(&ndbcluster_mutex);
 
-    /* Iterate through the  open files list */
-    List_iterator_fast<NDB_SHARE> it(util_open_tables);
-    while ((share= it++))
+    /* Iterate through the open files list */
+    for (i= 0; i < open_count; i++)
     {
+      share= share_list[i];
 #ifdef HAVE_NDB_BINLOG
       if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
           <= 1)
@@ -8804,10 +8982,7 @@
                                share->key, share->use_count));
       free_share(&share);
     }
-
-    /* Clear the list of open tables */
-    util_open_tables.empty();
-
+next:
     /* Calculate new time to wake up */
     int secs= 0;
     int msecs= ndb_cache_check_time;
@@ -8835,6 +9010,8 @@
 ndb_util_thread_end:
   net_end(&thd->net);
 ndb_util_thread_fail:
+  if (share_list)
+    delete [] share_list;
   thd->cleanup();
   delete thd;
   
Thread
bk commit into 5.1 tree (jonas:1.2335)jonas3 Apr