Below is the list of changes that have just been committed into a local
5.1 repository of marty. When marty does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-08-29 10:27:57+02:00, mskold@stripped +4 -0
WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE to
Ndb_event_data so each NdbEventOperation have dedicated buffers
sql/ha_ndbcluster.cc@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +3 -1
WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers
sql/ha_ndbcluster.h@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +1 -2
WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers
sql/ha_ndbcluster_binlog.cc@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +81 -53
WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers
sql/ha_ndbcluster_binlog.h@stripped, 2007-08-29 10:27:46+02:00, mskold@stripped +13 -7
WL3680 On-line add attribute, ndbcluster handler part: Moved record[] from NDB_SHARE
to Ndb_event_data so each NdbEventOperation have dedicated buffers
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: mskold
# Host: linux.site
# Root: /windows/Linux_space/MySQL/mysql-5.1-telco-6.2
--- 1.489/sql/ha_ndbcluster.cc 2007-08-29 10:28:08 +02:00
+++ 1.490/sql/ha_ndbcluster.cc 2007-08-29 10:28:08 +02:00
@@ -8995,9 +8995,9 @@ void ndbcluster_real_free_share(NDB_SHAR
hash_delete(&ndbcluster_open_tables, (byte*) *share);
thr_lock_delete(&(*share)->lock);
- pthread_mutex_destroy(&(*share)->mutex);
#ifdef HAVE_NDB_BINLOG
+ (void) pthread_mutex_lock(&(*share)->mutex);
if ((*share)->table)
{
// (*share)->table->mem_root is freed by closefrm
@@ -9021,7 +9021,9 @@ void ndbcluster_real_free_share(NDB_SHAR
(*share)->table= 0;
#endif
}
+ (void) pthread_mutex_unlock(&(*share)->mutex);
#endif
+ pthread_mutex_destroy(&(*share)->mutex);
free_root(&(*share)->mem_root, MYF(0));
my_free((gptr) *share, MYF(0));
*share= 0;
--- 1.188/sql/ha_ndbcluster.h 2007-08-29 10:28:08 +02:00
+++ 1.189/sql/ha_ndbcluster.h 2007-08-29 10:28:08 +02:00
@@ -138,12 +138,11 @@ typedef struct st_ndbcluster_share {
uint32 flags;
NdbEventOperation *op;
Uint64 op_gci;
- List<NdbEventOperation> old_ops; // for on-line alter table
char *old_names; // for rename table
TABLE_SHARE *table_share;
TABLE *table;
- byte *record[2]; // pointer to allocated records for receiving data
MY_BITMAP *subscriber_bitmap;
+ List<NdbEventOperation> old_ops; // for on-line alter table
#endif
} NDB_SHARE;
--- 1.130/sql/ha_ndbcluster_binlog.cc 2007-08-29 10:28:08 +02:00
+++ 1.131/sql/ha_ndbcluster_binlog.cc 2007-08-29 10:28:08 +02:00
@@ -344,8 +344,7 @@ ndbcluster_binlog_close_table(THD *thd,
static int
ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
- TABLE_SHARE *table_share, TABLE *table,
- int reopen)
+ TABLE_SHARE *table_share, TABLE *table)
{
int error;
DBUG_ENTER("ndbcluster_binlog_open_table");
@@ -361,7 +360,7 @@ ndbcluster_binlog_open_table(THD *thd, N
free_table_share(table_share);
DBUG_RETURN(error);
}
- if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
+ if ((error= open_table_from_share(thd, table_share, "", 0 /* don't allocate buffers */,
(uint) READ_ALL, 0, table, OTM_OPEN)))
{
sql_print_error("Unable to open table for %s, error=%d(%d)",
@@ -372,23 +371,6 @@ ndbcluster_binlog_open_table(THD *thd, N
}
assign_new_table_id(table_share);
- if (!reopen)
- {
- // allocate memory on ndb share so it can be reused after online alter table
- (void)multi_alloc_root(&share->mem_root,
- &(share->record[0]), table->s->rec_buff_length,
- &(share->record[1]), table->s->rec_buff_length,
- NULL);
- }
- {
- my_ptrdiff_t row_offset= share->record[0] - table->record[0];
- Field **p_field;
- for (p_field= table->field; *p_field; p_field++)
- (*p_field)->move_field_offset(row_offset);
- table->record[0]= share->record[0];
- table->record[1]= share->record[1];
- }
-
table->in_use= injector_thd;
table->s->db.str= share->db;
@@ -423,6 +405,7 @@ void ndbcluster_binlog_init_share(NDB_SH
share->op= 0;
share->table= 0;
+ share->old_ops.empty();
if (!ndb_schema_share &&
strcmp(share->db, NDB_REP_DB) == 0 &&
@@ -459,7 +442,6 @@ void ndbcluster_binlog_init_share(NDB_SH
{
share->flags|= NSF_NO_BINLOG;
}
- share->old_ops.empty();
DBUG_VOID_RETURN;
}
while (1)
@@ -467,14 +449,13 @@ void ndbcluster_binlog_init_share(NDB_SH
int error;
TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
- if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
+ if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table)))
break;
if (table->s->primary_key == MAX_KEY)
share->flags|= NSF_HIDDEN_PK;
if (table->s->blob_fields != 0)
share->flags|= NSF_BLOB_FLAG;
- share->old_ops.empty();
break;
}
DBUG_VOID_RETURN;
@@ -1609,6 +1590,7 @@ inline
bool
ndb_is_old_event_op(NDB_SHARE *share, NdbEventOperation *pOp)
{
+ safe_mutex_assert_owner(&share->mutex);
List_iterator_fast<NdbEventOperation> it(share->old_ops);
NdbEventOperation *op;
if (share->old_ops.is_empty())
@@ -1625,6 +1607,7 @@ inline
void
ndb_remove_old_event_op(NDB_SHARE *share, NdbEventOperation *pOp)
{
+ safe_mutex_assert_owner(&share->mutex);
List_iterator<NdbEventOperation> it(share->old_ops);
NdbEventOperation *op;
if (share->old_ops.is_empty())
@@ -1642,6 +1625,7 @@ ndb_remove_old_event_op(NDB_SHARE *share
void
ndb_remove_old_event_ops(NDB_SHARE *share)
{
+ safe_mutex_assert_owner(&share->mutex);
List_iterator<NdbEventOperation> it(share->old_ops);
NdbEventOperation *op;
if (share->old_ops.is_empty())
@@ -1654,15 +1638,15 @@ ndb_remove_old_event_ops(NDB_SHARE *shar
delete event_data;
op->setCustomData(NULL);
}
- it.remove();
- return;
}
+ share->old_ops.empty();
}
inline
void
ndb_drop_old_event_ops(Ndb *ndb, NDB_SHARE *share)
{
+ safe_mutex_assert_owner(&share->mutex);
List_iterator<NdbEventOperation> it(share->old_ops);
NdbEventOperation *op;
if (share->old_ops.is_empty())
@@ -1676,9 +1660,9 @@ ndb_drop_old_event_ops(Ndb *ndb, NDB_SHA
op->setCustomData(NULL);
}
ndb->dropEventOperation(op);
- it.remove();
free_share(&share);
}
+ share->old_ops.empty();
}
@@ -1692,6 +1676,7 @@ ndb_handle_schema_change(THD *thd, Ndb *
DBUG_ENTER("ndb_handle_schema_change");
MEM_ROOT *mem_root= &share->mem_root;
TABLE_SHARE *table_share= share->table_share;
+ TABLE *table= share->table;
const char *tabname= table_share->table_name.str;
const char *dbname= table_share->db.str;
const NDBTAB *altered_table= pOp->getTable();
@@ -1804,11 +1789,15 @@ ndb_handle_schema_change(THD *thd, Ndb *
ndbcluster_binlog_close_table(thd, share);
- TABLE *table= (share->table) ?
- share->table // allready allocated
- : (TABLE*) alloc_root(mem_root, sizeof(*table));
+ TABLE_SHARE *new_table_share=
+ (share->table_share) ?
+ share->table_share // allready allocated
+ : (TABLE_SHARE *) alloc_root(mem_root, sizeof(*new_table_share));
+ TABLE *new_table= (share->table) ?
+ share->table // allready allocated
+ : (TABLE*) alloc_root(mem_root, sizeof(*new_table));
if ((error= ndbcluster_binlog_open_table(thd, share,
- table_share, table, 1)))
+ new_table_share, new_table)))
sql_print_information("NDB: Failed to re-open table %s.%s",
dbname, tabname);
pthread_mutex_unlock(&LOCK_open);
@@ -2330,7 +2319,11 @@ ndb_binlog_thread_handle_schema_event_po
// fall through
case SOT_ALTER_TABLE:
if (share && schema_type == SOT_ALTER_TABLE)
+ {
+ (void) pthread_mutex_lock(&share->mutex);
ndb_drop_old_event_ops(injector_ndb, share);
+ (void) pthread_mutex_unlock(&share->mutex);
+ }
// invalidation already handled by binlog thread
if (!share || !share->op)
{
@@ -3113,11 +3106,42 @@ ndbcluster_create_event_ops(THD *thd, ND
if (share->flags & NSF_BLOB_FLAG)
op->mergeEvents(TRUE); // currently not inherited from event
- int n_columns= ndbtab->getNoOfColumns();
- int n_fields= table ? table->s->fields : 0; // XXX ???
- Ndb_event_data *event_data= new Ndb_event_data(share, n_columns);
+ uint n_columns= ndbtab->getNoOfColumns();
+ uint n_fields= table ? table->s->fields : 0; // XXX ???
+ Ndb_event_data *event_data= new Ndb_event_data(share, n_fields);
+ uint val_length= sizeof(NdbValue) * n_columns;
+ uint rec_length= table->s->rec_buff_length;
+ /*
+ Allocate memory globally so it can be reused after online alter table
+ */
+ if (my_multi_malloc(MYF(MY_WME),
+ &event_data->ndb_value[0],
+ val_length,
+ &event_data->ndb_value[1],
+ val_length,
+ &event_data->record[0],
+ rec_length,
+ &event_data->record[1],
+ rec_length,
+ NULL) != 0)
+ {
+ /*
+ Set table record pointers to point to new records
+ */
+ my_ptrdiff_t row_offset= event_data->record[0] - table->record[0];
+ Field **p_field;
+ for (p_field= table->field; *p_field; p_field++)
+ (*p_field)->move_field_offset(row_offset);
+ table->record[0]= event_data->record[0];
+ table->record[1]= event_data->record[1];
+ }
+ else
+ {
+ DBUG_PRINT("info", ("Failed to allocate records for event operation"));
+ DBUG_RETURN(-1);
+ }
- for (int j= 0; j < n_columns; j++)
+ for (uint j= 0; j < n_columns; j++)
{
const char *col_name= ndbtab->getColumn(j)->getName();
NdbValue attr0, attr1;
@@ -3154,9 +3178,10 @@ ndbcluster_create_event_ops(THD *thd, ND
op->getNdbError().code,
op->getNdbError().message,
"NDB");
+ delete event_data;
+ op->setCustomData(NULL);
ndb->dropEventOperation(op);
pthread_mutex_unlock(&injector_mutex);
- delete event_data;
DBUG_RETURN(-1);
}
}
@@ -3206,7 +3231,6 @@ ndbcluster_create_event_ops(THD *thd, ND
}
ndb->dropEventOperation(op);
pthread_mutex_unlock(&injector_mutex);
- delete event_data;
if (retries)
{
my_sleep(retry_sleep);
@@ -3693,12 +3717,12 @@ ndb_binlog_thread_handle_data_event(Ndb
{
/* unpack data to fetch orig_server_id and orig_epoch */
TABLE *table= share->table;
- uint n_fields= table->s->fields;
+ uint n_fields= event_data->no_fields;
MY_BITMAP b;
uint32 bitbuf[128 / (sizeof(uint32) * 8)];
bitmap_init(&b, bitbuf, n_fields, FALSE);
bitmap_set_all(&b);
- ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
+ ndb_unpack_record(table, event_data->ndb_value[0], &b,
event_data->record[0]);
/* store */
ndb_binlog_index_row *row= ndb_find_binlog_index_row
(rows, ((Field_long *)table->field[0])->val_int(), 1);
@@ -3750,7 +3774,7 @@ ndb_binlog_thread_handle_data_event(Ndb
dbug_print_table("table", table);
TABLE_SHARE *table_s= table->s;
- uint n_fields= table_s->fields;
+ uint n_fields= event_data->no_fields;
DBUG_PRINT("info", ("Assuming %u columns for table %s", n_fields,
table_s->table_name.str));
MY_BITMAP b;
/* Potential buffer for the bitmap */
@@ -3791,11 +3815,11 @@ ndb_binlog_thread_handle_data_event(Ndb
ptrdiff);
DBUG_ASSERT(ret == 0);
}
- ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
+ ndb_unpack_record(table, event_data->ndb_value[0], &b,
event_data->record[0]);
IF_DBUG(int ret=) trans.write_row(originating_server_id,
injector::transaction::table(table,
TRUE),
- &b, n_fields, table->record[0]);
+ &b, n_fields, event_data->record[0]);
DBUG_ASSERT(ret == 0);
}
break;
@@ -3823,19 +3847,19 @@ ndb_binlog_thread_handle_data_event(Ndb
if (share->flags & NSF_BLOB_FLAG)
{
- my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
+ my_ptrdiff_t ptrdiff= event_data->record[n] - event_data->record[0];
IF_DBUG(int ret =) get_ndb_blobs_value(table, event_data->ndb_value[n],
blobs_buffer[n],
blobs_buffer_size[n],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
- ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
- DBUG_EXECUTE("info", print_records(table, table->record[n]););
+ ndb_unpack_record(table, event_data->ndb_value[n], &b,
event_data->record[n]);
+ DBUG_EXECUTE("info", print_records(table, event_data->record[n]););
IF_DBUG(int ret =) trans.delete_row(originating_server_id,
injector::transaction::table(table,
TRUE),
- &b, n_fields, table->record[n]);
+ &b, n_fields, event_data->record[n]);
DBUG_ASSERT(ret == 0);
}
break;
@@ -3854,8 +3878,8 @@ ndb_binlog_thread_handle_data_event(Ndb
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, event_data->ndb_value[0],
- &b, table->record[0]);
- DBUG_EXECUTE("info", print_records(table, table->record[0]););
+ &b, event_data->record[0]);
+ DBUG_EXECUTE("info", print_records(table, event_data->record[0]););
if (table->s->primary_key != MAX_KEY)
{
/*
@@ -3864,7 +3888,7 @@ ndb_binlog_thread_handle_data_event(Ndb
*/
trans.write_row(originating_server_id,
injector::transaction::table(table, TRUE),
- &b, n_fields, table->record[0]);// after values
+ &b, n_fields, event_data->record[0]);// after values
}
else
{
@@ -3874,21 +3898,21 @@ ndb_binlog_thread_handle_data_event(Ndb
*/
if (share->flags & NSF_BLOB_FLAG)
{
- my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
+ my_ptrdiff_t ptrdiff= event_data->record[1] - event_data->record[0];
IF_DBUG(int ret =) get_ndb_blobs_value(table, event_data->ndb_value[1],
blobs_buffer[1],
blobs_buffer_size[1],
ptrdiff);
DBUG_ASSERT(ret == 0);
}
- ndb_unpack_record(table, event_data->ndb_value[1], &b,
table->record[1]);
- DBUG_EXECUTE("info", print_records(table, table->record[1]););
+ ndb_unpack_record(table, event_data->ndb_value[1], &b,
event_data->record[1]);
+ DBUG_EXECUTE("info", print_records(table, event_data->record[1]););
IF_DBUG(int ret =) trans.update_row(originating_server_id,
injector::transaction::table(table,
TRUE),
&b, n_fields,
- table->record[1], // before values
- table->record[0]);// after values
+ event_data->record[1], // before values
+ event_data->record[0]);// after values
DBUG_ASSERT(ret == 0);
}
}
@@ -4690,12 +4714,14 @@ err:
delete event_data;
op->setCustomData(NULL);
}
+ (void) pthread_mutex_lock(&share->mutex);
share->op= 0;
ndb_remove_old_event_op(share, op);
/* ndb_share reference binlog free */
DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
share->key, share->use_count));
free_share(&share);
+ (void) pthread_mutex_unlock(&share->mutex);
s_ndb->dropEventOperation(op);
}
delete s_ndb;
@@ -4720,12 +4746,14 @@ err:
delete event_data;
op->setCustomData(NULL);
}
+ (void) pthread_mutex_lock(&share->mutex);
share->op= 0;
ndb_remove_old_event_op(share, op);
/* ndb_share reference binlog free */
DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
share->key, share->use_count));
free_share(&share);
+ (void) pthread_mutex_unlock(&share->mutex);
i_ndb->dropEventOperation(op);
}
delete i_ndb;
--- 1.28/sql/ha_ndbcluster_binlog.h 2007-08-29 10:28:08 +02:00
+++ 1.29/sql/ha_ndbcluster_binlog.h 2007-08-29 10:28:08 +02:00
@@ -38,21 +38,27 @@ extern handlerton *ndbcluster_hton;
class Ndb_event_data {
public:
- Ndb_event_data(NDB_SHARE *the_share, int no_columns) : share(the_share)
+ Ndb_event_data(NDB_SHARE *the_share, uint n_fields) :
+ share(the_share), no_fields(n_fields)
{
- ndb_value[0]= (NdbValue*) my_malloc(sizeof(NdbValue) * no_columns,
- MYF(MY_WME));
- ndb_value[1]= (NdbValue*) my_malloc(sizeof(NdbValue) * no_columns,
- MYF(MY_WME));
+ ndb_value[0]= 0;
+ ndb_value[1]= 0;
+ record[0]= 0;
+ record[1]= 0;
}
~Ndb_event_data()
{
share= 0;
- my_free((gptr) ndb_value[0], MYF(MY_WME));
- my_free((gptr) ndb_value[1], MYF(MY_WME));
+ /*
+ ndbvalue[] and record[] are allocated with my_multi_malloc
+ so only first pointer should be freed
+ */
+ my_free((gptr) ndb_value[0], MYF(MY_WME|MY_ALLOW_ZERO_PTR));
}
NDB_SHARE *share;
+ uint no_fields;
NdbValue *ndb_value[2];
+ byte *record[2]; // pointer to allocated records for receiving data
};
/*
| Thread |
|---|
| • bk commit into 5.1 tree (mskold:1.2613) | Martin Skold | 29 Aug |