List:Internals« Previous MessageNext Message »
From:Stewart Smith Date:March 10 2005 11:53pm
Subject:bk commit into 5.1 tree (stewart:1.1799)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1799 05/03/11 09:53:24 stewart@stripped +1 -0
  Merge

  sql/ha_ndbcluster.cc
    1.166 05/03/11 09:53:23 stewart@stripped +0 -0
    SCCS merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	stewart
# Host:	kennedy.(none)
# Root:	/home/stewart/Documents/MySQL/5.1/ndb-wl2325/RESYNC

--- 1.165/sql/ha_ndbcluster.cc	2005-03-10 19:35:25 +11:00
+++ 1.166/sql/ha_ndbcluster.cc	2005-03-11 09:53:23 +11:00
@@ -6855,12 +6855,6 @@
 #ifdef HAVE_REPLICATION
 #include "slave.h"
 
-struct tableEvent {
-  NdbEventOperation *ndbevent;
-  NdbRecAttr** recAttr;
-  NdbRecAttr** recAttrPre;
-};
-
 static int add_binlog_index(THD *thd, longlong gci, const char *master_log_file, longlong
master_log_pos)
 {
   int error= 0;
@@ -6892,7 +6886,7 @@
   return error;
 }
 
-int ndbcluster_setup_repl_for_table(Ndb *ndb, struct tableEvent* te, const char* db_name,
const char* table_name, const char* eventName)
+int ndbcluster_setup_repl_for_table(Ndb *ndb, const char* db_name, const char*
table_name, const char* eventName)
 { 
   NDBDICT *dict;
 
@@ -6935,8 +6929,8 @@
     return -1;
   }
 
-  te->ndbevent= ndb->createEventOperation(eventName,100);
-  if(!te->ndbevent)
+  NdbEventOperation *op= ndb->createEventOperation(eventName,100);
+  if(!op)
   {
     sql_print_error("NDB Replication: Creating NdbEventOperation failed for"
 		    " %s",eventName);
@@ -6944,23 +6938,14 @@
   }
 
   int n_columns= table->getNoOfColumns();
-  te->recAttr= (NdbRecAttr**)malloc(2*sizeof(NdbRecAttr*)*n_columns);
-  if(te->recAttr == NULL)
-  {
-    sql_print_error("NDB Replication: Unable to allocate memory for "
-		    "tracking table changes");
-    return -1;
-  }
-  te->recAttrPre= te->recAttr + n_columns;
-
   for (int j= 0; j < n_columns; j++)
   {
     const char *col_name= table->getColumn(j)->getName();
-    te->recAttr[j]= te->ndbevent->getValue(col_name);
-    te->recAttrPre[j]= te->ndbevent->getPreValue(col_name);
+    op->getValue(col_name);
+    op->getPreValue(col_name);
   }
   
