Below is the list of changes that have just been committed into a local
5.1 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1819 05/03/09 19:09:31 stewart@stripped +1 -0
WL2325 - further injector thread work
sql/ha_ndbcluster.cc
1.160 05/03/09 19:09:18 stewart@stripped +83 -15
Injector thread
- dynamically allocate recAttr, recAttrPre for each table
- fix signed ->unsigned warning
- display return value of NdbEventOperation::next
- sql_print_error on overrun
- DBUG_PRINT on specific event type
- deallocate memory at end of thread (not active yet though)
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: stewart
# Host: kennedy.(none)
# Root: /home/stewart/Documents/MySQL/5.1/ndb-wl2325
--- 1.159/sql/ha_ndbcluster.cc 2005-03-09 16:00:27 +11:00
+++ 1.160/sql/ha_ndbcluster.cc 2005-03-09 19:09:18 +11:00
@@ -6824,8 +6824,8 @@
struct tableEvent {
NdbEventOperation *ndbevent;
- NdbRecAttr* recAttr[1024];
- NdbRecAttr* recAttrPre[1024];
+ NdbRecAttr** recAttr;
+ NdbRecAttr** recAttrPre;
};
// Injector thread main loop
@@ -6834,12 +6834,15 @@
{
THD *thd; /* needs to be first for thread_stack */
int error= 0;
- struct timespec abstime;
struct tableEvent *tableEvents;
+ unsigned int n_tableEvents;
NdbDictionary::Dictionary::List list;
NDBDICT *dict;
String eventName(200); // should be enough to prevent a malloc
+ /*
+ * Set up the Thread
+ */
my_thread_init();
DBUG_ENTER("ndb_injector_thread");
@@ -6857,7 +6860,9 @@
DBUG_RETURN(NULL);
}
- // FIXME: report this out somehow
+ /*
+ * Set up ndb replication
+ */
sql_print_information("Setting up Cluster Replication");
Ndb *ndb;
@@ -6883,10 +6888,12 @@
goto err;
}
- tableEvents= (struct tableEvent*) malloc(sizeof(struct tableEvent)*list.count);
+ n_tableEvents= list.count;
+ tableEvents= (struct tableEvent*) malloc(sizeof(struct tableEvent)*n_tableEvents);
if(!tableEvents)
{
+ tableEvents= NULL;
sql_print_error("NDB Replication: Could not allocate enough memory"
" to track table events");
goto err;
@@ -6957,6 +6964,15 @@
}
int n_columns= table->getNoOfColumns();
+ tableEvents[i].recAttr= (NdbRecAttr**)malloc(sizeof(NdbRecAttr*)*n_columns);
+ tableEvents[i].recAttrPre= (NdbRecAttr**)malloc(sizeof(NdbRecAttr*)*n_columns);
+ if(tableEvents[i].recAttr==NULL
+ || tableEvents[i].recAttrPre==NULL)
+ {
+ sql_print_error("NDB Replication: Unable to allocate memory for "
+ "tracking table changes");
+ goto err;
+ }
for (int j = 0; j < n_columns; j++) {
tableEvents[i].recAttr[j]= tableEvents[i].ndbevent->getValue(table->getColumn(j)->getName());
tableEvents[i].recAttrPre[j]= tableEvents[i].ndbevent->getPreValue(table->getColumn(j)->getName());
@@ -6977,35 +6993,87 @@
for (;;)
{
- int res= ndb->pollEvents(1000); // wait for event or 1000 ms
+ int res= ndb->pollEvents(10000); // wait for event or 1000 ms
DBUG_PRINT("info",("pollEvents res: %d",res));
if(res>0)
{
- for(int i=0; i<list.count; i++)
+ for(unsigned int i=0; i<list.count; i++)
{
NdbEventOperation *pOp= tableEvents[i].ndbevent;
int overrun= 0;
- while(pOp->next(&overrun) > 0)
+ int n= pOp->next(&overrun);
+ DBUG_PRINT("info",("%d",n));
+ while(n>0)
{
if (overrun)
{
- DBUG_PRINT("error",("OVERRUN"));
+ sql_print_error("NDB Replication: Overrun in event buffer, "
+ "this means we have dropped events. Cannot "
+ "continue replication.");
+ goto err;
}
Uint32 gci= pOp->getGCI();
if (!pOp->isConsistent()) {
- DBUG_PRINT("error",("NOT CONSISTENT"));
+ sql_print_error("NDB Replication: Not Consistent. Cannot "
+ "continue replication.");
+ goto err;
}
DBUG_PRINT("info",("EVENT TYPE: %d GCI: %u",pOp->getEventType(),gci));
- pOp->print();
- }
- }
- }
- }
+ switch(pOp->getEventType())
+ {
+ case NdbDictionary::Event::TE_INSERT:
+ DBUG_PRINT("info",("INSERT"));
+ break;
+ case NdbDictionary::Event::TE_DELETE:
+ DBUG_PRINT("info",("DELETE"));
+ break;
+ case NdbDictionary::Event::TE_UPDATE:
+ DBUG_PRINT("info",("UPDATE"));
+ break;
+ case NdbDictionary::Event::TE_ALL:
+ DBUG_PRINT("info",("TE_ALL - uh oh, a brain broke."));
+ // We should never get here.
+ break;
+ default:
+ DBUG_PRINT("info",("default - uh oh, a brain exploded."));
+ // We should REALLY never get here.
+ break;
+ }
+ n= pOp->next(&overrun);
+ DBUG_PRINT("info",("%d",n));
+ }// while
+ } // for
+ } // if
+ } // for(;;)
err:
sql_print_information("Stopping Cluster Replication");
+
+ // We now deallocate any memory we may have used
+ if(tableEvents)
+ {
+ for(unsigned int i=0; i<n_tableEvents; i++)
+ {
+ if(!tableEvents[i].ndbevent) // last valid entry
+ break;
+ ndb->dropEventOperation(tableEvents[i].ndbevent);
+
+ // FIXME: find out if we need to delete each recAttr
+ if(tableEvents[i].recAttr)
+ free(tableEvents[i].recAttr);
+ if(tableEvents[i].recAttrPre)
+ free(tableEvents[i].recAttrPre);
+
+ // if either were null, we'd have stopped
+ if(tableEvents[i].recAttr==NULL ||
+ tableEvents[i].recAttrPre==NULL)
+ break;
+ }
+ free(tableEvents);
+ }
+
}
#endif /* HAVE_NDBCLUSTER_DB */
| Thread |
|---|
| • bk commit into 5.1 tree (stewart:1.1819) | Stewart Smith | 9 Mar |