Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-07-26 11:47:54+02:00, tomas@stripped +2 -0
Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-telco-6.2
into whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-telco
MERGE: 1.2514.1.77
sql/ha_ndbcluster.cc@stripped, 2007-07-26 11:47:49+02:00, tomas@stripped +0 -0
Auto merged
MERGE: 1.472.1.13
sql/ha_ndbcluster_binlog.cc@stripped, 2007-07-26 11:47:49+02:00, tomas@stripped +0 -0
Auto merged
MERGE: 1.120.1.7
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: whalegate.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-telco/RESYNC
--- 1.487/sql/ha_ndbcluster.cc 2007-07-23 08:20:20 +02:00
+++ 1.488/sql/ha_ndbcluster.cc 2007-07-26 11:47:49 +02:00
@@ -3253,10 +3253,16 @@
{
if (unlikely(m_slow_path))
{
- if (!(thd->options & OPTION_BIN_LOG))
- op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
- else if (thd->slave_thread)
+ /*
+ ignore OPTION_BIN_LOG for slave thd. It is used to indicate
+ log-slave-updates option. This is instead handled in the
+ injector thread, by looking explicitly at the
+ opt_log_slave_updates flag.
+ */
+ if (thd->slave_thread)
op->setAnyValue(thd->server_id);
+ else if (!(thd->options & OPTION_BIN_LOG))
+ op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
}
}
--- 1.129/sql/ha_ndbcluster_binlog.cc 2007-07-23 08:20:20 +02:00
+++ 1.130/sql/ha_ndbcluster_binlog.cc 2007-07-26 11:47:49 +02:00
@@ -117,6 +117,9 @@
NDB_SHARE *ndb_schema_share= 0;
pthread_mutex_t ndb_schema_share_mutex;
+extern my_bool opt_log_slave_updates;
+static my_bool g_ndb_log_slave_updates;
+
/* Schema object distribution handling */
HASH ndb_schema_objects;
typedef struct st_ndb_schema_object {
@@ -2424,13 +2427,20 @@
ndb_binlog_index table
*/
struct ndb_binlog_index_row {
- ulonglong gci;
+ ulonglong epoch;
const char *master_log_file;
ulonglong master_log_pos;
- ulonglong n_inserts;
- ulonglong n_updates;
- ulonglong n_deletes;
- ulonglong n_schemaops;
+ ulong n_inserts;
+ ulong n_updates;
+ ulong n_deletes;
+ ulong n_schemaops;
+
+ ulong orig_server_id;
+ ulonglong orig_epoch;
+
+ ulong gci;
+
+ struct ndb_binlog_index_row *next;
};
/*
@@ -2470,11 +2480,12 @@
Insert one row in the ndb_binlog_index
*/
-int ndb_add_ndb_binlog_index(THD *thd, void *_row)
+static int
+ndb_add_ndb_binlog_index(THD *thd, ndb_binlog_index_row *row)
{
- ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
int error= 0;
bool need_reopen;
+ ndb_binlog_index_row *first= row;
/*
Turn of binlogging to prevent the table changes to be written to
the binary log.
@@ -2509,24 +2520,48 @@
/*
Intialize ndb_binlog_index->record[0]
*/
- empty_record(ndb_binlog_index);
+ do
+ {
+ empty_record(ndb_binlog_index);
- ndb_binlog_index->field[0]->store(row.master_log_pos);
- ndb_binlog_index->field[1]->store(row.master_log_file,
- strlen(row.master_log_file),
- &my_charset_bin);
- ndb_binlog_index->field[2]->store(row.gci);
- ndb_binlog_index->field[3]->store(row.n_inserts);
- ndb_binlog_index->field[4]->store(row.n_updates);
- ndb_binlog_index->field[5]->store(row.n_deletes);
- ndb_binlog_index->field[6]->store(row.n_schemaops);
-
- if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
- {
- sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
- error= -1;
- goto add_ndb_binlog_index_err;
- }
+ ndb_binlog_index->field[0]->store(first->master_log_pos);
+ ndb_binlog_index->field[1]->store(first->master_log_file,
+ strlen(first->master_log_file),
+ &my_charset_bin);
+ ndb_binlog_index->field[2]->store(first->epoch);
+ if (ndb_binlog_index->s->fields > 7)
+ {
+ ndb_binlog_index->field[3]->store(row->n_inserts);
+ ndb_binlog_index->field[4]->store(row->n_updates);
+ ndb_binlog_index->field[5]->store(row->n_deletes);
+ ndb_binlog_index->field[6]->store(row->n_schemaops);
+ ndb_binlog_index->field[7]->store(row->orig_server_id);
+ ndb_binlog_index->field[8]->store(row->orig_epoch);
+ ndb_binlog_index->field[9]->store(first->gci);
+ row= row->next;
+ }
+ else
+ {
+ while ((row= row->next))
+ {
+ first->n_inserts+= row->n_inserts;
+ first->n_updates+= row->n_updates;
+ first->n_deletes+= row->n_deletes;
+ first->n_schemaops+= row->n_schemaops;
+ }
+ ndb_binlog_index->field[3]->store((ulonglong)first->n_inserts);
+ ndb_binlog_index->field[4]->store((ulonglong)first->n_updates);
+ ndb_binlog_index->field[5]->store((ulonglong)first->n_deletes);
+ ndb_binlog_index->field[6]->store((ulonglong)first->n_schemaops);
+ }
+
+ if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
+ {
+ sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
+ error= -1;
+ goto add_ndb_binlog_index_err;
+ }
+ } while (row);
mysql_unlock_tables(thd, thd->lock);
thd->lock= 0;
@@ -4112,15 +4147,96 @@
/*
Handle data events from the storage nodes
*/
+inline ndb_binlog_index_row *
+ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
+ uint orig_server_id, int flag)
+{
+ ndb_binlog_index_row *row= *rows;
+#ifdef NOT_YET
+ ndb_binlog_index_row *first= row, *found_id= 0;
+ for (;;)
+ {
+ if (row->orig_server_id == orig_server_id)
+ {
+ /* */
+ if (!flag || !row->orig_epoch)
+ return row;
+ if (!found_id)
+ found_id= row;
+ }
+ if (row->orig_server_id == 0)
+ break;
+ row= row->next;
+ if (row == NULL)
+ {
+ row= (ndb_binlog_index_row*)sql_alloc(sizeof(ndb_binlog_index_row));
+ bzero((char*)row, sizeof(ndb_binlog_index_row));
+ row->next= first;
+ *rows= row;
+ if (found_id)
+ {
+ /*
+ If we found index_row with same server id already
+ that row will contain the current stats.
+ Copy stats over to new and reset old.
+ */
+ row->n_inserts= found_id->n_inserts;
+ row->n_updates= found_id->n_updates;
+ row->n_deletes= found_id->n_deletes;
+ found_id->n_inserts= 0;
+ found_id->n_updates= 0;
+ found_id->n_deletes= 0;
+ }
+ /* keep track of schema ops only on "first" index_row */
+ row->n_schemaops= first->n_schemaops;
+ first->n_schemaops= 0;
+ break;
+ }
+ }
+ row->orig_server_id= orig_server_id;
+#endif
+ return row;
+}
+
static int
ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
- ndb_binlog_index_row &row,
+ ndb_binlog_index_row **rows,
injector::transaction &trans)
{
Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
NDB_SHARE *share= event_data->share;
if (share == ndb_apply_status_share)
+ {
+#ifdef NOT_YET
+ switch(pOp->getEventType())
+ {
+ case NDBEVENT::TE_INSERT:
+ // fall through
+ case NDBEVENT::TE_UPDATE:
+ {
+ /* unpack data to fetch orig_server_id and orig_epoch */
+ TABLE *table= share->table;
+ uint n_fields= table->s->fields;
+ MY_BITMAP b;
+ uint32 bitbuf[128 / (sizeof(uint32) * 8)];
+ bitmap_init(&b, bitbuf, n_fields, FALSE);
+ bitmap_set_all(&b);
+ ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
+ /* store */
+ ndb_binlog_index_row *row= ndb_find_binlog_index_row
+ (rows, ((Field_long *)table->field[0])->val_int(), 1);
+ row->orig_epoch= ((Field_longlong *)table->field[1])->val_int();
+ break;
+ }
+ case NDBEVENT::TE_DELETE:
+ break;
+ default:
+ /* We should REALLY never get here */
+ abort();
+ }
+#endif
return 0;
+ }
uint32 originating_server_id= pOp->getAnyValue();
if (originating_server_id == 0)
@@ -4133,6 +4249,14 @@
originating_server_id);
return 0;
}
+ else if (!g_ndb_log_slave_updates)
+ {
+ /*
+ This event comes from a slave applier since it has an originating
+ server id set. Since option to log slave updates is not set, skip it.
+ */
+ return 0;
+ }
/*
Filter away duplicate events by checking if event was generated
@@ -4171,10 +4295,13 @@
byte* blobs_buffer[2] = { 0, 0 };
uint blobs_buffer_size[2] = { 0, 0 };
+ ndb_binlog_index_row *row=
+ ndb_find_binlog_index_row(rows, originating_server_id, 0);
+
switch(pOp->getEventType())
{
case NDBEVENT::TE_INSERT:
- row.n_inserts++;
+ row->n_inserts++;
DBUG_PRINT("info", ("INSERT INTO %s.%s",
table_s->db.str, table_s->table_name.str));
{
@@ -4196,7 +4323,7 @@
}
break;
case NDBEVENT::TE_DELETE:
- row.n_deletes++;
+ row->n_deletes++;
DBUG_PRINT("info",("DELETE FROM %s.%s",
table_s->db.str, table_s->table_name.str));
{
@@ -4236,7 +4363,7 @@
}
break;
case NDBEVENT::TE_UPDATE:
- row.n_updates++;
+ row->n_updates++;
DBUG_PRINT("info", ("UPDATE %s.%s",
table_s->db.str, table_s->table_name.str));
{
@@ -4778,9 +4905,10 @@
DBUG_PRINT("info", ("pollEvents res: %d", res));
thd->proc_info= "Processing events";
NdbEventOperation *pOp= i_ndb->nextEvent();
- ndb_binlog_index_row row;
+ ndb_binlog_index_row _row;
while (pOp != NULL)
{
+ ndb_binlog_index_row *rows= &_row;
#ifdef RUN_NDB_BINLOG_TIMER
Timer gci_timer, write_timer;
int event_count= 0;
@@ -4793,11 +4921,13 @@
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
+ /* initialize some variables for this epoch */
+ g_ndb_log_slave_updates= opt_log_slave_updates;
i_ndb->
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
i_ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
- bzero((char*) &row, sizeof(row));
+ bzero((char*)&_row, sizeof(_row));
injector::transaction trans;
// pass table map before epoch
{
@@ -4899,7 +5029,7 @@
event_count++;
#endif
if (pOp->hasError() &&
- ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
+ ndb_binlog_thread_handle_error(i_ndb, pOp, *rows) < 0)
goto err;
#ifndef DBUG_OFF
@@ -4935,14 +5065,14 @@
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
+ ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans);
else
{
// set injector_ndb database/schema from table internal name
IF_DBUG(int ret=)
i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
DBUG_ASSERT(ret == 0);
- ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
+ ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, *rows);
// reset to catch errors
i_ndb->setDatabaseName("");
DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
@@ -4990,13 +5120,14 @@
r);
/* TODO: Further handling? */
}
- row.gci= gci;
- row.master_log_file= start.file_name();
- row.master_log_pos= start.file_pos();
+ rows->gci= (ulong)gci;
+ rows->epoch= gci;
+ rows->master_log_file= start.file_name();
+ rows->master_log_pos= start.file_pos();
DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
if (ndb_update_ndb_binlog_index)
- ndb_add_ndb_binlog_index(thd, &row);
+ ndb_add_ndb_binlog_index(thd, rows);
ndb_latest_applied_binlog_epoch= gci;
}
ndb_latest_handled_binlog_epoch= gci;
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.2561) | tomas | 26 Jul |