Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-05-13 21:15:04+02:00, knielsen@ymer.(none) +8 -0
WL#3733: NdbRecord optimizations for mysqld.
Implement slave batching.
mysql-test/mysql-test-run.pl@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +2 -0
Implement slave batching.
mysql-test/r/rpl_ndb_basic.result@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +2 -2
Implement slave batching.
sql/ha_ndbcluster.cc@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +169 -99
Implement slave batching.
sql/ha_ndbcluster.h@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +17 -11
Implement slave batching.
sql/log_event.cc@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +22 -0
Implement slave batching.
sql/mysql_priv.h@stripped, 2007-05-13 21:14:58+02:00, knielsen@ymer.(none) +6 -1
Implement slave batching.
sql/mysqld.cc@stripped, 2007-05-13 21:14:59+02:00, knielsen@ymer.(none) +6 -1
Implement slave batching.
sql/set_var.cc@stripped, 2007-05-13 21:14:59+02:00, knielsen@ymer.(none) +4 -0
Implement slave batching.
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: knielsen
# Host: ymer.(none)
# Root: /usr/local/mysql/mysql-5.1-telco-ndbrecord
--- 1.276/sql/log_event.cc 2007-05-13 21:15:13 +02:00
+++ 1.277/sql/log_event.cc 2007-05-13 21:15:13 +02:00
@@ -6011,6 +6011,12 @@ int Rows_log_event::do_apply_event(RELAY
thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
else
thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+
+ if (slave_allow_batching)
+ thd->options|= OPTION_ALLOW_BATCH;
+ else
+ thd->options&= ~OPTION_ALLOW_BATCH;
+
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
@@ -7244,6 +7250,22 @@ static int find_and_fetch_row(TABLE *tab
table->s->reclength) == 0);
*/
+
+ /*
+ Ndb does not need read before delete/update (and no updates are sent)
+ if primary key specified
+
+ (Actually uniquekey will also do, but pk will be in each
+ row if table has pk)
+
+ Also set ignore no key, as we don't really know if row exists...
+ */
+ if (table->file->ht->db_type == DB_TYPE_NDBCLUSTER)
+ {
+ table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
+ DBUG_RETURN(0);
+ }
+
table->file->position(table->record[0]);
int error= table->file->rnd_pos(table->record[0], table->file->ref);
/*
--- 1.505/sql/mysql_priv.h 2007-05-13 21:15:13 +02:00
+++ 1.506/sql/mysql_priv.h 2007-05-13 21:15:13 +02:00
@@ -357,7 +357,11 @@ MY_LOCALE *my_locale_by_number(uint numb
fulltext functions when reading from it.
*/
#define TMP_TABLE_FORCE_MYISAM (ULL(1) << 32)
-
+/*
+ Dont report errors for individual rows,
+ But just report error on commit (or read ofcourse)
+*/
+#define OPTION_ALLOW_BATCH (ULL(1) << 33) // THD, intern (slave)
/*
Maximum length of time zone name that we support
@@ -1634,6 +1638,7 @@ extern ulong query_cache_size, query_cac
extern ulong slow_launch_threads, slow_launch_time;
extern ulong table_cache_size, table_def_size;
extern ulong max_connections,max_connect_errors, connect_timeout;
+extern my_bool slave_allow_batching;
extern ulong slave_net_timeout, slave_trans_retries;
extern uint max_user_connections;
extern ulong what_to_log,flush_time;
--- 1.641/sql/mysqld.cc 2007-05-13 21:15:13 +02:00
+++ 1.642/sql/mysqld.cc 2007-05-13 21:15:13 +02:00
@@ -428,6 +428,7 @@ ulong thread_stack, what_to_log;
ulong query_buff_size, slow_launch_time, slave_open_temp_tables;
ulong open_files_limit, max_binlog_size, max_relay_log_size;
ulong slave_net_timeout, slave_trans_retries;
+my_bool slave_allow_batching;
ulong thread_cache_size=0, thread_pool_size= 0;
ulong binlog_cache_size=0, max_binlog_cache_size=0;
ulong query_cache_size=0;
@@ -4938,7 +4939,7 @@ enum options_mysqld
OPT_SLAVE_LOAD_TMPDIR, OPT_NO_MIX_TYPE,
OPT_RPL_RECOVERY_RANK,OPT_INIT_RPL_ROLE,
OPT_RELAY_LOG, OPT_RELAY_LOG_INDEX, OPT_RELAY_LOG_INFO_FILE,
- OPT_SLAVE_SKIP_ERRORS, OPT_DES_KEY_FILE, OPT_LOCAL_INFILE,
+ OPT_SLAVE_SKIP_ERRORS, OPT_SLAVE_ALLOW_BATCHING, OPT_DES_KEY_FILE, OPT_LOCAL_INFILE,
OPT_SSL_SSL, OPT_SSL_KEY, OPT_SSL_CERT, OPT_SSL_CA,
OPT_SSL_CAPATH, OPT_SSL_CIPHER,
OPT_BACK_LOG, OPT_BINLOG_CACHE_SIZE,
@@ -6219,6 +6220,10 @@ The minimum value for this variable is 4
"before giving up and stopping.",
(gptr*) &slave_trans_retries, (gptr*) &slave_trans_retries, 0,
GET_ULONG, REQUIRED_ARG, 10L, 0L, (longlong) ULONG_MAX, 0, 1, 0},
+ {"slave-allow-batching", OPT_SLAVE_ALLOW_BATCHING,
+ "Allow slave to batch requests.",
+ (gptr*) &slave_allow_batching, (gptr*) &slave_allow_batching,
+ 0, GET_BOOL, NO_ARG, 0, 0, 1, 0, 1, 0},
#endif /* HAVE_REPLICATION */
{"slow_launch_time", OPT_SLOW_LAUNCH_TIME,
"If creating the thread takes longer than this value (in seconds), the Slow_launch_threads counter will be incremented.",
--- 1.285/mysql-test/mysql-test-run.pl 2007-05-13 21:15:13 +02:00
+++ 1.286/mysql-test/mysql-test-run.pl 2007-05-13 21:15:14 +02:00
@@ -3818,6 +3818,7 @@ sub mysqld_arguments ($$$$) {
$cluster->{'connect_string'});
mtr_add_arg($args, "%s--ndb-wait-connected=20", $prefix);
mtr_add_arg($args, "%s--ndb-cluster-connection-pool=3", $prefix);
+ mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
if ( $mysql_version_id >= 50100 )
{
mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
@@ -3894,6 +3895,7 @@ sub mysqld_arguments ($$$$) {
$clusters->[$mysqld->{'cluster'}]->{'connect_string'});
mtr_add_arg($args, "%s--ndb-wait-connected=20", $prefix);
mtr_add_arg($args, "%s--ndb-cluster-connection-pool=3", $prefix);
+ mtr_add_arg($args, "%s--slave-allow-batching", $prefix);
if ( $mysql_version_id >= 50100 )
{
mtr_add_arg($args, "%s--ndb-extra-logging", $prefix);
--- 1.464/sql/ha_ndbcluster.cc 2007-05-13 21:15:14 +02:00
+++ 1.465/sql/ha_ndbcluster.cc 2007-05-13 21:15:14 +02:00
@@ -116,6 +116,7 @@ static uint ndbcluster_alter_table_flags
}
#define NDB_AUTO_INCREMENT_RETRIES 10
+#define BATCH_FLUSH_SIZE (32768)
#define ERR_PRINT(err) \
DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
@@ -283,17 +284,19 @@ static int ndb_to_mysql_error(const NdbE
*/
void ha_ndbcluster::reset_state_at_execute()
{
+ Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
m_ops_pending= 0;
m_blobs_pending= FALSE;
- m_row_buffer_current= m_row_buffer;
+ thd_ndb->m_unsent_bytes= 0;
}
int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
{
+ int res= trans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AO_IgnoreError,
+ h->m_force_send);
h->reset_state_at_execute();
- if (trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AO_IgnoreError,
- h->m_force_send) == -1)
+ if (res == -1)
return -1;
const NdbError &err= trans->getNdbError();
@@ -319,10 +322,11 @@ int execute_no_commit(ha_ndbcluster *h,
return execute_no_commit_ignore_no_key(h,trans);
else
{
+ int res= trans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AbortOnError,
+ h->m_force_send);
h->reset_state_at_execute();
- return trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AbortOnError,
- h->m_force_send);
+ return res;
}
}
@@ -334,10 +338,11 @@ int execute_commit(ha_ndbcluster *h, Ndb
if (m_batch_execute)
return 0;
#endif
+ int res= trans->execute(NdbTransaction::Commit,
+ NdbOperation::AbortOnError,
+ h->m_force_send);
h->reset_state_at_execute();
- return trans->execute(NdbTransaction::Commit,
- NdbOperation::AbortOnError,
- h->m_force_send);
+ return res;
}
inline
@@ -368,10 +373,11 @@ int execute_no_commit_ie(ha_ndbcluster *
return 0;
#endif
h->release_completed_operations(trans, force_release);
+ int res= trans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AO_IgnoreError,
+ h->m_force_send);
h->reset_state_at_execute();
- return trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AO_IgnoreError,
- h->m_force_send);
+ return res;
}
/*
@@ -405,6 +411,8 @@ Thd_ndb::Thd_ndb()
options= 0;
(void) hash_init(&open_tables, &my_charset_bin, 5, 0, 0,
(hash_get_key)thd_ndb_share_get_key, 0, 0);
+ m_unsent_bytes= 0;
+ init_alloc_root(&m_batch_mem_root, BATCH_FLUSH_SIZE/4, 0);
}
Thd_ndb::~Thd_ndb()
@@ -429,6 +437,7 @@ Thd_ndb::~Thd_ndb()
}
changed_tables.empty();
hash_free(&open_tables);
+ free_root(&m_batch_mem_root, MYF(0));
}
void
@@ -727,93 +736,83 @@ ha_ndbcluster::request_partition_functio
mask[field_no>>3]|= (1 << (field_no & 7));
}
-void
-ha_ndbcluster::calc_batch_buffer_size()
+static inline char *
+alloc_batch_row(Thd_ndb *thd_ndb, uint size)
{
- /*
- Calculate how many rows that should be inserted
- per roundtrip to NDB. This is done in order to minimize the
- number of roundtrips as much as possible. However performance will
- degrade if too many bytes are inserted, thus it's limited by this
- calculation.
- */
- int bytes, batch;
- const NDBTAB *tab= m_table;
- const int bytesperbatch= 8192;
- bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
- batch= bytesperbatch/bytes;
- batch= batch == 0 ? 1 : batch;
- m_batch_buffer_size= batch * (table->s->reclength + m_extra_reclength);
- DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
-}
-
-int
-ha_ndbcluster::alloc_row_buffer()
-{
- uint desired_size;
- if (m_active_cursor || m_rows_to_insert > 1)
- desired_size= m_batch_buffer_size;
- else
- desired_size= table->s->reclength + m_extra_reclength;
-
- if (m_row_buffer_size < desired_size)
- {
- my_free(m_row_buffer, MYF(MY_ALLOW_ZERO_PTR));
- m_row_buffer= my_malloc(desired_size, MYF(0));
- if (unlikely(!m_row_buffer))
- {
- m_row_buffer_size= 0;
- return ER_OUTOFMEMORY;
- }
- m_row_buffer_size= desired_size;
- m_row_buffer_current= m_row_buffer;
- }
- return 0;
+ /*
+ We only reset the batch mem_root on first allocate after execute(), not
+ immediately at execute() time.
+ This is so that we have the chance after execute() to copy out data from
+ any read buffers.
+ */
+ if (thd_ndb->m_unsent_bytes == 0)
+ free_root(&(thd_ndb->m_batch_mem_root), MY_MARK_BLOCKS_FREE);
+ return alloc_root(&(thd_ndb->m_batch_mem_root), size);
}
/*
- Copy a record into a (bigger) buffer.
- This version will re-use the same buffer at every call.
+ Copy a record into a newly allocated buffer.
+ The returned record is valid until next execute() (actually until next
+ allocation after next execute()).
+ The input parameter op_batch_size is an estimate of the signal bytes
+ needed for the operation; this is used to set the output parameter
+ batch_full to true when it is time to flush the batch with execute().
*/
char *
-ha_ndbcluster::copy_row_to_buffer(const byte *record)
+ha_ndbcluster::batch_copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record,
+ bool & batch_full)
{
- if (alloc_row_buffer() != 0)
+ char *row= copy_row_to_buffer(thd_ndb, record);
+ if (unlikely(!row))
return NULL;
- memcpy(m_row_buffer, record, table->s->reclength);
- return m_row_buffer;
+ uint unsent= thd_ndb->m_unsent_bytes;
+ unsent+= m_bytes_per_write;
+ batch_full= unsent >= BATCH_FLUSH_SIZE;
+ thd_ndb->m_unsent_bytes= unsent;
+ return row;
}
char *
-ha_ndbcluster::get_row_buffer()
+ha_ndbcluster::batch_copy_key_to_buffer(Thd_ndb *thd_ndb, const byte *key,
+ uint key_len,
+ uint op_batch_size, bool & batch_full)
{
- if (alloc_row_buffer() != 0)
+ char *row= alloc_batch_row(thd_ndb, key_len);;
+ if (unlikely(!row))
return NULL;
- return m_row_buffer;
+ memcpy(row, key, key_len);
+ uint unsent= thd_ndb->m_unsent_bytes;
+ unsent+= op_batch_size;
+ DBUG_ASSERT(op_batch_size > 0);
+ batch_full= unsent >= BATCH_FLUSH_SIZE;
+ thd_ndb->m_unsent_bytes= unsent;
+ return row;
}
/*
- Copy a record into a buffer.
- This one returns a new buffer on each call until more are available;
- then it returns with buffer_full set to TRUE (and caller should execute()
- before calling again).
+ Simpler row buffer copy, for when we know we will not batch.
+ Only valid until next buffer allocation.
*/
-int
-ha_ndbcluster::batch_copy_row_to_buffer(const byte *record,
- char * &row, bool & buffer_full)
+char *
+ha_ndbcluster::copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record)
{
- int error= alloc_row_buffer();
- if (error != 0)
- return error;
+ char *row;
uint size= table->s->reclength + m_extra_reclength;
- row= m_row_buffer_current;
- m_row_buffer_current+= size;
- DBUG_ASSERT((uint)(m_row_buffer_current - m_row_buffer) <= m_row_buffer_size);
- buffer_full= (m_row_buffer_current - m_row_buffer + size) > m_row_buffer_size;
+ row= alloc_batch_row(thd_ndb, size);
+ if (unlikely(!row))
+ return NULL;
memcpy(row, record, table->s->reclength);
- return 0;
+ return row;
}
+/* Return a row buffer, valid until next execute(). */
+char *
+ha_ndbcluster::get_row_buffer()
+{
+ Thd_ndb *thd_ndb= get_thd_ndb(table->in_use);
+ return alloc_root(&(thd_ndb->m_batch_mem_root),
+ table->s->reclength + m_extra_reclength);
+}
/*
When using extra hidden columns, the mysqld column bitmaps do not
@@ -1266,10 +1265,13 @@ int ha_ndbcluster::get_metadata(const ch
goto err;
}
- calc_batch_buffer_size();
-
if ((error= add_table_ndb_record(dict)) != 0)
goto err;
+
+ /*
+ Approx. write size in bytes over transporter
+ */
+ m_bytes_per_write= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
if ((error= open_indexes(ndb, table, FALSE)) == 0)
{
ndbtab_g.release();
@@ -3250,6 +3252,7 @@ int ha_ndbcluster::ndb_write_row(byte *r
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
THD *thd= table->in_use;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
uint32 part_id;
char *row;
bool need_flush;
@@ -3297,17 +3300,23 @@ int ha_ndbcluster::ndb_write_row(byte *r
For non-bulk insert, we may still need to copy the row into a (bigger)
buffer if we need extra space for the user-defined partitioning hash
or the hidden primary key, but we always execute() in this case.
+
+ Note that when using writeTuple() with blobs, we cannot batch, as
+ NdbBlob::setValue() uses call-by-reference semantics for the blob value,
+ which must remain valid until execute(). For insertTuple(), the blob
+ value is buffered by NdbBlob::setValue().
*/
bool uses_blobs= uses_blob_value(table->write_set);
- if ((m_rows_to_insert > 1 && !uses_blobs) || batched_update)
+ if ((m_rows_to_insert > 1 && !uses_blobs) || batched_update ||
+ ( (thd->options & OPTION_ALLOW_BATCH) && !(uses_blobs && m_use_write)))
{
/* This sets row and need_flush (output parameters). */
- error= batch_copy_row_to_buffer(record, row, need_flush);
+ row= batch_copy_row_to_buffer(thd_ndb, record, need_flush);
DBUG_PRINT("info", ("allocating buffer for bulk insert, "
"m_rows_to_insert=%d write_set=0x%x",
(int)m_rows_to_insert, table->write_set->bitmap[0]));
- if (error != 0)
- DBUG_RETURN(error);
+ if (unlikely(!row))
+ DBUG_RETURN(ER_OUTOFMEMORY);
}
else
{
@@ -3316,7 +3325,7 @@ int ha_ndbcluster::ndb_write_row(byte *r
if (table_share->primary_key == MAX_KEY || m_use_partition_function)
{
DBUG_PRINT("info", ("Getting single buffer for oversize record."));
- row= copy_row_to_buffer(record);
+ row= copy_row_to_buffer(thd_ndb, record);
if (!row)
DBUG_RETURN(ER_OUTOFMEMORY);
}
@@ -3443,10 +3452,8 @@ int ha_ndbcluster::ndb_write_row(byte *r
*/
m_rows_inserted++;
no_uncommitted_rows_update(1);
- m_bulk_insert_not_flushed= TRUE;
if (need_flush || primary_key_update)
{
- // Send rows to NDB
int res= flush_bulk_insert();
if (res != 0)
{
@@ -3516,6 +3523,7 @@ int ha_ndbcluster::primary_key_cmp(const
int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
{
THD *thd= table->in_use;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
NdbTransaction *trans= m_active_trans;
NdbScanOperation* cursor= m_active_cursor;
NdbOperation *op;
@@ -3627,16 +3635,16 @@ int ha_ndbcluster::update_row(const byte
if (cursor && !m_update_cannot_batch)
{
/* For a scan, we only need to execute() if the batch buffer is full. */
- error= batch_copy_row_to_buffer(new_data, row, need_execute);
- if (error != 0)
- DBUG_RETURN(error);
+ row= batch_copy_row_to_buffer(thd_ndb, new_data, need_execute);
+ if (!row)
+ DBUG_RETURN(ER_OUTOFMEMORY);
}
else
{
need_execute= TRUE;
if (m_use_partition_function)
{
- row= copy_row_to_buffer(new_data);
+ row= copy_row_to_buffer(thd_ndb, new_data);
if (!row)
DBUG_RETURN(ER_OUTOFMEMORY);
}
@@ -3727,12 +3735,13 @@ int ha_ndbcluster::delete_row(const byte
int ha_ndbcluster::ndb_delete_row(const byte *record, bool primary_key_update)
{
THD *thd= table->in_use;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
NdbTransaction *trans= m_active_trans;
NdbScanOperation* cursor= m_active_cursor;
NdbOperation *op;
uint32 part_id;
int error;
- DBUG_ENTER("delete_row");
+ DBUG_ENTER("ndb_delete_row");
ha_statistic_increment(&SSV::ha_delete_count);
m_rows_changed++;
@@ -3766,7 +3775,7 @@ int ha_ndbcluster::ndb_delete_row(const
eventSetAnyValue(thd, op);
- if (!primary_key_update || m_delete_cannot_batch)
+ if (!(primary_key_update || m_delete_cannot_batch))
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
}
@@ -3774,16 +3783,54 @@ int ha_ndbcluster::ndb_delete_row(const
{
const NdbRecord *key_rec;
const char *key_row;
+ uint key_len;
if (table_share->primary_key != MAX_KEY)
{
key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
key_row= record;
+ key_len= table->s->reclength;
}
else
{
key_rec= m_ndb_hidden_key_record;
key_row= (const char *)(&m_ref);
+ key_len= sizeof(m_ref);
+ }
+ /*
+ Check if we can batch the delete; if so we need to buffer the key.
+
+ We do not batch deletes on tables with no primary key. For such tables,
+ replication uses full table scan to locate the row to delete. The
+ problem is the following scenario when deleting 2 (or more) rows:
+
+ 1. Table scan to locate the first row.
+ 2. Delete the row, batched so no execute.
+ 3. Table scan to locate the second row is executed, along with the
+ batched delete operation from step 2.
+ 4. The first row is returned from nextResult() (not deleted yet).
+ 5. The kernel deletes the row (operation from step 2).
+ 6. lockCurrentTuple() is called on the row returned in step 4. However,
+ as that row is now deleted, the operation fails and the transaction
+ is aborted.
+ 7. The delete of the second tuple now fails, as the transaction has
+ been aborted.
+ */
+ bool need_flush;
+ if ((thd->options & OPTION_ALLOW_BATCH) &&
+ table_share->primary_key != MAX_KEY)
+ {
+ /*
+ Poor approx. let delete ~ tabsize / 4
+ */
+ uint delete_size= 12 + m_bytes_per_write >> 2;
+ key_row= batch_copy_key_to_buffer(thd_ndb, key_row, key_len,
+ delete_size, need_flush);
+ if (unlikely(!key_row))
+ DBUG_RETURN(ER_OUTOFMEMORY);
}
+ else
+ need_flush= TRUE;
+
if (!(op=trans->deleteTuple(key_rec, key_row)))
ERR_RETURN(trans->getNdbError());
@@ -3793,6 +3840,9 @@ int ha_ndbcluster::ndb_delete_row(const
no_uncommitted_rows_update(-1);
eventSetAnyValue(thd, op);
+
+ if (!need_flush)
+ DBUG_RETURN(0);
}
// Execute delete operation
@@ -4737,8 +4787,6 @@ ha_ndbcluster::flush_bulk_insert()
DBUG_ENTER("ha_ndbcluster::flush_bulk_insert");
DBUG_PRINT("info", ("Sending inserts to NDB, rows_inserted: %d",
(int)m_rows_inserted));
-
- m_bulk_insert_not_flushed= FALSE;
if (m_transaction_on)
{
if (execute_no_commit(this,trans,FALSE) != 0)
@@ -4801,7 +4849,11 @@ int ha_ndbcluster::end_bulk_insert()
DBUG_ENTER("end_bulk_insert");
// Check if last inserts need to be flushed
- if (m_bulk_insert_not_flushed)
+
+ THD *thd= current_thd;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+
+ if ((thd->options & OPTION_ALLOW_BATCH) == 0 && thd_ndb->m_unsent_bytes)
{
error= flush_bulk_insert();
if (error != 0)
@@ -5245,7 +5297,27 @@ static int ndbcluster_commit(handlerton
}
#endif /* HAVE_NDB_BINLOG */
- if (execute_commit(thd,trans) != 0)
+ if (thd->options & OPTION_ALLOW_BATCH)
+ {
+ if ((res= trans->execute(Commit, AO_IgnoreError, 1)) != 0)
+ {
+ const NdbError err= trans->getNdbError();
+ if (err.classification == NdbError::ConstraintViolation ||
+ err.classification == NdbError::NoDataFound)
+ {
+ /*
+ This is ok...
+ */
+ res= 0;
+ }
+ }
+ }
+ else
+ {
+ res= execute_commit(thd,trans);
+ }
+
+ if (res != 0)
{
const NdbError err= trans->getNdbError();
const NdbOperation *error_op= trans->getNdbErrorOperation();
@@ -6799,7 +6871,6 @@ ha_ndbcluster::ha_ndbcluster(handlerton
m_rows_to_insert((ha_rows) 1),
m_rows_inserted((ha_rows) 0),
m_rows_changed((ha_rows) 0),
- m_bulk_insert_not_flushed(FALSE),
m_delete_cannot_batch(FALSE),
m_update_cannot_batch(FALSE),
m_ops_pending(0),
@@ -6810,7 +6881,6 @@ ha_ndbcluster::ha_ndbcluster(handlerton
m_blobs_buffer_size(0),
m_row_buffer(0),
m_row_buffer_size(0),
- m_batch_buffer_size(8192),
m_extra_reclength(0),
m_ndb_record(0),
m_ndb_record_fragment(0),
--- 1.180/sql/ha_ndbcluster.h 2007-05-13 21:15:14 +02:00
+++ 1.181/sql/ha_ndbcluster.h 2007-05-13 21:15:14 +02:00
@@ -223,6 +223,16 @@ class Thd_ndb
List<NDB_SHARE> changed_tables;
uint query_state;
HASH open_tables;
+ /*
+ This is a memroot used to buffer rows for batched execution.
+ It is reset after every execute().
+ */
+ MEM_ROOT m_batch_mem_root;
+ /*
+ Estimated pending batched execution bytes, once this is > BATCH_FLUSH_SIZE
+ we execute() to flush the rows buffered in m_batch_mem_root.
+ */
+ uint m_unsent_bytes;
};
class ha_ndbcluster: public handler
@@ -490,12 +500,13 @@ private:
void set_partition_function_value(char *row, uint32 func_value);
uint32 get_partition_fragment(const char *row);
void request_partition_function_value(uchar *mask);
- void calc_batch_buffer_size();
- int alloc_row_buffer();
- char *copy_row_to_buffer(const byte *record);
+ char *batch_copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record,
+ bool & batch_full);
+ char *batch_copy_key_to_buffer(Thd_ndb *thd_ndb, const byte *key,
+ uint key_len,
+ uint op_batch_size, bool & batch_full);
+ char *copy_row_to_buffer(Thd_ndb *thd_ndb, const byte *record);
char *get_row_buffer();
- int batch_copy_row_to_buffer(const byte *record,
- char * &row, bool & buffer_full);
void clear_extended_column_set(uchar *mask);
uchar *copy_column_set(MY_BITMAP *bitmap);
@@ -594,11 +605,6 @@ private:
char *m_row_buffer;
uint m_row_buffer_size;
char *m_row_buffer_current;
- /*
- Size of buffer to fill before sending batched rows to NDB kernel, computed
- from a heuristic based on row size.
- */
- uint m_batch_buffer_size;
/* Extra bytes needed in row for hidden fields. */
uint m_extra_reclength;
// NdbRecAttr has no reference to blob
@@ -615,10 +621,10 @@ private:
ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert?
ha_rows m_rows_inserted;
ha_rows m_rows_changed;
- bool m_bulk_insert_not_flushed;
bool m_delete_cannot_batch;
bool m_update_cannot_batch;
ha_rows m_ops_pending;
+ uint m_bytes_per_write;
bool m_skip_auto_increment;
bool m_blobs_pending;
bool m_slow_path;
--- 1.231/sql/set_var.cc 2007-05-13 21:15:14 +02:00
+++ 1.232/sql/set_var.cc 2007-05-13 21:15:14 +02:00
@@ -406,6 +406,10 @@ static sys_var_const_str_ptr sys_secure_
static sys_var_long_ptr sys_server_id(&vars, "server_id", &server_id, fix_server_id);
static sys_var_bool_ptr sys_slave_compressed_protocol(&vars, "slave_compressed_protocol",
&opt_slave_compressed_protocol);
+#ifdef HAVE_REPLICATION
+static sys_var_bool_ptr sys_slave_allow_batching(&vars, "slave_allow_batching",
+ &slave_allow_batching);
+#endif
static sys_var_long_ptr sys_slow_launch_time(&vars, "slow_launch_time",
&slow_launch_time);
static sys_var_thd_ulong sys_sort_buffer(&vars, "sort_buffer_size",
--- 1.12/mysql-test/r/rpl_ndb_basic.result 2007-05-13 21:15:14 +02:00
+++ 1.13/mysql-test/r/rpl_ndb_basic.result 2007-05-13 21:15:14 +02:00
@@ -159,8 +159,8 @@ Replicate_Do_Table
Replicate_Ignore_Table <Replicate_Ignore_Table>
Replicate_Wild_Do_Table
Replicate_Wild_Ignore_Table
-Last_Errno 1105
-Last_Error Unknown error
+Last_Errno 1205
+Last_Error Error 'Lock wait timeout exceeded; try restarting transaction' on query. Default database: ''. Query: 'COMMIT'
Skip_Counter 0
Exec_Master_Log_Pos <Exec_Master_Log_Pos>
Relay_Log_Space <Relay_Log_Space>
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2531) | knielsen | 13 May |