List:Commits« Previous MessageNext Message »
From:tomas Date:April 12 2006 4:01pm
Subject:bk commit into 5.1 tree (tomas:1.2314)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.2314 06/04/12 18:01:19 tomas@stripped +1 -0
  ndb:
  freeing non freed objects on server shutdown
  corrected timeout handling on schema events

  sql/ha_ndbcluster_binlog.cc
    1.43 06/04/12 18:01:11 tomas@stripped +87 -54
    ndb:
    freeing non freed objects on server shutdown
    corrected timeout handling on schema events

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-new

--- 1.42/sql/ha_ndbcluster_binlog.cc	2006-04-10 16:08:34 +02:00
+++ 1.43/sql/ha_ndbcluster_binlog.cc	2006-04-12 18:01:11 +02:00
@@ -1228,9 +1228,9 @@
       struct timespec abstime;
       int i;
       set_timespec(abstime, 1);
-      (void) pthread_cond_timedwait(&injector_cond,
-                                    &ndb_schema_object->mutex,
-                                    &abstime);
+      int ret= pthread_cond_timedwait(&injector_cond,
+                                      &ndb_schema_object->mutex,
+                                      &abstime);
 
       (void) pthread_mutex_lock(&schema_share->mutex);
       for (i= 0; i < ndb_number_of_storage_nodes; i++)
@@ -1252,16 +1252,19 @@
       if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
         break;
 
-      max_timeout--;
-      if (max_timeout == 0)
+      if (ret)
       {
-        sql_print_error("NDB %s: distibuting %s timed out. Ignoring...",
-                        type_str, ndb_schema_object->key);
-        break;
+        max_timeout--;
+        if (max_timeout == 0)
+        {
+          sql_print_error("NDB %s: distibuting %s timed out. Ignoring...",
+                          type_str, ndb_schema_object->key);
+          break;
+        }
+        if (ndb_extra_logging)
+          ndb_report_waiting(type_str, max_timeout,
+                             "distributing", ndb_schema_object->key);
       }
-      if (ndb_extra_logging)
-        ndb_report_waiting(type_str, max_timeout,
-                           "distributing", ndb_schema_object->key);
     }
     (void) pthread_mutex_unlock(&ndb_schema_object->mutex);
   }
@@ -2445,7 +2448,6 @@
     }
     if (!op)
     {
-      pthread_mutex_unlock(&injector_mutex);
       sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
                       " %s",event_name);
       push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
@@ -2453,6 +2455,7 @@
                           ndb->getNdbError().code,
                           ndb->getNdbError().message,
                           "NDB");
+      pthread_mutex_unlock(&injector_mutex);
       DBUG_RETURN(-1);
     }
 
@@ -2635,20 +2638,23 @@
   {
     struct timespec abstime;
     set_timespec(abstime, 1);
-    (void) pthread_cond_timedwait(&injector_cond,
-                                  &share->mutex,
-                                  &abstime);
-    max_timeout--;
+    int ret= pthread_cond_timedwait(&injector_cond,
+                                    &share->mutex,
+                                    &abstime);
     if (share->op == 0)
       break;
-    if (max_timeout == 0)
+    if (ret)
     {
-      sql_print_error("NDB delete table: timed out. Ignoring...");
-      break;
+      max_timeout--;
+      if (max_timeout == 0)
+      {
+        sql_print_error("NDB %s: timed out. Ignoring...", type_str);
+        break;
+      }
+      if (ndb_extra_logging)
+        ndb_report_waiting(type_str, max_timeout,
+                           type_str, share->key);
     }
-    if (ndb_extra_logging)
-      ndb_report_waiting(type_str, max_timeout,
-                         type_str, share->key);
   }
   (void) pthread_mutex_unlock(&share->mutex);
 #else
@@ -2711,7 +2717,8 @@
 }
 
 static int
-ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
+                                        NdbEventOperation *pOp,
                                         Binlog_index_row &row)
 {
   NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
@@ -2720,7 +2727,7 @@
   /* make sure to flush any pending events as they can be dependent
      on one of the tables being changed below
   */
-  injector_thd->binlog_flush_pending_rows_event(true);
+  thd->binlog_flush_pending_rows_event(true);
 
   switch (type)
   {
@@ -2777,7 +2784,7 @@
     return 0;
   }
 
-  ndb_handle_schema_change(injector_thd, ndb, pOp, share);
+  ndb_handle_schema_change(thd, ndb, pOp, share);
   return 0;
 }
 
