Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-05-09 13:18:58+02:00, knielsen@ymer.(none) +3 -0
Merge ymer.(none):/usr/local/mysql/mysql-5.1-wl2223
into ymer.(none):/usr/local/mysql/mysql-5.1-telco-ndbrecord
MERGE: 1.2409.11.27
sql/ha_ndbcluster.cc@stripped, 2007-05-09 13:18:55+02:00, knielsen@ymer.(none) +51 -55
Manual merge.
MERGE: 1.406.2.3
sql/ha_ndbcluster.h@stripped, 2007-05-09 13:18:55+02:00, knielsen@ymer.(none) +0 -0
Manual merge.
MERGE: 1.165.5.1
sql/structs.h@stripped, 2007-05-09 11:36:50+02:00, knielsen@ymer.(none) +0 -0
Auto merged
MERGE: 1.65.1.1
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: knielsen
# Host: ymer.(none)
# Root: /usr/local/mysql/mysql-5.1-telco-ndbrecord/RESYNC
--- 1.66/sql/structs.h 2007-05-09 13:19:05 +02:00
+++ 1.67/sql/structs.h 2007-05-09 13:19:05 +02:00
@@ -57,12 +57,14 @@ typedef struct st_key_part_info { /* Inf
/*
Number of bytes required to store the keypart value. This may be
different from the "length" field as it also counts
- - possible NULL-flag byte (see HA_KEY_NULL_LENGTH)
+ - possible NULL-flag byte (see HA_KEY_NULL_LENGTH) [if null_bit != 0,
+ the first byte stored at offset is 1 if null, 0 if non-null; the
+ actual value is stored from offset+1].
- possible HA_KEY_BLOB_LENGTH bytes needed to store actual value length.
*/
uint16 store_length;
uint16 key_type;
- uint16 fieldnr; /* Fieldnum in UNIREG */
+ uint16 fieldnr; /* Fieldnum in UNIREG (1,2,3,...) */
uint16 key_part_flag; /* 0 or HA_REVERSE_SORT */
uint8 type;
uint8 null_bit; /* Position to null_bit */
--- 1.460/sql/ha_ndbcluster.cc 2007-05-09 13:19:05 +02:00
+++ 1.461/sql/ha_ndbcluster.cc 2007-05-09 13:19:05 +02:00
@@ -157,8 +157,13 @@ static byte *ndbcluster_get_key(NDB_SHAR
#ifdef HAVE_NDB_BINLOG
static int rename_share(NDB_SHARE *share, const char *new_key);
#endif
+static
+NdbRecord *
+ndb_get_table_statistics_ndbrecord(NDBDICT *, const NDBTAB *);
static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*, const NDBTAB *,
struct Ndb_statistics *);
+static int ndb_get_table_statistics(ha_ndbcluster*, bool, Ndb*,
+ const NdbRecord *, struct Ndb_statistics *);
// Util thread variables
@@ -171,12 +176,6 @@ pthread_handler_t ndb_util_thread_func(v
ulong ndb_cache_check_time;
/*
- Dummy buffer to read zero pack_length fields
- which are mapped to 1 char
-*/
-static uint32 dummy_buf;
-
-/*
Stats that can be retrieved from ndb
*/
@@ -276,8 +275,22 @@ static int ndb_to_mysql_error(const NdbE
return error;
}
+/*
+ When execute() is called, this resets the internal state on things that
+ were awaiting execute(), such as pending scan take-over operations and
+ rows for batched operations.
+ Also used to initialize the state at start of a statement.
+*/
+void ha_ndbcluster::reset_state_at_execute()
+{
+ m_ops_pending= 0;
+ m_blobs_pending= FALSE;
+ m_row_buffer_current= m_row_buffer;
+}
+
int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
{
+ h->reset_state_at_execute();
if (trans->execute(NdbTransaction::NoCommit,
NdbOperation::AO_IgnoreError,
h->m_force_send) == -1)
@@ -302,11 +315,15 @@ int execute_no_commit(ha_ndbcluster *h,
return 0;
#endif
h->release_completed_operations(trans, force_release);
- return h->m_ignore_no_key ?
- execute_no_commit_ignore_no_key(h,trans) :
- trans->execute(NdbTransaction::NoCommit,
- NdbOperation::AbortOnError,
- h->m_force_send);
+ if (h->m_ignore_no_key)
+ return execute_no_commit_ignore_no_key(h,trans);
+ else
+ {
+ h->reset_state_at_execute();
+ return trans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AbortOnError,
+ h->m_force_send);
+ }
}
inline
@@ -317,6 +334,7 @@ int execute_commit(ha_ndbcluster *h, Ndb
if (m_batch_execute)
return 0;
#endif
+ h->reset_state_at_execute();
return trans->execute(NdbTransaction::Commit,
NdbOperation::AbortOnError,
h->m_force_send);
@@ -330,6 +348,11 @@ int execute_commit(THD *thd, NdbTransact
if (m_batch_execute)
return 0;
#endif
+ /*
+ We do not have to reset_state_at_execute() here, as this is at transaction
+ end time, and we will not take further action after this (nor can we
+ easily, as we execute outside the context of the ha_ndbcluster object).
+ */
return trans->execute(NdbTransaction::Commit,
NdbOperation::AbortOnError,
thd->variables.ndb_force_send);
@@ -345,6 +368,7 @@ int execute_no_commit_ie(ha_ndbcluster *
return 0;
#endif
h->release_completed_operations(trans, force_release);
+ h->reset_state_at_execute();
return trans->execute(NdbTransaction::NoCommit,
NdbOperation::AO_IgnoreError,
h->m_force_send);
@@ -457,7 +481,7 @@ Ndb *ha_ndbcluster::get_ndb()
void ha_ndbcluster::set_rec_per_key()
{
- DBUG_ENTER("ha_ndbcluster::get_status_const");
+ DBUG_ENTER("ha_ndbcluster::set_rec_per_key");
for (uint i=0 ; i < table_share->keys ; i++)
{
table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
@@ -477,7 +501,8 @@ ha_rows ha_ndbcluster::records()
Ndb *ndb= get_ndb();
ndb->setDatabaseName(m_dbname);
struct Ndb_statistics stat;
- if (ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat) == 0)
+ if (ndb_get_table_statistics(this, TRUE, ndb, m_ndb_statistics_record,
+ &stat) == 0)
{
retval= stat.row_count;
}
@@ -511,7 +536,8 @@ int ha_ndbcluster::records_update()
{
return my_errno= HA_ERR_OUT_OF_MEM;
}
- result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat);
+ result= ndb_get_table_statistics(this, TRUE, ndb, m_ndb_statistics_record,
+ &stat);
if (result == 0)
{
stats.mean_rec_length= stat.row_size;
@@ -632,48 +658,29 @@ bool ha_ndbcluster::get_error_message(in
}
-#ifndef DBUG_OFF
-/*
- Check if type is supported by NDB.
-*/
+void
+ha_ndbcluster::set_hidden_key(char *row, Uint64 auto_value)
+{
+ /* The hidden primary key is stored just after the normal row data. */
+ uint32 offset= offset_hidden_key();
+ memcpy(&row[offset], &auto_value, NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+}
-static bool ndb_supported_type(enum_field_types type)
+Uint64
+ha_ndbcluster::get_hidden_key(const char *row)
{
- switch (type) {
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_NEWDECIMAL:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_STRING:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_VARCHAR:
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- case MYSQL_TYPE_BIT:
- case MYSQL_TYPE_GEOMETRY:
- return TRUE;
- case MYSQL_TYPE_NULL:
- break;
- }
- return FALSE;
+ Uint64 hidden_key;
+ uint32 offset= offset_hidden_key();
+ memcpy(&hidden_key, &row[offset], NDB_HIDDEN_PRIMARY_KEY_LENGTH);
+ return hidden_key;
}
-#endif /* !DBUG_OFF */
+void
+ha_ndbcluster::request_hidden_key(uchar *mask)
+{
+ uint32 field_no= field_number_hidden_key();
+ mask[field_no>>3]|= (1 << (field_no & 7));
+}
/*
Check if MySQL field type forces var part in ndb storage
@@ -695,111 +702,339 @@ static bool field_type_forces_var_part(e
}
}
-/*
- Instruct NDB to set the value of the hidden primary key
-*/
+void
+ha_ndbcluster::set_partition_function_value(char *row, uint32 func_value)
+{
+ /* The partition function value is stored just after the hidden primary
+ key (if any). */
+ uint32 offset= offset_user_partition_function();
+ memcpy(&row[offset], &func_value, 4);
+}
+
+uint32
+ha_ndbcluster::get_partition_fragment(const char *row)
+{
+ uint32 fragment;
+ uint32 offset= offset_user_partition_fragment();
+ memcpy(&fragment, &row[offset], 4);
+ return fragment;
+}
+
+void
+ha_ndbcluster::request_partition_function_value(uchar *mask)
+{
+ uint32 field_no= field_number_user_partition_function();
+ mask[field_no>>3]|= (1 << (field_no & 7));
+}
-bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
- uint fieldnr, const byte *field_ptr)
+void
+ha_ndbcluster::calc_batch_buffer_size()
{
- DBUG_ENTER("set_hidden_key");
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr) != 0);
+ /*
+ Calculate how many rows that should be inserted
+ per roundtrip to NDB. This is done in order to minimize the
+ number of roundtrips as much as possible. However performance will
+ degrade if too many bytes are inserted, thus it's limited by this
+ calculation.
+ */
+ int bytes, batch;
+ const NDBTAB *tab= m_table;
+ const int bytesperbatch= 8192;
+ bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
+ batch= bytesperbatch/bytes;
+ batch= batch == 0 ? 1 : batch;
+ m_batch_buffer_size= batch * (table->s->reclength + m_extra_reclength);
+ DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
}
+int
+ha_ndbcluster::alloc_row_buffer()
+{
+ uint desired_size;
+ if (m_active_cursor || m_rows_to_insert > 1)
+ desired_size= m_batch_buffer_size;
+ else
+ desired_size= table->s->reclength + m_extra_reclength;
+
+ if (m_row_buffer_size < desired_size)
+ {
+ my_free(m_row_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ m_row_buffer= my_malloc(desired_size, MYF(0));
+ if (unlikely(!m_row_buffer))
+ {
+ m_row_buffer_size= 0;
+ return ER_OUTOFMEMORY;
+ }
+ m_row_buffer_size= desired_size;
+ m_row_buffer_current= m_row_buffer;
+ }
+ return 0;
+}
/*
- Instruct NDB to set the value of one primary key attribute
+ Copy a record into a (bigger) buffer.
+ This version will re-use the same buffer at every call.
*/
+char *
+ha_ndbcluster::copy_row_to_buffer(const byte *record)
+{
+ if (alloc_row_buffer() != 0)
+ return NULL;
+ memcpy(m_row_buffer, record, table->s->reclength);
+ return m_row_buffer;
+}
-int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
- uint fieldnr, const byte *field_ptr)
+char *
+ha_ndbcluster::get_row_buffer()
{
- uint32 pack_len= field->pack_length();
- DBUG_ENTER("set_ndb_key");
- DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d",
- fieldnr, field->field_name, field->type(),
- pack_len));
- DBUG_DUMP("key", (char*)field_ptr, pack_len);
-
- DBUG_ASSERT(ndb_supported_type(field->type()));
- DBUG_ASSERT(! (field->flags & BLOB_FLAG));
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
+ if (alloc_row_buffer() != 0)
+ return NULL;
+ return m_row_buffer;
+}
+
+/*
+ Copy a record into a buffer.
+ This one returns a new buffer on each call until more are available;
+ then it returns with buffer_full set to TRUE (and caller should execute()
+ before calling again).
+*/
+int
+ha_ndbcluster::batch_copy_row_to_buffer(const byte *record,
+ char * &row, bool & buffer_full)
+{
+ int error= alloc_row_buffer();
+ if (error != 0)
+ return error;
+ uint size= table->s->reclength + m_extra_reclength;
+ row= m_row_buffer_current;
+ m_row_buffer_current+= size;
+ DBUG_ASSERT((uint)(m_row_buffer_current - m_row_buffer) <= m_row_buffer_size);
+ buffer_full= (m_row_buffer_current - m_row_buffer + size) > m_row_buffer_size;
+ memcpy(row, record, table->s->reclength);
+ return 0;
}
/*
- Instruct NDB to set the value of one attribute
+ When using extra hidden columns, the mysqld column bitmaps do not
+ include bits for the extra columns, so we use this method to initialize
+ them (after copying the mysqld bitmap to a larger one).
*/
+void
+ha_ndbcluster::clear_extended_column_set(uchar *mask)
+{
+ if (table_share->primary_key == MAX_KEY)
+ {
+ uint32 field_no= field_number_hidden_key();
+ mask[field_no>>3]&= ~(1 << (field_no & 7));
+ }
+ if (m_use_partition_function)
+ {
+ uint32 field_no= field_number_user_partition_function();
+ mask[field_no>>3]&= ~(1 << (field_no & 7));
+ }
+}
-int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, int row_offset,
- bool *set_blob_value)
+uchar *
+ha_ndbcluster::copy_column_set(MY_BITMAP *bitmap)
{
- const byte* field_ptr= field->ptr + row_offset;
- uint32 pack_len= field->pack_length();
- DBUG_ENTER("set_ndb_value");
- DBUG_PRINT("enter", ("%d: %s type: %u len=%d is_null=%s",
- fieldnr, field->field_name, field->type(),
- pack_len, field->is_null(row_offset) ? "Y" : "N"));
- DBUG_DUMP("value", (char*) field_ptr, pack_len);
+ bitmap_copy(&m_bitmap, bitmap);
+ uchar *mask= (uchar *)m_bitmap_buf;
+ clear_extended_column_set(mask);
+ return mask;
+}
- DBUG_ASSERT(ndb_supported_type(field->type()));
+int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
+{
+ ha_ndbcluster *ha= (ha_ndbcluster *)arg;
+ DBUG_ENTER("g_get_ndb_blobs_value");
+ DBUG_PRINT("info", ("destination row: %p", ha->m_blob_destination_record));
+
+ /* Count the total length needed for blob data. */
+ int isNull;
+ if (ndb_blob->getNull(isNull) != 0)
+ ERR_RETURN(ndb_blob->getNdbError());
+ if (isNull == 0) {
+ Uint64 len64= 0;
+ if (ndb_blob->getLength(len64) != 0)
+ ERR_RETURN(ndb_blob->getNdbError());
+ /* Align to Uint64. */
+ ha->m_blob_total_size+= (len64 + 7) & ~((Uint64)7);
+ if (ha->m_blob_total_size > 0xffffffff)
+ {
+ DBUG_ASSERT(FALSE);
+ DBUG_RETURN(-1);
+ }
+ }
+ ha->m_blob_counter++;
+
+ /*
+ Wait until all blobs are active with reading, so we can allocate
+ and use a common buffer containing all.
+ */
+ if (ha->m_blob_counter < ha->m_blob_expected_count)
+ DBUG_RETURN(0);
+ ha->m_blob_counter= 0;
+
+ /* Re-allocate bigger blob buffer if necessary. */
+ if (ha->m_blob_total_size > ha->m_blobs_buffer_size)
{
- // ndb currently does not support size 0
- uint32 empty_field;
- if (pack_len == 0)
+ my_free(ha->m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ DBUG_PRINT("info", ("allocate blobs buffer size %u",
+ (uint32)(ha->m_blob_total_size)));
+ ha->m_blobs_buffer= my_malloc(ha->m_blob_total_size, MYF(MY_WME));
+ if (ha->m_blobs_buffer == NULL)
{
- pack_len= sizeof(empty_field);
- field_ptr= (byte *)&empty_field;
- if (field->is_null(row_offset))
- empty_field= 0;
- else
- empty_field= 1;
+ ha->m_blobs_buffer_size= 0;
+ DBUG_RETURN(-1);
}
+ ha->m_blobs_buffer_size= ha->m_blob_total_size;
+ }
+
+ /*
+ Now read all blob data.
+ If we know the destination mysqld row, we also set the blob null bit and
+ pointer/length (if not, it will be done instead in
+ unpack_record_ndbrecord()).
+ */
+ uint32 offset= 0;
+ for (uint i= 0; i < ha->table->s->fields; i++)
+ {
+ Field *field= ha->table->field[i];
if (! (field->flags & BLOB_FLAG))
+ continue;
+ NdbValue value= ha->m_value[i];
+ if (value.blob == NULL)
{
- if (field->type() != MYSQL_TYPE_BIT)
- {
- if (field->is_null(row_offset))
- {
- DBUG_PRINT("info", ("field is NULL"));
- // Set value to NULL
- DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL) != 0));
- }
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr) != 0);
- }
- else // if (field->type() == MYSQL_TYPE_BIT)
- {
- longlong bits= field->val_int();
-
- // Round up bit field length to nearest word boundry
- pack_len= ((pack_len + 3) >> 2) << 2;
- DBUG_ASSERT(pack_len <= 8);
- if (field->is_null(row_offset))
- // Set value to NULL
- DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL) != 0));
- DBUG_PRINT("info", ("bit field"));
- DBUG_DUMP("value", (char*)&bits, pack_len);
-#ifdef WORDS_BIGENDIAN
- /* store lsw first */
- bits = ((bits >> 32) & 0x00000000FFFFFFFFLL)
- | ((bits << 32) & 0xFFFFFFFF00000000LL);
-#endif
- DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)&bits) != 0);
+ DBUG_PRINT("info",("[%u] skipped", i));
+ continue;
+ }
+ Field_blob *field_blob= (Field_blob *)field;
+ NdbBlob *ndb_blob= value.blob;
+ int isNull;
+ if (ndb_blob->getNull(isNull) != 0)
+ ERR_RETURN(ndb_blob->getNdbError());
+ if (isNull == 0) {
+ Uint64 len64= 0;
+ if (ndb_blob->getLength(len64) != 0)
+ ERR_RETURN(ndb_blob->getNdbError());
+ DBUG_ASSERT(len64 < 0xffffffff);
+ char *buf= ha->m_blobs_buffer + offset;
+ uint32 len= ha->m_blobs_buffer_size - offset;
+ if (ndb_blob->readData(buf, len) != 0)
+ ERR_RETURN(ndb_blob->getNdbError());
+ DBUG_PRINT("info", ("[%u] offset: %u buf: 0x%lx len=%u",
+ i, offset, (long) buf, len));
+ DBUG_ASSERT(len == len64);
+ if (ha->m_blob_destination_record)
+ {
+ my_ptrdiff_t ptrdiff=
+ ha->m_blob_destination_record - ha->table->record[0];
+ field_blob->move_field_offset(ptrdiff);
+ field_blob->set_ptr(len, buf);
+ field_blob->set_notnull();
+ field_blob->move_field_offset(-ptrdiff);
}
+ offset+= ((len64 + 7) & ~((Uint64)7));
}
- // Blob type
- NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
- if (ndb_blob != NULL)
+ else if (ha->m_blob_destination_record)
{
- if (field->is_null(row_offset))
- DBUG_RETURN(ndb_blob->setNull() != 0);
+ /* Have to set length even in this case. */
+ my_ptrdiff_t ptrdiff=
+ ha->m_blob_destination_record - ha->table->record[0];
+ char *buf= ha->m_blobs_buffer + offset;
+ field_blob->move_field_offset(ptrdiff);
+ field_blob->set_ptr((uint32)0, buf);
+ field_blob->set_null();
+ field_blob->move_field_offset(-ptrdiff);
+ DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
+ }
+ }
+
+ DBUG_RETURN(0);
+}
+
+/*
+ Request reading of blob values.
+
+ If dst_record is specified, the blob null bit, pointer, and length will be
+ set in that record. Otherwise they must be set later by calling
+ unpack_record_ndbrecord().
+*/
+int
+ha_ndbcluster::get_blob_values(NdbOperation *ndb_op, byte *dst_record,
+ const MY_BITMAP *bitmap)
+{
+ uint i;
+ DBUG_ENTER("ha_ndbcluster::get_blob_values");
+
+ m_blob_counter= 0;
+ m_blob_expected_count= 0;
+ m_blob_destination_record= dst_record;
+ m_blob_total_size= 0;
+
+ for (i= 0; i < table_share->fields; i++)
+ {
+ Field *field= table->field[i];
+ if (!(field->flags & BLOB_FLAG))
+ continue;
+
+ DBUG_PRINT("info", ("fieldnr=%d", i));
+ NdbBlob *ndb_blob;
+ if (bitmap_is_set(bitmap, i))
+ {
+ if ((ndb_blob= ndb_op->getBlobHandle(i)) == NULL ||
+ ndb_blob->setActiveHook(g_get_ndb_blobs_value, this) != 0)
+ DBUG_RETURN(1);
+ m_blob_expected_count++;
+ }
+ else
+ ndb_blob= NULL;
+
+ m_value[i].blob= ndb_blob;
+ }
+
+ DBUG_RETURN(0);
+}
- Field_blob *field_blob= (Field_blob*)field;
+int
+ha_ndbcluster::set_blob_values(NdbOperation *ndb_op, my_ptrdiff_t row_offset,
+ const MY_BITMAP *bitmap, uint *set_count)
+{
+ uint field_no;
+ uint *blob_index, *blob_index_end;
+ int res= 0;
+ DBUG_ENTER("ha_ndbcluster::set_blob_values");
+
+ *set_count= 0;
+
+ if (table_share->blob_fields == 0)
+ DBUG_RETURN(0);
+
+ blob_index= table_share->blob_field;
+ blob_index_end= blob_index + table_share->blob_fields;
+ do
+ {
+ field_no= *blob_index;
+ /* A NULL bitmap sets all blobs. */
+ if (bitmap && !bitmap_is_set(bitmap, field_no))
+ continue;
+ Field *field= table->field[field_no];
+
+ NdbBlob *ndb_blob= ndb_op->getBlobHandle(field_no);
+ if (ndb_blob == NULL)
+ DBUG_RETURN(1);
+ if (field->is_null_in_record_with_offset(row_offset))
+ {
+ if (ndb_blob->setNull() != 0)
+ DBUG_RETURN(1);
+ }
+ else
+ {
+ Field_blob *field_blob= (Field_blob *)field;
// Get length and pointer to data
+ const byte* field_ptr= field->ptr + row_offset;
uint32 blob_len= field_blob->get_length(field_ptr);
char* blob_ptr= NULL;
field_blob->get_ptr(&blob_ptr);
@@ -814,40 +1049,16 @@ int ha_ndbcluster::set_ndb_value(NdbOper
(long) blob_ptr, blob_len));
DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
- if (set_blob_value)
- *set_blob_value= TRUE;
// No callback needed to write value
- DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
+ res= ndb_blob->setValue(blob_ptr, blob_len);
+ if (res != 0)
+ DBUG_RETURN(1);
}
- DBUG_RETURN(1);
- }
-}
+ ++(*set_count);
+ } while (++blob_index != blob_index_end);
-/*
- Callback to read all blob values.
- - not done in unpack_record because unpack_record is valid
- after execute(Commit) but reading blobs is not
- - may only generate read operations; they have to be executed
- somewhere before the data is available
- - due to single buffer for all blobs, we let the last blob
- process all blobs (last so that all are active)
- - null bit is still set in unpack_record
- - TODO allocate blob part aligned buffers
-*/
-
-NdbBlob::ActiveHook g_get_ndb_blobs_value;
-
-int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
-{
- DBUG_ENTER("g_get_ndb_blobs_value");
- if (ndb_blob->blobsNextBlob() != NULL)
- DBUG_RETURN(0);
- ha_ndbcluster *ha= (ha_ndbcluster *)arg;
- int ret= get_ndb_blobs_value(ha->table, ha->m_value,
- ha->m_blobs_buffer, ha->m_blobs_buffer_size,
- ha->m_blobs_offset);
- DBUG_RETURN(ret);
+ DBUG_RETURN(res);
}
/*
@@ -937,82 +1148,15 @@ int get_ndb_blobs_value(TABLE* table, Nd
/*
- Instruct NDB to fetch one field
- - data is read directly into buffer provided by field
- if field is NULL, data is read into memory provided by NDBAPI
-*/
-
-int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, byte* buf)
-{
- DBUG_ENTER("get_ndb_value");
- DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
- (int)(field != NULL ? field->flags : 0)));
-
- if (field != NULL)
- {
- DBUG_ASSERT(buf);
- DBUG_ASSERT(ndb_supported_type(field->type()));
- DBUG_ASSERT(field->ptr != NULL);
- if (! (field->flags & BLOB_FLAG))
- {
- if (field->type() != MYSQL_TYPE_BIT)
- {
- byte *field_buf;
- if (field->pack_length() != 0)
- field_buf= buf + (field->ptr - table->record[0]);
- else
- field_buf= (byte *)&dummy_buf;
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr,
- field_buf);
- }
- else // if (field->type() == MYSQL_TYPE_BIT)
- {
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr);
- }
- DBUG_RETURN(m_value[fieldnr].rec == NULL);
- }
-
- // Blob type
- NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
- m_value[fieldnr].blob= ndb_blob;
- if (ndb_blob != NULL)
- {
- // Set callback
- m_blobs_offset= buf - (byte*) table->record[0];
- void *arg= (void *)this;
- DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0);
- }
- DBUG_RETURN(1);
- }
-
- // Used for hidden key only
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr, m_ref);
- DBUG_RETURN(m_value[fieldnr].rec == NULL);
-}
-
-/*
- Instruct NDB to fetch the partition id (fragment id)
-*/
-int ha_ndbcluster::get_ndb_partition_id(NdbOperation *ndb_op)
-{
- DBUG_ENTER("get_ndb_partition_id");
- DBUG_RETURN(ndb_op->getValue(NdbDictionary::Column::FRAGMENT,
- (char *)&m_part_id) == NULL);
-}
-
-/*
Check if any set or get of blob value in current query.
*/
-bool ha_ndbcluster::uses_blob_value()
+bool ha_ndbcluster::uses_blob_value(const MY_BITMAP *bitmap)
{
- MY_BITMAP *bitmap;
uint *blob_index, *blob_index_end;
if (table_share->blob_fields == 0)
return FALSE;
- bitmap= m_write_op ? table->write_set : table->read_set;
blob_index= table_share->blob_field;
blob_index_end= blob_index + table_share->blob_fields;
do
@@ -1099,11 +1243,43 @@ int ha_ndbcluster::get_metadata(const ch
DBUG_PRINT("info", ("fetched table %s", tab->getName()));
m_table= tab;
+
+ if (bitmap_init(&m_bitmap, m_bitmap_buf, table_share->fields, 0) ||
+ bitmap_init(&m_pk_bitmap, m_pk_bitmap_buf, table_share->fields, 0))
+ {
+ error= HA_ERR_OUT_OF_MEM;
+ goto err;
+ }
+ if (table_share->primary_key != MAX_KEY)
+ {
+ KEY *pk_info= table->key_info + table_share->primary_key;
+ uint i;
+ for (i= 0; i < pk_info->key_parts; i++)
+ {
+ KEY_PART_INFO *kp= &pk_info->key_part[i];
+ bitmap_set_bit(&m_pk_bitmap, kp->fieldnr - 1);
+ }
+ }
+ else
+ {
+ /* Hidden primary key. */
+ uint field_no= table_share->fields;
+ ((uchar *)m_pk_bitmap_buf)[field_no>>3]|= (1 << (field_no & 7));
+
+ if ((error= add_hidden_pk_ndb_record(dict)) != 0)
+ goto err;
+ }
+
+ calc_batch_buffer_size();
+
+ if ((error= add_table_ndb_record(dict)) != 0)
+ goto err;
if ((error= open_indexes(ndb, table, FALSE)) == 0)
{
ndbtab_g.release();
DBUG_RETURN(0);
}
+
err:
ndbtab_g.invalidate();
m_table= NULL;
@@ -1190,9 +1366,13 @@ static void ndb_init_index(NDB_INDEX_DAT
data.index_stat_cache_entries=0;
data.index_stat_update_freq=0;
data.index_stat_query_count=0;
+ data.ndb_record_key= NULL;
+ data.ndb_record_row= NULL;
+ data.ndb_unique_record_key= NULL;
+ data.ndb_unique_record_row= NULL;
}
-static void ndb_clear_index(NDB_INDEX_DATA &data)
+static void ndb_clear_index(NDBDICT *dict, NDB_INDEX_DATA &data)
{
if (data.unique_index_attrid_map)
{
@@ -1202,6 +1382,14 @@ static void ndb_clear_index(NDB_INDEX_DA
{
delete data.index_stat;
}
+ if (data.ndb_unique_record_key)
+ dict->releaseRecord(data.ndb_unique_record_key);
+ if (data.ndb_unique_record_row)
+ dict->releaseRecord(data.ndb_unique_record_row);
+ if (data.ndb_record_key)
+ dict->releaseRecord(data.ndb_record_key);
+ if (data.ndb_record_row)
+ dict->releaseRecord(data.ndb_record_row);
ndb_init_index(data);
}
@@ -1213,6 +1401,7 @@ int ha_ndbcluster::add_index_handle(THD
const char *index_name, uint index_no)
{
int error= 0;
+
NDB_INDEX_TYPE idx_type= get_index_type_from_table(index_no);
m_index[index_no].type= idx_type;
DBUG_ENTER("ha_ndbcluster::add_index_handle");
@@ -1284,6 +1473,10 @@ int ha_ndbcluster::add_index_handle(THD
m_index[index_no].unique_index= index;
error= fix_unique_index_attr_order(m_index[index_no], index, key_info);
}
+
+ if (!error)
+ error= add_index_ndb_record(dict, key_info, index_no);
+
if (!error)
m_index[index_no].status= ACTIVE;
@@ -1291,6 +1484,312 @@ int ha_ndbcluster::add_index_handle(THD
}
/*
+ We use this function to convert null bit masks, as found in class Field,
+ to bit numbers, as used in NdbRecord.
+*/
+static uint
+null_bit_mask_to_bit_number(uchar bit_mask)
+{
+ switch (bit_mask)
+ {
+ case 0x1: return 0;
+ case 0x2: return 1;
+ case 0x4: return 2;
+ case 0x8: return 3;
+ case 0x10: return 4;
+ case 0x20: return 5;
+ case 0x40: return 6;
+ case 0x80: return 7;
+ default:
+ DBUG_ASSERT(false);
+ return 0;
+ }
+}
+
+static void
+ndb_set_record_specification(uint field_no,
+ NdbDictionary::RecordSpecification *spec,
+ const TABLE *table,
+ const NdbDictionary::Table *ndb_table)
+{
+ spec->column= ndb_table->getColumn(field_no);
+ spec->offset= table->field[field_no]->ptr - table->record[0];
+ if (table->field[field_no]->null_ptr)
+ {
+ spec->nullbit_byte_offset=
+ (char *)table->field[field_no]->null_ptr - table->record[0];
+ spec->nullbit_bit_in_byte=
+ null_bit_mask_to_bit_number(table->field[field_no]->null_bit);
+ }
+ else if (table->field[field_no]->type() == MYSQL_TYPE_BIT)
+ {
+ /* We need to store the position of the overflow bits. */
+ const Field_bit* field_bit= static_cast<Field_bit*>(table->field[field_no]);
+ spec->nullbit_byte_offset=
+ (char *)field_bit->bit_ptr - table->record[0];
+ spec->nullbit_bit_in_byte= field_bit->bit_ofs;
+ }
+ else
+ {
+ spec->nullbit_byte_offset= 0;
+ spec->nullbit_bit_in_byte= 0;
+ }
+}
+
+int
+ha_ndbcluster::add_table_ndb_record(NDBDICT *dict)
+{
+ DBUG_ENTER("ha_ndbcluster::add_table_ndb_record()");
+ NdbDictionary::RecordSpecification spec[NDB_MAX_ATTRIBUTES_IN_TABLE + 2];
+ NdbRecord *rec;
+ uint i;
+
+ for (i= 0; i < table_share->fields; i++)
+ {
+ ndb_set_record_specification(i, &spec[i], table, m_table);
+ }
+
+ uint32 size= 0;
+ if (table_share->primary_key == MAX_KEY)
+ {
+ /* Access to the hidden primary key. */
+ spec[i].column= m_table->getColumn(i);
+ spec[i].offset= offset_hidden_key();
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ size+= NDB_HIDDEN_PRIMARY_KEY_LENGTH;
+ i++;
+ }
+ if (m_use_partition_function)
+ {
+ /* Access to the hidden partition function column. */
+ spec[i].column= m_table->getColumn(i);
+ spec[i].offset= offset_user_partition_function();
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ size+= 4;
+ i++;
+ }
+
+ rec= dict->createRecord(m_table, spec, i, sizeof(spec[0]),
+ NdbDictionary::RecMysqldBitfield);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_ndb_record= rec;
+
+ /*
+ We need a different NdbRecord for reading the FRAGMENT pseudo-column,
+ as pseudo-columns cannot be enabled/disabled with bitmask.
+ */
+ if (m_use_partition_function && table_share->primary_key == MAX_KEY)
+ {
+ spec[i].column= NdbDictionary::Column::FRAGMENT;
+ spec[i].offset= offset_user_partition_fragment();
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ size+= 4;
+ i++;
+
+ rec= dict->createRecord(m_table, spec, i, sizeof(spec[0]),
+ NdbDictionary::RecMysqldBitfield);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_ndb_record_fragment= rec;
+ }
+ else
+ m_ndb_record_fragment= NULL;
+
+ m_extra_reclength= size;
+
+ rec= ndb_get_table_statistics_ndbrecord(dict, m_table);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_ndb_statistics_record= rec;
+
+ DBUG_RETURN(0);
+}
+
+/* Create NdbRecord for setting hidden primary key from Uint64. */
+int
+ha_ndbcluster::add_hidden_pk_ndb_record(NDBDICT *dict)
+{
+ DBUG_ENTER("ha_ndbcluster::add_hidden_pk_ndb_record");
+ NdbDictionary::RecordSpecification spec[1];
+ NdbRecord *rec;
+
+ spec[0].column= m_table->getColumn(table_share->fields);
+ spec[0].offset= 0;
+ spec[0].nullbit_byte_offset= 0;
+ spec[0].nullbit_bit_in_byte= 0;
+
+ rec= dict->createRecord(m_table, spec, 1, sizeof(spec[0]));
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_ndb_hidden_key_record= rec;
+
+ DBUG_RETURN(0);
+}
+
+int
+ha_ndbcluster::add_index_ndb_record(NDBDICT *dict, KEY *key_info, uint index_no)
+{
+ DBUG_ENTER("ha_ndbcluster::add_index_ndb_record");
+ NdbDictionary::RecordSpecification spec[NDB_MAX_ATTRIBUTES_IN_TABLE + 2];
+ NdbRecord *rec;
+
+ Uint32 offset= 0;
+ for (uint i= 0; i < key_info->key_parts; i++)
+ {
+ KEY_PART_INFO *kp= &key_info->key_part[i];
+
+ spec[i].column= m_table->getColumn(kp->fieldnr - 1);
+ if (! spec[i].column)
+ ERR_RETURN(dict->getNdbError());
+ if (kp->null_bit)
+ {
+ /* Nullable column. */
+ spec[i].offset= offset + 1; // First byte is NULL flag
+ spec[i].nullbit_byte_offset= offset;
+ spec[i].nullbit_bit_in_byte= 0;
+ }
+ else
+ {
+ /* Not nullable column. */
+ spec[i].offset= offset;
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ }
+ offset+= kp->store_length;
+ }
+
+ if (m_index[index_no].index)
+ {
+ /*
+ Enable MysqldShrinkVarchar flag so that the two-byte length used by
+ mysqld for short varchar keys is correctly converted into a one-byte
+ length used by Ndb kernel.
+ */
+ rec= dict->createRecord(m_index[index_no].index, m_table,
+ spec, key_info->key_parts, sizeof(spec[0]),
+ ( NdbDictionary::RecMysqldShrinkVarchar |
+ NdbDictionary::RecMysqldBitfield ));
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_record_key= rec;
+ }
+ else
+ m_index[index_no].ndb_record_key= NULL;
+
+ if (m_index[index_no].unique_index)
+ {
+ rec= dict->createRecord(m_index[index_no].unique_index, m_table,
+ spec, key_info->key_parts, sizeof(spec[0]),
+ ( NdbDictionary::RecMysqldShrinkVarchar |
+ NdbDictionary::RecMysqldBitfield ));
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_unique_record_key= rec;
+ }
+ else if (index_no == table_share->primary_key)
+ {
+ /* The primary key is special, there is no explicit NDB index associated. */
+ rec= dict->createRecord(m_table,
+ spec, key_info->key_parts, sizeof(spec[0]),
+ ( NdbDictionary::RecMysqldShrinkVarchar |
+ NdbDictionary::RecMysqldBitfield ));
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_unique_record_key= rec;
+ }
+ else
+ m_index[index_no].ndb_unique_record_key= NULL;
+
+ /* Now do the same, but this time with offsets from Field, for row access. */
+ for (uint i= 0; i < key_info->key_parts; i++)
+ {
+ const KEY_PART_INFO *kp= &key_info->key_part[i];
+
+ spec[i].offset= kp->offset;
+ if (kp->null_bit)
+ {
+ /* Nullable column. */
+ spec[i].nullbit_byte_offset= kp->null_offset;
+ spec[i].nullbit_bit_in_byte= null_bit_mask_to_bit_number(kp->null_bit);
+ }
+ else
+ {
+ /* Not nullable column. */
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ }
+ }
+
+ if (m_index[index_no].unique_index)
+ {
+ rec= dict->createRecord(m_index[index_no].unique_index, m_table,
+ spec, key_info->key_parts, sizeof(spec[0]),
+ NdbDictionary::RecMysqldBitfield);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_unique_record_row= rec;
+ }
+ else if (index_no == table_share->primary_key)
+ {
+ rec= dict->createRecord(m_table,
+ spec, key_info->key_parts, sizeof(spec[0]),
+ NdbDictionary::RecMysqldBitfield);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_unique_record_row= rec;
+ }
+ else
+ m_index[index_no].ndb_unique_record_row= NULL;
+
+ /*
+ Now create ordered index ndb record for row access with all columns.
+ We need this to properly sort rows retrieved from ordered index scan.
+ */
+ if (m_index[index_no].index)
+ {
+ uint i;
+ for (i= 0; i < table_share->fields; i++)
+ {
+ ndb_set_record_specification(i, &spec[i], table, m_table);
+ }
+
+ if (table_share->primary_key == MAX_KEY)
+ {
+ /* Access to the hidden primary key. */
+ spec[i].column= m_table->getColumn(i);
+ spec[i].offset= offset_hidden_key();
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ i++;
+
+ if (m_use_partition_function)
+ {
+ spec[i].column= NdbDictionary::Column::FRAGMENT;
+ spec[i].offset= offset_user_partition_fragment();
+ spec[i].nullbit_byte_offset= 0;
+ spec[i].nullbit_bit_in_byte= 0;
+ i++;
+ }
+ }
+
+ rec= dict->createRecord(m_index[index_no].index, m_table,
+ spec, i, sizeof(spec[0]),
+ NdbDictionary::RecMysqldBitfield);
+ if (! rec)
+ ERR_RETURN(dict->getNdbError());
+ m_index[index_no].ndb_record_row= rec;
+ }
+ else
+ m_index[index_no].ndb_record_row= NULL;
+
+ DBUG_RETURN(0);
+}
+
+/*
Associate index handles for each index of a table
*/
int ha_ndbcluster::open_indexes(Ndb *ndb, TABLE *tab, bool ignore_error)
@@ -1422,7 +1921,7 @@ int ha_ndbcluster::drop_indexes(Ndb *ndb
}
if (error)
DBUG_RETURN(error);
- ndb_clear_index(m_index[i]);
+ ndb_clear_index(dict, m_index[i]);
continue;
}
}
@@ -1485,6 +1984,26 @@ void ha_ndbcluster::release_metadata(THD
}
if (m_table != NULL)
{
+ if (m_ndb_record != NULL)
+ {
+ dict->releaseRecord(m_ndb_record);
+ m_ndb_record= NULL;
+ }
+ if (m_ndb_record_fragment != NULL)
+ {
+ dict->releaseRecord(m_ndb_record_fragment);
+ m_ndb_record_fragment= NULL;
+ }
+ if (m_ndb_hidden_key_record != NULL)
+ {
+ dict->releaseRecord(m_ndb_hidden_key_record);
+ m_ndb_hidden_key_record= NULL;
+ }
+ if (m_ndb_statistics_record != NULL)
+ {
+ dict->releaseRecord(m_ndb_statistics_record);
+ m_ndb_statistics_record= NULL;
+ }
if (m_table->getObjectStatus() == NdbDictionary::Object::Invalid)
invalidate_indexes= 1;
dict->removeTableGlobal(*m_table, invalidate_indexes);
@@ -1506,19 +2025,20 @@ void ha_ndbcluster::release_metadata(THD
DBUG_ASSERT(m_table != NULL);
dict->removeIndexGlobal(*m_index[i].index, invalidate_indexes);
}
- ndb_clear_index(m_index[i]);
+ ndb_clear_index(dict, m_index[i]);
}
m_table= NULL;
DBUG_VOID_RETURN;
}
-int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
+int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type,
+ const MY_BITMAP *column_bitmap)
{
if (type >= TL_WRITE_ALLOW_WRITE)
return NdbOperation::LM_Exclusive;
if (type == TL_READ_WITH_SHARED_LOCKS ||
- uses_blob_value())
+ (column_bitmap != NULL && uses_blob_value(column_bitmap)))
return NdbOperation::LM_Read;
return NdbOperation::LM_CommittedRead;
}
@@ -1610,125 +2130,6 @@ static void shrink_varchar(Field* field,
}
}
-int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
-{
- KEY* key_info= table->key_info + table_share->primary_key;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ENTER("set_primary_key");
-
- for (; key_part != end; key_part++)
- {
- Field* field= key_part->field;
- const byte* ptr= key;
- char buf[256];
- shrink_varchar(field, ptr, buf);
- if (set_ndb_key(op, field,
- key_part->fieldnr-1, ptr))
- ERR_RETURN(op->getNdbError());
- key += key_part->store_length;
- }
- DBUG_RETURN(0);
-}
-
-
-int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *record)
-{
- KEY* key_info= table->key_info + table_share->primary_key;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ENTER("set_primary_key_from_record");
-
- for (; key_part != end; key_part++)
- {
- Field* field= key_part->field;
- if (set_ndb_key(op, field,
- key_part->fieldnr-1, record+key_part->offset))
- ERR_RETURN(op->getNdbError());
- }
- DBUG_RETURN(0);
-}
-
-int ha_ndbcluster::set_index_key_from_record(NdbOperation *op,
- const byte *record, uint keyno)
-{
- KEY* key_info= table->key_info + keyno;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- uint i;
- DBUG_ENTER("set_index_key_from_record");
-
- for (i= 0; key_part != end; key_part++, i++)
- {
- Field* field= key_part->field;
- if (set_ndb_key(op, field, m_index[keyno].unique_index_attrid_map[i],
- record+key_part->offset))
- ERR_RETURN(m_active_trans->getNdbError());
- }
- DBUG_RETURN(0);
-}
-
-int
-ha_ndbcluster::set_index_key(NdbOperation *op,
- const KEY *key_info,
- const byte * key_ptr)
-{
- DBUG_ENTER("ha_ndbcluster::set_index_key");
- uint i;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
-
- for (i= 0; key_part != end; key_part++, i++)
- {
- Field* field= key_part->field;
- const byte* ptr= key_part->null_bit ? key_ptr + 1 : key_ptr;
- char buf[256];
- shrink_varchar(field, ptr, buf);
- if (set_ndb_key(op, field, m_index[active_index].unique_index_attrid_map[i], ptr))
- ERR_RETURN(m_active_trans->getNdbError());
- key_ptr+= key_part->store_length;
- }
- DBUG_RETURN(0);
-}
-
-inline
-int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
-{
- uint i;
- DBUG_ENTER("define_read_attrs");
-
- // Define attributes to read
- for (i= 0; i < table_share->fields; i++)
- {
- Field *field= table->field[i];
- if (bitmap_is_set(table->read_set, i) ||
- ((field->flags & PRI_KEY_FLAG)))
- {
- if (get_ndb_value(op, field, i, buf))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- m_value[i].ptr= NULL;
- }
- }
-
- if (table_share->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= table_share->fields;
-#ifndef DBUG_OFF
- const NDBTAB *tab= (const NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
-#endif
- if (get_ndb_value(op, NULL, hidden_no, NULL))
- ERR_RETURN(op->getNdbError());
- }
- DBUG_RETURN(0);
-}
-
/*
Read one record from NDB using primary key
@@ -1737,53 +2138,31 @@ int ha_ndbcluster::define_read_attrs(byt
int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf,
uint32 part_id)
{
- uint no_fields= table_share->fields;
NdbConnection *trans= m_active_trans;
NdbOperation *op;
-
+ char *row;
int res;
DBUG_ENTER("pk_read");
- DBUG_PRINT("enter", ("key_len: %u", key_len));
+ DBUG_PRINT("enter", ("key_len: %u read_set=%x",
+ key_len, table->read_set->bitmap[0]));
DBUG_DUMP("key", (char*)key, key_len);
- m_write_op= FALSE;
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (table_share->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- DBUG_DUMP("key", (char*)key, 8);
- if (set_hidden_key(op, no_fields, key))
- ERR_RETURN(trans->getNdbError());
-
- // Read key at the same time, for future reference
- if (get_ndb_value(op, NULL, no_fields, NULL))
- ERR_RETURN(trans->getNdbError());
- }
- else
+ if (table_share->primary_key == MAX_KEY)
{
- if ((res= set_primary_key(op, key)))
- return res;
+ row= get_row_buffer();
+ if (!row)
+ DBUG_RETURN(ER_OUTOFMEMORY);
}
-
- if ((res= define_read_attrs(buf, op)))
- DBUG_RETURN(res);
+ else
+ row= buf;
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
+ if (!(op= pk_unique_index_read_key(table->s->primary_key, key, row, lm)))
+ ERR_RETURN(trans->getNdbError());
+
if (m_use_partition_function)
- {
op->setPartitionId(part_id);
- // If table has user defined partitioning
- // and no indexes, we need to read the partition id
- // to support ORDER BY queries
- if (table_share->primary_key == MAX_KEY &&
- get_ndb_partition_id(op))
- ERR_RETURN(trans->getNdbError());
- }
if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 ||
op->getNdbError().code)
@@ -1792,8 +2171,14 @@ int ha_ndbcluster::pk_read(const byte *k
DBUG_RETURN(ndb_err(trans));
}
- // The value have now been fetched from NDB
- unpack_record(buf);
+ if (table_share->primary_key == MAX_KEY)
+ {
+ memcpy(buf, row, table_share->reclength);
+ m_ref= get_hidden_key(row);
+ if (m_use_partition_function)
+ m_part_id= get_partition_fragment(row);
+ }
+
table->status= 0;
DBUG_RETURN(0);
}
@@ -1806,11 +2191,9 @@ int ha_ndbcluster::pk_read(const byte *k
int ha_ndbcluster::complemented_read(const byte *old_data, byte *new_data,
uint32 old_part_id)
{
- uint no_fields= table_share->fields, i;
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
DBUG_ENTER("complemented_read");
- m_write_op= FALSE;
if (bitmap_is_set_all(table->read_set))
{
@@ -1818,62 +2201,51 @@ int ha_ndbcluster::complemented_read(con
DBUG_RETURN(0);
}
+ const NdbRecord *key_rec;
+ const char *key_row;
+ if (table_share->primary_key != MAX_KEY)
+ {
+ key_rec= m_index[table->s->primary_key].ndb_unique_record_row;
+ key_row= old_data;
+ }
+ else
+ {
+ /* Hidden primary key, previously read into m_ref. */
+ key_rec= m_ndb_hidden_key_record;
+ key_row= (const char *)(&m_ref);
+ }
+
+ /*
+ Use mask only with columns that are not in write_set, not in
+ read_set, and not part of the primary key.
+ */
+ bitmap_copy(&m_bitmap, table->read_set);
+ bitmap_union(&m_bitmap, table->write_set);
+ bitmap_invert(&m_bitmap);
NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, &m_bitmap);
+ if (!(op= trans->readTuple(key_rec, key_row, m_ndb_record, new_data,
+ lm, (const unsigned char *)(m_bitmap.bitmap))))
ERR_RETURN(trans->getNdbError());
- if (table_share->primary_key != MAX_KEY)
- {
- if (set_primary_key_from_record(op, old_data))
- ERR_RETURN(trans->getNdbError());
- }
- else
+
+ if (table_share->blob_fields > 0)
{
- // This table has no primary key, use "hidden" primary key
- if (set_hidden_key(op, table->s->fields, m_ref))
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ int res= get_blob_values(op, new_data, &m_bitmap);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (res != 0)
ERR_RETURN(op->getNdbError());
}
if (m_use_partition_function)
op->setPartitionId(old_part_id);
- // Read all unreferenced non-key field(s)
- for (i= 0; i < no_fields; i++)
- {
- Field *field= table->field[i];
- if (!((field->flags & PRI_KEY_FLAG) ||
- bitmap_is_set(table->read_set, i)) &&
- !bitmap_is_set(table->write_set, i))
- {
- if (get_ndb_value(op, field, i, new_data))
- ERR_RETURN(trans->getNdbError());
- }
- }
-
if (execute_no_commit(this,trans,FALSE) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
- // The value have now been fetched from NDB
- unpack_record(new_data);
- table->status= 0;
-
- /**
- * restore m_value
- */
- for (i= 0; i < no_fields; i++)
- {
- Field *field= table->field[i];
- if (!((field->flags & PRI_KEY_FLAG) ||
- bitmap_is_set(table->read_set, i)))
- {
- m_value[i].ptr= NULL;
- }
- }
-
DBUG_RETURN(0);
}
@@ -1962,13 +2334,17 @@ check_null_in_record(const KEY* key_info
*/
}
+/* Empty mask and dummy row, for reading no attributes using NdbRecord. */
+/* Mask will be initialized to all zeros by linker. */
+static unsigned char empty_mask[(NDB_MAX_ATTRIBUTES_IN_TABLE+7)/8];
+static char dummy_row[1];
+
/*
* Peek to check if any rows already exist with conflicting
* primary key or unique index values
*/
-int ha_ndbcluster::peek_indexed_rows(const byte *record,
- bool check_pk)
+int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
{
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
@@ -1978,20 +2354,20 @@ int ha_ndbcluster::peek_indexed_rows(con
DBUG_ENTER("peek_indexed_rows");
NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, NULL);
first= NULL;
if (check_pk && table->s->primary_key != MAX_KEY)
{
/*
* Fetch any row with colliding primary key
*/
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
+ const NdbRecord *key_rec=
+ m_index[table->s->primary_key].ndb_unique_record_row;
+ if (!(op= trans->readTuple(key_rec, (const char *)record,
+ key_rec, dummy_row, lm, empty_mask)))
ERR_RETURN(trans->getNdbError());
first= op;
- if ((res= set_primary_key_from_record(op, record)))
- ERR_RETURN(trans->getNdbError());
if (m_use_partition_function)
{
@@ -2013,7 +2389,6 @@ int ha_ndbcluster::peek_indexed_rows(con
* Fetch any rows with colliding unique indexes
*/
KEY* key_info;
- KEY_PART_INFO *key_part, *end;
for (i= 0, key_info= table->key_info; i < table->s->keys; i++, key_info++)
{
if (i != table->s->primary_key &&
@@ -2029,19 +2404,16 @@ int ha_ndbcluster::peek_indexed_rows(con
{
DBUG_PRINT("info", ("skipping check for key with NULL"));
continue;
- }
- NdbIndexOperation *iop;
- const NDBINDEX *unique_index = m_index[i].unique_index;
- key_part= key_info->key_part;
- end= key_part + key_info->key_parts;
- if (!(iop= trans->getNdbIndexOperation(unique_index, m_table)) ||
- iop->readTuple(lm) != 0)
+ }
+
+ NdbOperation *iop;
+ const NdbRecord *key_rec= m_index[i].ndb_unique_record_row;
+ if (!(iop= trans->readTuple(key_rec, record, key_rec, dummy_row,
+ lm, empty_mask)))
ERR_RETURN(trans->getNdbError());
if (!first)
first= iop;
- if ((res= set_index_key_from_record(iop, record, i)))
- ERR_RETURN(trans->getNdbError());
}
}
last= trans->getLastDefinedOperation();
@@ -2074,45 +2446,51 @@ int ha_ndbcluster::peek_indexed_rows(con
int ha_ndbcluster::unique_index_read(const byte *key,
uint key_len, byte *buf)
{
- int res;
NdbTransaction *trans= m_active_trans;
- NdbIndexOperation *op;
+ NdbOperation *op;
+ char *row;
DBUG_ENTER("ha_ndbcluster::unique_index_read");
DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
DBUG_DUMP("key", (char*)key, key_len);
+ if (table_share->primary_key == MAX_KEY)
+ {
+ row= get_row_buffer();
+ if (!row)
+ DBUG_RETURN(ER_OUTOFMEMORY);
+ }
+ else
+ row= buf;
+
NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbIndexOperation(m_index[active_index].unique_index,
- m_table)) ||
- op->readTuple(lm) != 0)
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
+ if (!(op= pk_unique_index_read_key(active_index, key, row, lm)))
ERR_RETURN(trans->getNdbError());
- // Set secondary index key(s)
- if ((res= set_index_key(op, table->key_info + active_index, key)))
- DBUG_RETURN(res);
-
- if ((res= define_read_attrs(buf, op)))
- DBUG_RETURN(res);
-
if (execute_no_commit_ie(this,trans,FALSE) != 0 ||
op->getNdbError().code)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
- // The value have now been fetched from NDB
- unpack_record(buf);
+
+ if (table_share->primary_key == MAX_KEY)
+ {
+ memcpy(buf, row, table_share->reclength);
+ m_ref= get_hidden_key(row);
+ if (m_use_partition_function)
+ m_part_id= get_partition_fragment(row);
+ }
+
table->status= 0;
DBUG_RETURN(0);
}
-inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
+int
+ha_ndbcluster::scan_handle_lock_tuple(NdbScanOperation *scanOp,
+ NdbTransaction *trans)
{
- DBUG_ENTER("fetch_next");
- int local_check;
- NdbTransaction *trans= m_active_trans;
-
+ DBUG_ENTER("ha_ndbcluster::scan_handle_lock_tuple");
if (m_lock_tuple)
{
/*
@@ -2121,24 +2499,36 @@ inline int ha_ndbcluster::fetch_next(Ndb
LOCK WITH SHARE MODE) and row was not explictly unlocked
with unlock_row() call
*/
- NdbConnection *con_trans= m_active_trans;
- NdbOperation *op;
- // Lock row
- DBUG_PRINT("info", ("Keeping lock on scanned row"));
+ NdbOperation *op;
+ // Lock row
+ DBUG_PRINT("info", ("Keeping lock on scanned row"));
- if (!(op= m_active_cursor->lockCurrentTuple()))
- {
- /* purecov: begin inspected */
- m_lock_tuple= FALSE;
- ERR_RETURN(con_trans->getNdbError());
- /* purecov: end */
- }
- m_ops_pending++;
+ if (!(op= scanOp->lockCurrentTuple(trans, m_ndb_record,
+ dummy_row, empty_mask)))
+ {
+ /* purecov: begin inspected */
+ m_lock_tuple= FALSE;
+ ERR_RETURN(trans->getNdbError());
+ /* purecov: end */
+ }
+ m_ops_pending++;
}
m_lock_tuple= FALSE;
+ DBUG_RETURN(0);
+}
+
+inline int ha_ndbcluster::fetch_next(NdbScanOperation* cursor)
+{
+ DBUG_ENTER("fetch_next");
+ int local_check;
+ int error;
+ NdbTransaction *trans= m_active_trans;
+
+ if ((error= scan_handle_lock_tuple(cursor, trans)) != 0)
+ DBUG_RETURN(error);
bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE &&
- m_lock.type != TL_READ_WITH_SHARED_LOCKS;;
+ m_lock.type != TL_READ_WITH_SHARED_LOCKS;
do {
DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
/*
@@ -2148,11 +2538,10 @@ inline int ha_ndbcluster::fetch_next(Ndb
{
if (execute_no_commit(this,trans,FALSE) != 0)
DBUG_RETURN(ndb_err(trans));
- m_ops_pending= 0;
- m_blobs_pending= FALSE;
}
- if ((local_check= cursor->nextResult(contact_ndb, m_force_send)) == 0)
+ if ((local_check= cursor->nextResult(m_next_row, contact_ndb,
+ m_force_send)) == 0)
{
/*
Explicitly lock tuple if "select for update" or
@@ -2176,22 +2565,8 @@ inline int ha_ndbcluster::fetch_next(Ndb
DBUG_PRINT("info", ("ops_pending: %ld", (long) m_ops_pending));
if (m_ops_pending)
{
- if (m_transaction_on)
- {
- if (execute_no_commit(this,trans,FALSE) != 0)
- DBUG_RETURN(-1);
- }
- else
- {
- if (execute_commit(this,trans) != 0)
- DBUG_RETURN(-1);
- if (trans->restart() != 0)
- {
- DBUG_ASSERT(0);
- DBUG_RETURN(-1);
- }
- }
- m_ops_pending= 0;
+ if (execute_no_commit(this,trans,FALSE) != 0)
+ DBUG_RETURN(-1);
}
contact_ndb= (local_check == 2);
}
@@ -2227,7 +2602,14 @@ inline int ha_ndbcluster::next_result(by
{
DBUG_PRINT("info", ("One more record found"));
- unpack_record(buf);
+ if (table_share->primary_key == MAX_KEY)
+ {
+ m_ref= get_hidden_key(m_next_row);
+ if (m_use_partition_function)
+ m_part_id= get_partition_fragment(m_next_row);
+ }
+
+ unpack_record_ndbrecord(buf, m_next_row);
table->status= 0;
DBUG_RETURN(0);
}
@@ -2246,9 +2628,54 @@ inline int ha_ndbcluster::next_result(by
}
/*
+ Do a primary key or unique key index read operation.
+ The key value is taken from a buffer in mysqld key format.
+*/
+NdbOperation *
+ha_ndbcluster::pk_unique_index_read_key(uint idx, const byte *key, byte *buf,
+ NdbOperation::LockMode lm)
+{
+ NdbOperation *op;
+ const NdbRecord *ndb_record= m_ndb_record;
+ uchar *mask= (uchar *)(table->read_set->bitmap);
+ const NdbRecord *key_rec;
+ if (idx != MAX_KEY)
+ key_rec= m_index[idx].ndb_unique_record_key;
+ else
+ key_rec= m_ndb_hidden_key_record;
+
+ /* Initialize the null bitmap, setting unused null bits to 1. */
+ memset(buf, 0xff, table->s->null_bytes);
+
+ if (m_use_partition_function || table_share->primary_key == MAX_KEY)
+ {
+ /*
+ We need an extended column mask.
+ We may also need to read the hidden primary key and the FRAGMENT
+ pseudo-column.
+ */
+ mask= copy_column_set(table->read_set);
+ if (table_share->primary_key == MAX_KEY)
+ {
+ request_hidden_key(mask);
+ if (m_use_partition_function)
+ ndb_record= m_ndb_record_fragment;
+ }
+ }
+ op= m_active_trans->readTuple(key_rec, key, ndb_record, buf, lm, mask);
+
+ if (uses_blob_value(table->read_set) &&
+ get_blob_values(op, buf, table->read_set) != 0)
+ return NULL;
+
+ return op;
+}
+
+/*
Set bounds for ordered index scan.
*/
+/* ToDo: remove if converting records_in_range() to NdbRecord. */
int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
uint inx,
bool rir,
@@ -2312,7 +2739,7 @@ int ha_ndbcluster::set_bounds(NdbIndexSc
p.key= keys[j];
p.part_ptr= &p.key->key[tot_len];
p.part_null= key_part->null_bit && *p.part_ptr;
- p.bound_ptr= (const char *)
+ p.bound_ptr=
p.part_null ? 0 : key_part->null_bit ? p.part_ptr + 1 : p.part_ptr;
if (j == 0)
@@ -2426,6 +2853,95 @@ int ha_ndbcluster::set_bounds(NdbIndexSc
DBUG_RETURN(op->end_of_bound(range_no));
}
+/* Count number of columns in key part. */
+static uint
+count_key_columns(const KEY *key_info, const key_range *key)
+{
+ KEY_PART_INFO *first_key_part= key_info->key_part;
+ KEY_PART_INFO *key_part_end= first_key_part + key_info->key_parts;
+ KEY_PART_INFO *key_part;
+ uint length= 0;
+ for(key_part= first_key_part; key_part < key_part_end; key_part++)
+ {
+ if (length >= key->length)
+ break;
+ length+= key_part->store_length;
+ }
+ return key_part - first_key_part;
+}
+
+/* Helper method to compute NDB index bounds. Note: does not set range_no. */
+static void
+compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
+ const KEY *key_info,
+ const key_range *start_key, const key_range *end_key)
+{
+ if (start_key)
+ {
+ bound.low_key= start_key->key;
+ bound.low_key_count= count_key_columns(key_info, start_key);
+ bound.low_inclusive=
+ start_key->flag != HA_READ_AFTER_KEY &&
+ start_key->flag != HA_READ_BEFORE_KEY;
+ }
+ else
+ {
+ bound.low_key= NULL;
+ bound.low_key_count= 0;
+ }
+
+ if (start_key &&
+ (start_key->flag == HA_READ_KEY_EXACT ||
+ start_key->flag == HA_READ_PREFIX_LAST))
+ {
+ bound.high_key= bound.low_key;
+ bound.high_key_count= bound.low_key_count;
+ bound.high_inclusive= TRUE;
+ }
+ else if (end_key)
+ {
+ bound.high_key= end_key->key;
+ bound.high_key_count= count_key_columns(key_info, end_key);
+ /*
+ For some reason, 'where b >= 1 and b <= 3' uses HA_READ_AFTER_KEY for
+ the end_key.
+ So HA_READ_AFTER_KEY in end_key sets high_inclusive, even though in
+ start_key it does not set low_inclusive.
+ */
+ bound.high_inclusive= end_key->flag != HA_READ_BEFORE_KEY;
+ if (end_key->flag == HA_READ_KEY_EXACT ||
+ end_key->flag == HA_READ_PREFIX_LAST)
+ {
+ bound.low_key= bound.high_key;
+ bound.low_key_count= bound.high_key_count;
+ bound.low_inclusive= TRUE;
+ }
+ }
+ else
+ {
+ bound.high_key= NULL;
+ bound.high_key_count= 0;
+ }
+}
+
+struct ordered_index_scan_data {
+ const KEY *key_info;
+ const key_range *start_key;
+ const key_range *end_key;
+};
+
+/* Callback to set up scan bounds for ordered_index_scan(). */
+static int
+ordered_index_scan_callback(void *arg, Uint32 i,
+ NdbIndexScanOperation::IndexBound & bound)
+{
+ struct ordered_index_scan_data *data= (struct ordered_index_scan_data *)arg;
+ compute_index_bounds(bound, data->key_info, data->start_key, data->end_key);
+ bound.range_no= 0;
+ return 0; // Success
+}
+
+
/*
Start ordered index scan in NDB
*/
@@ -2435,73 +2951,72 @@ int ha_ndbcluster::ordered_index_scan(co
bool sorted, bool descending,
byte* buf, part_id_range *part_spec)
{
- int res;
- bool restart;
NdbTransaction *trans= m_active_trans;
NdbIndexScanOperation *op;
+ struct ordered_index_scan_data data;
+ uchar *mask;
+ int error;
DBUG_ENTER("ha_ndbcluster::ordered_index_scan");
- DBUG_PRINT("enter", ("index: %u, sorted: %d, descending: %d",
- active_index, sorted, descending));
+ DBUG_PRINT("enter", ("index: %u, sorted: %d, descending: %d read_set=0x%x",
+ active_index, sorted, descending, table->read_set->bitmap[0]));
DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
- m_write_op= FALSE;
// Check that sorted seems to be initialised
DBUG_ASSERT(sorted == 0 || sorted == 1);
- if (m_active_cursor == 0)
+ if (m_active_cursor && (error= close_scan()))
+ DBUG_RETURN(error);
+
+ if (m_use_partition_function || table_share->primary_key == MAX_KEY)
{
- restart= FALSE;
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- bool need_pk = (lm == NdbOperation::LM_Read);
- if (!(op= trans->getNdbIndexScanOperation(m_index[active_index].index,
- m_table)) ||
- op->readTuples(lm, 0, parallelism, sorted, descending, FALSE, need_pk))
- ERR_RETURN(trans->getNdbError());
- if (m_use_partition_function && part_spec != NULL &&
- part_spec->start_part == part_spec->end_part)
- op->setPartitionId(part_spec->start_part);
- m_active_cursor= op;
- } else {
- restart= TRUE;
- op= (NdbIndexScanOperation*)m_active_cursor;
-
- if (m_use_partition_function && part_spec != NULL &&
- part_spec->start_part == part_spec->end_part)
- op->setPartitionId(part_spec->start_part);
- DBUG_ASSERT(op->getSorted() == sorted);
- DBUG_ASSERT(op->getLockMode() ==
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
- if (op->reset_bounds(m_force_send))
- DBUG_RETURN(ndb_err(m_active_trans));
+ mask= copy_column_set(table->read_set);
+ if (table_share->primary_key == MAX_KEY)
+ request_hidden_key(mask);
}
-
+ else
+ mask= (uchar *)(table->read_set->bitmap);
+
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
+ Uint32 scan_flags= 0;
+ if (lm == NdbOperation::LM_Read)
+ scan_flags|= NdbScanOperation::SF_KeyInfo;
+ if (sorted)
+ scan_flags|= NdbScanOperation::SF_OrderBy;
+ if (descending)
+ scan_flags|= NdbScanOperation::SF_Descending;
+ const NdbRecord *key_rec= m_index[active_index].ndb_record_key;
+ const NdbRecord *row_rec= m_index[active_index].ndb_record_row;
+ Uint32 num_bounds= (start_key != NULL || end_key != NULL);
+ data.key_info= table->key_info + active_index;
+ if (!descending) {
+ data.start_key= start_key;
+ data.end_key= end_key;
+ }
+ else
{
- const key_range *keys[2]= { start_key, end_key };
- res= set_bounds(op, active_index, FALSE, keys);
- if (res)
- DBUG_RETURN(res);
+ data.start_key= end_key;
+ data.end_key= start_key;
}
- if (!restart)
- {
- if (m_cond && m_cond->generate_scan_filter(op))
- DBUG_RETURN(ndb_err(trans));
+ if (!(op= trans->scanIndex(key_rec, ordered_index_scan_callback, &data,
+ num_bounds, row_rec, lm,
+ mask,
+ scan_flags, parallelism, 0)))
+ ERR_RETURN(trans->getNdbError());
- if ((res= define_read_attrs(buf, op)))
- {
- DBUG_RETURN(res);
- }
-
- // If table has user defined partitioning
- // and no primary key, we need to read the partition id
- // to support ORDER BY queries
- if (m_use_partition_function &&
- (table_share->primary_key == MAX_KEY) &&
- (get_ndb_partition_id(op)))
- ERR_RETURN(trans->getNdbError());
- }
+ if (uses_blob_value(table->read_set) &&
+ get_blob_values(op, NULL, table->read_set) != 0)
+ ERR_RETURN(op->getNdbError());
+
+ if (m_use_partition_function && part_spec != NULL &&
+ part_spec->start_part == part_spec->end_part)
+ op->setPartitionId(part_spec->start_part);
+ m_active_cursor= op;
+
+ if (m_cond && m_cond->generate_scan_filter(op))
+ DBUG_RETURN(ndb_err(trans));
if (execute_no_commit(this,trans,FALSE) != 0)
DBUG_RETURN(ndb_err(trans));
@@ -2543,22 +3058,55 @@ int ha_ndbcluster::unique_index_scan(con
uint key_len,
byte *buf)
{
- int res;
NdbScanOperation *op;
NdbTransaction *trans= m_active_trans;
part_id_range part_spec;
+ uchar *mask= (uchar *)(table->read_set->bitmap);
+ const NdbRecord *ndb_record= m_ndb_record;
DBUG_ENTER("unique_index_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
+ if (table_share->primary_key == MAX_KEY || m_use_partition_function)
+ {
+ mask= copy_column_set(table->read_set);
+ if (m_use_partition_function)
+ {
+ part_spec.start_part= 0;
+ part_spec.end_part= m_part_info->get_tot_partitions() - 1;
+ prune_partition_set(table, &part_spec);
+ DBUG_PRINT("info", ("part_spec.start_part: %u part_spec.end_part: %u",
+ part_spec.start_part, part_spec.end_part));
+ /*
+ If partition pruning has found no partition in set
+ we can return HA_ERR_END_OF_FILE
+ */
+ if (part_spec.start_part > part_spec.end_part)
+ {
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+ }
+
+ // If table has user defined partitioning
+ // and no primary key, we need to read the partition id
+ // to support ORDER BY queries
+ if (table_share->primary_key == MAX_KEY)
+ ndb_record= m_ndb_record_fragment;
+ }
+ if (table_share->primary_key == MAX_KEY)
+ request_hidden_key(mask);
+ }
+
NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
int flags= guess_scan_flags(lm, m_table, table->read_set);
- if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
- op->readTuples(lm, flags, parallelism))
+ if (!(op=trans->scanTable(ndb_record, lm, mask, flags, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
+ if (uses_blob_value(table->read_set) &&
+ get_blob_values(op, NULL, table->read_set) != 0)
+ ERR_RETURN(op->getNdbError());
+
if (m_use_partition_function)
{
part_spec.start_part= 0;
@@ -2567,16 +3115,10 @@ int ha_ndbcluster::unique_index_scan(con
DBUG_PRINT("info", ("part_spec.start_part = %u, part_spec.end_part = %u",
part_spec.start_part, part_spec.end_part));
/*
- If partition pruning has found no partition in set
- we can return HA_ERR_END_OF_FILE
If partition pruning has found exactly one partition in set
we can optimize scan to run towards that partition only.
*/
- if (part_spec.start_part > part_spec.end_part)
- {
- DBUG_RETURN(HA_ERR_END_OF_FILE);
- }
- else if (part_spec.start_part == part_spec.end_part)
+ if (part_spec.start_part == part_spec.end_part)
{
/*
Only one partition is required to scan, if sorted is required we
@@ -2585,12 +3127,6 @@ int ha_ndbcluster::unique_index_scan(con
*/
m_active_cursor->setPartitionId(part_spec.start_part);
}
- // If table has user defined partitioning
- // and no primary key, we need to read the partition id
- // to support ORDER BY queries
- if ((table_share->primary_key == MAX_KEY) &&
- (get_ndb_partition_id(op)))
- ERR_RETURN(trans->getNdbError());
}
if (!m_cond)
m_cond= new ha_ndbcluster_cond;
@@ -2601,8 +3137,6 @@ int ha_ndbcluster::unique_index_scan(con
}
if (m_cond->generate_scan_filter_from_key(op, key_info, key, key_len, buf))
DBUG_RETURN(ndb_err(trans));
- if ((res= define_read_attrs(buf, op)))
- DBUG_RETURN(res);
if (execute_no_commit(this,trans,FALSE) != 0)
DBUG_RETURN(ndb_err(trans));
@@ -2617,41 +3151,62 @@ int ha_ndbcluster::unique_index_scan(con
int ha_ndbcluster::full_table_scan(byte *buf)
{
- int res;
NdbScanOperation *op;
NdbTransaction *trans= m_active_trans;
part_id_range part_spec;
+ uchar *mask= (uchar *)(table->read_set->bitmap);
+ const NdbRecord *ndb_record= m_ndb_record;
DBUG_ENTER("full_table_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
- m_write_op= FALSE;
+
+ if (table_share->primary_key == MAX_KEY || m_use_partition_function)
+ {
+ mask= copy_column_set(table->read_set);
+ if (m_use_partition_function)
+ {
+ part_spec.start_part= 0;
+ part_spec.end_part= m_part_info->get_tot_partitions() - 1;
+ prune_partition_set(table, &part_spec);
+ DBUG_PRINT("info", ("part_spec.start_part: %u part_spec.end_part: %u",
+ part_spec.start_part, part_spec.end_part));
+ /*
+ If partition pruning has found no partition in set
+ we can return HA_ERR_END_OF_FILE
+ */
+ if (part_spec.start_part > part_spec.end_part)
+ {
+ DBUG_RETURN(HA_ERR_END_OF_FILE);
+ }
+
+ // If table has user defined partitioning
+ // and no primary key, we need to read the partition id
+ // to support ORDER BY queries
+ if (table_share->primary_key == MAX_KEY)
+ ndb_record= m_ndb_record_fragment;
+ }
+ if (table_share->primary_key == MAX_KEY)
+ request_hidden_key(mask);
+ }
NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
int flags= guess_scan_flags(lm, m_table, table->read_set);
- if (!(op=trans->getNdbScanOperation(m_table)) ||
- op->readTuples(lm, flags, parallelism))
+ if (!(op= trans->scanTable(ndb_record, lm, mask, flags, parallelism)))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
+ if (uses_blob_value(table->read_set) &&
+ get_blob_values(op, NULL, table->read_set) != 0)
+ ERR_RETURN(op->getNdbError());
+
if (m_use_partition_function)
{
- part_spec.start_part= 0;
- part_spec.end_part= m_part_info->get_tot_partitions() - 1;
- prune_partition_set(table, &part_spec);
- DBUG_PRINT("info", ("part_spec.start_part: %u part_spec.end_part: %u",
- part_spec.start_part, part_spec.end_part));
/*
- If partition pruning has found no partition in set
- we can return HA_ERR_END_OF_FILE
If partition pruning has found exactly one partition in set
we can optimize scan to run towards that partition only.
*/
- if (part_spec.start_part > part_spec.end_part)
- {
- DBUG_RETURN(HA_ERR_END_OF_FILE);
- }
- else if (part_spec.start_part == part_spec.end_part)
+ if (part_spec.start_part == part_spec.end_part)
{
/*
Only one partition is required to scan, if sorted is required we
@@ -2660,18 +3215,10 @@ int ha_ndbcluster::full_table_scan(byte
*/
m_active_cursor->setPartitionId(part_spec.start_part);
}
- // If table has user defined partitioning
- // and no primary key, we need to read the partition id
- // to support ORDER BY queries
- if ((table_share->primary_key == MAX_KEY) &&
- (get_ndb_partition_id(op)))
- ERR_RETURN(trans->getNdbError());
}
if (m_cond && m_cond->generate_scan_filter(op))
DBUG_RETURN(ndb_err(trans));
- if ((res= define_read_attrs(buf, op)))
- DBUG_RETURN(res);
if (execute_no_commit(this,trans,FALSE) != 0)
DBUG_RETURN(ndb_err(trans));
@@ -2679,36 +3226,39 @@ int ha_ndbcluster::full_table_scan(byte
DBUG_RETURN(next_result(buf));
}
+int ha_ndbcluster::write_row(byte *record)
+{
+ DBUG_ENTER("ha_ndbcluster::write_row");
+ DBUG_RETURN(ndb_write_row(record, FALSE, FALSE));
+}
+
/*
Insert one record into NDB
*/
-int ha_ndbcluster::write_row(byte *record)
+int ha_ndbcluster::ndb_write_row(byte *record, bool primary_key_update,
+ bool batched_update)
{
bool has_auto_increment;
- uint i;
NdbTransaction *trans= m_active_trans;
NdbOperation *op;
- int res;
THD *thd= table->in_use;
- longlong func_value= 0;
- DBUG_ENTER("ha_ndbcluster::write_row");
+ uint32 part_id;
+ char *row;
+ bool need_flush;
+ int error;
+ DBUG_ENTER("ha_ndbcluster::ndb_write_row");
- m_write_op= TRUE;
has_auto_increment= (table->next_number_field && record == table->record[0]);
- if (table_share->primary_key != MAX_KEY)
+
+ if (has_auto_increment && table_share->primary_key != MAX_KEY)
{
/*
* Increase any auto_incremented primary key
*/
- if (has_auto_increment)
- {
- int error;
-
- m_skip_auto_increment= FALSE;
- if ((error= update_auto_increment()))
- DBUG_RETURN(error);
- m_skip_auto_increment= (insert_id_for_cur_row == 0);
- }
+ m_skip_auto_increment= FALSE;
+ if ((error= update_auto_increment()))
+ DBUG_RETURN(error);
+ m_skip_auto_increment= (insert_id_for_cur_row == 0);
}
/*
@@ -2731,33 +3281,42 @@ int ha_ndbcluster::write_row(byte *recor
DBUG_RETURN(peek_res);
}
- statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
- if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
- table->timestamp_field->set_time();
-
- if (!(op= trans->getNdbOperation(m_table)))
- ERR_RETURN(trans->getNdbError());
-
- res= (m_use_write) ? op->writeTuple() :op->insertTuple();
- if (res != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (m_use_partition_function)
+ /*
+ Since the NdbRecord operations need row data to remain valid until
+ execute(), for bulk insert we need to save rows in a buffer, and
+ execute() whenever the buffer gets full.
+
+ For non-bulk insert, we may still need to copy the row into a (bigger)
+ buffer if we need extra space for the user-defined partitioning hash
+ or the hidden primary key, but we always execute() in this case.
+ */
+ bool uses_blobs= uses_blob_value(table->write_set);
+ if ((m_rows_to_insert > 1 && !uses_blobs) || batched_update)
{
- uint32 part_id;
- int error;
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- error= m_part_info->get_partition_id(m_part_info, &part_id, &func_value);
- dbug_tmp_restore_column_map(table->read_set, old_map);
- if (error)
- {
- m_part_info->err_value= func_value;
+ /* This sets row and need_flush (output parameters). */
+ error= batch_copy_row_to_buffer(record, row, need_flush);
+ DBUG_PRINT("info", ("allocating buffer for bulk insert, "
+ "m_rows_to_insert=%d write_set=0x%x",
+ (int)m_rows_to_insert, table->write_set->bitmap[0]));
+ if (error != 0)
DBUG_RETURN(error);
+ }
+ else
+ {
+ DBUG_PRINT("info", ("Non-bulk insert."));
+ need_flush= TRUE;
+ if (table_share->primary_key == MAX_KEY || m_use_partition_function)
+ {
+ DBUG_PRINT("info", ("Getting single buffer for oversize record."));
+ row= copy_row_to_buffer(record);
+ if (!row)
+ DBUG_RETURN(ER_OUTOFMEMORY);
}
- op->setPartitionId(part_id);
+ else
+ row= record;
}
- if (table_share->primary_key == MAX_KEY)
+ if (table_share->primary_key == MAX_KEY)
{
// Table has hidden primary key
Ndb *ndb= get_ndb();
@@ -2772,35 +3331,21 @@ int ha_ndbcluster::write_row(byte *recor
ndb->getNdbError().status == NdbError::TemporaryError);
if (ret == -1)
ERR_RETURN(ndb->getNdbError());
- if (set_hidden_key(op, table_share->fields, (const byte*)&auto_value))
- ERR_RETURN(op->getNdbError());
+ set_hidden_key(row, auto_value);
}
- else
- {
- int error;
- if ((error= set_primary_key_from_record(op, record)))
- DBUG_RETURN(error);
- }
- // Set non-key attribute(s)
- bool set_blob_value= FALSE;
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- for (i= 0; i < table_share->fields; i++)
+ if (m_use_partition_function)
{
- Field *field= table->field[i];
- if (!(field->flags & PRI_KEY_FLAG) &&
- (bitmap_is_set(table->write_set, i) || !m_use_write) &&
- set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
+ longlong func_value= 0;
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ error= m_part_info->get_partition_id(m_part_info, &part_id, &func_value);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (error)
{
- m_skip_auto_increment= TRUE;
- dbug_tmp_restore_column_map(table->read_set, old_map);
- ERR_RETURN(op->getNdbError());
+ m_part_info->err_value= func_value;
+ DBUG_RETURN(error);
}
- }
- dbug_tmp_restore_column_map(table->read_set, old_map);
- if (m_use_partition_function)
- {
/*
We need to set the value of the partition function value in
NDB since the NDB kernel doesn't have easy access to the function
@@ -2808,12 +3353,61 @@ int ha_ndbcluster::write_row(byte *recor
*/
if (func_value >= INT_MAX32)
func_value= INT_MAX32;
- uint32 part_func_value= (uint32)func_value;
- uint no_fields= table_share->fields;
+ set_partition_function_value(row, (uint32)func_value);
+ }
+
+ statistic_increment(thd->status_var.ha_write_count, &LOCK_status);
+ if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
+ table->timestamp_field->set_time();
+
+ /*
+ We do not use the table->write_set here.
+ The reason is that for REPLACE INTO t(a), the write_set is passed with
+ only column 'a' enabled.
+ But it is wrong not to write all columns in REPLACE, since REPLACE is
+ the same as DELETE+INSERT (ie. not writing all columns risks loosing
+ default values).
+ */
+ /*
+ ToDo: Actually, we have to use the write set, since otherwise replication
+ fails. Replication seems to rely on being able to replicate an update with
+ a write_row() with only some bits set in write_set, leaving other fields
+ intact.
+ This means that we now suffer from BUG#22045... :-/
+ */
+
+ if (m_use_write)
+ {
+ const NdbRecord *key_rec;
+ const char *key_row;
+ uchar *mask;
+ if (table_share->primary_key == MAX_KEY || m_use_partition_function)
+ {
+ mask= copy_column_set(table->write_set);
+ if (m_use_partition_function)
+ request_partition_function_value(mask);
+ if (table_share->primary_key == MAX_KEY)
+ request_hidden_key(mask);
+ }
+ else
+ mask= (uchar *)(table->write_set->bitmap);
+
if (table_share->primary_key == MAX_KEY)
- no_fields++;
- op->setValue(no_fields, part_func_value);
+ {
+ key_rec= m_ndb_hidden_key_record;
+ key_row= &row[offset_hidden_key()];
+ }
+ else
+ {
+ key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
+ key_row= row;
+ }
+ op= trans->writeTuple(key_rec, key_row, m_ndb_record, row, mask);
}
+ else
+ op= trans->insertTuple(m_ndb_record, row);
+ if (!(op))
+ ERR_RETURN(trans->getNdbError());
if (unlikely(m_slow_path))
{
@@ -2822,6 +3416,20 @@ int ha_ndbcluster::write_row(byte *recor
else if (thd->slave_thread)
op->setAnyValue(thd->server_id);
}
+
+ uint blob_count= 0;
+ if (table_share->blob_fields > 0)
+ {
+ my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+ int res= set_blob_values(op, row - table->record[0], NULL, &blob_count);
+ dbug_tmp_restore_column_map(table->read_set, old_map);
+ if (res != 0)
+ ERR_RETURN(op->getNdbError());
+ }
+
+ if (m_use_partition_function)
+ op->setPartitionId(part_id);
+
m_rows_changed++;
/*
@@ -2834,39 +3442,14 @@ int ha_ndbcluster::write_row(byte *recor
m_rows_inserted++;
no_uncommitted_rows_update(1);
m_bulk_insert_not_flushed= TRUE;
- if ((m_rows_to_insert == (ha_rows) 1) ||
- ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
- m_primary_key_update ||
- set_blob_value)
+ if (need_flush || primary_key_update)
{
// Send rows to NDB
- DBUG_PRINT("info", ("Sending inserts to NDB, "\
- "rows_inserted: %d bulk_insert_rows: %d",
- (int)m_rows_inserted, (int)m_bulk_insert_rows));
-
- m_bulk_insert_not_flushed= FALSE;
- if (m_transaction_on)
- {
- if (execute_no_commit(this,trans,FALSE) != 0)
- {
- m_skip_auto_increment= TRUE;
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- }
- else
+ int res= flush_bulk_insert();
+ if (res != 0)
{
- if (execute_commit(this,trans) != 0)
- {
- m_skip_auto_increment= TRUE;
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- if (trans->restart() != 0)
- {
- DBUG_ASSERT(0);
- DBUG_RETURN(-1);
- }
+ m_skip_auto_increment= TRUE;
+ DBUG_RETURN(res);
}
}
if ((has_auto_increment) && (m_skip_auto_increment))
@@ -2891,22 +3474,21 @@ int ha_ndbcluster::write_row(byte *recor
}
-/* Compare if a key in a row has changed */
-
-int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
- const byte * new_row)
+/* Compare if an update changes the primary key in a row. */
+int ha_ndbcluster::primary_key_cmp(const byte * old_row, const byte * new_row)
{
+ uint keynr= table_share->primary_key;
KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
for (; key_part != end ; key_part++)
{
- if (key_part->null_bit)
- {
- if ((old_row[key_part->null_offset] & key_part->null_bit) !=
- (new_row[key_part->null_offset] & key_part->null_bit))
- return 1;
- }
+ if (!bitmap_is_set(table->write_set, key_part->fieldnr - 1))
+ continue;
+
+ /* The primary key does not allow NULLs. */
+ DBUG_ASSERT(!key_part->null_bit);
+
if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
{
@@ -2935,14 +3517,12 @@ int ha_ndbcluster::update_row(const byte
NdbTransaction *trans= m_active_trans;
NdbScanOperation* cursor= m_active_cursor;
NdbOperation *op;
- uint i;
uint32 old_part_id= 0, new_part_id= 0;
int error;
longlong func_value;
bool pk_update= (table_share->primary_key != MAX_KEY &&
- key_cmp(table_share->primary_key, old_data, new_data));
+ primary_key_cmp(old_data, new_data));
DBUG_ENTER("update_row");
- m_write_op= TRUE;
/*
* If IGNORE the ignore constraint violations on primary and unique keys,
@@ -2994,9 +3574,7 @@ int ha_ndbcluster::update_row(const byte
DBUG_RETURN(read_res);
}
// Delete old row
- m_primary_key_update= TRUE;
- delete_res= delete_row(old_data);
- m_primary_key_update= FALSE;
+ delete_res= ndb_delete_row(old_data, TRUE);
if (delete_res)
{
DBUG_PRINT("info", ("delete failed"));
@@ -3004,23 +3582,20 @@ int ha_ndbcluster::update_row(const byte
}
// Insert new row
DBUG_PRINT("info", ("delete succeded"));
- m_primary_key_update= TRUE;
- insert_res= write_row(new_data);
- m_primary_key_update= FALSE;
+ bool batched_update= (cursor != 0);
+ insert_res= ndb_write_row(new_data, TRUE, batched_update);
if (insert_res)
{
DBUG_PRINT("info", ("insert failed"));
if (trans->commitStatus() == NdbConnection::Started)
{
// Undo delete_row(old_data)
- m_primary_key_update= TRUE;
- undo_res= write_row((byte *)old_data);
+ undo_res= ndb_write_row((byte *)old_data, TRUE, batched_update);
if (undo_res)
push_warning(current_thd,
MYSQL_ERROR::WARN_LEVEL_WARN,
undo_res,
"NDB failed undoing delete at primary key update");
- m_primary_key_update= FALSE;
}
DBUG_RETURN(insert_res);
}
@@ -3028,80 +3603,102 @@ int ha_ndbcluster::update_row(const byte
DBUG_RETURN(0);
}
+ /*
+ Set only non-primary-key attributes.
+ We already checked that any primary key attribute in write_set has no
+ real changes.
+ */
+ bitmap_copy(&m_bitmap, table->write_set);
+ bitmap_subtract(&m_bitmap, &m_pk_bitmap);
+ uchar *mask= (uchar *)(m_bitmap.bitmap);
+ /* Need to initialize bits for any extra hidden columns. */
+ if (table_share->primary_key == MAX_KEY || m_use_partition_function)
+ clear_extended_column_set(mask);
+
+ /* Need to set the value of any user-defined partitioning function. */
+ char *row;
+ bool need_execute;
+ /*
+ Batch update operation if we are doing a scan for update, unless
+ there exist UPDATE AFTER triggers
+ */
+ if (cursor && !m_update_cannot_batch)
+ {
+ /* For a scan, we only need to execute() if the batch buffer is full. */
+ error= batch_copy_row_to_buffer(new_data, row, need_execute);
+ if (error != 0)
+ DBUG_RETURN(error);
+ }
+ else
+ {
+ need_execute= TRUE;
+ if (m_use_partition_function)
+ {
+ row= copy_row_to_buffer(new_data);
+ if (!row)
+ DBUG_RETURN(ER_OUTOFMEMORY);
+ }
+ else
+ row= new_data;
+ }
+
+ if (m_use_partition_function)
+ {
+ if (func_value >= INT_MAX32)
+ func_value= INT_MAX32;
+ set_partition_function_value(row, (uint32)func_value);
+ request_partition_function_value(mask);
+ }
+
if (cursor)
{
/*
We are scanning records and want to update the record
- that was just found, call updateTuple on the cursor
+ that was just found, call updateCurrentTuple on the cursor
to take over the lock to a new update operation
And thus setting the primary key of the record from
the active record in cursor
*/
- DBUG_PRINT("info", ("Calling updateTuple on cursor"));
- if (!(op= cursor->updateCurrentTuple()))
+ DBUG_PRINT("info", ("Calling updateTuple on cursor, write_set=0x%x",
+ table->write_set->bitmap[0]));
+ if (!(op= cursor->updateCurrentTuple(trans, m_ndb_record, row, mask)))
ERR_RETURN(trans->getNdbError());
+
m_lock_tuple= FALSE;
m_ops_pending++;
- if (uses_blob_value())
- m_blobs_pending= TRUE;
- if (m_use_partition_function)
- cursor->setPartitionId(new_part_id);
}
else
{
- if (!(op= trans->getNdbOperation(m_table)) ||
- op->updateTuple() != 0)
- ERR_RETURN(trans->getNdbError());
-
- if (m_use_partition_function)
- op->setPartitionId(new_part_id);
- if (table_share->primary_key == MAX_KEY)
+ const NdbRecord *key_rec;
+ const char *key_row;
+ if (table_share->primary_key != MAX_KEY)
{
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
-
- // Require that the PK for this record has previously been
- // read into m_ref
- DBUG_DUMP("key", m_ref, NDB_HIDDEN_PRIMARY_KEY_LENGTH);
-
- if (set_hidden_key(op, table->s->fields, m_ref))
- ERR_RETURN(op->getNdbError());
- }
- else
+ key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
+ key_row= old_data;
+ }
+ else
{
- int res;
- if ((res= set_primary_key_from_record(op, old_data)))
- DBUG_RETURN(res);
+ /* Use hidden primary key previously read into m_ref. */
+ key_rec= m_ndb_hidden_key_record;
+ key_row= (const char *)(&m_ref);
}
- }
- m_rows_changed++;
+ if (!(op= trans->updateTuple(key_rec, key_row, m_ndb_record, row, mask)))
+ ERR_RETURN(trans->getNdbError());
+ }
- // Set non-key attribute(s)
- my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
- for (i= 0; i < table_share->fields; i++)
+ uint blob_count;
+ if (uses_blob_value(table->write_set))
{
- Field *field= table->field[i];
- if (bitmap_is_set(table->write_set, i) &&
- (!(field->flags & PRI_KEY_FLAG)) &&
- set_ndb_value(op, field, i, new_data - table->record[0]))
- {
- dbug_tmp_restore_column_map(table->read_set, old_map);
+ int row_offset= new_data - table->record[0];
+ if (set_blob_values(op, row_offset, table->write_set, &blob_count) != 0)
ERR_RETURN(op->getNdbError());
- }
+ if (cursor && blob_count > 0)
+ m_blobs_pending= TRUE;
}
- dbug_tmp_restore_column_map(table->read_set, old_map);
if (m_use_partition_function)
- {
- if (func_value >= INT_MAX32)
- func_value= INT_MAX32;
- uint32 part_func_value= (uint32)func_value;
- uint no_fields= table_share->fields;
- if (table_share->primary_key == MAX_KEY)
- no_fields++;
- op->setValue(no_fields, part_func_value);
- }
+ op->setPartitionId(new_part_id);
if (unlikely(m_slow_path))
{
@@ -3110,13 +3707,10 @@ int ha_ndbcluster::update_row(const byte
else if (thd->slave_thread)
op->setAnyValue(thd->server_id);
}
- /*
- Execute update operation if we are not doing a scan for update
- and there exist UPDATE AFTER triggers
- */
- if ((!cursor || m_update_cannot_batch) &&
- execute_no_commit(this,trans,false) != 0) {
+ m_rows_changed++;
+
+ if (need_execute && execute_no_commit(this,trans,FALSE) != 0) {
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
@@ -3125,11 +3719,16 @@ int ha_ndbcluster::update_row(const byte
}
+int ha_ndbcluster::delete_row(const byte *record)
+{
+ return ndb_delete_row(record, FALSE);
+}
+
/*
Delete one record from NDB, using primary key
*/
-int ha_ndbcluster::delete_row(const byte *record)
+int ha_ndbcluster::ndb_delete_row(const byte *record, bool primary_key_update)
{
THD *thd= table->in_use;
NdbTransaction *trans= m_active_trans;
@@ -3138,7 +3737,6 @@ int ha_ndbcluster::delete_row(const byte
uint32 part_id;
int error;
DBUG_ENTER("delete_row");
- m_write_op= TRUE;
statistic_increment(thd->status_var.ha_delete_count,&LOCK_status);
m_rows_changed++;
@@ -3160,54 +3758,48 @@ int ha_ndbcluster::delete_row(const byte
the active record in cursor
*/
DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
- if (cursor->deleteCurrentTuple() != 0)
+ if ((op= cursor->deleteCurrentTuple(trans, m_ndb_record)) == 0)
ERR_RETURN(trans->getNdbError());
m_lock_tuple= FALSE;
m_ops_pending++;
if (m_use_partition_function)
- cursor->setPartitionId(part_id);
+ op->setPartitionId(part_id);
no_uncommitted_rows_update(-1);
if (unlikely(m_slow_path))
{
if (!(thd->options & OPTION_BIN_LOG))
- ((NdbOperation *)trans->getLastDefinedOperation())->
- setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
+ op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
else if (thd->slave_thread)
- ((NdbOperation *)trans->getLastDefinedOperation())->
- setAnyValue(thd->server_id);
+ op->setAnyValue(thd->server_id);
}
- if (!(m_primary_key_update || m_delete_cannot_batch))
+ if (!primary_key_update || m_delete_cannot_batch)
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
}
else
{
-
- if (!(op=trans->getNdbOperation(m_table)) ||
- op->deleteTuple() != 0)
+ const NdbRecord *key_rec;
+ const char *key_row;
+ if (table_share->primary_key != MAX_KEY)
+ {
+ key_rec= m_index[table_share->primary_key].ndb_unique_record_row;
+ key_row= record;
+ }
+ else
+ {
+ key_rec= m_ndb_hidden_key_record;
+ key_row= (const char *)(&m_ref);
+ }
+ if (!(op=trans->deleteTuple(key_rec, key_row)))
ERR_RETURN(trans->getNdbError());
if (m_use_partition_function)
op->setPartitionId(part_id);
no_uncommitted_rows_update(-1);
-
- if (table_share->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
-
- if (set_hidden_key(op, table->s->fields, m_ref))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- if ((error= set_primary_key_from_record(op, record)))
- DBUG_RETURN(error);
- }
if (unlikely(m_slow_path))
{
@@ -3230,7 +3822,7 @@ int ha_ndbcluster::delete_row(const byte
Unpack a record read from NDB
SYNOPSIS
- unpack_record()
+ ndb_unpack_record()
buf Buffer to store read row
NOTE
@@ -3275,7 +3867,6 @@ void ndb_unpack_record(TABLE *table, Ndb
{
DBUG_PRINT("info",("[%u] NULL",
(*value).rec->getColumn()->getColumnNo()));
- field->set_null(row_offset);
}
else
{
@@ -3288,6 +3879,7 @@ void ndb_unpack_record(TABLE *table, Ndb
else if (field->type() == MYSQL_TYPE_BIT)
{
Field_bit *field_bit= static_cast<Field_bit*>(field);
+ field_bit->set_notnull(row_offset);
/*
Move internal field pointer to point to 'buf'. Calling
@@ -3332,6 +3924,7 @@ void ndb_unpack_record(TABLE *table, Ndb
}
else
{
+ field->set_notnull(row_offset);
DBUG_PRINT("info",("[%u] SET",
(*value).rec->getColumn()->getColumnNo()));
DBUG_DUMP("info", (const char*) field->ptr, field->pack_length());
@@ -3346,7 +3939,6 @@ void ndb_unpack_record(TABLE *table, Ndb
if (isNull == 1)
{
DBUG_PRINT("info",("[%u] NULL", col_no));
- field->set_null(row_offset);
}
else if (isNull == -1)
{
@@ -3355,6 +3947,7 @@ void ndb_unpack_record(TABLE *table, Ndb
}
else
{
+ field->set_notnull(row_offset);
#ifndef DBUG_OFF
// pointer vas set in get_ndb_blobs_value
Field_blob *field_blob= (Field_blob*)field;
@@ -3395,6 +3988,99 @@ void ha_ndbcluster::unpack_record(byte *
}
/*
+ Unpack a record returned from a scan.
+ We copy field-for-field to
+ 1. Avoid unnecessary copying for sparse rows.
+ 2. Properly initialize not used null bits.
+ Note that we do not unpack all returned rows; some primary/unique key
+ operations can read directly into the destination row.
+*/
+void ha_ndbcluster::unpack_record_ndbrecord(byte *dst_row, const byte *src_row)
+{
+ int res;
+ DBUG_ASSERT(src_row != NULL);
+
+ my_ptrdiff_t dst_offset= dst_row - table->record[0];
+ my_ptrdiff_t src_offset= src_row - table->record[0];
+
+ /* Initialize the NULL bitmap. */
+ memset(dst_row, 0xff, table->s->null_bytes);
+
+ char *blob_ptr= m_blobs_buffer;
+
+ for (uint i= 0; i < table_share->fields; i++)
+ {
+ Field *field= table->field[i];
+ if (bitmap_is_set(table->read_set, i))
+ {
+ if (field->type() == MYSQL_TYPE_BIT)
+ {
+ Field_bit *field_bit= static_cast<Field_bit*>(field);
+ if (!field->is_null_in_record_with_offset(src_offset))
+ {
+ field->move_field_offset(src_offset);
+ longlong value= field_bit->val_int();
+ field->move_field_offset(dst_offset-src_offset);
+ field_bit->set_notnull();
+ /* Field_bit in DBUG requires the bit set in write_set for store(). */
+ my_bitmap_map *old_map=
+ dbug_tmp_use_all_columns(table, table->write_set);
+ int res= field_bit->store(value, true);
+ dbug_tmp_restore_column_map(table->write_set, old_map);
+ DBUG_ASSERT(res == 0);
+ field->move_field_offset(-dst_offset);
+ }
+ }
+ else if (field->flags & BLOB_FLAG)
+ {
+ Field_blob *field_blob= (Field_blob *)field;
+ NdbBlob *ndb_blob= m_value[i].blob;
+ DBUG_ASSERT(ndb_blob != 0);
+ int isNull;
+ res= ndb_blob->getNull(isNull);
+ DBUG_ASSERT(res == 0); // Already succeeded once
+ Uint64 len64= 0;
+ field_blob->move_field_offset(dst_offset);
+ if (!isNull)
+ {
+ res= ndb_blob->getLength(len64);
+ DBUG_ASSERT(res == 0 && len64 <= (Uint64)0xffffffff);
+ field->set_notnull();
+ }
+ /* Need not set_null(), as we initialized null bits to 1 above. */
+ field_blob->set_ptr((uint32)len64, blob_ptr);
+ field_blob->move_field_offset(-dst_offset);
+ blob_ptr+= (len64 + 7) & ~((Uint64)7);
+ }
+ else
+ {
+ /* Normal field (not blob or bit type). */
+ if (!field->is_null_in_record_with_offset(src_offset))
+ {
+ char *src_ptr= field->ptr + src_offset;
+ /*
+ Hm, can't use the offset argument of set_notnull(), as it
+ is of type int, not my_ptrdiff_t (so could fail if
+ sizeof(int) < sizeof(char *)). Maybe this should really
+ be fixed in field.h?
+ */
+ field->move_field_offset(dst_offset);
+ field->set_notnull();
+ /*
+ ToDo: For varchar, maybe copy only the actually used bytes.
+ Unfortunately, there seems to me no suitable method to get this
+ actual size from field::*, so would need to check types explicitly.
+ */
+ memcpy(field->ptr, src_ptr, field->pack_length());
+ field->move_field_offset(-dst_offset);
+ }
+ /* No action needed for a NULL field. */
+ }
+ }
+ }
+}
+
+/*
Utility function to print/dump the fetched field
to avoid unnecessary work, wrap in DBUG_EXECUTE as in:
@@ -3612,7 +4298,6 @@ int ha_ndbcluster::read_range_first_to_b
}
}
- m_write_op= FALSE;
switch (type){
case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX:
@@ -3672,20 +4357,12 @@ int ha_ndbcluster::read_range_next()
int ha_ndbcluster::rnd_init(bool scan)
{
- NdbScanOperation *cursor= m_active_cursor;
+ int error;
DBUG_ENTER("rnd_init");
DBUG_PRINT("enter", ("scan: %d", scan));
- // Check if scan is to be restarted
- if (cursor)
- {
- if (!scan)
- DBUG_RETURN(1);
- if (cursor->restart(m_force_send) != 0)
- {
- DBUG_ASSERT(0);
- DBUG_RETURN(-1);
- }
- }
+
+ if (m_active_cursor && (error= close_scan()))
+ DBUG_RETURN(error);
index_init(table_share->primary_key, 0);
DBUG_RETURN(0);
}
@@ -3693,34 +4370,17 @@ int ha_ndbcluster::rnd_init(bool scan)
int ha_ndbcluster::close_scan()
{
NdbTransaction *trans= m_active_trans;
+ int error;
DBUG_ENTER("close_scan");
- m_multi_cursor= 0;
- if (!m_active_cursor && !m_multi_cursor)
+ NdbScanOperation *cursor= m_active_cursor;
+
+ if (!cursor)
DBUG_RETURN(0);
- NdbScanOperation *cursor= m_active_cursor ? m_active_cursor : m_multi_cursor;
+ if ((error= scan_handle_lock_tuple(cursor, trans)) != 0)
+ DBUG_RETURN(error);
- if (m_lock_tuple)
- {
- /*
- Lock level m_lock.type either TL_WRITE_ALLOW_WRITE
- (SELECT FOR UPDATE) or TL_READ_WITH_SHARED_LOCKS (SELECT
- LOCK WITH SHARE MODE) and row was not explictly unlocked
- with unlock_row() call
- */
- NdbOperation *op;
- // Lock row
- DBUG_PRINT("info", ("Keeping lock on scanned row"));
-
- if (!(op= cursor->lockCurrentTuple()))
- {
- m_lock_tuple= FALSE;
- ERR_RETURN(trans->getNdbError());
- }
- m_ops_pending++;
- }
- m_lock_tuple= FALSE;
if (m_ops_pending)
{
/*
@@ -3732,11 +4392,10 @@ int ha_ndbcluster::close_scan()
no_uncommitted_rows_execute_failure();
DBUG_RETURN(ndb_err(trans));
}
- m_ops_pending= 0;
}
cursor->close(m_force_send, TRUE);
- m_active_cursor= m_multi_cursor= NULL;
+ m_active_cursor= NULL;
DBUG_RETURN(0);
}
@@ -3891,7 +4550,7 @@ void ha_ndbcluster::position(const byte
hidden_col->getAutoIncrement() &&
key_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
#endif
- memcpy(ref, m_ref, key_length);
+ memcpy(ref, &m_ref, key_length);
}
#ifndef DBUG_OFF
if (table_share->primary_key == MAX_KEY && m_use_partition_function)
@@ -3936,8 +4595,8 @@ int ha_ndbcluster::info(uint flag)
DBUG_RETURN(my_errno= HA_ERR_OUT_OF_MEM);
}
if (current_thd->variables.ndb_use_exact_count &&
- (result= ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat))
- == 0)
+ (result= ndb_get_table_statistics
+ (this, TRUE, ndb, m_ndb_statistics_record, &stat)) == 0)
{
stats.mean_rec_length= stat.row_size;
stats.data_file_length= stat.fragment_memory;
@@ -4092,11 +4751,41 @@ int ha_ndbcluster::reset()
*/
-void ha_ndbcluster::start_bulk_insert(ha_rows rows)
+int
+ha_ndbcluster::flush_bulk_insert()
{
- int bytes, batch;
- const NDBTAB *tab= m_table;
+ NdbTransaction *trans= m_active_trans;
+ DBUG_ENTER("ha_ndbcluster::flush_bulk_insert");
+ DBUG_PRINT("info", ("Sending inserts to NDB, rows_inserted: %d",
+ (int)m_rows_inserted));
+ m_bulk_insert_not_flushed= FALSE;
+ if (m_transaction_on)
+ {
+ if (execute_no_commit(this,trans,FALSE) != 0)
+ {
+ no_uncommitted_rows_execute_failure();
+ DBUG_RETURN(ndb_err(trans));
+ }
+ }
+ else
+ {
+ if (execute_commit(this,trans) != 0)
+ {
+ no_uncommitted_rows_execute_failure();
+ DBUG_RETURN(ndb_err(trans));
+ }
+ if (trans->restart() != 0)
+ {
+ DBUG_ASSERT(0);
+ DBUG_RETURN(-1);
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+void ha_ndbcluster::start_bulk_insert(ha_rows rows)
+{
DBUG_ENTER("start_bulk_insert");
DBUG_PRINT("enter", ("rows: %d", (int)rows));
@@ -4111,7 +4800,6 @@ void ha_ndbcluster::start_bulk_insert(ha
DBUG_PRINT("info", ("Batching turned off as duplicate key is "
"ignored by using peek_row"));
m_rows_to_insert= 1;
- m_bulk_insert_rows= 1;
DBUG_VOID_RETURN;
}
if (rows == (ha_rows) 0)
@@ -4122,20 +4810,6 @@ void ha_ndbcluster::start_bulk_insert(ha
else
m_rows_to_insert= rows;
- /*
- Calculate how many rows that should be inserted
- per roundtrip to NDB. This is done in order to minimize the
- number of roundtrips as much as possible. However performance will
- degrade if too many bytes are inserted, thus it's limited by this
- calculation.
- */
- const int bytesperbatch= 8192;
- bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
- batch= bytesperbatch/bytes;
- batch= batch == 0 ? 1 : batch;
- DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
- m_bulk_insert_rows= batch;
-
DBUG_VOID_RETURN;
}
@@ -4150,33 +4824,9 @@ int ha_ndbcluster::end_bulk_insert()
// Check if last inserts need to be flushed
if (m_bulk_insert_not_flushed)
{
- NdbTransaction *trans= m_active_trans;
- // Send rows to NDB
- DBUG_PRINT("info", ("Sending inserts to NDB, "\
- "rows_inserted: %d bulk_insert_rows: %d",
- (int) m_rows_inserted, (int) m_bulk_insert_rows));
- m_bulk_insert_not_flushed= FALSE;
- if (m_transaction_on)
- {
- if (execute_no_commit(this, trans,FALSE) != 0)
- {
- no_uncommitted_rows_execute_failure();
- my_errno= error= ndb_err(trans);
- }
- }
- else
- {
- if (execute_commit(this, trans) != 0)
- {
- no_uncommitted_rows_execute_failure();
- my_errno= error= ndb_err(trans);
- }
- else
- {
- IF_DBUG(int res=) trans->restart();
- DBUG_ASSERT(res == 0);
- }
- }
+ error= flush_bulk_insert();
+ if (error != 0)
+ my_errno= error;
}
m_rows_inserted= (ha_rows) 0;
@@ -4457,7 +5107,7 @@ int ha_ndbcluster::external_lock(THD *th
DBUG_ASSERT(m_active_trans);
// Start of transaction
m_rows_changed= 0;
- m_ops_pending= 0;
+ reset_state_at_execute();
m_slow_path= thd_ndb->m_slow_path;
#ifdef HAVE_NDB_BINLOG
if (unlikely(m_slow_path))
@@ -4523,10 +5173,6 @@ int ha_ndbcluster::external_lock(THD *th
DBUG_PRINT("warning", ("m_active_cursor != NULL"));
m_active_cursor= NULL;
- if (m_multi_cursor)
- DBUG_PRINT("warning", ("m_multi_cursor != NULL"));
- m_multi_cursor= NULL;
-
if (m_blobs_pending)
DBUG_PRINT("warning", ("blobs_pending != 0"));
m_blobs_pending= 0;
@@ -4577,6 +5223,7 @@ int ha_ndbcluster::start_stmt(THD *thd,
trans= ndb->startTransaction();
if (trans == NULL)
ERR_RETURN(ndb->getNdbError());
+ reset_state_at_execute();
no_uncommitted_rows_reset(thd);
thd_ndb->stmt= trans;
thd_ndb->query_state&= NDB_QUERY_NORMAL;
@@ -4584,7 +5231,7 @@ int ha_ndbcluster::start_stmt(THD *thd,
}
m_active_trans= trans;
// Start of statement
- m_ops_pending= 0;
+ reset_state_at_execute();
thd->set_current_stmt_binlog_row_based_if_mixed();
DBUG_RETURN(error);
@@ -5226,7 +5873,7 @@ int ha_ndbcluster::create(const char *na
pk_length += 2;
}
- // Make sure that blob tables don't have to big part size
+ // Make sure that blob tables don't have too big part size
for (i= 0; i < form->s->fields; i++)
{
/**
@@ -6169,28 +6816,33 @@ ha_ndbcluster::ha_ndbcluster(handlerton
m_use_write(FALSE),
m_ignore_dup_key(FALSE),
m_has_unique_index(FALSE),
- m_primary_key_update(FALSE),
m_ignore_no_key(FALSE),
m_rows_to_insert((ha_rows) 1),
m_rows_inserted((ha_rows) 0),
- m_bulk_insert_rows((ha_rows) 1024),
m_rows_changed((ha_rows) 0),
m_bulk_insert_not_flushed(FALSE),
m_delete_cannot_batch(FALSE),
m_update_cannot_batch(FALSE),
m_ops_pending(0),
m_skip_auto_increment(TRUE),
+ m_row_buffer_current(NULL),
m_blobs_pending(0),
- m_blobs_offset(0),
m_blobs_buffer(0),
m_blobs_buffer_size(0),
+ m_row_buffer(0),
+ m_row_buffer_size(0),
+ m_batch_buffer_size(8192),
+ m_extra_reclength(0),
+ m_ndb_record(0),
+ m_ndb_record_fragment(0),
+ m_ndb_hidden_key_record(0),
+ m_ndb_statistics_record(0),
m_dupkey((uint) -1),
m_ha_not_exact_count(FALSE),
m_force_send(TRUE),
m_autoincrement_prefetch((ha_rows) 32),
m_transaction_on(TRUE),
- m_cond(NULL),
- m_multi_cursor(NULL)
+ m_cond(NULL)
{
int i;
@@ -6240,6 +6892,11 @@ ha_ndbcluster::~ha_ndbcluster()
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
m_blobs_buffer= 0;
+ my_free(m_row_buffer, MYF(MY_ALLOW_ZERO_PTR));
+ m_row_buffer= 0;
+ m_row_buffer_current= 0;
+ m_row_buffer_size= 0;
+
// Check for open cursor/transaction
if (m_active_cursor) {
}
@@ -6261,6 +6918,25 @@ ha_ndbcluster::~ha_ndbcluster()
+void
+ha_ndbcluster::column_bitmaps_signal()
+{
+ DBUG_ENTER("ha_ndbcluster::column_bitmaps_signal");
+ /*
+ We need to make sure we always read all of the primary key.
+ Otherwise we cannot support position() and rnd_pos().
+ */
+ bitmap_union(table->read_set, &m_pk_bitmap);
+ DBUG_VOID_RETURN;
+
+ /*
+ Alternatively, we could just set a flag, and in the reader methods set the
+ extra bits as required if the flag is set, followed by clearing the flag.
+ This to save doing the work of setting bits twice or more.
+ On the other hand this is quite fast in itself.
+ */
+}
+
/*
Open a table for further use
- fetch metadata for this table from NDB
@@ -6329,7 +7005,8 @@ int ha_ndbcluster::open(const char *name
ERR_RETURN(ndb->getNdbError());
}
struct Ndb_statistics stat;
- res= ndb_get_table_statistics(NULL, FALSE, ndb, m_table, &stat);
+ res= ndb_get_table_statistics(NULL, FALSE, ndb, m_ndb_statistics_record,
+ &stat);
stats.mean_rec_length= stat.row_size;
stats.data_file_length= stat.fragment_memory;
stats.records= stat.row_count;
@@ -7581,6 +8258,7 @@ void ha_ndbcluster::set_tabname(const ch
}
+/* ToDo: convert to NdbRecord? */
ha_rows
ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
key_range *max_key)
@@ -7629,7 +8307,8 @@ ha_ndbcluster::records_in_range(uint inx
else
{
Ndb_statistics stat;
- if ((res=ndb_get_table_statistics(this, TRUE, ndb, m_table, &stat)))
+ if ((res=ndb_get_table_statistics(this, TRUE, ndb,
+ m_ndb_statistics_record, &stat)))
break;
table_rows=stat.row_count;
DBUG_PRINT("info", ("use db row_count: %lu", (ulong) table_rows));
@@ -8383,6 +9062,175 @@ void ndbcluster_free_share(NDB_SHARE **s
}
+struct ndb_table_statistics_row {
+ Uint64 rows;
+ Uint64 commits;
+ Uint32 size;
+ Uint64 fixed_mem;
+ Uint64 var_mem;
+};
+
+static
+NdbRecord *
+ndb_get_table_statistics_ndbrecord(NDBDICT *dict, const NDBTAB *table)
+{
+ NdbDictionary::RecordSpecification spec[5];
+ spec[0].column= NdbDictionary::Column::ROW_COUNT;
+ spec[0].offset= offsetof(struct ndb_table_statistics_row, rows);
+ spec[0].nullbit_byte_offset= 0;
+ spec[0].nullbit_bit_in_byte= 0;
+ spec[1].column= NdbDictionary::Column::COMMIT_COUNT;
+ spec[1].offset= offsetof(struct ndb_table_statistics_row, commits);
+ spec[1].nullbit_byte_offset= 0;
+ spec[1].nullbit_bit_in_byte= 0;
+ spec[2].column= NdbDictionary::Column::ROW_SIZE;
+ spec[2].offset= offsetof(struct ndb_table_statistics_row, size);
+ spec[2].nullbit_byte_offset= 0;
+ spec[2].nullbit_bit_in_byte= 0;
+ spec[3].column= NdbDictionary::Column::FRAGMENT_FIXED_MEMORY;
+ spec[3].offset= offsetof(struct ndb_table_statistics_row, fixed_mem);
+ spec[3].nullbit_byte_offset= 0;
+ spec[3].nullbit_bit_in_byte= 0;
+ spec[4].column= NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY;
+ spec[4].offset= offsetof(struct ndb_table_statistics_row, var_mem);
+ spec[4].nullbit_byte_offset= 0;
+ spec[4].nullbit_bit_in_byte= 0;
+
+ return dict->createRecord(table, spec,
+ sizeof(spec)/sizeof(spec[0]), sizeof(spec[0]), 0);
+}
+
+static
+int
+ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb,
+ const NdbRecord *record,
+ struct Ndb_statistics * ndbstat)
+{
+ NdbTransaction* pTrans;
+ NdbError error;
+ int retries= 10;
+ int reterr= 0;
+ int retry_sleep= 30 * 1000; /* 30 milliseconds */
+ const char *row;
+ char buff[22], buff2[22], buff3[22], buff4[22];
+ DBUG_ENTER("ndb_get_table_statistics");
+
+ DBUG_ASSERT(record != 0);
+
+ do
+ {
+ Uint32 count= 0;
+ Uint64 sum_rows= 0;
+ Uint64 sum_commits= 0;
+ Uint64 sum_row_size= 0;
+ Uint64 sum_mem= 0;
+ NdbScanOperation*pOp;
+ int check;
+
+ if ((pTrans= ndb->startTransaction()) == NULL)
+ {
+ error= ndb->getNdbError();
+ goto retry;
+ }
+
+ /* Set batch_size=1, as we need only one row per fragment. */
+ if ((pOp= pTrans->scanTable(record, NdbOperation::LM_CommittedRead,
+ NULL, 0, 0, 1)) == NULL)
+ {
+ error= pTrans->getNdbError();
+ goto retry;
+ }
+
+ if (pOp->interpret_exit_last_row() == -1)
+ {
+ error= pOp->getNdbError();
+ goto retry;
+ }
+
+ if (pTrans->execute(NdbTransaction::NoCommit,
+ NdbOperation::AbortOnError,
+ TRUE) == -1)
+ {
+ error= pTrans->getNdbError();
+ goto retry;
+ }
+
+ while ((check= pOp->nextResult(row, TRUE, TRUE)) == 0)
+ {
+ /* NDB API ensures proper alignment of rows to make the cast valid. */
+ const ndb_table_statistics_row *stat=
+ (const ndb_table_statistics_row *)row;
+ sum_rows+= stat->rows;
+ sum_commits+= stat->commits;
+ if (sum_row_size < stat->size)
+ sum_row_size= stat->size;
+ sum_mem+= stat->fixed_mem + stat->var_mem;
+ count++;
+ }
+
+ if (check == -1)
+ {
+ error= pOp->getNdbError();
+ goto retry;
+ }
+
+ pOp->close(TRUE);
+
+ ndb->closeTransaction(pTrans);
+
+ ndbstat->row_count= sum_rows;
+ ndbstat->commit_count= sum_commits;
+ ndbstat->row_size= sum_row_size;
+ ndbstat->fragment_memory= sum_mem;
+
+ DBUG_PRINT("exit", ("records: %s commits: %s "
+ "row_size: %s mem: %s count: %u",
+ llstr(sum_rows, buff),
+ llstr(sum_commits, buff2),
+ llstr(sum_row_size, buff3),
+ llstr(sum_mem, buff4),
+ count));
+
+ DBUG_RETURN(0);
+retry:
+ if(report_error)
+ {
+ if (file && pTrans)
+ {
+ reterr= file->ndb_err(pTrans);
+ }
+ else
+ {
+ const NdbError& tmp= error;
+ ERR_PRINT(tmp);
+ reterr= ndb_to_mysql_error(&tmp);
+ }
+ }
+ else
+ reterr= error.code;
+
+ if (pTrans)
+ {
+ ndb->closeTransaction(pTrans);
+ pTrans= NULL;
+ }
+ if (error.status == NdbError::TemporaryError && retries--)
+ {
+ my_sleep(retry_sleep);
+ continue;
+ }
+ break;
+ } while(1);
+ DBUG_PRINT("exit", ("failed, reterr: %u, NdbError %u(%s)", reterr,
+ error.code, error.message));
+ DBUG_RETURN(reterr);
+}
+
+/*
+ Query cache stuff still call this NdbRecAttr-based version, as they
+ have no easy access to a pre-computed NdbRecord for the table. Could be
+ fixed to also use the NdbRecord version.
+*/
static
int
ndb_get_table_statistics(ha_ndbcluster* file, bool report_error, Ndb* ndb, const NDBTAB *ndbtab,
@@ -8424,8 +9272,12 @@ ndb_get_table_statistics(ha_ndbcluster*
error= pTrans->getNdbError();
goto retry;
}
-
- if (pOp->readTuples(NdbOperation::LM_CommittedRead))
+
+ /*
+ Set batch_size = 1 to avoid allocating unnecessary NdbRecAttr's.
+ We will in any case only read a single row from each fragment.
+ */
+ if (pOp->readTuples(NdbOperation::LM_CommittedRead, 0, 0, 1))
{
error= pOp->getNdbError();
goto retry;
@@ -8595,6 +9447,59 @@ ha_ndbcluster::null_value_index_search(K
DBUG_RETURN(FALSE);
}
+/*
+ This is used to check if an ordered index scan is needed for a range in
+ a multi range read.
+ If a scan is not needed, we use a faster primary/unique key operation
+ instead.
+*/
+static my_bool
+read_multi_needs_scan(NDB_INDEX_TYPE cur_index_type, const KEY *key_info,
+ const KEY_MULTI_RANGE *r)
+{
+ if (cur_index_type == ORDERED_INDEX)
+ return TRUE;
+ if (cur_index_type == PRIMARY_KEY_INDEX ||
+ cur_index_type == UNIQUE_INDEX)
+ return FALSE;
+ DBUG_ASSERT(cur_index_type == PRIMARY_KEY_ORDERED_INDEX ||
+ cur_index_type == UNIQUE_ORDERED_INDEX);
+ if (r->start_key.length != key_info->key_length ||
+ r->start_key.flag != HA_READ_KEY_EXACT)
+ return TRUE; // Not exact match, need scan
+ if (cur_index_type == UNIQUE_ORDERED_INDEX &&
+ check_null_in_key(key_info, r->start_key.key,r->start_key.length))
+ return TRUE; // Can't use for NULL values
+ return FALSE;
+}
+
+struct read_multi_callback_data {
+ const KEY *key_info;
+ const KEY_MULTI_RANGE *first_range;
+ const KEY_MULTI_RANGE *range;
+};
+
+/* Callback to set up scan bounds for read multi range. */
+static int
+read_multi_bounds_callback(void *arg, Uint32 i,
+ NdbIndexScanOperation::IndexBound & bound)
+{
+ struct read_multi_callback_data *data=
+ (struct read_multi_callback_data *)arg;
+
+ /* Skip any ranges not to be included in the scan. */
+ while (data->range->range_flag & (SKIP_RANGE|UNIQUE_RANGE))
+ data->range++;
+
+ compute_index_bounds(bound, data->key_info,
+ &data->range->start_key, &data->range->end_key);
+ bound.range_no= data->range - data->first_range;
+
+ data->range++;
+
+ return 0; // Success
+}
+
int
ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
KEY_MULTI_RANGE *ranges,
@@ -8602,23 +9507,25 @@ ha_ndbcluster::read_multi_range_first(KE
bool sorted,
HANDLER_BUFFER *buffer)
{
- m_write_op= FALSE;
- int res;
KEY* key_info= table->key_info + active_index;
NDB_INDEX_TYPE cur_index_type= get_index_type(active_index);
ulong reclength= table_share->reclength;
NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
+ struct read_multi_callback_data data;
+
DBUG_ENTER("ha_ndbcluster::read_multi_range_first");
+ DBUG_PRINT("info", ("blob fields=%d read_set=0x%x", table_share->blob_fields, table->read_set->bitmap[0]));
/**
* blobs and unique hash index with NULL can't be batched currently
*/
- if (uses_blob_value() ||
+ if (uses_blob_value(table->read_set) ||
(cur_index_type == UNIQUE_INDEX &&
has_null_in_unique_index(active_index) &&
null_value_index_search(ranges, ranges+range_count, buffer)))
{
+ DBUG_PRINT("info", ("read_multi_range not possible, falling back to default handler implementation"));
m_disable_multi_read= TRUE;
DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges,
@@ -8648,30 +9555,35 @@ ha_ndbcluster::read_multi_range_first(KE
* range 3 range (3,5) NOTE result rows will be intermixed
* pk-op 4 pk-op 4
* range 5
- * pk-op 6 pk-ok 6
+ * pk-op 6 pk-op 6
*/
- /**
- * Variables for loop
- */
- byte *curr= (byte*)buffer->buffer;
- byte *end_of_buffer= (byte*)buffer->buffer_end;
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- bool need_pk = (lm == NdbOperation::LM_Read);
- const NDBTAB *tab= m_table;
- const NDBINDEX *unique_idx= m_index[active_index].unique_index;
- const NDBINDEX *idx= m_index[active_index].index;
+ /*
+ We first loop over all ranges, converting into primary/unique key
+ operations if possible, and counting ranges that require an
+ ordered index scan. If the supplied HANDLER_BUFFER is too small, we
+ may also need to do only part of the multi read at once.
+
+ Afterwards, we create the ordered index scan cursor (if needed).
+ */
+
+ DBUG_ASSERT(cur_index_type != UNDEFINED_INDEX);
+
const NdbOperation* lastOp= m_active_trans->getLastDefinedOperation();
- NdbIndexScanOperation* scanOp= 0;
- for (; multi_range_curr<multi_range_end && curr+reclength <= end_of_buffer;
- multi_range_curr++)
+ NdbOperation::LockMode lm=
+ (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set);
+ byte *row_buf= (byte*)buffer->buffer;
+ byte *end_of_buffer= (byte*)buffer->buffer_end;
+ uint num_scan_ranges= 0;
+ uint i;
+ for (i= 0; i < range_count; i++)
{
+ KEY_MULTI_RANGE *r= &ranges[i];
+
part_id_range part_spec;
if (m_use_partition_function)
{
- get_partition_set(table, curr, active_index,
- &multi_range_curr->start_key,
+ get_partition_set(table, table->record[0], active_index, &r->start_key,
&part_spec);
DBUG_PRINT("info", ("part_spec.start_part: %u part_spec.end_part: %u",
part_spec.start_part, part_spec.end_part));
@@ -8685,311 +9597,288 @@ ha_ndbcluster::read_multi_range_first(KE
We can skip this partition since the key won't fit into any
partition
*/
- curr += reclength;
- multi_range_curr->range_flag |= SKIP_RANGE;
+ r->range_flag|= SKIP_RANGE;
continue;
}
}
- switch (cur_index_type) {
- case PRIMARY_KEY_ORDERED_INDEX:
- if (!(multi_range_curr->start_key.length == key_info->key_length &&
- multi_range_curr->start_key.flag == HA_READ_KEY_EXACT))
- goto range;
- // else fall through
- case PRIMARY_KEY_INDEX:
- {
- multi_range_curr->range_flag |= UNIQUE_RANGE;
- if ((op= m_active_trans->getNdbOperation(tab)) &&
- !op->readTuple(lm) &&
- !set_primary_key(op, multi_range_curr->start_key.key) &&
- !define_read_attrs(curr, op) &&
- (!m_use_partition_function ||
- (op->setPartitionId(part_spec.start_part), TRUE)))
- curr += reclength;
- else
- ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
- break;
- }
- break;
- case UNIQUE_ORDERED_INDEX:
- if (!(multi_range_curr->start_key.length == key_info->key_length &&
- multi_range_curr->start_key.flag == HA_READ_KEY_EXACT &&
- !check_null_in_key(key_info, multi_range_curr->start_key.key,
- multi_range_curr->start_key.length)))
- goto range;
- // else fall through
- case UNIQUE_INDEX:
- {
- multi_range_curr->range_flag |= UNIQUE_RANGE;
- if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
- !op->readTuple(lm) &&
- !set_index_key(op, key_info, multi_range_curr->start_key.key) &&
- !define_read_attrs(curr, op))
- curr += reclength;
- else
- ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
- break;
- }
- case ORDERED_INDEX: {
- range:
- multi_range_curr->range_flag &= ~(uint)UNIQUE_RANGE;
- if (scanOp == 0)
- {
- if (m_multi_cursor)
- {
- scanOp= m_multi_cursor;
- DBUG_ASSERT(scanOp->getSorted() == sorted);
- DBUG_ASSERT(scanOp->getLockMode() ==
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
- if (scanOp->reset_bounds(m_force_send))
- DBUG_RETURN(ndb_err(m_active_trans));
-
- end_of_buffer -= reclength;
- }
- else if ((scanOp= m_active_trans->getNdbIndexScanOperation(idx, tab))
- &&!scanOp->readTuples(lm, 0, parallelism, sorted,
- FALSE, TRUE, need_pk, TRUE)
- &&!(m_cond && m_cond->generate_scan_filter(scanOp))
- &&!define_read_attrs(end_of_buffer-reclength, scanOp))
- {
- m_multi_cursor= scanOp;
- m_multi_range_cursor_result_ptr= end_of_buffer-reclength;
- }
- else
- {
- ERR_RETURN(scanOp ? scanOp->getNdbError() :
- m_active_trans->getNdbError());
- }
- }
+ r->range_flag&= ~(uint)SKIP_RANGE;
- const key_range *keys[2]= { &multi_range_curr->start_key,
- &multi_range_curr->end_key };
- if ((res= set_bounds(scanOp, active_index, FALSE, keys,
- multi_range_curr-ranges)))
- DBUG_RETURN(res);
- break;
+ if (read_multi_needs_scan(cur_index_type, key_info, r))
+ {
+ /*
+ If we reach the limit of ranges allowed in a single scan: stop
+ here, send what we have so far, and continue when done with that.
+ */
+ if (i > NdbIndexScanOperation::MaxRangeNo)
+ break;
+
+ /* Include this range in the ordered index scan. */
+ r->range_flag&= ~(uint)UNIQUE_RANGE;
+ num_scan_ranges++;
}
- case UNDEFINED_INDEX:
- DBUG_ASSERT(FALSE);
- DBUG_RETURN(1);
- break;
+ else
+ {
+ /*
+ Convert to primary/unique key operation.
+
+ If there is not enough buffer for reading the row: stop here, send
+ what we have so far, and continue when done with that.
+ */
+ if (row_buf + reclength > end_of_buffer)
+ break;
+
+ r->range_flag |= UNIQUE_RANGE;
+
+ if (!(op= pk_unique_index_read_key(active_index,
+ r->start_key.key,
+ row_buf, lm)))
+ ERR_RETURN(m_active_trans->getNdbError());
+
+ if (m_use_partition_function &&
+ (cur_index_type == PRIMARY_KEY_ORDERED_INDEX ||
+ cur_index_type == PRIMARY_KEY_INDEX))
+ op->setPartitionId(part_spec.start_part);
+
+ row_buf+= reclength;
}
+
}
-
- if (multi_range_curr != multi_range_end)
+ DBUG_ASSERT(i > 0 || i == range_count); // Require progress
+ m_multi_range_defined_end= ranges + i;
+ if (num_scan_ranges > 0)
{
- /**
- * Mark that we're using entire buffer (even if might not) as
- * we haven't read all ranges for some reason
- * This as we don't want mysqld to reuse the buffer when we read
- * the remaining ranges
- */
- buffer->end_of_used_area= (byte*)buffer->buffer_end;
+ /* Do a multi-range index scan for ranges not done by primary/unique key. */
+ uchar *mask;
+
+ data.key_info= key_info;
+ data.first_range= ranges;
+ data.range= data.first_range;
+
+ Uint32 flags= NdbScanOperation::SF_ReadRangeNo;
+ if (lm == NdbOperation::LM_Read)
+ flags|= NdbScanOperation::SF_KeyInfo;
+ if (sorted)
+ flags|= NdbScanOperation::SF_OrderBy;
+
+ if (m_use_partition_function || table_share->primary_key == MAX_KEY)
+ {
+ mask= copy_column_set(table->read_set);
+ if (table_share->primary_key == MAX_KEY)
+ request_hidden_key(mask);
+ }
+ else
+ mask= (uchar *)(table->read_set->bitmap);
+
+ NdbIndexScanOperation *scanOp= m_active_trans->scanIndex
+ (m_index[active_index].ndb_record_key, read_multi_bounds_callback,
+ &data, num_scan_ranges, m_index[active_index].ndb_record_row, lm,
+ mask, flags, parallelism, 0);
+ if (!scanOp)
+ ERR_RETURN(m_active_trans->getNdbError());
+ m_active_cursor= scanOp;
+
+ if (uses_blob_value(table->read_set) &&
+ get_blob_values(scanOp, NULL, table->read_set) != 0)
+ ERR_RETURN(op->getNdbError());
+
+ if (m_cond && m_cond->generate_scan_filter(scanOp))
+ ERR_RETURN(scanOp->getNdbError());
+
+ /* We set m_next_row=0 to say that no row was fetched from the scan yet. */
+ m_next_row= 0;
}
else
{
- buffer->end_of_used_area= curr;
+ m_active_cursor= 0;
}
-
+
+ buffer->end_of_used_area= row_buf;
+
/**
* Set first operation in multi range
*/
m_current_multi_operation=
lastOp ? lastOp->next() : m_active_trans->getFirstDefinedOperation();
- if (!(res= execute_no_commit_ie(this, m_active_trans,true)))
- {
- m_multi_range_defined= multi_range_curr;
- multi_range_curr= ranges;
- m_multi_range_result_ptr= (byte*)buffer->buffer;
- DBUG_RETURN(read_multi_range_next(found_range_p));
- }
- ERR_RETURN(m_active_trans->getNdbError());
-}
+ if (execute_no_commit_ie(this, m_active_trans, true))
+ ERR_RETURN(m_active_trans->getNdbError());
-#if 0
-#define DBUG_MULTI_RANGE(x) DBUG_PRINT("info", ("read_multi_range_next: case %d\n", x));
-#else
-#define DBUG_MULTI_RANGE(x)
-#endif
+ m_multi_range_result_ptr= (byte*)buffer->buffer;
+
+ DBUG_RETURN(read_multi_range_next(found_range_p));
+}
int
ha_ndbcluster::read_multi_range_next(KEY_MULTI_RANGE ** multi_range_found_p)
{
+ int res;
+
DBUG_ENTER("ha_ndbcluster::read_multi_range_next");
if (m_disable_multi_read)
{
- DBUG_MULTI_RANGE(11);
DBUG_RETURN(handler::read_multi_range_next(multi_range_found_p));
}
-
- int res;
- int range_no;
- ulong reclength= table_share->reclength;
- const NdbOperation* op= m_current_multi_operation;
- for (;multi_range_curr < m_multi_range_defined; multi_range_curr++)
+
+ while (multi_range_curr < m_multi_range_defined_end)
{
- DBUG_MULTI_RANGE(12);
if (multi_range_curr->range_flag & SKIP_RANGE)
- continue;
- if (multi_range_curr->range_flag & UNIQUE_RANGE)
{
- if (op->getNdbError().code == 0)
- {
- DBUG_MULTI_RANGE(13);
- goto found_next;
- }
-
- op= m_active_trans->getNextCompletedOperation(op);
- m_multi_range_result_ptr += reclength;
- continue;
- }
- else if (m_multi_cursor && !multi_range_sorted)
+ /* Nothing in this range, move to next one. */
+ multi_range_curr++;
+ }
+ else if(multi_range_curr->range_flag & UNIQUE_RANGE)
{
- DBUG_MULTI_RANGE(1);
- if ((res= fetch_next(m_multi_cursor)) == 0)
- {
- DBUG_MULTI_RANGE(2);
- range_no= m_multi_cursor->get_range_no();
- goto found;
- }
- else
+ /*
+ Move to next range; we can have at most one record from a unique range.
+ */
+ KEY_MULTI_RANGE *old_multi_range_curr= multi_range_curr;
+ multi_range_curr= old_multi_range_curr + 1;
+ const NdbOperation *op= m_current_multi_operation;
+ m_current_multi_operation= m_active_trans->getNextCompletedOperation(op);
+ byte *src_row= m_multi_range_result_ptr;
+ m_multi_range_result_ptr= src_row + table_share->reclength;
+
+ const NdbError &error= op->getNdbError();
+ if (error.code == 0)
+ {
+ *multi_range_found_p= old_multi_range_curr;
+ memcpy(table->record[0], src_row, table_share->reclength);
+ if (table_share->primary_key == MAX_KEY)
+ {
+ m_ref= get_hidden_key(src_row);
+ if (m_use_partition_function)
+ m_part_id= get_partition_fragment(src_row);
+ }
+ DBUG_RETURN(0);
+ }
+ else if (error.classification != NdbError::NoDataFound)
{
- DBUG_MULTI_RANGE(14);
- goto close_scan;
+ DBUG_RETURN(ndb_err(m_active_trans));
}
+
+ /* No row found, so fall through to try the next range. */
}
- else if (m_multi_cursor && multi_range_sorted)
+ else
{
- if (m_active_cursor && (res= fetch_next(m_multi_cursor)))
+ /* An index scan range. */
+ if ((res= read_multi_range_fetch_next()) != 0)
+ DBUG_RETURN(res);
+
+ if (!m_next_row)
{
- DBUG_MULTI_RANGE(3);
- goto close_scan;
- }
-
- range_no= m_multi_cursor->get_range_no();
- uint current_range_no= multi_range_curr - m_multi_ranges;
- if ((uint) range_no == current_range_no)
- {
- DBUG_MULTI_RANGE(4);
- // return current row
- goto found;
- }
- else if (range_no > (int)current_range_no)
- {
- DBUG_MULTI_RANGE(5);
- // wait with current row
- m_active_cursor= 0;
- continue;
+ /*
+ The whole scan is done, and the cursor has been closed.
+ So nothing more for this range. Move to next.
+ */
+ multi_range_curr++;
}
- else
+ else
{
- DBUG_MULTI_RANGE(6);
- // First fetch from cursor
- DBUG_ASSERT(range_no == -1);
- if ((res= m_multi_cursor->nextResult(TRUE)))
+ int current_range_no= m_current_range_no;
+ int expected_range_no;
+ /*
+ For a sorted index scan, we will receive rows in increasing range_no
+ order, so we can return ranges in order, pausing when range_no
+ indicate that the currently processed range (multi_range_curr) is
+ done.
+
+ But for unsorted scan, we may receive a high range_no from one
+ fragment followed by a low range_no from another fragment. So we
+ need to process all index scan ranges together.
+ */
+ if (!multi_range_sorted ||
+ (expected_range_no= multi_range_curr - m_multi_ranges)
+ == current_range_no)
{
- DBUG_MULTI_RANGE(15);
- goto close_scan;
+ *multi_range_found_p= m_multi_ranges + current_range_no;
+ /* Copy out data from the new row. */
+ if (table_share->primary_key == MAX_KEY)
+ {
+ m_ref= get_hidden_key(m_next_row);
+ if (m_use_partition_function)
+ m_part_id= get_partition_fragment(m_next_row);
+ }
+ unpack_record_ndbrecord(table->record[0], m_next_row);
+ /*
+ Mark that we have used this row, so we need to fetch a new
+ one on the next call.
+ */
+ m_next_row= 0;
+ DBUG_RETURN(0);
+ }
+ else if (current_range_no > expected_range_no)
+ {
+ /* Nothing more in scan for this range. Move to next. */
+ multi_range_curr++;
+ }
+ else
+ {
+ /*
+ Should not happen. Ranges should be returned from NDB API in
+ the order we requested them.
+ */
+ DBUG_ASSERT(0);
+ multi_range_curr++; // Attempt to carry on
}
- multi_range_curr--; // Will be increased in for-loop
- continue;
}
}
- else /** m_multi_cursor == 0 */
- {
- DBUG_MULTI_RANGE(7);
- /**
- * Corresponds to range 5 in example in read_multi_range_first
- */
- (void)1;
- continue;
- }
-
- DBUG_ASSERT(FALSE); // Should only get here via goto's
-close_scan:
- if (res == 1)
- {
- m_multi_cursor->close(FALSE, TRUE);
- m_active_cursor= m_multi_cursor= 0;
- DBUG_MULTI_RANGE(8);
- continue;
- }
- else
- {
- DBUG_MULTI_RANGE(9);
- DBUG_RETURN(ndb_err(m_active_trans));
- }
}
-
+
if (multi_range_curr == multi_range_end)
{
- DBUG_MULTI_RANGE(16);
Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
thd_ndb->query_state&= NDB_QUERY_NORMAL;
DBUG_RETURN(HA_ERR_END_OF_FILE);
}
-
- /**
- * Read remaining ranges
- */
+
+ /*
+ Read remaining ranges
+ */
DBUG_RETURN(read_multi_range_first(multi_range_found_p,
multi_range_curr,
multi_range_end - multi_range_curr,
multi_range_sorted,
multi_range_buffer));
-
-found:
- /**
- * Found a record belonging to a scan
- */
- m_active_cursor= m_multi_cursor;
- * multi_range_found_p= m_multi_ranges + range_no;
- memcpy(table->record[0], m_multi_range_cursor_result_ptr, reclength);
- setup_recattr(m_active_cursor->getFirstRecAttr());
- unpack_record(table->record[0]);
- table->status= 0;
- DBUG_RETURN(0);
-
-found_next:
- /**
- * Found a record belonging to a pk/index op,
- * copy result and move to next to prepare for next call
- */
- * multi_range_found_p= multi_range_curr;
- memcpy(table->record[0], m_multi_range_result_ptr, reclength);
- setup_recattr(op->getFirstRecAttr());
- unpack_record(table->record[0]);
- table->status= 0;
-
- multi_range_curr++;
- m_current_multi_operation= m_active_trans->getNextCompletedOperation(op);
- m_multi_range_result_ptr += reclength;
- DBUG_RETURN(0);
}
+/*
+ Fetch next row from the ordered index cursor in multi range scan.
+
+ We keep the next row in m_next_row, and the range_no of the
+ next row in m_current_range_no. This is used in sorted index scan
+ to correctly interleave rows from primary/unique key operations with
+ rows from the scan.
+*/
int
-ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
+ha_ndbcluster::read_multi_range_fetch_next()
{
- DBUG_ENTER("setup_recattr");
+ int res;
+ NdbIndexScanOperation *cursor= (NdbIndexScanOperation *)m_active_cursor;
- Field **field, **end;
- NdbValue *value= m_value;
-
- end= table->field + table_share->fields;
-
- for (field= table->field; field < end; field++, value++)
+ if (!cursor)
+ return 0; // Scan already done.
+
+ if (!m_next_row)
{
- if ((* value).ptr)
+ res= fetch_next(cursor);
+ if (res == 0)
+ {
+ m_current_range_no= cursor->get_range_no();
+ }
+ else if (res == 1)
+ {
+ /* We have fetched the last row from the scan. */
+ cursor->close(FALSE, TRUE);
+ m_active_cursor= 0;
+ m_next_row= 0;
+ return 0;
+ }
+ else
{
- DBUG_ASSERT(curr != 0);
- NdbValue* val= m_value + curr->getColumn()->getColumnNo();
- DBUG_ASSERT(val->ptr);
- val->rec= curr;
- curr= curr->next();
+ /* An error. */
+ return res;
}
}
-
- DBUG_RETURN(0);
+ return 0;
}
char*
--- 1.178/sql/ha_ndbcluster.h 2007-05-09 13:19:05 +02:00
+++ 1.179/sql/ha_ndbcluster.h 2007-05-09 13:19:05 +02:00
@@ -79,6 +79,16 @@ typedef struct ndb_index_data {
// Simple counter mechanism to decide when to connect to db
uint index_stat_update_freq;
uint index_stat_query_count;
+ /*
+ In mysqld, keys and rows are stored differently (using KEY_PART_INFO for
+ keys and Field for rows).
+ So we need to use different NdbRecord for an index for passing values
+ from a key and from a row.
+ */
+ NdbRecord *ndb_record_key;
+ NdbRecord *ndb_record_row;
+ NdbRecord *ndb_unique_record_key;
+ NdbRecord *ndb_unique_record_row;
} NDB_INDEX_DATA;
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
@@ -222,6 +232,7 @@ class ha_ndbcluster: public handler
~ha_ndbcluster();
int ha_initialise();
+ void column_bitmaps_signal();
int open(const char *name, int mode, uint test_if_locked);
int close(void);
@@ -402,6 +413,10 @@ private:
int drop_indexes(Ndb *ndb, TABLE *tab);
int add_index_handle(THD *thd, NdbDictionary::Dictionary *dict,
KEY *key_info, const char *index_name, uint index_no);
+ int add_table_ndb_record(NdbDictionary::Dictionary *dict);
+ int add_hidden_pk_ndb_record(NdbDictionary::Dictionary *dict);
+ int add_index_ndb_record(NdbDictionary::Dictionary *dict,
+ KEY *key_info, uint index_no);
int get_metadata(const char* path);
void release_metadata(THD *thd, Ndb *ndb);
NDB_INDEX_TYPE get_index_type(uint idx_no) const;
@@ -431,49 +446,79 @@ private:
uint key_len,
byte *buf);
int full_table_scan(byte * buf);
+ int flush_bulk_insert();
+ int ndb_write_row(byte *record, bool primary_key_update, bool batched_update);
+ int ndb_delete_row(const byte *record, bool primary_key_update);
bool check_all_operations_for_error(NdbTransaction *trans,
const NdbOperation *first,
const NdbOperation *last,
uint errcode);
int peek_indexed_rows(const byte *record, bool check_pk);
+ int scan_handle_lock_tuple(NdbScanOperation *scanOp, NdbTransaction *trans);
int fetch_next(NdbScanOperation* op);
int next_result(byte *buf);
- int define_read_attrs(byte* buf, NdbOperation* op);
- int filtered_scan(const byte *key, uint key_len,
- byte *buf,
- enum ha_rkey_function find_flag);
int close_scan();
void unpack_record(byte *buf);
- int get_ndb_lock_type(enum thr_lock_type type);
+ void unpack_record_ndbrecord(byte *dst_row, const byte *src_row);
+ int get_ndb_lock_type(enum thr_lock_type type,
+ const MY_BITMAP *column_bitmap);
void set_dbname(const char *pathname);
void set_tabname(const char *pathname);
- bool set_hidden_key(NdbOperation*,
- uint fieldnr, const byte* field_ptr);
- int set_ndb_key(NdbOperation*, Field *field,
- uint fieldnr, const byte* field_ptr);
- int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
- int row_offset= 0, bool *set_blob_value= 0);
- int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
- int get_ndb_partition_id(NdbOperation *);
+ uint offset_hidden_key() { return table->s->reclength; }
+ uint offset_user_partition_function() {
+ return table->s->reclength +
+ (table_share->primary_key == MAX_KEY ?
+ NDB_HIDDEN_PRIMARY_KEY_LENGTH : 0);
+ }
+ uint offset_user_partition_fragment() {
+ return table->s->reclength +
+ (table_share->primary_key == MAX_KEY ?
+ NDB_HIDDEN_PRIMARY_KEY_LENGTH+4 : 4);
+ }
+ uint field_number_hidden_key() { return table->s->fields; }
+ uint field_number_user_partition_function() {
+ return table->s->fields +
+ (table_share->primary_key == MAX_KEY ? 1 : 0);
+ }
+
+ void set_hidden_key(char *row, Uint64 auto_value);
+ Uint64 get_hidden_key(const char *row);
+ void request_hidden_key(uchar *mask);
+ void set_partition_function_value(char *row, uint32 func_value);
+ uint32 get_partition_fragment(const char *row);
+ void request_partition_function_value(uchar *mask);
+ void calc_batch_buffer_size();
+ int alloc_row_buffer();
+ char *copy_row_to_buffer(const byte *record);
+ char *get_row_buffer();
+ int batch_copy_row_to_buffer(const byte *record,
+ char * &row, bool & buffer_full);
+ void clear_extended_column_set(uchar *mask);
+ uchar *copy_column_set(MY_BITMAP *bitmap);
+
+ int get_blob_values(NdbOperation *ndb_op, byte *dst_record,
+ const MY_BITMAP *bitmap);
+ int set_blob_values(NdbOperation *ndb_op, my_ptrdiff_t row_offset,
+ const MY_BITMAP *bitmap, uint *set_count);
friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
- int set_primary_key(NdbOperation *op, const byte *key);
- int set_primary_key_from_record(NdbOperation *op, const byte *record);
- int set_index_key_from_record(NdbOperation *op, const byte *record,
- uint keyno);
+
+ NdbOperation *pk_unique_index_read_key(uint idx, const byte *key, byte *buf,
+ NdbOperation::LockMode lm);
+ int read_multi_range_fetch_next();
+
int set_bounds(NdbIndexScanOperation*, uint inx, bool rir,
const key_range *keys[2], uint= 0);
- int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
- int set_index_key(NdbOperation *, const KEY *key_info, const byte *key_ptr);
+ int primary_key_cmp(const byte * old_row, const byte * new_row);
void print_results();
virtual void get_auto_increment(ulonglong offset, ulonglong increment,
ulonglong nb_desired_values,
ulonglong *first_value,
ulonglong *nb_reserved_values);
- bool uses_blob_value();
+ bool uses_blob_value(const MY_BITMAP *bitmap);
char *update_table_comment(const char * comment);
@@ -489,6 +534,7 @@ private:
void release_completed_operations(NdbTransaction*, bool);
+ void reset_state_at_execute();
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);
@@ -497,6 +543,31 @@ private:
NdbTransaction *m_active_trans;
NdbScanOperation *m_active_cursor;
const NdbDictionary::Table *m_table;
+ /*
+ Normal NdbRecord for accessing rows, with all fields including hidden
+ fields (hidden primary key, user-defined partitioning function value).
+ */
+ NdbRecord *m_ndb_record;
+ /* As m_ndb_record, but adding the FRAGMENT pseudo-column at end of row. */
+ NdbRecord *m_ndb_record_fragment;
+ /* NdbRecord for accessing tuple by hidden Uint64 primary key. */
+ NdbRecord *m_ndb_hidden_key_record;
+
+ /*
+ Special NdbRecord for ndb_get_table_statistics(), reading lots of
+ pseudo-columns.
+ */
+ NdbRecord *m_ndb_statistics_record;
+ /* Bitmap used for NdbRecord operation column mask. */
+ MY_BITMAP m_bitmap;
+ my_bitmap_map m_bitmap_buf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
+ 8*sizeof(my_bitmap_map) - 1) /
+ (8*sizeof(my_bitmap_map))]; // Buffer for m_bitmap
+ /* Bitmap with bit set for all primary key columns. */
+ MY_BITMAP m_pk_bitmap;
+ my_bitmap_map m_pk_bitmap_buf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
+ 8*sizeof(my_bitmap_map) - 1) /
+ (8*sizeof(my_bitmap_map))]; // Buffer for m_pk_bitmap
struct Ndb_local_table_statistics *m_table_info;
char m_dbname[FN_HEADLEN];
//char m_schemaname[FN_HEADLEN];
@@ -507,24 +578,41 @@ private:
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
THD_NDB_SHARE *m_thd_ndb_share;
+ /*
+ Pointer to row returned from scan nextResult().
+ */
+ const char *m_next_row;
+ /* For read_multi_range scans, the get_range_no() of current row. */
+ int m_current_range_no;
+ /*
+ A buffer of rows for when we cannot pass the mysqld record pointer directly
+ to the NDB API, either because the mysqld buffer is too small (eg. hidden
+ primary key), or because it will not remain valid until execute() (eg.
+ bulk insert).
+ */
+ char *m_row_buffer;
+ uint m_row_buffer_size;
+ char *m_row_buffer_current;
+ /*
+ Size of buffer to fill before sending batched rows to NDB kernel, computed
+ from a heuristic based on row size.
+ */
+ uint m_batch_buffer_size;
+ /* Extra bytes needed in row for hidden fields. */
+ uint m_extra_reclength;
// NdbRecAttr has no reference to blob
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
- byte m_ref[NDB_HIDDEN_PRIMARY_KEY_LENGTH];
+ Uint64 m_ref;
partition_info *m_part_info;
uint32 m_part_id;
- byte *m_rec0;
- Field **m_part_field_array;
bool m_use_partition_function;
bool m_sorted;
bool m_use_write;
bool m_ignore_dup_key;
bool m_has_unique_index;
- bool m_primary_key_update;
- bool m_write_op;
bool m_ignore_no_key;
ha_rows m_rows_to_insert; // TODO: merge it with handler::estimation_rows_to_insert?
ha_rows m_rows_inserted;
- ha_rows m_bulk_insert_rows;
ha_rows m_rows_changed;
bool m_bulk_insert_not_flushed;
bool m_delete_cannot_batch;
@@ -533,7 +621,13 @@ private:
bool m_skip_auto_increment;
bool m_blobs_pending;
bool m_slow_path;
- my_ptrdiff_t m_blobs_offset;
+
+ /* State for setActiveHook() callback for reading blob data. */
+ uint m_blob_counter;
+ uint m_blob_expected_count;
+ byte *m_blob_destination_record;
+ Uint64 m_blob_total_size;
+
// memory for blobs in one tuple
char *m_blobs_buffer;
uint32 m_blobs_buffer_size;
@@ -548,11 +642,13 @@ private:
bool m_disable_multi_read;
byte *m_multi_range_result_ptr;
KEY_MULTI_RANGE *m_multi_ranges;
- KEY_MULTI_RANGE *m_multi_range_defined;
+ /*
+ Points 1 past the end of last multi range operation currently being
+ executed, to support splitting large multi range reands into manageable
+ pieces.
+ */
+ KEY_MULTI_RANGE *m_multi_range_defined_end;
const NdbOperation *m_current_multi_operation;
- NdbIndexScanOperation *m_multi_cursor;
- byte *m_multi_range_cursor_result_ptr;
- int setup_recattr(const NdbRecAttr*);
Ndb *get_ndb();
};
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2525) | knielsen | 9 May |