From: Sergey Petrunia Date: February 2 2008 3:41pm Subject: bk commit into 6.0 tree (sergefp:1.2799) WL#4086 List-Archive: http://lists.mysql.com/commits/41597 Message-Id: <20080202154146.2DEB2FBC15@pylon.mylan> Below is the list of changes that have just been committed into a local 6.0 repository of sergefp. When sergefp does a push these changes will be propagated to the main repository and, within 24 hours after the push, to the public repository. For information on how to access the public repository see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html ChangeSet@stripped, 2008-02-02 18:41:37+03:00, sergefp@stripped +8 -0 Kristian's patch: WL#4086: NdbRecord interface modifications Remove extra copy of rows between NdbOperation creation and execute(), no longer needed with new NdbRecord interface. Add some new copying for two special cases in batched slave operation: conflict resolution and blob updates. Some small fixes. sql/ha_ndbcluster.cc@stripped, 2008-02-02 18:41:30+03:00, sergefp@stripped +264 -557 Remove extra copy of rows between NdbOperation creation and execute(). Fix bug in read_multi_range with wrong range_no. Remove bad setting of partitionId in updateCurrentTuple() and deleteCurrentTuple(). Add new row copy needed for two special cases: update with conflict resolution in slave thread, and batched slave updates with blobs. sql/ha_ndbcluster.h@stripped, 2008-02-02 18:41:30+03:00, sergefp@stripped +16 -42 Remove extra copy of rows between NdbOperation creation and execute(). Add new row copy needed for two special cases: update with conflict resolution in slave thread, and batched slave updates with blobs. storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2008-02-02 18:41:30+03:00, sergefp@stripped +2 -1 Add the ability to specify key and row with different NdbRecord for insertTuple(). storage/ndb/src/ndbapi/NdbOperationDefine.cpp@stripped, 2008-02-02 18:41:30+03:00, sergefp@stripped +4 -4 Add the ability to use extra setValue to set primary key columns in insertTuple(). storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2008-02-02 18:41:30+03:00, sergefp@stripped +6 -4 Fix the simple and dirty flags for key operations; they were copied into the signal train too early, before being correctly computed. storage/ndb/src/ndbapi/NdbScanFilter.cpp@stripped, 2008-02-02 18:41:31+03:00, sergefp@stripped +2 -0 Fix scanfilter to work (at least for ha_ndbcluster.cc usage). storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2008-02-02 18:41:31+03:00, sergefp@stripped +12 -10 Fix scanfilter to work (at least for ha_ndbcluster.cc usage). storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2008-02-02 18:41:31+03:00, sergefp@stripped +11 -7 Add the ability to specify key and row with different NdbRecord for insertTuple(). diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc --- a/sql/ha_ndbcluster.cc 2008-02-02 18:28:00 +03:00 +++ b/sql/ha_ndbcluster.cc 2008-02-02 18:41:30 +03:00 @@ -629,31 +629,6 @@ bool ha_ndbcluster::get_error_message(in } -void -ha_ndbcluster::set_hidden_key(uchar *row, Uint64 auto_value) -{ - /* The hidden primary key is stored just after the normal row data. */ - uint32 offset= offset_hidden_key(); - DBUG_ASSERT(offset + NDB_HIDDEN_PRIMARY_KEY_LENGTH <= table->s->reclength + m_extra_reclength); - memcpy(&row[offset], &auto_value, NDB_HIDDEN_PRIMARY_KEY_LENGTH); -} - -Uint64 -ha_ndbcluster::get_hidden_key(const uchar *row) -{ - Uint64 hidden_key; - uint32 offset= offset_hidden_key(); - memcpy(&hidden_key, &row[offset], NDB_HIDDEN_PRIMARY_KEY_LENGTH); - return hidden_key; -} - -void -ha_ndbcluster::request_hidden_key(uchar *mask) -{ - uint32 field_no= field_number_hidden_key(); - mask[field_no>>3]|= (1 << (field_no & 7)); -} - /* Check if MySQL field type forces var part in ndb storage */ @@ -674,139 +649,58 @@ static bool field_type_forces_var_part(e } } -void -ha_ndbcluster::set_partition_function_value(uchar *row, uint32 func_value) -{ - /* The partition function value is stored just after the hidden primary - key (if any). */ - uint32 offset= offset_user_partition_function(); - DBUG_ASSERT(offset + 4 <= table->s->reclength + m_extra_reclength); - memcpy(&row[offset], &func_value, 4); -} - -uint32 -ha_ndbcluster::get_partition_fragment(const uchar *row) -{ - uint32 fragment; - uint32 offset= offset_user_partition_fragment(); - memcpy(&fragment, &row[offset], 4); - return fragment; -} - -void -ha_ndbcluster::request_partition_function_value(uchar *mask) -{ - uint32 field_no= field_number_user_partition_function(); - mask[field_no>>3]|= (1 << (field_no & 7)); -} - -static inline uchar * -alloc_batch_row(Thd_ndb *thd_ndb, uint size) +/* + * This is used for every additional row operation, to update the guesstimate + * of pending bytes to send, and to check if it is now time to flush a batch. + */ +bool +ha_ndbcluster::add_row_check_if_batch_full_size(Thd_ndb *thd_ndb, uint size) { - /* - We only reset the batch mem_root on first allocate after execute(), not - immediately at execute() time. - This is so that we have the chance after execute() to copy out data from - any read buffers. - */ if (thd_ndb->m_unsent_bytes == 0) free_root(&(thd_ndb->m_batch_mem_root), MY_MARK_BLOCKS_FREE); - return (uchar*)alloc_root(&(thd_ndb->m_batch_mem_root), size); -} -/* - Copy a record into a newly allocated buffer. - The returned record is valid until next execute() (actually until next - allocation after next execute()). - The input parameter op_batch_size is an estimate of the signal bytes - needed for the operation; this is used to set the output parameter - batch_full to true when it is time to flush the batch with execute(). -*/ -uchar * -ha_ndbcluster::batch_copy_row_to_buffer(Thd_ndb *thd_ndb, const uchar *record, - bool & batch_full) -{ - uchar *row= copy_row_to_buffer(thd_ndb, record); - if (unlikely(!row)) - return NULL; uint unsent= thd_ndb->m_unsent_bytes; - unsent+= m_bytes_per_write; - batch_full= unsent >= BATCH_FLUSH_SIZE; + unsent+= size; thd_ndb->m_unsent_bytes= unsent; - return row; + return unsent >= BATCH_FLUSH_SIZE; } +/* + Return a generic buffer that will remain valid until after next execute. + + The memory is freed by the first call to add_row_check_if_batch_full_size() + following any execute() call. The intention is that the memory is associated + with one batch of operations during batched slave updates. + + Note in particular that using get_buffer() / copy_row_to_buffer() separately + from add_row_check_if_batch_full_size() could make meory usage grow without + limit, and that this sequence: + + execute() + get_buffer() / copy_row_to_buffer() + add_row_check_if_batch_full_size() + ... + execute() + + will free the memory already at add_row_check_if_batch_full_size() time, it + will not remain valid until the second execute(). +*/ uchar * -ha_ndbcluster::batch_copy_key_to_buffer(Thd_ndb *thd_ndb, const uchar *key, - uint key_len, - uint op_batch_size, bool & batch_full) +ha_ndbcluster::get_buffer(Thd_ndb *thd_ndb, uint size) { - uchar *row= alloc_batch_row(thd_ndb, key_len); - if (unlikely(!row)) - return NULL; - memcpy(row, key, key_len); - uint unsent= thd_ndb->m_unsent_bytes; - unsent+= op_batch_size; - DBUG_ASSERT(op_batch_size > 0); - batch_full= unsent >= BATCH_FLUSH_SIZE; - thd_ndb->m_unsent_bytes= unsent; - return row; + return (uchar*)alloc_root(&(thd_ndb->m_batch_mem_root), size); } -/* - Simpler row buffer copy, for when we know we will not batch. - Only valid until next buffer allocation. -*/ uchar * ha_ndbcluster::copy_row_to_buffer(Thd_ndb *thd_ndb, const uchar *record) { - uchar *row; - uint size= table->s->reclength + m_extra_reclength; - row= alloc_batch_row(thd_ndb, size); + uchar *row= get_buffer(thd_ndb, table->s->reclength); if (unlikely(!row)) return NULL; memcpy(row, record, table->s->reclength); return row; } -/* Return a row buffer, valid until next execute(). */ -uchar * -ha_ndbcluster::get_row_buffer() -{ - Thd_ndb *thd_ndb= get_thd_ndb(table->in_use); - return (uchar*)alloc_root(&(thd_ndb->m_batch_mem_root), - table->s->reclength + m_extra_reclength); -} - -/* - When using extra hidden columns, the mysqld column bitmaps do not - include bits for the extra columns, so we use this method to initialize - them (after copying the mysqld bitmap to a larger one). -*/ -void -ha_ndbcluster::clear_extended_column_set(uchar *mask) -{ - if (table_share->primary_key == MAX_KEY) - { - uint32 field_no= field_number_hidden_key(); - mask[field_no>>3]&= ~(1 << (field_no & 7)); - } - if (m_user_defined_partitioning) - { - uint32 field_no= field_number_user_partition_function(); - mask[field_no>>3]&= ~(1 << (field_no & 7)); - } -} - -uchar * -ha_ndbcluster::copy_column_set(MY_BITMAP *bitmap) -{ - bitmap_copy(&m_bitmap, bitmap); - uchar *mask= (uchar *)m_bitmap_buf; - clear_extended_column_set(mask); - return mask; -} - int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg) { ha_ndbcluster *ha= (ha_ndbcluster *)arg; @@ -961,8 +855,9 @@ ha_ndbcluster::get_blob_values(const Ndb } int -ha_ndbcluster::set_blob_values(const NdbOperation *ndb_op, my_ptrdiff_t row_offset, - const MY_BITMAP *bitmap, uint *set_count) +ha_ndbcluster::set_blob_values(const NdbOperation *ndb_op, + my_ptrdiff_t row_offset, const MY_BITMAP *bitmap, + uint *set_count, bool batch) { uint field_no; uint *blob_index, *blob_index_end; @@ -986,11 +881,11 @@ ha_ndbcluster::set_blob_values(const Ndb NdbBlob *ndb_blob= ndb_op->getBlobHandle(field_no); if (ndb_blob == NULL) - DBUG_RETURN(1); + ERR_RETURN(ndb_op->getNdbError()); if (field->is_null_in_record_with_offset(row_offset)) { if (ndb_blob->setNull() != 0) - DBUG_RETURN(1); + ERR_RETURN(ndb_op->getNdbError()); } else { @@ -1012,10 +907,21 @@ ha_ndbcluster::set_blob_values(const Ndb (long) blob_ptr, blob_len)); DBUG_DUMP("value", blob_ptr, min(blob_len, 26)); - // No callback needed to write value + /* + NdbBlob requires the data pointer to remain valid until execute() time. + So when batching, we need to copy the value to a temporary buffer. + */ + if (batch && blob_len > 0) + { + uchar *tmp_buf= get_buffer(m_thd_ndb, blob_len); + if (!tmp_buf) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + memcpy(tmp_buf, blob_ptr, blob_len); + blob_ptr= tmp_buf; + } res= ndb_blob->setValue((char*)blob_ptr, blob_len); if (res != 0) - DBUG_RETURN(1); + ERR_RETURN(ndb_op->getNdbError()); } ++(*set_count); @@ -1511,58 +1417,12 @@ ha_ndbcluster::add_table_ndb_record(NDBD ndb_set_record_specification(i, &spec[i], table, m_table); } - uint32 size= 0; - if (table_share->primary_key == MAX_KEY) - { - /* Access to the hidden primary key. */ - spec[i].column= m_table->getColumn(i); - spec[i].offset= offset_hidden_key(); - spec[i].nullbit_byte_offset= 0; - spec[i].nullbit_bit_in_byte= 0; - size+= NDB_HIDDEN_PRIMARY_KEY_LENGTH; - i++; - } - if (m_user_defined_partitioning) - { - /* Access to the hidden partition function column. */ - spec[i].column= m_table->getColumn(i); - spec[i].offset= offset_user_partition_function(); - spec[i].nullbit_byte_offset= 0; - spec[i].nullbit_bit_in_byte= 0; - size+= 4; - i++; - } - rec= dict->createRecord(m_table, spec, i, sizeof(spec[0]), NdbDictionary::RecMysqldBitfield); if (! rec) ERR_RETURN(dict->getNdbError()); m_ndb_record= rec; - /* - We need a different NdbRecord for reading the FRAGMENT pseudo-column, - as pseudo-columns cannot be enabled/disabled with bitmask. - */ - if (m_user_defined_partitioning && table_share->primary_key == MAX_KEY) - { - spec[i].column= NdbDictionary::Column::FRAGMENT; - spec[i].offset= offset_user_partition_fragment(); - spec[i].nullbit_byte_offset= 0; - spec[i].nullbit_bit_in_byte= 0; - size+= 4; - i++; - - rec= dict->createRecord(m_table, spec, i, sizeof(spec[0]), - NdbDictionary::RecMysqldBitfield); - if (! rec) - ERR_RETURN(dict->getNdbError()); - m_ndb_record_fragment= rec; - } - else - m_ndb_record_fragment= NULL; - - m_extra_reclength= size; - rec= ndb_get_table_statistics_ndbrecord(dict, m_table); if (! rec) ERR_RETURN(dict->getNdbError()); @@ -1719,25 +1579,6 @@ ha_ndbcluster::add_index_ndb_record(NDBD ndb_set_record_specification(i, &spec[i], table, m_table); } - if (table_share->primary_key == MAX_KEY) - { - /* Access to the hidden primary key. */ - spec[i].column= m_table->getColumn(i); - spec[i].offset= offset_hidden_key(); - spec[i].nullbit_byte_offset= 0; - spec[i].nullbit_bit_in_byte= 0; - i++; - - if (m_user_defined_partitioning) - { - spec[i].column= NdbDictionary::Column::FRAGMENT; - spec[i].offset= offset_user_partition_fragment(); - spec[i].nullbit_byte_offset= 0; - spec[i].nullbit_bit_in_byte= 0; - i++; - } - } - rec= dict->createRecord(m_index[index_no].index, m_table, spec, i, sizeof(spec[0]), NdbDictionary::RecMysqldBitfield); @@ -1951,11 +1792,6 @@ void ha_ndbcluster::release_metadata(THD dict->releaseRecord(m_ndb_record); m_ndb_record= NULL; } - if (m_ndb_record_fragment != NULL) - { - dict->releaseRecord(m_ndb_record_fragment); - m_ndb_record_fragment= NULL; - } if (m_ndb_hidden_key_record != NULL) { dict->releaseRecord(m_ndb_hidden_key_record); @@ -2122,31 +1958,20 @@ int ha_ndbcluster::pk_read(const uchar * { NdbConnection *trans= m_thd_ndb->trans; const NdbOperation *op; - uchar *row; int res; DBUG_ENTER("pk_read"); DBUG_PRINT("enter", ("key_len: %u read_set=%x", key_len, table->read_set->bitmap[0])); DBUG_DUMP("key", key, key_len); - if (table_share->primary_key == MAX_KEY) - { - row= get_row_buffer(); - if (!row) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - row= buf; - NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set); - if (!(op= pk_unique_index_read_key(table->s->primary_key, key, row, lm, - (m_user_defined_partitioning? - &part_id: + if (!(op= pk_unique_index_read_key(table->s->primary_key, key, buf, lm, + (m_user_defined_partitioning ? + &part_id : NULL)))) ERR_RETURN(trans->getNdbError()); - if ((res = execute_no_commit_ie(this,trans,FALSE)) != 0 || op->getNdbError().code) @@ -2155,14 +1980,6 @@ int ha_ndbcluster::pk_read(const uchar * DBUG_RETURN(ndb_err(trans)); } - if (table_share->primary_key == MAX_KEY) - { - memcpy(buf, row, table_share->reclength); - m_ref= get_hidden_key(row); - if (m_user_defined_partitioning) - m_part_id= get_partition_fragment(row); - } - table->status= 0; DBUG_RETURN(0); } @@ -2176,7 +1993,6 @@ int ha_ndbcluster::ndb_pk_update_row(THD uint32 old_part_id) { NdbTransaction *trans= m_thd_ndb->trans; - int read_needed= !bitmap_is_set_all(table->read_set); int error; const NdbOperation *op; DBUG_ENTER("ndb_pk_update_row"); @@ -2211,45 +2027,43 @@ int ha_ndbcluster::ndb_pk_update_row(THD key_row= (const uchar*)(&m_ref); } - if (!read_needed) + if (!bitmap_is_set_all(table->read_set)) { - // We have allready retrieved all fields - goto delete_tuple; - } + /* + Need to read rest of columns for later re-insert. + + Use mask only with columns that are not in write_set, not in + read_set, and not part of the primary key. + */ - /* - Use mask only with columns that are not in write_set, not in - read_set, and not part of the primary key. - */ - { bitmap_copy(&m_bitmap, table->read_set); bitmap_union(&m_bitmap, table->write_set); bitmap_invert(&m_bitmap); NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, &m_bitmap); if (!(op= trans->readTuple(key_rec, (const char *)key_row, - m_ndb_record, (char *)new_data, - lm, (const unsigned char *)(m_bitmap.bitmap), - poptions, sizeof(NdbOperation::OperationOptions)))) + m_ndb_record, (char *)new_data, lm, + (const unsigned char *)(m_bitmap.bitmap), + poptions, + sizeof(NdbOperation::OperationOptions)))) ERR_RETURN(trans->getNdbError()); - } - if (table_share->blob_fields > 0) - { - my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); - error= get_blob_values(op, new_data, &m_bitmap); - dbug_tmp_restore_column_map(table->read_set, old_map); - if (error != 0) - ERR_RETURN(op->getNdbError()); - } + if (table_share->blob_fields > 0) + { + my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); + error= get_blob_values(op, new_data, &m_bitmap); + dbug_tmp_restore_column_map(table->read_set, old_map); + if (error != 0) + ERR_RETURN(op->getNdbError()); + } - if (execute_no_commit(this, trans, FALSE) != 0) - { - table->status= STATUS_NOT_FOUND; - DBUG_RETURN(ndb_err(trans)); + if (execute_no_commit(this, trans, FALSE) != 0) + { + table->status= STATUS_NOT_FOUND; + DBUG_RETURN(ndb_err(trans)); + } } -delete_tuple: // Delete old row error= ndb_delete_row(old_data, TRUE); if (error) @@ -2507,23 +2321,13 @@ int ha_ndbcluster::unique_index_read(con { NdbTransaction *trans= m_thd_ndb->trans; const NdbOperation *op; - uchar *row; DBUG_ENTER("ha_ndbcluster::unique_index_read"); DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index)); DBUG_DUMP("key", key, key_len); - - if (table_share->primary_key == MAX_KEY) - { - row= get_row_buffer(); - if (!row) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - row= buf; NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set); - if (!(op= pk_unique_index_read_key(active_index, key, row, lm, NULL))) + if (!(op= pk_unique_index_read_key(active_index, key, buf, lm, NULL))) ERR_RETURN(trans->getNdbError()); if (execute_no_commit_ie(this,trans,FALSE) != 0 || @@ -2538,14 +2342,6 @@ int ha_ndbcluster::unique_index_read(con DBUG_RETURN(err); } - if (table_share->primary_key == MAX_KEY) - { - memcpy(buf, row, table_share->reclength); - m_ref= get_hidden_key(row); - if (m_user_defined_partitioning) - m_part_id= get_partition_fragment(row); - } - table->status= 0; DBUG_RETURN(0); } @@ -2673,13 +2469,6 @@ inline int ha_ndbcluster::next_result(uc if ((res= fetch_next(m_active_cursor)) == 0) { DBUG_PRINT("info", ("One more record found")); - - if (table_share->primary_key == MAX_KEY) - { - m_ref= get_hidden_key(m_next_row); - if (m_user_defined_partitioning) - m_part_id= get_partition_fragment(m_next_row); - } unpack_record(buf, m_next_row); table->status= 0; @@ -2709,12 +2498,11 @@ ha_ndbcluster::pk_unique_index_read_key( Uint32 *ppartition_id) { const NdbOperation *op; - const NdbRecord *ndb_record= m_ndb_record; - uchar *mask= (uchar *)(table->read_set->bitmap); const NdbRecord *key_rec; NdbOperation::OperationOptions options; NdbOperation::OperationOptions *poptions = NULL; - options.optionsPresent=0; + options.optionsPresent= 0; + NdbOperation::GetValueSpec gets[2]; if (idx != MAX_KEY) key_rec= m_index[idx].ndb_unique_record_key; @@ -2724,31 +2512,22 @@ ha_ndbcluster::pk_unique_index_read_key( /* Initialize the null bitmap, setting unused null bits to 1. */ memset(buf, 0xff, table->s->null_bytes); - if (m_user_defined_partitioning || table_share->primary_key == MAX_KEY) + if (table_share->primary_key == MAX_KEY) { - /* - We need an extended column mask. - We may also need to read the hidden primary key and the FRAGMENT - pseudo-column. - */ - mask= copy_column_set(table->read_set); - if (table_share->primary_key == MAX_KEY) - { - request_hidden_key(mask); - if (m_user_defined_partitioning) - ndb_record= m_ndb_record_fragment; - } + get_hidden_fields_keyop(&options, gets); + poptions= &options; + } - if (ppartition_id !=NULL) - { - options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID; - options.partitionId=*ppartition_id; - poptions=&options; - } + if (ppartition_id != NULL) + { + options.optionsPresent|= NdbOperation::OperationOptions::OO_PARTITION_ID; + options.partitionId= *ppartition_id; + poptions= &options; } - op= m_thd_ndb->trans->readTuple(key_rec, (const char *)key, - ndb_record, (char *)buf, lm, mask, - poptions, + + op= m_thd_ndb->trans->readTuple(key_rec, (const char *)key, m_ndb_record, + (char *)buf, lm, + (uchar *)(table->read_set->bitmap), poptions, sizeof(NdbOperation::OperationOptions)); if (uses_blob_value(table->read_set) && @@ -3021,7 +2800,6 @@ int ha_ndbcluster::ordered_index_scan(co { NdbTransaction *trans= m_thd_ndb->trans; NdbIndexScanOperation *op; - uchar *mask; int error; DBUG_ENTER("ha_ndbcluster::ordered_index_scan"); @@ -3035,15 +2813,6 @@ int ha_ndbcluster::ordered_index_scan(co if (m_active_cursor && (error= close_scan())) DBUG_RETURN(error); - if (m_user_defined_partitioning || table_share->primary_key == MAX_KEY) - { - mask= copy_column_set(table->read_set); - if (table_share->primary_key == MAX_KEY) - request_hidden_key(mask); - } - else - mask= (uchar *)(table->read_set->bitmap); - NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set); @@ -3051,6 +2820,10 @@ int ha_ndbcluster::ordered_index_scan(co options.optionsPresent=NdbTransaction::ScanOptions::SO_SCANFLAGS; options.scan_flags=0; + NdbOperation::GetValueSpec gets[2]; + if (table_share->primary_key == MAX_KEY) + get_hidden_fields_scan(&options, gets); + if (lm == NdbOperation::LM_Read) options.scan_flags|= NdbScanOperation::SF_KeyInfo; if (sorted) @@ -3088,7 +2861,7 @@ int ha_ndbcluster::ordered_index_scan(co } if (!(op= trans->scanIndex(key_rec, row_rec, lm, - mask, + (uchar *)(table->read_set->bitmap), pbound, &options, sizeof(NdbTransaction::ScanOptions)))) @@ -3145,15 +2918,11 @@ int ha_ndbcluster::full_table_scan(const NdbScanOperation *op; NdbTransaction *trans= m_thd_ndb->trans; part_id_range part_spec; - uchar *mask= (uchar *)(table->read_set->bitmap); - const NdbRecord *ndb_record= m_ndb_record; + NdbOperation::GetValueSpec gets[2]; DBUG_ENTER("full_table_scan"); DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname)); - if (table_share->primary_key == MAX_KEY || m_user_defined_partitioning) - mask= copy_column_set(table->read_set); - NdbOperation::LockMode lm= (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type, table->read_set); NdbTransaction::ScanOptions options; @@ -3189,18 +2958,10 @@ int ha_ndbcluster::full_table_scan(const } } if (table_share->primary_key == MAX_KEY) - { - request_hidden_key(mask); - if (m_user_defined_partitioning) - { - // If table has user defined partitioning - // and no primary key, we need to read the partition id - // to support ORDER BY queries - ndb_record= m_ndb_record_fragment; - } - } + get_hidden_fields_scan(&options, gets); - if (!(op= trans->scanTable(ndb_record, lm, mask, + if (!(op= trans->scanTable(m_ndb_record, lm, + (uchar *)(table->read_set->bitmap), &options, sizeof(NdbTransaction::ScanOptions)))) ERR_RETURN(trans->getNdbError()); m_active_cursor= op; @@ -3258,6 +3019,47 @@ ha_ndbcluster::set_auto_inc(THD *thd, Fi DBUG_RETURN(0); } +Uint32 +ha_ndbcluster::setup_get_hidden_fields(NdbOperation::GetValueSpec gets[2]) +{ + Uint32 num_gets= 0; + /* + We need to read the hidden primary key, and possibly the FRAGMENT + pseudo-column. + */ + gets[num_gets].column= get_hidden_key_column(); + gets[num_gets].appStorage= &m_ref; + num_gets++; + if (m_user_defined_partitioning) + { + /* Need to read partition id to support ORDER BY columns. */ + gets[num_gets].column= NdbDictionary::Column::FRAGMENT; + gets[num_gets].appStorage= &m_part_id; + num_gets++; + } + return num_gets; +} + +void +ha_ndbcluster::get_hidden_fields_keyop(NdbOperation::OperationOptions *options, + NdbOperation::GetValueSpec gets[2]) +{ + Uint32 num_gets= setup_get_hidden_fields(gets); + options->optionsPresent|= NdbOperation::OperationOptions::OO_GETVALUE; + options->extraGetValues= gets; + options->numExtraGetValues= num_gets; +} + +void +ha_ndbcluster::get_hidden_fields_scan(NdbTransaction::ScanOptions *options, + NdbOperation::GetValueSpec gets[2]) +{ + Uint32 num_gets= setup_get_hidden_fields(gets); + options->optionsPresent|= NdbTransaction::ScanOptions::SO_GETVALUE; + options->extraGetValues= gets; + options->numExtraGetValues= num_gets; +} + inline void ha_ndbcluster::eventSetAnyValue(THD *thd, NdbOperation::OperationOptions *options) @@ -3303,9 +3105,9 @@ int ha_ndbcluster::ndb_write_row(uchar * THD *thd= table->in_use; Thd_ndb *thd_ndb= get_thd_ndb(thd); uint32 part_id; - uchar *row; - bool need_execute; int error; + NdbOperation::SetValueSpec sets[2]; + Uint32 num_sets= 0; DBUG_ENTER("ha_ndbcluster::ndb_write_row"); has_auto_increment= (table->next_number_field && record == table->record[0]); @@ -3341,52 +3143,13 @@ int ha_ndbcluster::ndb_write_row(uchar * DBUG_RETURN(peek_res); } - /* - Since the NdbRecord operations need row data to remain valid until - execute(), for bulk insert we need to save rows in a buffer, and - execute() whenever the buffer gets full. - - For non-bulk insert, we may still need to copy the row into a (bigger) - buffer if we need extra space for the user-defined partitioning hash - or the hidden primary key, but we always execute() in this case. - - Note that when using writeTuple() with blobs, we cannot batch, as - NdbBlob::setValue() uses call-by-reference semantics for the blob value, - which must remain valid until execute(). For insertTuple(), the blob - value is buffered by NdbBlob::setValue(). - */ bool uses_blobs= uses_blob_value(table->write_set); - if ((m_rows_to_insert > 1 && !uses_blobs) || batched_update || - ( (thd->options & OPTION_ALLOW_BATCH) && !(uses_blobs && m_use_write))) - { - /* This sets row and need_execute (output parameters). */ - row= batch_copy_row_to_buffer(thd_ndb, record, need_execute); - DBUG_PRINT("info", ("allocating buffer for bulk insert, " - "m_rows_to_insert=%d write_set=0x%x", - (int)m_rows_to_insert, table->write_set->bitmap[0])); - if (unlikely(!row)) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - { - DBUG_PRINT("info", ("Non-bulk insert.")); - need_execute= TRUE; - if (table_share->primary_key == MAX_KEY || m_user_defined_partitioning) - { - DBUG_PRINT("info", ("Getting single buffer for oversize record.")); - row= copy_row_to_buffer(thd_ndb, record); - if (unlikely(!row)) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - row= record; - } + Uint64 auto_value; if (table_share->primary_key == MAX_KEY) { - // Table has hidden primary key + /* Table has hidden primary key. */ Ndb *ndb= get_ndb(thd); - Uint64 auto_value; uint retries= NDB_AUTO_INCREMENT_RETRIES; int retry_sleep= 30; /* 30 milliseconds, transaction */ for (;;) @@ -3404,7 +3167,9 @@ int ha_ndbcluster::ndb_write_row(uchar * } break; } - set_hidden_key(row, auto_value); + sets[num_sets].column= get_hidden_key_column(); + sets[num_sets].value= &auto_value; + num_sets++; } if (m_user_defined_partitioning) @@ -3426,7 +3191,9 @@ int ha_ndbcluster::ndb_write_row(uchar * */ if (func_value >= INT_MAX32) func_value= INT_MAX32; - set_partition_function_value(row, (uint32)func_value); + sets[num_sets].column= get_partition_id_column(); + sets[num_sets].value= &func_value; + num_sets++; } ha_statistic_increment(&SSV::ha_write_count); @@ -3445,11 +3212,30 @@ int ha_ndbcluster::ndb_write_row(uchar * if (m_user_defined_partitioning) { options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID; - options.partitionId=part_id; + options.partitionId= part_id; + } + if (num_sets) + { + options.optionsPresent |= NdbOperation::OperationOptions::OO_SETVALUE; + options.extraSetValues= sets; + options.numExtraSetValues= num_sets; } if (options.optionsPresent != 0) poptions=&options; + const NdbRecord *key_rec; + const uchar *key_row; + if (table_share->primary_key == MAX_KEY) + { + key_rec= m_ndb_hidden_key_record; + key_row= (const uchar *)&auto_value; + } + else + { + key_rec= m_index[table_share->primary_key].ndb_unique_record_row; + key_row= record; + } + /* We do not use the table->write_set here. The reason is that for REPLACE INTO t(a), the write_set is passed with @@ -3469,60 +3255,36 @@ int ha_ndbcluster::ndb_write_row(uchar * if (m_use_write) { - const NdbRecord *key_rec; - const uchar *key_row; - uchar *mask; - /* Using write, the only user-visible cols we write are in the write_set */ user_cols_written_bitmap= table->write_set; - - if (table_share->primary_key == MAX_KEY || m_user_defined_partitioning) - { - mask= copy_column_set(table->write_set); - if (m_user_defined_partitioning) - request_partition_function_value(mask); - if (table_share->primary_key == MAX_KEY) - request_hidden_key(mask); - } - else - mask= (uchar *)(table->write_set->bitmap); - - if (table_share->primary_key == MAX_KEY) - { - key_rec= m_ndb_hidden_key_record; - key_row= &row[offset_hidden_key()]; - } - else - { - key_rec= m_index[table_share->primary_key].ndb_unique_record_row; - key_row= row; - } - op= trans->writeTuple(key_rec, (const char *)key_row, - m_ndb_record, (char *)row, mask, - poptions, - sizeof(NdbOperation::OperationOptions)); + op= trans->writeTuple(key_rec, (const char *)key_row, m_ndb_record, + (char *)record, (uchar *)(table->write_set->bitmap), + poptions, sizeof(NdbOperation::OperationOptions)); } else { /* Using insert, we write all user visible columns */ user_cols_written_bitmap= NULL; - op= trans->insertTuple(m_ndb_record, (char *)row, - NULL, // No mask - poptions, - sizeof(NdbOperation::OperationOptions)); + op= trans->insertTuple(key_rec, (const char *)key_row, m_ndb_record, + (char *)record, NULL, // No mask + poptions, sizeof(NdbOperation::OperationOptions)); } if (!(op)) ERR_RETURN(trans->getNdbError()); + bool need_flush= add_row_check_if_batch_full(thd_ndb); + bool do_batch= !need_flush && + (batched_update || (thd->options & OPTION_ALLOW_BATCH)); uint blob_count= 0; if (table_share->blob_fields > 0) { my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); /* Set Blob values for all columns updated by the operation */ - int res= set_blob_values(op, row - table->record[0], user_cols_written_bitmap, &blob_count); + int res= set_blob_values(op, record - table->record[0], + user_cols_written_bitmap, &blob_count, do_batch); dbug_tmp_restore_column_map(table->read_set, old_map); if (res != 0) - ERR_RETURN(op->getNdbError()); + DBUG_RETURN(res); } m_rows_changed++; @@ -3536,7 +3298,9 @@ int ha_ndbcluster::ndb_write_row(uchar * */ m_rows_inserted++; no_uncommitted_rows_update(1); - if (need_execute || primary_key_update) + if (( (m_rows_to_insert == 1 || uses_blobs) && !do_batch ) || + primary_key_update || + need_flush) { int res= flush_bulk_insert(); if (res != 0) @@ -3607,8 +3371,13 @@ int ha_ndbcluster::update_row(const ucha uint32 old_part_id= 0, new_part_id= 0; int error; longlong func_value; - bool pk_update= (table_share->primary_key != MAX_KEY && - primary_key_cmp(old_data, new_data)); + Uint32 func_value_uint32; + bool have_pk= (table_share->primary_key != MAX_KEY); + bool pk_update= (have_pk && + primary_key_cmp(old_data, new_data)); + bool batch_allowed= (thd->options & OPTION_ALLOW_BATCH) != 0; + NdbOperation::SetValueSpec sets[1]; + DBUG_ENTER("update_row"); /* @@ -3672,38 +3441,6 @@ int ha_ndbcluster::update_row(const ucha bitmap_copy(&m_bitmap, table->write_set); bitmap_subtract(&m_bitmap, &m_pk_bitmap); uchar *mask= (uchar *)(m_bitmap.bitmap); - /* Need to initialize bits for any extra hidden columns. */ - if (table_share->primary_key == MAX_KEY || m_user_defined_partitioning) - clear_extended_column_set(mask); - - /* Need to set the value of any user-defined partitioning function. */ - uchar *row; - bool need_execute; - /* - Batch update operation if we are doing a scan for update, unless - there exist UPDATE AFTER triggers - */ - if (!m_update_cannot_batch && - (cursor || ((thd->options & OPTION_ALLOW_BATCH) && - (table_share->primary_key != MAX_KEY)))) - { - /* For a scan, we only need to execute() if the batch buffer is full. */ - row= batch_copy_row_to_buffer(thd_ndb, new_data, need_execute); - if (unlikely(!row)) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - { - need_execute= TRUE; - if (m_user_defined_partitioning) - { - row= copy_row_to_buffer(thd_ndb, new_data); - if (unlikely(!row)) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - row= new_data; - } NdbOperation::OperationOptions *poptions = NULL; NdbOperation::OperationOptions options; @@ -3712,16 +3449,26 @@ int ha_ndbcluster::update_row(const ucha if (m_user_defined_partitioning) { if (func_value >= INT_MAX32) - func_value= INT_MAX32; - set_partition_function_value(row, (uint32)func_value); - request_partition_function_value(mask); + func_value_uint32= INT_MAX32; + else + func_value_uint32= (uint32)func_value; + sets[0].column= get_partition_id_column(); + sets[0].value= &func_value_uint32; + options.optionsPresent|= NdbOperation::OperationOptions::OO_SETVALUE; + options.extraSetValues= sets; + options.numExtraSetValues= 1; - options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID; - options.partitionId = new_part_id; + if (!cursor) + { + options.optionsPresent|= NdbOperation::OperationOptions::OO_PARTITION_ID; + options.partitionId= new_part_id; + } } eventSetAnyValue(thd, &options); + bool need_flush= add_row_check_if_batch_full(thd_ndb); + if (cursor) { /* @@ -3738,7 +3485,7 @@ int ha_ndbcluster::update_row(const ucha poptions = &options; if (!(op= cursor->updateCurrentTuple(trans, m_ndb_record, - (const char*)row, mask, + (const char*)new_data, mask, poptions, sizeof(NdbOperation::OperationOptions)))) ERR_RETURN(trans->getNdbError()); @@ -3750,38 +3497,25 @@ int ha_ndbcluster::update_row(const ucha { const NdbRecord *key_rec; const uchar *key_row; - uint key_len; - if (table_share->primary_key != MAX_KEY) + if (have_pk) { key_rec= m_index[table_share->primary_key].ndb_unique_record_row; - key_row= row; - key_len= table->s->reclength; + key_row= new_data; } else { /* Use hidden primary key previously read into m_ref. */ key_rec= m_ndb_hidden_key_record; key_row= (const uchar *)(&m_ref); - key_len= sizeof(m_ref); - } - - if (!need_execute) - { - /* - Poor approx. let delete ~ tabsize / 4 - */ - uint delete_size= 12 + m_bytes_per_write >> 2; - key_row= batch_copy_key_to_buffer(thd_ndb, key_row, key_len, - delete_size, need_execute); - if (unlikely(!key_row)) - DBUG_RETURN(ER_OUTOFMEMORY); } if (options.optionsPresent !=0) poptions= &options; if (!(op= trans->updateTuple(key_rec, (const char *)key_row, - m_ndb_record, (const char*)row, mask))) + m_ndb_record, (const char*)new_data, mask, + poptions, + sizeof(NdbOperation::OperationOptions)))) ERR_RETURN(trans->getNdbError()); } @@ -3789,13 +3523,21 @@ int ha_ndbcluster::update_row(const ucha if (uses_blob_value(table->write_set)) { int row_offset= new_data - table->record[0]; - if (set_blob_values(op, row_offset, table->write_set, &blob_count) != 0) - ERR_RETURN(op->getNdbError()); + int res= set_blob_values(op, row_offset, table->write_set, &blob_count, + (batch_allowed && !need_flush)); + if (res != 0) + DBUG_RETURN(res); } m_rows_changed++; - if (need_execute) + /* + Batch update operation if we are doing a scan for update, unless + there exist UPDATE AFTER triggers + */ + if (m_update_cannot_batch || + !(cursor || (batch_allowed && have_pk)) || + need_flush) { if (execute_no_commit(this,trans,FALSE) != 0) { @@ -3844,19 +3586,13 @@ int ha_ndbcluster::ndb_delete_row(const NdbOperation::OperationOptions *poptions = NULL; options.optionsPresent=0; - if (m_user_defined_partitioning) - { - options.optionsPresent |= NdbOperation::OperationOptions::OO_PARTITION_ID; - options.partitionId = part_id; - } - eventSetAnyValue(thd, &options); - if (options.optionsPresent != 0) - poptions = &options; - if (cursor) { + if (options.optionsPresent != 0) + poptions = &options; + /* We are scanning records and want to delete the record that was just found, call deleteTuple on the cursor @@ -3884,21 +3620,38 @@ int ha_ndbcluster::ndb_delete_row(const { const NdbRecord *key_rec; const uchar *key_row; - uint key_len; + + if (m_user_defined_partitioning) + { + options.optionsPresent|= NdbOperation::OperationOptions::OO_PARTITION_ID; + options.partitionId= part_id; + } + + if (options.optionsPresent != 0) + poptions= &options; + if (table_share->primary_key != MAX_KEY) { key_rec= m_index[table_share->primary_key].ndb_unique_record_row; key_row= record; - key_len= table->s->reclength; } else { key_rec= m_ndb_hidden_key_record; key_row= (const uchar *)(&m_ref); - key_len= sizeof(m_ref); } + if (!(op=trans->deleteTuple(key_rec, (const char *)key_row, + NULL, // result_rec + NULL, // result_row + NULL, // result_mask + poptions, + sizeof(NdbOperation::OperationOptions)))) + ERR_RETURN(trans->getNdbError()); + + no_uncommitted_rows_update(-1); + /* - Check if we can batch the delete; if so we need to buffer the key. + Check if we can batch the delete. We do not batch deletes on tables with no primary key. For such tables, replication uses full table scan to locate the row to delete. The @@ -3916,33 +3669,16 @@ int ha_ndbcluster::ndb_delete_row(const 7. The delete of the second tuple now fails, as the transaction has been aborted. */ - bool need_execute; - if ((thd->options & OPTION_ALLOW_BATCH) && - table_share->primary_key != MAX_KEY) - { - /* - Poor approx. let delete ~ tabsize / 4 - */ - uint delete_size= 12 + m_bytes_per_write >> 2; - key_row= batch_copy_key_to_buffer(thd_ndb, key_row, key_len, - delete_size, need_execute); - if (unlikely(!key_row)) - DBUG_RETURN(ER_OUTOFMEMORY); - } - else - need_execute= TRUE; - if (!(op=trans->deleteTuple(key_rec, (const char *)key_row, - NULL, // result_rec - NULL, // result_row - NULL, // result_mask - poptions, - sizeof(NdbOperation::OperationOptions)))) - ERR_RETURN(trans->getNdbError()); - - no_uncommitted_rows_update(-1); - - if (!need_execute) + /* + Poor approx. let delete ~ tabsize / 4 + */ + uint delete_size= 12 + m_bytes_per_write >> 2; + bool need_flush= add_row_check_if_batch_full_size(thd_ndb, delete_size); + if ( (thd->options & OPTION_ALLOW_BATCH) && + table_share->primary_key != MAX_KEY && + !primary_key_update && + !need_flush) DBUG_RETURN(0); } @@ -6946,15 +6682,10 @@ ha_ndbcluster::ha_ndbcluster(handlerton m_delete_cannot_batch(FALSE), m_update_cannot_batch(FALSE), m_skip_auto_increment(TRUE), - m_row_buffer_current(NULL), m_blobs_pending(0), m_blobs_buffer(0), m_blobs_buffer_size(0), - m_row_buffer(0), - m_row_buffer_size(0), - m_extra_reclength(0), m_ndb_record(0), - m_ndb_record_fragment(0), m_ndb_hidden_key_record(0), m_ndb_statistics_record(0), m_dupkey((uint) -1), @@ -7011,11 +6742,6 @@ ha_ndbcluster::~ha_ndbcluster() my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); m_blobs_buffer= 0; - my_free(m_row_buffer, MYF(MY_ALLOW_ZERO_PTR)); - m_row_buffer= 0; - m_row_buffer_current= 0; - m_row_buffer_size= 0; - // Check for open cursor/transaction DBUG_ASSERT(m_active_cursor == NULL); DBUG_ASSERT(m_thd_ndb == NULL); @@ -9229,7 +8955,7 @@ ndb_get_table_statistics(ha_ndbcluster* } NdbTransaction::ScanOptions options; - options.optionsPresent=NdbTransaction::ScanOptions::SO_BATCH; + options.optionsPresent= NdbTransaction::ScanOptions::SO_BATCH; /* Set batch_size=1, as we need only one row per fragment. */ options.batch=1; @@ -9746,7 +9472,6 @@ int ha_ndbcluster::multi_range_start_ret uint num_scan_ranges= 0; int range_no= -1; int mrr_range_no= starting_range; - // ToDo: proper interface to save/retrieve point to iterate from void *saved_start_pos; uint saved_start_no; @@ -9804,7 +9529,6 @@ int ha_ndbcluster::multi_range_start_ret { /* Do a multi-range index scan for ranges not done by primary/unique key. */ NdbTransaction::ScanOptions options; - uchar *mask; options.optionsPresent= NdbTransaction::ScanOptions::SO_SCANFLAGS | @@ -9821,21 +9545,16 @@ int ha_ndbcluster::multi_range_start_ret options.parallel=parallelism; - if (m_user_defined_partitioning || table_share->primary_key == MAX_KEY) - { - mask= copy_column_set(table->read_set); - if (table_share->primary_key == MAX_KEY) - request_hidden_key(mask); - } - else - mask= (uchar *)(table->read_set->bitmap); + NdbOperation::GetValueSpec gets[2]; + if (table_share->primary_key == MAX_KEY) + get_hidden_fields_scan(&options, gets); /* Define scan */ NdbIndexScanOperation *scanOp= m_thd_ndb->trans->scanIndex (m_index[active_index].ndb_record_key, m_index[active_index].ndb_record_row, lm, - mask, + (uchar *)(table->read_set->bitmap), NULL, /* All bounds specified below */ &options, sizeof(NdbTransaction::ScanOptions)); @@ -9962,12 +9681,6 @@ int ha_ndbcluster::multi_range_read_next { *range_info= mrr_get_ptr_by_idx2(this, mrr_iter, first_running_range++); memcpy(table->record[0], src_row, table_share->reclength); - if (table_share->primary_key == MAX_KEY) - { - m_ref= get_hidden_key(src_row); - if (m_user_defined_partitioning) - m_part_id= get_partition_fragment(src_row); - } DBUG_RETURN(0); } else if (error.classification != NdbError::NoDataFound) @@ -10015,12 +9728,6 @@ int ha_ndbcluster::multi_range_read_next { *range_info= mrr_get_ptr_by_idx2(this, mrr_iter, current_range_no); /* Copy out data from the new row. */ - if (table_share->primary_key == MAX_KEY) - { - m_ref= get_hidden_key(m_next_row); - if (m_user_defined_partitioning) - m_part_id= get_partition_fragment(m_next_row); - } unpack_record(table->record[0], m_next_row); /* Mark that we have used this row, so we need to fetch a new diff -Nrup a/sql/ha_ndbcluster.h b/sql/ha_ndbcluster.h --- a/sql/ha_ndbcluster.h 2008-02-02 18:28:00 +03:00 +++ b/sql/ha_ndbcluster.h 2008-02-02 18:41:30 +03:00 @@ -582,44 +582,31 @@ private: void set_dbname(const char *pathname); void set_tabname(const char *pathname); - uint offset_hidden_key() { return table->s->reclength; } - uint offset_user_partition_function() { - return table->s->reclength + - (table_share->primary_key == MAX_KEY ? - NDB_HIDDEN_PRIMARY_KEY_LENGTH : 0); + const NdbDictionary::Column *get_hidden_key_column() { + return m_table->getColumn(table_share->fields); } - uint offset_user_partition_fragment() { - return table->s->reclength + - (table_share->primary_key == MAX_KEY ? - NDB_HIDDEN_PRIMARY_KEY_LENGTH+4 : 4); - } - uint field_number_hidden_key() { return table->s->fields; } - uint field_number_user_partition_function() { - return table->s->fields + - (table_share->primary_key == MAX_KEY ? 1 : 0); + const NdbDictionary::Column *get_partition_id_column() { + Uint32 index= table_share->fields + (table_share->primary_key == MAX_KEY); + return m_table->getColumn(index); } - void set_hidden_key(uchar *row, Uint64 auto_value); - Uint64 get_hidden_key(const uchar *row); - void request_hidden_key(uchar *mask); - void set_partition_function_value(uchar *row, uint32 func_value); - uint32 get_partition_fragment(const uchar *row); - void request_partition_function_value(uchar *mask); - uchar *batch_copy_row_to_buffer(Thd_ndb *thd_ndb, const uchar *record, - bool & batch_full); - uchar *batch_copy_key_to_buffer(Thd_ndb *thd_ndb, const uchar *key, - uint key_len, - uint op_batch_size, bool & batch_full); + bool add_row_check_if_batch_full_size(Thd_ndb *thd_ndb, uint size); + bool add_row_check_if_batch_full(Thd_ndb *thd_ndb) { + return add_row_check_if_batch_full_size(thd_ndb, m_bytes_per_write); + } + uchar *get_buffer(Thd_ndb *thd_ndb, uint size); uchar *copy_row_to_buffer(Thd_ndb *thd_ndb, const uchar *record); - uchar *get_row_buffer(); - void clear_extended_column_set(uchar *mask); - uchar *copy_column_set(MY_BITMAP *bitmap); int get_blob_values(const NdbOperation *ndb_op, uchar *dst_record, const MY_BITMAP *bitmap); int set_blob_values(const NdbOperation *ndb_op, my_ptrdiff_t row_offset, - const MY_BITMAP *bitmap, uint *set_count); + const MY_BITMAP *bitmap, uint *set_count, bool batch); friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg); + Uint32 setup_get_hidden_fields(NdbOperation::GetValueSpec gets[2]); + void get_hidden_fields_keyop(NdbOperation::OperationOptions *options, + NdbOperation::GetValueSpec gets[2]); + void get_hidden_fields_scan(NdbTransaction::ScanOptions *options, + NdbOperation::GetValueSpec gets[2]); void eventSetAnyValue(THD *thd, NdbOperation::OperationOptions *options); bool check_index_fields_in_write_set(uint keyno); @@ -671,8 +658,6 @@ private: fields (hidden primary key, user-defined partitioning function value). */ NdbRecord *m_ndb_record; - /* As m_ndb_record, but adding the FRAGMENT pseudo-column at end of row. */ - NdbRecord *m_ndb_record_fragment; /* NdbRecord for accessing tuple by hidden Uint64 primary key. */ NdbRecord *m_ndb_hidden_key_record; @@ -711,17 +696,6 @@ private: }; /* For read_multi_range scans, the get_range_no() of current row. */ int m_current_range_no; - /* - A buffer of rows for when we cannot pass the mysqld record pointer directly - to the NDB API, either because the mysqld buffer is too small (eg. hidden - primary key), or because it will not remain valid until execute() (eg. - bulk insert). - */ - char *m_row_buffer; - uint m_row_buffer_size; - char *m_row_buffer_current; - /* Extra bytes needed in row for hidden fields. */ - uint m_extra_reclength; // NdbRecAttr has no reference to blob NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE]; Uint64 m_ref; diff -Nrup a/storage/ndb/include/ndbapi/NdbTransaction.hpp b/storage/ndb/include/ndbapi/NdbTransaction.hpp --- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2008-02-02 18:28:00 +03:00 +++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2008-02-02 18:41:30 +03:00 @@ -718,7 +718,8 @@ public: const unsigned char *result_mask= 0, const NdbOperation::OperationOptions *opts = 0, Uint32 sizeOfOptions = 0); - const NdbOperation *insertTuple(const NdbRecord *rec, const char *row, + const NdbOperation *insertTuple(const NdbRecord *key_rec, const char *key_row, + const NdbRecord *attr_rec, const char *attr_row, const unsigned char *mask= 0, const NdbOperation::OperationOptions *opts = 0, Uint32 sizeOfOptions = 0); diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp --- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2008-02-02 18:28:01 +03:00 +++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2008-02-02 18:41:30 +03:00 @@ -1167,11 +1167,11 @@ NdbOperation::handleOperationOptions (co return 4295; } - if (pcol->getPrimaryKey()) + if (type == UpdateRequest && pcol->getPrimaryKey()) { - // For insert, NdbRecord needed the full PK before we got here - // So if we get a setValue on a PK column here, it's a problem - // Set value on tuple key attribute is not allowed + // It is not possible to update a primary key column. + // It can be set like this for insert and write (but it + // still needs to be included in the key NdbRecord and row). return 4202; } diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationExec.cpp b/storage/ndb/src/ndbapi/NdbOperationExec.cpp --- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2008-02-02 18:28:01 +03:00 +++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2008-02-02 18:41:30 +03:00 @@ -1016,6 +1016,8 @@ NdbOperation::prepareSendNdbRecord(Abort TcKeyReq::setAbortOption(tcKeyReq->requestInfo, m_abortOption); TcKeyReq::setCommitFlag(tcKeyReq->requestInfo, theCommitIndicator); TcKeyReq::setStartFlag(tcKeyReq->requestInfo, theStartIndicator); + TcKeyReq::setSimpleFlag(tcKeyReq->requestInfo, theSimpleIndicator); + TcKeyReq::setDirtyFlag(tcKeyReq->requestInfo, theDirtyIndicator); theStatus= WaitResponse; theReceiver.prepareSend(); @@ -1046,12 +1048,12 @@ NdbOperation::fillTcKeyReqHdr(TcKeyReq * tcKeyReq->attrLen= attrLen; UintR reqInfo= 0; - TcKeyReq::setSimpleFlag(reqInfo, theSimpleIndicator); - // CommitFlag set later in prepareSendNdbRecord() - // StartFlag set later in prepareSendNdbRecord() + /* + * The flags Commit, Start, Simple, and Dirty are set later, + * in prepareSendNdbRecord(). + */ TcKeyReq::setInterpretedFlag(reqInfo, (m_interpreted_code != NULL)); /* We will setNoDiskFlag() later when we have checked all columns. */ - TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator); TcKeyReq::setOperationType(reqInfo, theOperationType); // AbortOption set later in prepareSendNdbRecord() TcKeyReq::setDistributionKeyFlag(reqInfo, theDistrKeyIndicator_); diff -Nrup a/storage/ndb/src/ndbapi/NdbScanFilter.cpp b/storage/ndb/src/ndbapi/NdbScanFilter.cpp --- a/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-02-02 18:28:01 +03:00 +++ b/storage/ndb/src/ndbapi/NdbScanFilter.cpp 2008-02-02 18:41:31 +03:00 @@ -120,6 +120,7 @@ NdbScanFilter::~NdbScanFilter(){ int NdbScanFilter::begin(Group group){ +#if 0 // TODO : REMOVE when ScanFilter fixed for NdbRecord // if (m_impl.m_operation->theStatus == NdbOperation::UseNdbRecord) { @@ -130,6 +131,7 @@ NdbScanFilter::begin(Group group){ return -1; } // /TODO hack +#endif if (m_impl.m_stack2.push_back(m_impl.m_negative)) { m_impl.m_operation->setErrorCodeAbort(4000); diff -Nrup a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp --- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2008-02-02 18:28:01 +03:00 +++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2008-02-02 18:41:31 +03:00 @@ -1008,11 +1008,6 @@ int NdbScanOperation::prepareSendScan(Ui theErrorLine = 0; - // In prepareSendInterpreted we set the sizes (word 4-8) in the - // first ATTRINFO signal. - if (prepareSendInterpreted() == -1) - return -1; - /* When using getValue() in ordered scans, we need to request "behind the scenes" any part of the primary key that is not request explicitly by the @@ -1025,8 +1020,6 @@ int NdbScanOperation::prepareSendScan(Ui ((NdbIndexScanOperation*)this)->fix_get_values(); } - theCurrentATTRINFO->setLength(theAI_LenInCurrAI); - /** * Prepare all receivers */ @@ -1149,6 +1142,13 @@ NdbScanOperation::doSendScan(int aProces assert(theSCAN_TABREQ != NULL); tSignal = theSCAN_TABREQ; + // In prepareSendInterpreted we set the sizes (word 4-8) in the + // first ATTRINFO signal. + if (prepareSendInterpreted() == -1) + return -1; + + theCurrentATTRINFO->setLength(theAI_LenInCurrAI); + Uint32 tupKeyLen = theTupKeyLen; Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr; Uint64 transId = theNdbCon->theTransactionId; @@ -1543,22 +1543,24 @@ NdbRecAttr* NdbScanOperation::getValue_NdbRecord_scan(const NdbColumnImpl* attrInfo, char* aValue) { + DBUG_ENTER("NdbScanOperation::getValue_NdbRecord_scan"); int res; Uint32 ah; NdbRecAttr *ra; + DBUG_PRINT("info", ("Column: %u", attrInfo->m_attrId)); AttributeHeader::init(&ah, attrInfo->m_attrId, 0); res= insertATTRINFO(ah); if (res==-1) - return NULL; + DBUG_RETURN(NULL); theInitialReadSize= theTotalCurrAI_Len - 5; ra= theReceiver.getValue(attrInfo, aValue); if (!ra) { setErrorCodeAbort(4000); - return NULL; + DBUG_RETURN(NULL); } theErrorLine++; - return ra; + DBUG_RETURN(ra); } NdbRecAttr* diff -Nrup a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp --- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2008-02-02 18:28:01 +03:00 +++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2008-02-02 18:41:31 +03:00 @@ -2378,13 +2378,14 @@ NdbTransaction::readTuple(const NdbRecor } const NdbOperation * -NdbTransaction::insertTuple(const NdbRecord *rec, const char *row, +NdbTransaction::insertTuple(const NdbRecord *key_rec, const char *key_row, + const NdbRecord *attr_rec, const char *attr_row, const unsigned char *mask, const NdbOperation::OperationOptions *opts, Uint32 sizeOfOptions) { /* Check that the NdbRecord specifies the full primary key. */ - if (!(rec->flags & NdbRecord::RecHasAllKeys)) + if (!(key_rec->flags & NdbRecord::RecHasAllKeys)) { setOperationErrorCodeAbort(4292); return NULL; @@ -2393,8 +2394,8 @@ NdbTransaction::insertTuple(const NdbRec NdbOperation *op= setupRecordOp(NdbOperation::InsertRequest, NdbOperation::LM_Exclusive, NdbOperation::AbortOnError, - rec, row, - rec, row, mask, + key_rec, key_row, + attr_rec, attr_row, mask, opts, sizeOfOptions); if (!op) @@ -2584,6 +2585,9 @@ NdbTransaction::scanTable(const NdbRecor const ScanOptions *options, Uint32 sizeOfOptions) { + DBUG_ENTER("NdbTransaction::scanTable"); + DBUG_PRINT("info", ("Options=%p(0x%x)", options, + (options ? (unsigned)(options->optionsPresent) : 0))); /* Normal scan operations are created as NdbIndexScanOperations. The reason for this is that they can then share a pool of allocated @@ -2604,7 +2608,7 @@ NdbTransaction::scanTable(const NdbRecor if (op_idx==NULL) { setOperationErrorCodeAbort(4000); - return NULL; + DBUG_RETURN(NULL); } op= op_idx; @@ -2709,12 +2713,12 @@ NdbTransaction::scanTable(const NdbRecor /* Error code should be set */ goto giveup_err; - return op_idx; + DBUG_RETURN(op_idx); giveup_err: releaseScanOperation(&m_theFirstScanOperation, &m_theLastScanOperation, op_idx); - return NULL; + DBUG_RETURN(NULL); } /*