Below is the list of changes that have just been committed into a local
5.1 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1842 05/04/06 11:58:38 stewart@stripped +1 -0
resolve merge
sql/ha_ndbcluster.cc
1.194 05/04/06 11:58:31 stewart@stripped +3 -4
resolve merge
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: stewart
# Host: kennedy.(none)
# Root: /home/stewart/Documents/MySQL/5.1/ndb-wl2325/RESYNC
--- 1.193/sql/ha_ndbcluster.cc 2005-04-06 11:52:36 +10:00
+++ 1.194/sql/ha_ndbcluster.cc 2005-04-06 11:58:31 +10:00
@@ -133,6 +133,7 @@
int ndbcluster_create_event_ops(Ndb *ndb, const NDBTAB* table,
const char *dbname, const char* event_name);
int ndbcluster_drop_event_ops(const char *tab);
+int replication_handle_drop_table(Ndb *ndb, const char *dbname, const char *tabname);
inline void ndb_rep_event_name(String *event_name, const char* db, const char* tbl);
extern "C" pthread_handler_decl(ndb_injector_thread_func, arg);
@@ -269,9 +270,11 @@
if (m_batch_execute)
return 0;
#endif
- return trans->execute(NdbTransaction::Commit,
+ int r= trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
h->m_force_send);
+ DBUG_PRINT("info",("trans: 0x%x at commit gci: %u", trans, trans->getGCI()));
+ return r;
}
inline
@@ -282,9 +285,11 @@
if (m_batch_execute)
return 0;
#endif
- return trans->execute(NdbTransaction::Commit,
+ int r= trans->execute(NdbTransaction::Commit,
NdbTransaction::AbortOnError,
thd->variables.ndb_force_send);
+ DBUG_PRINT("info",("trans: 0x%x at commit gci: %u", trans, trans->getGCI()));
+ return r;
}
inline
@@ -3902,28 +3907,28 @@
}
#ifdef HAVE_REPLICATION
- if(opt_bin_log && m_tabname[0]!='#')
+ if(ndb_injector_thread_running > 0 && m_tabname[0] != '#') // FIXME tabname
hack
{
const NDBTAB *t= dict->getTable(m_tabname);
String event_name(INJECTOR_EVENT_LEN);
ndb_rep_event_name(&event_name,m_dbname,m_tabname);
+ const NdbError err;
/* always create an event for the table */
if ( ndbcluster_create_event(ndb,t,event_name.c_ptr()) >= 0 )
{
+ sql_print_information("NDB Replication: CREATE TABLE Event: %s",
+ event_name.c_ptr());
if (injector_ndb &&
ndbcluster_create_event_ops(injector_ndb, t, m_dbname,
event_name.c_ptr() ) < 0)
{
- ; /* ToDo; handle error? */
- ndbcluster_drop_event(ndb,t,event_name.c_ptr());
+ /* ToDo; handle error? */
}
}
else
{
- ; /* ToDo handle error */
+ /* ToDo; handle error? */
}
- sql_print_information("NDB Replication: CREATE TABLE Event: %s",
- event_name.c_ptr());
}
#endif /* HAVE_REPLICATION */
@@ -4021,6 +4026,15 @@
set_tabname(from);
set_tabname(to, new_tabname);
+#ifdef HAVE_REPLICATION
+ bool handle_drop_table= (m_tabname[0] != '#' && ndb_injector_thread_running
> 0);
+ char old_db_tabname[FN_REFLEN];
+ if (handle_drop_table)
+ {
+ (void)strxnmov(old_db_tabname, FN_REFLEN, m_dbname,"/",m_tabname,NullS);
+ }
+#endif
+
if (check_ndb_connection())
DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
@@ -4034,19 +4048,6 @@
set_dbname(to);
ndb->setDatabaseName(m_dbname);
-#ifdef HAVE_REPLICATION
- if(opt_bin_log && m_tabname[0]!='#') // FIXME tabname hack
- {
- ndbcluster_drop_event_ops(from);
-
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,m_dbname,m_tabname);
-
- if ( ndbcluster_drop_event(ndb,0/* ToDo */,event_name.c_ptr()) < 0 )
- ; /* ToDo; handle error? */
- }
-#endif
-
if (!(result= alter_table_name(new_tabname)))
{
// Rename .ndb file
@@ -4054,28 +4055,35 @@
}
#ifdef HAVE_REPLICATION
- if(opt_bin_log && new_tabname[0]!='#') // FIXME tabname hack
+ if( !result && ndb_injector_thread_running > 0 ) // FIXME tabname hack
{
- const NDBTAB *t= dict->getTable(new_tabname);
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,m_dbname,new_tabname);
- /* always create an event for the table */
- if ( ndbcluster_create_event(ndb,t,event_name.c_ptr()) >= 0 )
+ if (handle_drop_table)
{
- if (injector_ndb &&
- ndbcluster_create_event_ops(injector_ndb, t,
- m_dbname, event_name.c_ptr() ) < 0)
- {
- ; /* ToDo; handle error? */
- ndbcluster_drop_event(ndb,t,event_name.c_ptr());
- }
+ replication_handle_drop_table(ndb, old_db_tabname, 0);
}
- else
+
+ if (new_tabname[0] != '#') // FIXME tabname hack
{
- ; /* ToDo handle error */
+ const NDBTAB *t= dict->getTable(new_tabname);
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name,m_dbname,new_tabname);
+ /* always create an event for the table */
+ if ( ndbcluster_create_event(ndb,t,event_name.c_ptr()) >= 0 )
+ {
+ sql_print_information("NDB Replication: RENAME Event: %s",
+ event_name.c_ptr());
+ if (injector_ndb &&
+ ndbcluster_create_event_ops(injector_ndb, t,
+ m_dbname, event_name.c_ptr() ) < 0)
+ {
+ /* ToDo; handle error? */
+ }
+ }
+ else
+ {
+ ; /* ToDo handle error */
+ }
}
- sql_print_information("NDB Replication: RENAME Event: %s",
- event_name.c_ptr());
}
#endif
@@ -4132,45 +4140,11 @@
}
#ifdef HAVE_REPLICATION
+ if ( ndb_injector_thread_running > 0 && m_tabname[0] != '#' ) // FIXME
tabname hack
{
- Ndb *ndb= get_ndb();
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,m_dbname,m_tabname);
- if ( ndbcluster_drop_event(ndb,0/* ToDo */,event_name.c_ptr()) < 0 )
- ; /* ToDo; handle error? */
-
- char name[FN_REFLEN];
- (void)strxnmov(name, FN_REFLEN, "./",m_dbname,"/",m_tabname,NullS);
- NDB_SHARE *share= get_share(name,false);
- if (share)
- {
- int max_timeout= 10;
- (void) pthread_mutex_lock(&share->mutex);
- while (share->op && max_timeout)
- {
- struct timespec abstime;
- set_timespec(abstime, 1);
- sql_print_information("NDB delete table: waiting max %u sec for drop table %s.",
- max_timeout, name);
- (void) pthread_cond_timedwait(&injector_cond,
- &share->mutex,
- &abstime);
- max_timeout--;
- }
- (void) pthread_mutex_unlock(&share->mutex);
- free_share(share);
- if (max_timeout == 0)
- {
- sql_print_error("NDB delete table: timed out. Ignoring...");
- // ToDo: handle possible mem leak if cluter goes down and share is not freed...
- }
- }
- else
- {
- sql_print_error("NDB delete table: share does not exist for %s. Ignoring...",
name);
- }
+ replication_handle_drop_table(get_ndb(),m_dbname,m_tabname);
}
-#endif /* HAVE_REPLICATION */
+#endif
DBUG_RETURN(0);
}
@@ -7188,8 +7162,17 @@
#ifdef HAVE_REPLICATION
#include "slave.h"
-static int add_binlog_index(THD *thd, longlong gci, const char *master_log_file,
- longlong master_log_pos)
+struct Binlog_index_row {
+ longlong gci;
+ const char *master_log_file;
+ longlong master_log_pos;
+ longlong n_inserts;
+ longlong n_updates;
+ longlong n_deletes;
+ longlong n_schemaops;
+};
+
+static int add_binlog_index(THD *thd, Binlog_index_row *row)
{
int error= 0;
TABLE *binlog_index;
@@ -7206,9 +7189,13 @@
goto add_binlog_index_err;
}
binlog_index= tables.table;
- binlog_index->field[0]->store(gci);
-
binlog_index->field[1]->store(master_log_file,strlen(master_log_file),&my_charset_bin);
- binlog_index->field[2]->store(master_log_pos);
+ binlog_index->field[0]->store(row->gci);
+
binlog_index->field[1]->store(row->master_log_file,strlen(row->master_log_file),&my_charset_bin);
+ binlog_index->field[2]->store(row->master_log_pos);
+ binlog_index->field[3]->store(row->n_inserts);
+ binlog_index->field[4]->store(row->n_updates);
+ binlog_index->field[5]->store(row->n_deletes);
+ binlog_index->field[6]->store(row->n_schemaops);
if (binlog_index->file->ha_write_row(binlog_index->record[0]))
{
sql_print_error("write_row");
@@ -7336,6 +7323,8 @@
DBUG_PRINT("info",("op=%x", op));
share->op= op; // assign op in NDB_SHARE
pthread_mutex_unlock(&share->mutex);
+ if (op)
+ op->setCustomData((void *)share);
// do not free_share since this will delete the share
}
@@ -7403,13 +7392,67 @@
DBUG_RETURN(0);
}
+int replication_handle_drop_table(Ndb *ndb, const char *dbname, const char *tabname)
+{
+ DBUG_ENTER("replication_handle_drop_table");
+
+ char name[FN_REFLEN];
+ (void)strxnmov(name, FN_REFLEN, "./",dbname,tabname ? "/" : NullS, tabname, NullS);
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name,name+sizeof("./")-1,0);
+ NDB_SHARE *share= get_share(name,false);
+
+ /* ToDo; handle error? */
+ if ( ndbcluster_drop_event(ndb,0/* ToDo */,event_name.c_ptr()) < 0 )
+ {
+ if (share->op)
+ {
+ DBUG_ASSERT(false);
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+ }
+
+ if (share)
+ {
+ int max_timeout= 10;
+ (void) pthread_mutex_lock(&share->mutex);
+ while (share->op && max_timeout)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ sql_print_information("NDB delete table: waiting max %u sec for drop table %s.",
+ max_timeout, name);
+ (void) pthread_cond_timedwait(&injector_cond,
+ &share->mutex,
+ &abstime);
+ max_timeout--;
+ }
+ (void) pthread_mutex_unlock(&share->mutex);
+ free_share(share);
+ if (max_timeout == 0)
+ {
+ sql_print_error("NDB delete table: timed out. Ignoring...");
+ // ToDo: handle possible mem leak if cluter goes down and share is not freed...
+ }
+ }
+ else
+ {
+ sql_print_error("NDB delete table: share does not exist for %s. Ignoring...", name);
+ }
+ DBUG_RETURN(0);
+}
+
inline void
ndb_rep_event_name(String *event_name,const char* db, const char* tbl)
{
event_name->set_ascii("REPL$",5);
event_name->append(db);
- event_name->append('/');
- event_name->append(tbl);
+ if (tbl)
+ {
+ event_name->append('/');
+ event_name->append(tbl);
+ }
}
/**
@@ -7428,8 +7471,8 @@
longlong last_gci_in_binlog_index= 0;
#ifndef DISABLE_INJECTOR
injector *inj= injector::instance();
-#endif
bitvector *b;
+#endif
pthread_mutex_lock(&injector_startup_mutex);
/*
@@ -7508,6 +7551,8 @@
DBUG_PRINT("info",("pollEvents res: %d",res));
if(res > 0) // ToDo should perhaps be >= 0 to get empty transactions...
{
+ Binlog_index_row row;
+ bzero((char*) &row,sizeof(row));
#ifndef DISABLE_INJECTOR
injector::transaction trans= inj->new_trans(thd);
#endif
@@ -7566,7 +7611,11 @@
* TODO: handle this correctly
* update the cluster_replication.binlog_index
*/
- add_binlog_index(thd, gci, "some file name from injector", 4321LL);
+ row.gci= gci;
+ row.master_log_file= "some file name from injector";
+ row.master_log_pos= 4321LL;
+ add_binlog_index(thd, &row);
+ bzero((char*) &row,sizeof(row));
last_gci_in_binlog_index= gci;
#ifndef DISABLE_INJECTOR
@@ -7579,46 +7628,36 @@
pOp->getEventType(),gci,last_gci_in_binlog_index));
const NDBTAB *table= pOp->getTable();
+ NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
+ DBUG_ASSERT(share != 0);
if ( pOp->getEventType() == NdbDictionary::Event::TE_DROP )
{
- char name[FN_REFLEN];
- (void)strxnmov(name, FN_REFLEN, "./",table->getMysqlName(),NullS);
-
- sql_print_information("NDB Replication: drop table %s.", name);
-
+ row.n_schemaops++;
+ sql_print_information("NDB Replication: drop table %s.", table->getMysqlName());
ndb->dropEventOperation(pOp);
-
- NDB_SHARE *share= get_share(name, false);
- DBUG_ASSERT(share != 0);
-
if (share)
{
(void) pthread_mutex_lock(&share->mutex);
- DBUG_PRINT("info",("TABLE DROP EVENT: %s received op: %x dahare op: %x",
- name, pOp, share->op));
+ DBUG_PRINT("info",("TABLE DROP EVENT: %s received op: %x hhare op: %x",
+ table->getMysqlName(), pOp, share->op));
assert(share->op == pOp);
share->op= 0;
// Signal ha_ndbcluster::delete_table that drop is done
(void) pthread_cond_signal(&injector_cond);
(void) pthread_mutex_unlock(&share->mutex);
- free_share(share);
- // extra for the previous lock
+ // release previous lock
free_share(share);
}
else
{
- sql_print_error("NDB Replication: share does not exist for %s. Ignoring...", name);
+ sql_print_error("NDB Replication: share does not exist for %s. Ignoring...",
+ table->getMysqlName());
}
continue;
}
- String db_tab(200);
- db_tab.append("./");
- db_tab.append(table->getMysqlName());
- NDB_SHARE *share= get_share(db_tab.c_ptr(),false); // remember to free share
- DBUG_ASSERT(share != 0);
#ifndef DISABLE_INJECTOR
byte* row_buf= new byte[share->table->s->rec_buff_length];
byte* field_buf= NULL;
@@ -7672,19 +7711,24 @@
case NdbDictionary::Event::TE_INSERT:
{
int res;
+ row.n_inserts++;
DBUG_PRINT("info",("INSERT INTO %s (%s)",table->getName(),col_names.c_ptr()));
#ifndef DISABLE_INJECTOR
- injector::transaction::table tbl(share->table);
- bitvector cols(i,true);
- res= trans.write_row(::server_id, tbl, cols, row_buf);
- DBUG_PRINT("info",("WRITE_ROW: %d", res));
+ {
+ injector::transaction::table tbl(share->table);
+ bitvector cols(i,true);
+ res= trans.write_row(::server_id, tbl, cols, row_buf);
+ DBUG_PRINT("info",("WRITE_ROW: %d",res));
+ }
#endif
}
break;
case NdbDictionary::Event::TE_DELETE:
+ row.n_deletes++;
DBUG_PRINT("info",("DELETE FROM %s",table->getName()));
break;
case NdbDictionary::Event::TE_UPDATE:
+ row.n_updates++;
DBUG_PRINT("info",("UPDATE %s",table->getName()));
break;
case NdbDictionary::Event::TE_ALL:
@@ -7696,8 +7740,6 @@
// We should REALLY never get here.
break;
}
-
- free_share(share);
} // while
#ifndef DISABLE_INJECTOR
@@ -7712,7 +7754,10 @@
* TODO: handle this correctly
* update the cluster_replication.binlog_index
*/
- add_binlog_index(thd, gci, "some file name from injector", 4321LL);
+ row.gci= gci;
+ row.master_log_file= "some file name from injector";
+ row.master_log_pos= 4321LL;
+ add_binlog_index(thd, &row);
last_gci_in_binlog_index= gci;
} // if
} // for(;;)
| Thread |
|---|
| • bk commit into 5.1 tree (stewart:1.1842) | Stewart Smith | 6 Apr |