List:Internals« Previous MessageNext Message »
From:Stewart Smith Date:March 10 2005 5:41am
Subject:bk commit into 5.1 tree (stewart:1.1796)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1796 05/03/10 16:41:36 stewart@stripped +1 -0
  Further work on WL2325 - NDB Injector thread

  sql/ha_ndbcluster.cc
    1.164 05/03/10 16:41:25 stewart@stripped +86 -70
    Injector thread:
    
    Move the setup of events for a table to a seperate function. In the future we'll
    support adding/dropping of tables while replicating. We'll be calling this function
    to set things up.

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	stewart
# Host:	kennedy.(none)
# Root:	/home/stewart/Documents/MySQL/5.1/ndb-wl2325

--- 1.163/sql/ha_ndbcluster.cc	2005-03-10 16:10:22 +11:00
+++ 1.164/sql/ha_ndbcluster.cc	2005-03-10 16:41:25 +11:00
@@ -6869,6 +6869,84 @@
   return error;
 }
 
+int ndbcluster_setup_repl_for_table(Ndb *ndb, struct tableEvent* te, const char* db_name, const char* table_name, const char* eventName)
+{ 
+  NDBDICT *dict;
+
+  dict= ndb->getDictionary();
+  if(!dict)
+  {
+    sql_print_error("NDB Replication: could not setup replication, "
+		    "Invalid NdbDictionary");
+    return -1;
+  }
+
+  ndb->setDatabaseName(db_name);
+  const NdbDictionary::Table *table= dict->getTable(table_name);
+
+  NdbDictionary::Event myEvent(eventName);
+  myEvent.setTable(table_name);
+  myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
+
+  // add all columns to the event
+  for(int a= 0; a < table->getNoOfColumns(); a++)
+  {
+    myEvent.addEventColumn(a);
+  }
+
+  int res= dict->createEvent(myEvent); // Add event to database
+  if (res && 
+      !(dict->getNdbError().classification ==
+	NdbError::SchemaObjectExists))
+  {
+    /*
+     * if the event already exists, that's okay, another mysqld
+     * may be doing replication as well. It's just if it's another
+     * error that we report it.
+     */
+    sql_print_error("NDB Replication: Unable to create event in database. "
+		    "Event: %s Error Code: %d Message: %s",
+		    eventName,
+		    dict->getNdbError().code,
+		    dict->getNdbError().message);
+    return -1;
+  }
+
+  te->ndbevent= ndb->createEventOperation(eventName,100);
+  if(!te->ndbevent)
+  {
+    sql_print_error("NDB Replication: Creating NdbEventOperation failed for"
+		    " %s",eventName);
+    return -1;
+  }
+
+  int n_columns= table->getNoOfColumns();
+  te->recAttr= (NdbRecAttr**)malloc(2*sizeof(NdbRecAttr*)*n_columns);
+  if(te->recAttr == NULL)
+  {
+    sql_print_error("NDB Replication: Unable to allocate memory for "
+		    "tracking table changes");
+    return -1;
+  }
+  te->recAttrPre= te->recAttr + n_columns;
+
+  for (int j= 0; j < n_columns; j++)
+  {
+    const char *col_name= table->getColumn(j)->getName();
+    te->recAttr[j]= te->ndbevent->getValue(col_name);
+    te->recAttrPre[j]= te->ndbevent->getPreValue(col_name);
+  }
+  
+  if (te->ndbevent->execute())
+  {
+    sql_print_error("NDB Replication: ndbevent->execute failed for %s",
+		    eventName);
+    return -1;
+  }
+
+  return 0;
+}
+
 // Injector thread main loop
 extern "C" pthread_handler_decl(ndb_injector_thread_func,
                                 arg __attribute__((unused)))
@@ -6953,89 +7031,27 @@
       /* skip this table */
       continue;
     }
-
-    ndb->setDatabaseName(t.database);
-    const NdbDictionary::Table *table= dict->getTable(t.name);
-
     eventName.set_ascii("REPL$",5);
     eventName.append(t.database);
     eventName.append('/');
     eventName.append(t.name);
 
-    NdbDictionary::Event myEvent(eventName.ptr());
-    myEvent.setTable(t.name);
-    myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); 
-    // add all columns to the event
-    for(int a= 0; a < table->getNoOfColumns(); a++)
-    {
-      myEvent.addEventColumn(a);
-    }
-
-    int res= dict->createEvent(myEvent); // Add event to database
-    if (res && 
-	!(dict->getNdbError().classification ==
-	  NdbError::SchemaObjectExists))
-    {
-      /*
-       * if the event already exists, that's okay, another mysqld
-       * may be doing replication as well. It's just if it's another
-       * error that we report it.
-       */
-      sql_print_error("NDB Replication: Unable to create event in database. "
-		      "Event: %s Error Code: %d Message: %s",
-		      eventName.ptr(),
-		      dict->getNdbError().code,
-		      dict->getNdbError().message);
-      goto err;
-    }
-
-    tableEvents[n_tableEvents].ndbevent=
-      ndb->createEventOperation(eventName.ptr(),100);
-    if(!tableEvents[n_tableEvents].ndbevent)
-    {
-      sql_print_error("NDB Replication: Creating NdbEventOperation failed for"
-		      " %s",eventName.ptr());
-      goto err;
-    }
-
-    int n_columns= table->getNoOfColumns();
-    tableEvents[n_tableEvents].recAttr=
-      (NdbRecAttr**)malloc(2*sizeof(NdbRecAttr*)*n_columns);
-    if(tableEvents[n_tableEvents].recAttr == NULL)
-    {
-      sql_print_error("NDB Replication: Unable to allocate memory for "
-		      "tracking table changes");
+    if(ndbcluster_setup_repl_for_table(ndb, &(tableEvents[n_tableEvents]),
+				       t.database, t.name,
+				       eventName.ptr()) < 0)
       goto err;
-    }
-    tableEvents[n_tableEvents].recAttrPre=
-      tableEvents[n_tableEvents].recAttr+n_columns;
 
-    for (int j= 0; j < n_columns; j++)
-    {
-      tableEvents[n_tableEvents].recAttr[j]=
-	tableEvents[n_tableEvents].ndbevent
-	->getValue(table->getColumn(j)->getName());
-
-      tableEvents[n_tableEvents].recAttrPre[j]=
-	tableEvents[n_tableEvents].ndbevent
-	->getPreValue(table->getColumn(j)->getName());
-    }
-
-    if (tableEvents[n_tableEvents].ndbevent->execute())
-    {
-      sql_print_error("NDB Replication: ndbevent->execute failed for %s",
-		      eventName.ptr());
-      goto err;
-    }
-    
-    sql_print_information("NDB Replication: %s.%s Event:%s NumColumns:%d",
-			  t.database,t.name,eventName.ptr(),n_columns);
+    sql_print_information("NDB Replication: %s.%s Event:%s",
+			  t.database,t.name,eventName.ptr());
     n_tableEvents++;
   } // for
 
   sql_print_information("Starting Cluster Replication with %d events",
 			list.count);
 
+  /**
+   * Main NDB Injector loop
+   */
   for (;;)
   {
     int res= ndb->pollEvents(10000); // wait for event or 1000 ms
Thread
bk commit into 5.1 tree (stewart:1.1796)Stewart Smith10 Mar