@@ -3057,7 +3064,8 @@
 pthread_handler_t ndb_binlog_thread_func(void *arg)
 {
   THD *thd; /* needs to be first for thread_stack */
-  Ndb *ndb= 0;
+  Ndb *i_ndb= 0;
+  Ndb *s_ndb= 0;
   Thd_ndb *thd_ndb=0;
   int ndb_update_binlog_index= 1;
   injector *inj= injector::instance();
@@ -3109,16 +3117,16 @@
   pthread_mutex_unlock(&LOCK_thread_count);
   thd->lex->start_transaction_opt= 0;
 
-  if (!(schema_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
-      schema_ndb->init())
+  if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+      s_ndb->init())
   {
     sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
     goto err;
   }
 
   // empty database
-  if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
-      ndb->init())
+  if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
+      i_ndb->init())
   {
     sql_print_error("NDB Binlog: Getting Ndb object failed");
     ndb_binlog_thread_running= -1;
@@ -3139,7 +3147,8 @@
     pthread_mutex_lock(&injector_mutex);
   */
   injector_thd= thd;
-  injector_ndb= ndb;
+  injector_ndb= i_ndb;
+  schema_ndb= s_ndb;
   ndb_binlog_thread_running= 1;
   if (opt_bin_log)
   {
@@ -3221,14 +3230,14 @@
     int res= 0, tot_poll_wait= 1000;
     if (ndb_binlog_running)
     {
-      res= ndb->pollEvents(tot_poll_wait, &gci);
+      res= i_ndb->pollEvents(tot_poll_wait, &gci);
       tot_poll_wait= 0;
     }
-    int schema_res= schema_ndb->pollEvents(tot_poll_wait, &schema_gci);
+    int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
     ndb_latest_received_binlog_epoch= gci;
 
     while (gci > schema_gci && schema_res >= 0)
-      schema_res= schema_ndb->pollEvents(10, &schema_gci);
+      schema_res= s_ndb->pollEvents(10, &schema_gci);
 
     if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
         (ndb_latest_handled_binlog_epoch >= g_latest_trans_gci ||
@@ -3256,11 +3265,11 @@
     if (unlikely(schema_res > 0))
     {
       thd->proc_info= "Processing events from schema table";
-      schema_ndb->
+      s_ndb->
         setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
-      schema_ndb->
+      s_ndb->
         setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
-      NdbEventOperation *pOp= schema_ndb->nextEvent();
+      NdbEventOperation *pOp= s_ndb->nextEvent();
       while (pOp != NULL)
       {
         if (!pOp->hasError())
@@ -3273,7 +3282,7 @@
                           "binlog schema event",
                           (ulong) pOp->getNdbError().code,
                           pOp->getNdbError().message);
-        pOp= schema_ndb->nextEvent();
+        pOp= s_ndb->nextEvent();
       }
     }
 
@@ -3285,7 +3294,7 @@
       int event_count= 0;
 #endif
       thd->proc_info= "Processing events";
-      NdbEventOperation *pOp= ndb->nextEvent();
+      NdbEventOperation *pOp= i_ndb->nextEvent();
       Binlog_index_row row;
       while (pOp != NULL)
       {
@@ -3296,9 +3305,9 @@
                     ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
         DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
 
-        ndb->
+        i_ndb->
           setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
-        ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
+        i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
 
         bzero((char*) &row, sizeof(row));
         injector::transaction trans;
@@ -3307,7 +3316,7 @@
           Uint32 iter= 0;
           const NdbEventOperation *gci_op;
           Uint32 event_types;
-          while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+          while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
                  != NULL)
           {
             NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
@@ -3393,7 +3402,7 @@
           event_count++;
 #endif
           if (pOp->hasError() &&
-              ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
+              ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
             goto err;
 
 #ifndef DBUG_OFF
@@ -3413,7 +3422,7 @@
             Uint32 iter= 0;
             const NdbEventOperation *gci_op;
             Uint32 event_types;
-            while ((gci_op= ndb->getGCIEventOperations(&iter, &event_types))
+            while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
                    != NULL)
             {
               if (gci_op == pOp)
@@ -3425,19 +3434,19 @@
 #endif
           if ((unsigned) pOp->getEventType() <
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
-            ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
+            ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
           else
           {
             // set injector_ndb database/schema from table internal name
             int ret=
-              ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
+              i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
             DBUG_ASSERT(ret == 0);
-            ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
+            ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
             // reset to catch errors
-            ndb->setDatabaseName("");
+            i_ndb->setDatabaseName("");
           }
 
-          pOp= ndb->nextEvent();
+          pOp= i_ndb->nextEvent();
         } while (pOp && pOp->getGCI() == gci);
 
         /*
@@ -3495,7 +3504,9 @@
   close_thread_tables(thd);
   pthread_mutex_lock(&injector_mutex);
   /* don't mess with the injector_ndb anymore from other threads */
+  injector_thd= 0;
   injector_ndb= 0;
+  schema_ndb= 0;
   pthread_mutex_unlock(&injector_mutex);
   thd->db= 0; // as not to try to free memory
   sql_print_information("Stopping Cluster Binlog");
@@ -3512,21 +3523,43 @@
   }
 
   /* remove all event operations */
-  if (ndb)
+  if (s_ndb)
   {
     NdbEventOperation *op;
     DBUG_PRINT("info",("removing all event operations"));
-    while ((op= ndb->getEventOperation()))
+    while ((op= s_ndb->getEventOperation()))
     {
       DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
       DBUG_PRINT("info",("removing event operation on %s",
                          op->getEvent()->getName()));
       NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
+      DBUG_ASSERT(share->op == op ||
+                  share->op_old == op);
+      share->op= share->op_old= 0;
       free_share(&share);
-      ndb->dropEventOperation(op);
+      s_ndb->dropEventOperation(op);
+    }
+    delete s_ndb;
+    s_ndb= 0;
+  }
+  if (i_ndb)
+  {
+    NdbEventOperation *op;
+    DBUG_PRINT("info",("removing all event operations"));
+    while ((op= i_ndb->getEventOperation()))
+    {
+      DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
+      DBUG_PRINT("info",("removing event operation on %s",
+                         op->getEvent()->getName()));
+      NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
+      DBUG_ASSERT(share->op == op ||
+                  share->op_old == op);
+      share->op= share->op_old= 0;
+      free_share(&share);
+      i_ndb->dropEventOperation(op);
     }
-    delete ndb;
-    ndb= 0;
+    delete i_ndb;
+    i_ndb= 0;
   }
 
   hash_free(&ndb_schema_objects);
Thread
bk commit into 5.1 tree (tomas:1.2314)tomas12 Apr