#At file:///home/jonas/src/telco-7.0/ based on revid:jonas@stripped
4357 jonas oreland 2011-05-07
ndb - bug#56051 - remove known 899 case...reclassify 899 as internal error (instead of temporary)
modified:
storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
storage/ndb/src/ndbapi/ndberror.c
storage/ndb/test/ndbapi/testBasic.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
=== modified file 'storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp'
--- a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2011-05-07 06:16:13 +0000
@@ -170,6 +170,7 @@ public:
DumpBackup = 13000,
DumpBackupSetCompressed = 13001,
DumpBackupSetCompressedLCP = 13002,
+ BackupErrorInsert = 13003,
DumpDbinfo = 14000,
DbinfoListTables = 14001,
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2011-02-24 09:46:11 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2011-05-07 06:16:13 +0000
@@ -662,6 +662,16 @@ Backup::execDUMP_STATE_ORD(Signal* signa
c_defaults.m_compressed_lcp= signal->theData[1];
infoEvent("Compressed LCP: %d", c_defaults.m_compressed_lcp);
}
+
+ if (signal->theData[0] == DumpStateOrd::BackupErrorInsert)
+ {
+ if (signal->getLength() == 1)
+ ndbout_c("BACKUP: setting error %u", signal->theData[1]);
+ else
+ ndbout_c("BACKUP: setting error %u, %u",
+ signal->theData[1], signal->theData[2]);
+ SET_ERROR_INSERT_VALUE2(signal->theData[1], signal->theData[2]);
+ }
}
void Backup::execDBINFO_SCANREQ(Signal *signal)
@@ -4578,6 +4588,13 @@ Backup::checkScan(Signal* signal, Backup
sendSignal(ptr.p->masterRef, GSN_ABORT_BACKUP_ORD, signal,
AbortBackupOrd::SignalLength, JBB);
}
+#ifdef ERROR_INSERT
+ else if (ERROR_INSERTED(10042) && filePtr.p->tableId ==c_error_insert_extra)
+ {
+ sendSignalWithDelay(lqhRef, GSN_SCAN_NEXTREQ, signal,
+ 10, ScanFragNextReq::SignalLength);
+ }
+#endif
else
{
sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-07 06:16:13 +0000
@@ -723,9 +723,10 @@ struct Fragrecord {
Uint32 m_free_page_id_list;
DynArr256::Head m_page_map;
DLFifoList<Page>::Head thFreeFirst; // pages with atleast 1 free record
-
+
Uint32 m_lcp_scan_op;
- Uint32 m_lcp_keep_list;
+ Local_key m_lcp_keep_list_head;
+ Local_key m_lcp_keep_list_tail;
enum FragState
{ FS_FREE
@@ -1439,9 +1440,8 @@ typedef Ptr<HostBuffer> HostBufferPtr;
STATIC_CONST( MM_SHRINK = 0x00200000 ); // Has MM part shrunk
STATIC_CONST( MM_GROWN = 0x00400000 ); // Has MM part grown
STATIC_CONST( FREED = 0x00800000 ); // Is freed
+ STATIC_CONST( FREE = 0x00800000 ); // alias
STATIC_CONST( LCP_SKIP = 0x01000000 ); // Should not be returned in LCP
- STATIC_CONST( LCP_KEEP = 0x02000000 ); // Should be returned in LCP
- STATIC_CONST( FREE = 0x02800000 ); // Is free
STATIC_CONST( VAR_PART = 0x04000000 ); // Is there a varpart
STATIC_CONST( REORG_MOVE = 0x08000000 );
@@ -3215,6 +3215,8 @@ private:
Uint32* get_default_ptr(const Tablerec*, Uint32&);
Uint32 get_len(Ptr<Page>* pagePtr, Var_part_ref ref);
+ STATIC_CONST( COPY_TUPLE_HEADER32 = 4 );
+
Tuple_header* alloc_copy_tuple(const Tablerec* tabPtrP, Local_key* ptr){
Uint32 * dst = c_undo_buffer.alloc_copy_tuple(ptr,
tabPtrP->total_rec_size);
@@ -3224,7 +3226,7 @@ private:
bzero(dst, tabPtrP->total_rec_size);
#endif
Uint32 count = tabPtrP->m_no_of_attributes;
- ChangeMask * mask = (ChangeMask*)(dst);
+ ChangeMask * mask = (ChangeMask*)(dst + COPY_TUPLE_HEADER32);
mask->m_cols = count;
return (Tuple_header*)(mask->end_of_mask(count));
}
@@ -3234,11 +3236,12 @@ private:
}
Tuple_header * get_copy_tuple(Uint32 * rawptr) {
- return (Tuple_header*)(get_change_mask_ptr(rawptr)->end_of_mask());
+ return (Tuple_header*)
+ (get_change_mask_ptr(rawptr)->end_of_mask());
}
ChangeMask * get_change_mask_ptr(Uint32 * rawptr) {
- return (ChangeMask*)(rawptr);
+ return (ChangeMask*)(rawptr + COPY_TUPLE_HEADER32);
}
Tuple_header* get_copy_tuple(const Local_key* ptr){
@@ -3250,7 +3253,7 @@ private:
Uint32 * tmp = raw - (1 + ((tabP->m_no_of_attributes + 31) >> 5));
ChangeMask* mask = (ChangeMask*)tmp;
assert(mask->end_of_mask() == raw);
- assert(get_copy_tuple(tmp) == copytuple);
+ assert(get_copy_tuple(tmp - COPY_TUPLE_HEADER32) == copytuple);
return mask;
}
@@ -3382,10 +3385,10 @@ private:
Page_cache_client::Request,
OperationrecPtr);
int retrieve_log_page(Signal*, FragrecordPtr, OperationrecPtr);
-
- void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*,
- Operationrec*, Fragrecord*, Tablerec*);
-
+
+ void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*,
+ KeyReqStruct*, Operationrec*, Fragrecord*, Tablerec*);
+
int handle_size_change_after_update(KeyReqStruct* req_struct,
Tuple_header* org,
Operationrec*,
@@ -3411,7 +3414,31 @@ private:
void check_page_map(Fragrecord*);
bool find_page_id_in_list(Fragrecord*, Uint32 pid);
#endif
- void handle_lcp_keep(Signal*, Fragrecord*, ScanOp*, Uint32 rowid);
+ void handle_lcp_keep(Signal*, Fragrecord*, ScanOp*);
+ void handle_lcp_keep_commit(const Local_key*,
+ KeyReqStruct *,
+ Operationrec*, Fragrecord*, Tablerec*);
+
+ void setup_lcp_read_copy_tuple( KeyReqStruct *,
+ Operationrec*,
+ Fragrecord*,
+ Tablerec*);
+
+ bool isCopyTuple(Uint32 pageid, Uint32 pageidx) const {
+ return (pageidx & (Uint16(1) << 15)) != 0;
+ }
+
+ void setCopyTuple(Uint32& pageid, Uint16& pageidx) const {
+ assert(!isCopyTuple(pageid, pageidx));
+ pageidx |= (Uint16(1) << 15);
+ assert(isCopyTuple(pageid, pageidx));
+ }
+
+ void clearCopyTuple(Uint32& pageid, Uint16& pageidx) const {
+ assert(isCopyTuple(pageid, pageidx));
+ pageidx &= ~(Uint16(1) << 15);
+ assert(!isCopyTuple(pageid, pageidx));
+ }
};
#if 0
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-07 06:16:13 +0000
@@ -51,15 +51,8 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* s
PagePtr pagePtr;
Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
- ndbassert(ptr->m_header_bits & Tuple_header::FREE);
+ ndbassert(ptr->m_header_bits & Tuple_header::FREED);
- if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
- {
- ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
- ptr->m_header_bits |= Tuple_header::FREED;
- return;
- }
-
if (regTabPtr.p->m_attributes[MM].m_no_of_varsize +
regTabPtr.p->m_attributes[MM].m_no_of_dynamic)
{
@@ -157,12 +150,12 @@ Dbtup::dealloc_tuple(Signal* signal,
Uint32 gci,
Page* page,
Tuple_header* ptr,
+ KeyReqStruct * req_struct,
Operationrec* regOperPtr,
Fragrecord* regFragPtr,
Tablerec* regTabPtr)
{
Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
- Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
Uint32 bits = ptr->m_header_bits;
Uint32 extra_bits = Tuple_header::FREED;
@@ -189,9 +182,15 @@ Dbtup::dealloc_tuple(Signal* signal,
if (!is_rowid_lcp_scanned(rowid, *scanOp.p))
{
jam();
- extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
- ptr->m_operation_ptr_i = lcp_keep_list;
- regFragPtr->m_lcp_keep_list = rowid.ref();
+
+ /**
+ * We're committing a delete, on a row that should
+ * be part of LCP. Copy original row into copy-tuple
+ * and add this copy-tuple to lcp-keep-list
+ *
+ */
+ handle_lcp_keep_commit(&rowid,
+ req_struct, regOperPtr, regFragPtr, regTabPtr);
}
}
@@ -204,6 +203,69 @@ Dbtup::dealloc_tuple(Signal* signal,
}
}
+void
+Dbtup::handle_lcp_keep_commit(const Local_key* rowid,
+ KeyReqStruct * req_struct,
+ Operationrec * opPtrP,
+ Fragrecord * regFragPtr,
+ Tablerec * regTabPtr)
+{
+ bool disk = false;
+ Uint32 sizes[4];
+ Uint32 * copytuple = get_copy_tuple_raw(&opPtrP->m_copy_tuple_location);
+ Tuple_header * dst = get_copy_tuple(copytuple);
+ Tuple_header * org = req_struct->m_tuple_ptr;
+ if (regTabPtr->need_expand(disk))
+ {
+ setup_fixed_part(req_struct, opPtrP, regTabPtr);
+ req_struct->m_tuple_ptr = dst;
+ expand_tuple(req_struct, sizes, org, regTabPtr, disk);
+ shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
+ }
+ else
+ {
+ memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
+ }
+ dst->m_header_bits |= Tuple_header::COPY_TUPLE;
+
+ /**
+ * Store original row-id in copytuple[0,1]
+ * Store next-ptr in copytuple[1,2] (set to RNIL/RNIL)
+ *
+ */
+ assert(sizeof(Local_key) == 8);
+ memcpy(copytuple+0, rowid, sizeof(Local_key));
+
+ Local_key nil;
+ nil.setNull();
+ memcpy(copytuple+2, &nil, sizeof(nil));
+
+ /**
+ * Link it to list
+ */
+ if (regFragPtr->m_lcp_keep_list_tail.isNull())
+ {
+ jam();
+ regFragPtr->m_lcp_keep_list_head = opPtrP->m_copy_tuple_location;
+ }
+ else
+ {
+ jam();
+ Uint32 * tail = get_copy_tuple_raw(®FragPtr->m_lcp_keep_list_tail);
+ Local_key nextptr;
+ memcpy(&nextptr, tail+2, sizeof(Local_key));
+ ndbassert(nextptr.isNull());
+ nextptr = opPtrP->m_copy_tuple_location;
+ memcpy(tail+2, &nextptr, sizeof(Local_key));
+ }
+ regFragPtr->m_lcp_keep_list_tail = opPtrP->m_copy_tuple_location;
+
+ /**
+ * And finally clear m_copy_tuple_location so that it won't be freed
+ */
+ opPtrP->m_copy_tuple_location.setNull();
+}
+
#if 0
static void dump_buf_hex(unsigned char *p, Uint32 bytes)
{
@@ -786,7 +848,7 @@ skip_disk:
ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
}
dealloc_tuple(signal, gci_hi, page.p, tuple_ptr,
- regOperPtr.p, regFragPtr.p, regTabPtr.p);
+ &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p);
}
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-07 06:16:13 +0000
@@ -635,6 +635,17 @@ void Dbtup::execTUPKEYREQ(Signal* signal
goto do_insert;
}
+ if (unlikely(isCopyTuple(pageid, pageidx)))
+ {
+ /**
+ * Only LCP reads a copy-tuple "directly"
+ */
+ ndbassert(Roptype == ZREAD);
+ ndbassert(disk_page == RNIL);
+ setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
+ goto do_read;
+ }
+
/**
* Get pointer to tuple
*/
@@ -652,6 +663,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal
if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr,
disk_page != RNIL))
{
+ do_read:
if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
{
req_struct.log_size= 0;
@@ -847,6 +859,44 @@ Dbtup::setup_fixed_part(KeyReqStruct* re
req_struct->attr_descr= tab_descr;
}
+void
+Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr)
+{
+ Local_key tmp;
+ tmp.m_page_no = req_struct->frag_page_id;
+ tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
+ clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
+
+ Uint32 * copytuple = get_copy_tuple_raw(&tmp);
+ Local_key rowid;
+ memcpy(&rowid, copytuple+0, sizeof(Local_key));
+
+ req_struct->frag_page_id = rowid.m_page_no;
+ regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
+
+ Tuple_header * th = get_copy_tuple(copytuple);
+ req_struct->m_page_ptr.setNull();
+ req_struct->m_tuple_ptr = (Tuple_header*)th;
+ th->m_operation_ptr_i = RNIL;
+ ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
+
+ Uint32 num_attr= regTabPtr->m_no_of_attributes;
+ Uint32 descr_start= regTabPtr->tabDescriptor;
+ TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+ ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+ req_struct->attr_descr= tab_descr;
+
+ bool disk = false;
+ if (regTabPtr->need_expand(disk))
+ {
+ jam();
+ prepare_read(req_struct, regTabPtr, disk);
+ }
+}
+
/* ---------------------------------------------------------------- */
/* ------------------------ CONFIRM REQUEST ----------------------- */
/* ---------------------------------------------------------------- */
@@ -1904,6 +1954,13 @@ int Dbtup::handleDeleteReq(Signal* signa
KeyReqStruct *req_struct,
bool disk)
{
+ Tuple_header* dst = alloc_copy_tuple(regTabPtr,
+ ®OperPtr->m_copy_tuple_location);
+ if (dst == 0) {
+ terrorCode = ZMEM_NOMEM_ERROR;
+ goto error;
+ }
+
// delete must set but not increment tupVersion
if (!regOperPtr->is_first_operation())
{
@@ -1911,24 +1968,25 @@ int Dbtup::handleDeleteReq(Signal* signa
regOperPtr->tupVersion= prevOp->tupVersion;
// make copy since previous op is committed before this one
const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
- Tuple_header* dst = alloc_copy_tuple(regTabPtr,
- ®OperPtr->m_copy_tuple_location);
- if (dst == 0) {
- terrorCode = ZMEM_NOMEM_ERROR;
- goto error;
- }
- Uint32 len = regTabPtr->total_rec_size -
- Uint32(((Uint32*)dst) -
+ Uint32 len = regTabPtr->total_rec_size -
+ Uint32(((Uint32*)dst) -
get_copy_tuple_raw(®OperPtr->m_copy_tuple_location));
memcpy(dst, org, 4 * len);
req_struct->m_tuple_ptr = dst;
- set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
}
- else
+ else
{
regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
+ if (regTabPtr->m_no_of_disk_attributes)
+ {
+ dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
+ memcpy(dst->get_disk_ref_ptr(regTabPtr),
+ req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
+ sizeof(Local_key));
+ }
}
req_struct->changeMask.set();
+ set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
if(disk && regOperPtr->m_undo_buffer_space == 0)
{
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2011-05-07 06:16:13 +0000
@@ -670,7 +670,6 @@ void Dbtup::initializeDefaultValuesFrag(
DefaultValuesFragment.p->fragStatus = Fragrecord::FS_ONLINE;
DefaultValuesFragment.p->m_undo_complete= false;
DefaultValuesFragment.p->m_lcp_scan_op = RNIL;
- DefaultValuesFragment.p->m_lcp_keep_list = RNIL;
DefaultValuesFragment.p->noOfPages = 0;
DefaultValuesFragment.p->noOfVarPages = 0;
DefaultValuesFragment.p->m_max_page_no = 0;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2011-04-18 15:36:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp 2011-05-07 06:16:13 +0000
@@ -703,7 +703,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signa
regFragPtr.p->m_tablespace_id= tablespace_id;
regFragPtr.p->m_undo_complete= false;
regFragPtr.p->m_lcp_scan_op = RNIL;
- regFragPtr.p->m_lcp_keep_list = RNIL;
+ regFragPtr.p->m_lcp_keep_list_head.setNull();
+ regFragPtr.p->m_lcp_keep_list_tail.setNull();
regFragPtr.p->noOfPages = 0;
regFragPtr.p->noOfVarPages = 0;
regFragPtr.p->m_max_page_no = 0;
@@ -1573,6 +1574,8 @@ Dbtup::computeTableMetaData(Tablerec *re
/* Room for changemask */
total_rec_size += 1 + ((regTabPtr->m_no_of_attributes + 31) >> 5);
+ total_rec_size += COPY_TUPLE_HEADER32;
+
regTabPtr->total_rec_size= total_rec_size;
setUpQueryRoutines(regTabPtr);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2011-05-07 06:16:13 +0000
@@ -287,9 +287,8 @@ Dbtup::execACC_CHECK_SCAN(Signal* signal
}
const bool lcp = (scan.m_bits & ScanOp::SCAN_LCP);
- Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
- if (lcp && lcp_list != RNIL)
+ if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
{
jam();
/**
@@ -297,7 +296,7 @@ Dbtup::execACC_CHECK_SCAN(Signal* signal
* So that scan state is not alterer
* if lcp_keep rows are found in ScanOp::First
*/
- handle_lcp_keep(signal, fragPtr.p, scanPtr.p, lcp_list);
+ handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
return;
}
@@ -692,19 +691,18 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
const bool mm = (bits & ScanOp::SCAN_DD);
const bool lcp = (bits & ScanOp::SCAN_LCP);
-
- Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
+
const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
table.m_offsets[mm].m_fix_header_size : 1;
const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
- if (lcp && lcp_list != RNIL)
+ if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
{
jam();
/**
* Handle lcp keep list here to, due to scanCont
*/
- handle_lcp_keep(signal, fragPtr.p, scanPtr.p, lcp_list);
+ handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
return false;
}
@@ -1130,57 +1128,40 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
void
Dbtup::handle_lcp_keep(Signal* signal,
Fragrecord* fragPtrP,
- ScanOp* scanPtrP,
- Uint32 lcp_list)
+ ScanOp* scanPtrP)
{
TablerecPtr tablePtr;
tablePtr.i = scanPtrP->m_tableId;
ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
- Local_key tmp;
- tmp.assref(lcp_list);
- tmp.m_page_no = getRealpid(fragPtrP, tmp.m_page_no);
-
- Ptr<Page> pagePtr;
- c_page_pool.getPtr(pagePtr, tmp.m_page_no);
- Tuple_header* ptr = (Tuple_header*)
- ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
- Uint32 headerbits = ptr->m_header_bits;
- ndbrequire(headerbits & Tuple_header::LCP_KEEP);
-
- Uint32 next = ptr->m_operation_ptr_i;
- ptr->m_operation_ptr_i = RNIL;
- ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
-
- if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
+ ndbassert(!fragPtrP->m_lcp_keep_list_head.isNull());
+ Local_key tmp = fragPtrP->m_lcp_keep_list_head;
+ Uint32 * copytuple = get_copy_tuple_raw(&tmp);
+ memcpy(&fragPtrP->m_lcp_keep_list_head,
+ copytuple+2,
+ sizeof(Local_key));
+
+ if (fragPtrP->m_lcp_keep_list_head.isNull())
+ {
jam();
- setChecksum(ptr, tablePtr.p);
+ ndbassert(tmp.m_page_no == fragPtrP->m_lcp_keep_list_tail.m_page_no);
+ ndbassert(tmp.m_page_idx == fragPtrP->m_lcp_keep_list_tail.m_page_idx);
+ fragPtrP->m_lcp_keep_list_tail.setNull();
}
+ Local_key save = tmp;
+ setCopyTuple(tmp.m_page_no, tmp.m_page_idx);
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scanPtrP->m_userPtr;
conf->accOperationPtr = (Uint32)-1;
conf->fragId = fragPtrP->fragmentId;
- conf->localKey[0] = Local_key::ref2page_id(lcp_list);
- conf->localKey[1] = Local_key::ref2page_idx(lcp_list);
+ conf->localKey[0] = tmp.m_page_no;
+ conf->localKey[1] = tmp.m_page_idx;
conf->gci = 0;
Uint32 blockNo = refToMain(scanPtrP->m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 6);
-
- fragPtrP->m_lcp_keep_list = next;
- ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
- if (headerbits & Tuple_header::FREED)
- {
- if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
- tablePtr.p->m_attributes[MM].m_no_of_dynamic)
- {
- jam();
- free_var_rec(fragPtrP, tablePtr.p, &tmp, pagePtr);
- } else {
- jam();
- free_fix_rec(fragPtrP, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
- }
- }
+
+ c_undo_buffer.free_copy_tuple(&save);
}
void
@@ -1320,4 +1301,7 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
new (scanPtr.p) ScanOp;
scanPtr.p->m_fragPtrI = fragPtr.i;
scanPtr.p->m_state = ScanOp::First;
+
+ ndbassert(frag.m_lcp_keep_list_head.isNull());
+ ndbassert(frag.m_lcp_keep_list_tail.isNull());
}
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2011-05-04 11:45:33 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2011-05-07 06:16:13 +0000
@@ -187,7 +187,7 @@ ErrorBundle ErrorCodes[] = {
{ 805, DMEC, TR, "Out of attrinfo records in tuple manager" },
{ 830, DMEC, TR, "Out of add fragment operation records" },
{ 873, DMEC, TR, "Out of attrinfo records for scan in tuple manager" },
- { 899, DMEC, TR, "Rowid already allocated" },
+ { 899, DMEC, IE, "Internal error: rowid already allocated" },
{ 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
{ 1218, DMEC, TR, "Send Buffers overloaded in NDB kernel" },
{ 1220, DMEC, TR, "REDO log files overloaded (increase FragmentLogFileSize)" },
=== modified file 'storage/ndb/test/ndbapi/testBasic.cpp'
--- a/storage/ndb/test/ndbapi/testBasic.cpp 2011-04-07 07:22:49 +0000
+++ b/storage/ndb/test/ndbapi/testBasic.cpp 2011-05-07 06:16:13 +0000
@@ -2267,6 +2267,154 @@ runBug59496_case2(NDBT_Context* ctx, NDB
return NDBT_OK;
}
+#define CHK_RET_FAILED(x) if (!(x)) { ndbout_c("Failed on line: %u", __LINE__); return NDBT_FAILED; }
+
+int
+runTest899(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb* pNdb = GETNDB(step);
+ const NdbDictionary::Table* pTab = ctx->getTab();
+
+ const int rows = ctx->getNumRecords();
+ const int loops = ctx->getNumLoops();
+ const int batch = ctx->getProperty("Batch", Uint32(50));
+ const int until_stopped = ctx->getProperty("UntilStopped");
+
+ const NdbRecord * pRowRecord = pTab->getDefaultRecord();
+ CHK_RET_FAILED(pRowRecord != 0);
+
+ const Uint32 len = NdbDictionary::getRecordRowLength(pRowRecord);
+ Uint8 * pRow = new Uint8[len];
+
+ int count_ok = 0;
+ int count_failed = 0;
+ int count_899 = 0;
+ for (int i = 0; i < loops || (until_stopped && !ctx->isTestStopped()); i++)
+ {
+ ndbout_c("loop: %d",i);
+ int result = 0;
+ for (int rowNo = 0; rowNo < rows;)
+ {
+ NdbTransaction* pTrans = pNdb->startTransaction();
+ CHK_RET_FAILED(pTrans != 0);
+
+ for (int b = 0; rowNo < rows && b < batch; rowNo++, b++)
+ {
+ bzero(pRow, len);
+
+ HugoCalculator calc(* pTab);
+
+ NdbOperation::OperationOptions opts;
+ bzero(&opts, sizeof(opts));
+
+ const NdbOperation* pOp = 0;
+ switch(i % 2){
+ case 0:
+ calc.setValues(pRow, pRowRecord, rowNo, rand());
+ pOp = pTrans->writeTuple(pRowRecord, (char*)pRow,
+ pRowRecord, (char*)pRow,
+ 0,
+ &opts,
+ sizeof(opts));
+ result = pTrans->execute(NoCommit);
+ break;
+ case 1:
+ calc.setValues(pRow, pRowRecord, rowNo, rand());
+ pOp = pTrans->deleteTuple(pRowRecord, (char*)pRow,
+ pRowRecord, (char*)pRow,
+ 0,
+ &opts,
+ sizeof(opts));
+ result = pTrans->execute(NoCommit, AO_IgnoreError);
+ break;
+ }
+
+ CHK_RET_FAILED(pOp != 0);
+
+ if (result != 0)
+ {
+ goto found_error;
+ }
+ }
+ result = pTrans->execute(Commit);
+
+ if (result != 0)
+ {
+ found_error:
+ count_failed++;
+ NdbError err = pTrans->getNdbError();
+ if (! (err.status == NdbError::TemporaryError ||
+ err.classification == NdbError::NoDataFound ||
+ err.classification == NdbError::ConstraintViolation))
+ {
+ ndbout << err << endl;
+ }
+ CHK_RET_FAILED(err.status == NdbError::TemporaryError ||
+ err.classification == NdbError::NoDataFound ||
+ err.classification == NdbError::ConstraintViolation);
+ if (err.code == 899)
+ {
+ count_899++;
+ ndbout << err << endl;
+ }
+ }
+ else
+ {
+ count_ok++;
+ }
+ pTrans->close();
+ }
+ }
+
+ ndbout_c("count_ok: %d count_failed: %d (899: %d)",
+ count_ok, count_failed, count_899);
+ delete [] pRow;
+
+ return count_899 == 0 ? NDBT_OK : NDBT_FAILED;
+}
+
+int
+runInit899(NDBT_Context* ctx, NDBT_Step* step)
+{
+ NdbRestarter restarter;
+ int val = DumpStateOrd::DihMinTimeBetweenLCP;
+ restarter.dumpStateAllNodes(&val, 1);
+
+ Ndb* pNdb = GETNDB(step);
+ const NdbDictionary::Table* pTab = ctx->getTab();
+ const NdbDictionary::Table * pTab2 = pNdb->getDictionary()->
+ getTable(pTab->getName());
+
+ int tableId = pTab2->getObjectId();
+ int val2[] = { DumpStateOrd::BackupErrorInsert, 10042, tableId };
+
+ for (int i = 0; i < restarter.getNumDbNodes(); i++)
+ {
+ if (i & 1)
+ {
+ int nodeId = restarter.getDbNodeId(i);
+ ndbout_c("Setting slow LCP of table %d on node %d",
+ tableId, nodeId);
+ restarter.dumpStateOneNode(nodeId, val2, 3);
+ }
+ }
+
+ return NDBT_OK;
+}
+
+int
+runEnd899(NDBT_Context* ctx, NDBT_Step* step)
+{
+ // reset LCP speed
+ NdbRestarter restarter;
+ int val[] = { DumpStateOrd::DihMinTimeBetweenLCP, 0 };
+ restarter.dumpStateAllNodes(val, 2);
+
+ restarter.insertErrorInAllNodes(0);
+ return NDBT_OK;
+}
+
+
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
@@ -2618,6 +2766,13 @@ TESTCASE("Bug59496_case2", "")
STEP(runBug59496_case2);
STEPS(runBug59496_scan, 10);
}
+TESTCASE("899", "")
+{
+ INITIALIZER(runLoadTable);
+ INITIALIZER(runInit899);
+ STEP(runTest899);
+ FINALIZER(runEnd899);
+}
NDBT_TESTSUITE_END(testBasic);
#if 0
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2011-05-03 06:21:59 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2011-05-07 06:16:13 +0000
@@ -32,6 +32,10 @@ max-time: 900
cmd: testIndex
args: -n NF_Mixed T1 T6 T13
+max-time: 900
+cmd: testBasic
+args: -r 5000 -n 899 T15 D1 D2
+
max-time: 600
cmd: atrt-testBackup
args: -n NFMaster T1
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20110507061613-2g5tm04hks7hqp8m.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (jonas:4357) Bug#56051 | jonas oreland | 7 May |