-  if (te->ndbevent->execute())
+  if (op->execute())
   {
     sql_print_error("NDB Replication: ndbevent->execute failed for %s",
 		    eventName);
@@ -6985,8 +6970,6 @@
 {
   THD *thd; /* needs to be first for thread_stack */
   int error= 0;
-  struct tableEvent *tableEvents= NULL;
-  unsigned int n_tableEvents= 0;
   NdbDictionary::Dictionary::List list;
   NDBDICT *dict;
   String eventName(200); // should be enough to prevent a malloc
@@ -7045,16 +7028,6 @@
     goto err;
   }
 
-  /** TODO, this needs to be a dynamic structure */
-  tableEvents= (struct tableEvent*) malloc(sizeof(struct tableEvent)*list.count);
-
-  if(!tableEvents)
-  {
-    sql_print_error("NDB Replication: Could not allocate enough memory"
-		    " to track table events");
-    goto err;
-  }
-
   /**
    * Setup event watching for tables being replicated.
    */
@@ -7069,14 +7042,12 @@
 
     ndb_rep_event_name(&eventName,t.database,t.name);
 
-    if(ndbcluster_setup_repl_for_table(ndb, &(tableEvents[n_tableEvents]),
-				       t.database, t.name,
+    if(ndbcluster_setup_repl_for_table(ndb, t.database, t.name,
 				       eventName.c_ptr()) < 0)
       goto err;
 
     sql_print_information("NDB Replication: %s.%s Event:%s",
 			  t.database,t.name,eventName.c_ptr());
-    n_tableEvents++;
   } // for
 
   sql_print_information("Starting Cluster Replication with %d events",
@@ -7087,94 +7058,72 @@
    */
   for (;;)
   {
-    int res= ndb->pollEvents(10000); // wait for event or 1000 ms
+    int res= ndb->pollEvents(1000); // wait for event or 1000 ms
 
     if (abort_loop)
       break; /* Shutting down server */
 
     DBUG_PRINT("info",("pollEvents res: %d",res));
-    if(res>0)
+    if(res > 0)
     {
-      for(unsigned int i=0; i < n_tableEvents; i++)
+      error= 0;
+      for (NdbEventOperation *pOp= ndb->nextEvent(&error);
+	   pOp; pOp= ndb->nextEvent(&error) )
       {
-	NdbEventOperation *pOp= tableEvents[i].ndbevent;
-
-	int overrun= 0;
-	int n= pOp->next(&overrun);
-	DBUG_PRINT("info",("%d",n));
-	while(n>0)
+	int overrun= pOp->isOverrun();
+	if (overrun)
+	{
+	  sql_print_error("NDB Replication: Overrun in event buffer, "
+			  "this means we have dropped events. Cannot "
+			  "continue replication.");
+	  goto err;
+	}
+	
+	longlong gci= pOp->getGCI();
+	
+	if (!pOp->isConsistent())
+	{
+	  sql_print_error("NDB Replication: Not Consistent. Cannot "
+			  "continue replication.");
+	  goto err;
+	}
+	
+	if (gci != last_gci_in_binlog_index)
 	{
-	  if (overrun)
-	  {
-	    sql_print_error("NDB Replication: Overrun in event buffer, "
-			    "this means we have dropped events. Cannot "
-			    "continue replication.");
-	    goto err;
-	  }
-	  
-	  longlong gci= pOp->getGCI();
-	  
-	  if (!pOp->isConsistent())
-	  {
-	    sql_print_error("NDB Replication: Not Consistent. Cannot "
-			    "continue replication.");
-	    goto err;
-	  }
-
-	  if (gci != last_gci_in_binlog_index)
-	  {
-	    /**
-	     * TODO: handle this correctly
-	     * update the cluster_replication.binlog_index
-	     */
-	    add_binlog_index(thd, gci, "some file name from injector", (longlong)4321);
-	    last_gci_in_binlog_index= gci;
-	  }
-
-	  DBUG_PRINT("info",("EVENT TYPE: %d GCI: %u",pOp->getEventType(),gci));
-	  switch(pOp->getEventType())
-	  {
-	  case NdbDictionary::Event::TE_INSERT:
-	    DBUG_PRINT("info",("INSERT"));
-	    break;
-	  case NdbDictionary::Event::TE_DELETE:
-	    DBUG_PRINT("info",("DELETE"));
-	    break;
-	  case NdbDictionary::Event::TE_UPDATE:
-	    DBUG_PRINT("info",("UPDATE"));
-	    break;
-	  case NdbDictionary::Event::TE_ALL:
-	    DBUG_PRINT("info",("TE_ALL - uh oh, a brain broke."));
-	    // We should never get here.
-	    break;
-	  default:
-	    DBUG_PRINT("info",("default - uh oh, a brain exploded."));
-	    // We should REALLY never get here.
-	    break;
-	  }
-	  n= pOp->next(&overrun);
-	  DBUG_PRINT("info",("%d",n));
-	}// while
+	  /**
+	   * TODO: handle this correctly
+	   * update the cluster_replication.binlog_index
+	   */
+	  add_binlog_index(thd, gci, "some file name from injector", (longlong)4321);
+	  last_gci_in_binlog_index= gci;
+	}
+	
+	DBUG_PRINT("info",("EVENT TYPE: %d GCI: %u",pOp->getEventType(),gci));
+	switch(pOp->getEventType())
+	{
+	case NdbDictionary::Event::TE_INSERT:
+	  DBUG_PRINT("info",("INSERT"));
+	  break;
+	case NdbDictionary::Event::TE_DELETE:
+	  DBUG_PRINT("info",("DELETE"));
+	  break;
+	case NdbDictionary::Event::TE_UPDATE:
+	  DBUG_PRINT("info",("UPDATE"));
+	  break;
+	case NdbDictionary::Event::TE_ALL:
+	  DBUG_PRINT("info",("TE_ALL - uh oh, a brain broke."));
+	  // We should never get here.
+	  break;
+	default:
+	  DBUG_PRINT("info",("default - uh oh, a brain exploded."));
+	  // We should REALLY never get here.
+	  break;
+	}
       } // for
     } // if
   } // for(;;)
 err:
   sql_print_information("Stopping Cluster Replication");
-
-  // We now deallocate any memory we may have used
-  if(tableEvents)
-  {
-    for(unsigned int i=0; i<n_tableEvents; i++)
-    {
-      if(!tableEvents[i].ndbevent) // last valid entry
-	break;
-      ndb->dropEventOperation(tableEvents[i].ndbevent);
-
-      if(tableEvents[i].recAttr)
-	free(tableEvents[i].recAttr);
-    }
-    free(tableEvents);
-  }
 
   thd->cleanup();
   delete thd;
Thread
bk commit into 5.1 tree (stewart:1.1799)Stewart Smith10 Mar