From: Frazer Clement Date: May 25 2011 2:11pm Subject: bzr commit into mysql-5.1-telco-7.1 branch (frazer.clement:4225) List-Archive: http://lists.mysql.com/commits/138082 Message-Id: <201105251411.p4PEBkBd015410@acsmt357.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///home/frazer/bzr/mysql-5.1-telco-7.1/ based on revid:jonas@stripped 4225 Frazer Clement 2011-05-25 [merge] Merge 7.0->7.1 modified: storage/ndb/include/kernel/kernel_types.h storage/ndb/include/ndb_version.h.in storage/ndb/include/ndbapi/NdbOperation.hpp storage/ndb/include/ndbapi/NdbTransaction.hpp storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp storage/ndb/src/ndbapi/NdbOperationExec.cpp storage/ndb/src/ndbapi/NdbTransaction.cpp storage/ndb/src/ndbapi/ndberror.c storage/ndb/test/include/HugoOperations.hpp storage/ndb/test/include/HugoTransactions.hpp storage/ndb/test/ndbapi/testBasic.cpp storage/ndb/test/ndbapi/testIndex.cpp storage/ndb/test/run-test/daily-devel-tests.txt storage/ndb/test/src/HugoOperations.cpp storage/ndb/test/src/HugoTransactions.cpp storage/ndb/test/tools/hugoPkUpdate.cpp === modified file 'storage/ndb/include/kernel/kernel_types.h' --- a/storage/ndb/include/kernel/kernel_types.h 2011-04-19 09:01:07 +0000 +++ b/storage/ndb/include/kernel/kernel_types.h 2011-05-25 13:19:02 +0000 @@ -36,9 +36,7 @@ enum Operation_t { ,ZDELETE = 3 ,ZWRITE = 4 ,ZREAD_EX = 5 -#if 0 - ,ZREAD_CONSISTENT = 6 -#endif + ,ZREFRESH = 6 ,ZUNLOCK = 7 }; === modified file 'storage/ndb/include/ndb_version.h.in' --- a/storage/ndb/include/ndb_version.h.in 2011-05-17 23:37:03 +0000 +++ b/storage/ndb/include/ndb_version.h.in 2011-05-25 14:11:22 +0000 @@ -652,4 +652,28 @@ ndb_tup_extrabits(Uint32 x) } } +#define NDBD_REFRESH_TUPLE_70 NDB_MAKE_VERSION(7,0,26) +#define NDBD_REFRESH_TUPLE_71 NDB_MAKE_VERSION(7,1,15) +#define NDBD_REFRESH_TUPLE_72 NDB_MAKE_VERSION(7,2,1) + +static +inline +int +ndb_refresh_tuple(Uint32 x) +{ + { + const Uint32 major = (x >> 16) & 0xFF; + const Uint32 minor = (x >> 8) & 0xFF; + + if (major == 7 && minor < 2) + { + if (minor == 0) + return x >= NDBD_REFRESH_TUPLE_70; + else if (minor == 1) + return x >= NDBD_REFRESH_TUPLE_71; + } + return x >= NDBD_REFRESH_TUPLE_72; + } +} + #endif === modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp' --- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2011-04-28 07:47:53 +0000 +++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2011-05-25 13:19:02 +0000 @@ -914,6 +914,7 @@ public: DeleteRequest = 3, ///< Delete Operation WriteRequest = 4, ///< Write Operation ReadExclusive = 5, ///< Read exclusive + RefreshRequest = 6, ///< UnlockRequest = 7, ///< Unlock operation OpenScanRequest, ///< Scan Operation OpenRangeScanRequest, ///< Range scan operation === modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp' --- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2011-04-27 10:48:16 +0000 +++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2011-05-25 13:19:02 +0000 @@ -752,6 +752,12 @@ public: const NdbOperation::OperationOptions *opts = 0, Uint32 sizeOfOptions = 0); +#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL + const NdbOperation *refreshTuple(const NdbRecord *key_rec, const char *key_row, + const NdbOperation::OperationOptions *opts = 0, + Uint32 sizeOfOptions = 0); +#endif + /** * Scan a table, using NdbRecord to read out column data. * === modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp' --- a/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2011-04-28 07:47:53 +0000 +++ b/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2011-05-25 13:19:02 +0000 @@ -36,6 +36,7 @@ printTCKEYREQ(FILE * output, const Uint3 sig->getOperationType(requestInfo) == ZDELETE ? "Delete" : sig->getOperationType(requestInfo) == ZWRITE ? "Write" : sig->getOperationType(requestInfo) == ZUNLOCK ? "Unlock" : + sig->getOperationType(requestInfo) == ZREFRESH ? "Refresh" : "Unknown"); { if(sig->getDirtyFlag(requestInfo)){ === modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp' --- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2011-04-19 09:01:07 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2011-05-25 13:19:02 +0000 @@ -140,7 +140,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " /** * Check kernel_types for other operation types */ -#define ZSCAN_OP 6 +#define ZSCAN_OP 8 #define ZSCAN_REC_SIZE 256 #define ZSTAND_BY 2 #define ZTABLESIZE 16 @@ -642,6 +642,7 @@ public: class Dblqh* c_lqh; void execACCMINUPDATE(Signal* signal); + void removerow(Uint32 op, const Local_key*); private: BLOCK_DEFINES(Dbacc); === modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2011-04-20 11:58:16 +0000 +++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2011-05-25 13:19:02 +0000 @@ -971,9 +971,12 @@ void Dbacc::initOpRec(Signal* signal) Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty Uint32 dirtyReadFlag = readFlag & dirtyFlag; + Uint32 operation = Treqinfo & 0xf; + if (operation == ZREFRESH) + operation = ZWRITE; /* Insert if !exist, otherwise lock */ Uint32 opbits = 0; - opbits |= Treqinfo & 0x7; + opbits |= operation; opbits |= ((Treqinfo >> 4) & 0x3) ? (Uint32) Operationrec::OP_LOCK_MODE : 0; opbits |= ((Treqinfo >> 4) & 0x3) ? (Uint32) Operationrec::OP_ACC_LOCK_MODE : 0; opbits |= (dirtyReadFlag) ? (Uint32) Operationrec::OP_DIRTY_READ : 0; @@ -2323,6 +2326,27 @@ void Dbacc::execACCMINUPDATE(Signal* sig ndbrequire(false); }//Dbacc::execACCMINUPDATE() +void +Dbacc::removerow(Uint32 opPtrI, const Local_key* key) +{ + jamEntry(); + operationRecPtr.i = opPtrI; + ptrCheckGuard(operationRecPtr, coprecsize, operationrec); + Uint32 opbits = operationRecPtr.p->m_op_bits; + fragrecptr.i = operationRecPtr.p->fragptr; + + /* Mark element disappeared */ + opbits |= Operationrec::OP_ELEMENT_DISAPPEARED; + opbits &= ~Uint32(Operationrec::OP_COMMIT_DELETE_CHECK); + operationRecPtr.p->m_op_bits = opbits; + +#ifdef VM_TRACE + ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec); + ndbrequire(operationRecPtr.p->localdata[0] == key->m_page_no); + ndbrequire(operationRecPtr.p->localdata[1] == key->m_page_idx); +#endif +}//Dbacc::execACCMINUPDATE() + /* ******************--------------------------------------------------------------- */ /* ACC_COMMITREQ COMMIT TRANSACTION */ /* SENDER: LQH, LEVEL B */ @@ -2371,6 +2395,16 @@ void Dbacc::execACC_COMMITREQ(Signal* si }//if } else { jam(); /* EXPAND PROCESS HANDLING */ + if (unlikely(opbits & Operationrec::OP_ELEMENT_DISAPPEARED)) + { + jam(); + /* Commit of refresh of non existing tuple. + * ZREFRESH->ZWRITE->ZINSERT + * Do not affect element count + */ + ndbrequire((opbits & Operationrec::OP_MASK) == ZINSERT); + return; + } fragrecptr.p->noOfElements++; fragrecptr.p->slack -= fragrecptr.p->elementLength; if (fragrecptr.p->slack >= (1u << 31)) { === modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp' --- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-04-28 07:47:53 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-05-25 13:19:02 +0000 @@ -3159,6 +3159,7 @@ public: bool is_same_trans(Uint32 opId, Uint32 trid1, Uint32 trid2); void get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo); void accminupdate(Signal*, Uint32 opPtrI, const Local_key*); + void accremoverow(Signal*, Uint32 opPtrI, const Local_key*); /** * @@ -3368,6 +3369,16 @@ Dblqh::accminupdate(Signal* signal, Uint } inline +void +Dblqh::accremoverow(Signal* signal, Uint32 opId, const Local_key* key) +{ + TcConnectionrecPtr regTcPtr; + regTcPtr.i= opId; + ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec); + c_acc->removerow(regTcPtr.p->accConnectrec, key); +} + +inline bool Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr) { === modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp' --- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-17 23:37:03 +0000 +++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-25 14:11:22 +0000 @@ -148,6 +148,7 @@ operator<<(NdbOut& out, Operation_t op) case ZDELETE: out << "DELETE"; break; case ZWRITE: out << "WRITE"; break; case ZUNLOCK: out << "UNLOCK"; break; + case ZREFRESH: out << "REFRESH"; break; } return out; } @@ -4533,6 +4534,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal regTcPtr->lockType = op == ZREAD_EX ? ZUPDATE : (Operation_t) op == ZWRITE ? ZINSERT : + (Operation_t) op == ZREFRESH ? ZINSERT : (Operation_t) op == ZUNLOCK ? ZREAD : // lockType not relevant for unlock req (Operation_t) op; } @@ -5072,6 +5074,7 @@ void Dblqh::prepareContinueAfterBlockedL case ZINSERT: TRACENR("INSERT"); break; case ZDELETE: TRACENR("DELETE"); break; case ZUNLOCK: TRACENR("UNLOCK"); break; + case ZREFRESH: TRACENR("REFRESH"); break; default: TRACENR("operation << ">"); break; } @@ -5121,7 +5124,6 @@ Dblqh::exec_acckeyreq(Signal* signal, Tc Uint32 taccreq; regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC; taccreq = regTcPtr.p->operation; - taccreq = taccreq + (regTcPtr.p->opSimple << 3); taccreq = taccreq + (regTcPtr.p->lockType << 4); taccreq = taccreq + (regTcPtr.p->dirtyOp << 6); taccreq = taccreq + (regTcPtr.p->replicaType << 7); @@ -5286,15 +5288,17 @@ Dblqh::handle_nr_copy(Signal* signal, Pt if (match) { jam(); - if (op != ZDELETE) + if (op != ZDELETE && op != ZREFRESH) { if (TRACENR_FLAG) - TRACENR(" Changing from to ZWRITE" << endl); + TRACENR(" Changing from INSERT/UPDATE to ZWRITE" << endl); regTcPtr.p->operation = ZWRITE; } goto run; } - + + ndbassert(!match && op == ZINSERT); + /** * 1) Delete row at specified rowid (if len > 0) * 2) Delete specified row at different rowid (if exists) @@ -6006,7 +6010,7 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign TRACE_OP(regTcPtr, "TUPKEYREQ"); - regTcPtr->m_use_rowid |= (op == ZINSERT); + regTcPtr->m_use_rowid |= (op == ZINSERT || op == ZREFRESH); regTcPtr->m_row_id.m_page_no = page_no; regTcPtr->m_row_id.m_page_idx = page_idx; === modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp' --- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-04-28 09:42:34 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-05-25 14:11:22 +0000 @@ -1694,6 +1694,7 @@ private: void checkNodeFailComplete(Signal* signal, Uint32 failedNodeId, Uint32 bit); void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc); + bool isRefreshSupported() const; // Initialisation void initData(); === modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-25 10:11:58 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-25 14:11:22 +0000 @@ -3061,6 +3061,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) case ZINSERT: case ZDELETE: case ZWRITE: + case ZREFRESH: jam(); break; default: @@ -3142,6 +3143,34 @@ handle_reorg_trigger(DiGetNodesConf * co } } +bool +Dbtc::isRefreshSupported() const +{ + const NodeVersionInfo& nvi = getNodeVersionInfo(); + const Uint32 minVer = nvi.m_type[NodeInfo::DB].m_min_version; + const Uint32 maxVer = nvi.m_type[NodeInfo::DB].m_max_version; + + if (likely (minVer == maxVer)) + { + /* Normal case, use function */ + return ndb_refresh_tuple(minVer); + } + + /* As refresh feature was introduced across three minor versions + * we check that all data nodes support it. This slow path + * should only be hit during upgrades between versions + */ + for (Uint32 i=1; i < MAX_NODES; i++) + { + const NodeInfo& nodeInfo = getNodeInfo(i); + if ((nodeInfo.m_type == NODE_TYPE_DB) && + (nodeInfo.m_connected) && + (! ndb_refresh_tuple(nodeInfo.m_version))) + return false; + } + return true; +} + /** * tckeyreq050Lab * This method is executed once all KeyInfo has been obtained for @@ -3368,6 +3397,14 @@ void Dbtc::tckeyreq050Lab(Signal* signal TlastReplicaNo = tnoOfBackup + tnoOfStandby; regTcPtr->lastReplicaNo = (Uint8)TlastReplicaNo; regTcPtr->noOfNodes = (Uint8)(TlastReplicaNo + 1); + + if (unlikely((Toperation == ZREFRESH) && + (! isRefreshSupported()))) + { + /* Function not implemented yet */ + TCKEY_abort(signal,63); + return; + } }//if if (regCachePtr->isLongTcKeyReq || === modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp' --- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-20 05:11:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-25 13:19:02 +0000 @@ -268,6 +268,7 @@ inline const Uint32* ALIGN_WORD(const vo #define ZMUST_BE_ABORTED_ERROR 898 #define ZTUPLE_DELETED_ERROR 626 #define ZINSERT_ERROR 630 +#define ZOP_AFTER_REFRESH_ERROR 920 #define ZINVALID_CHAR_FORMAT 744 #define ZROWID_ALLOCATED 899 @@ -843,6 +844,19 @@ struct Operationrec { * version even if in the same transaction. */ Uint16 tupVersion; + + /* + * When refreshing a row, there are four scenarios + * The actual scenario is encoded in the 'copy tuple location' + * to enable special handling at commit time + */ + enum RefreshScenario + { + RF_SINGLE_NOT_EXIST = 1, /* Refresh op first in trans, no row */ + RF_SINGLE_EXIST = 2, /* Refresh op first in trans, row exists */ + RF_MULTI_NOT_EXIST = 3, /* Refresh op !first in trans, row deleted */ + RF_MULTI_EXIST = 4 /* Refresh op !first in trans, row exists */ + }; }; typedef Ptr OperationrecPtr; @@ -2080,6 +2094,13 @@ private: KeyReqStruct* req_struct, bool disk); + int handleRefreshReq(Signal* signal, + Ptr, + Ptr, + Tablerec*, + KeyReqStruct*, + bool disk); + //------------------------------------------------------------------ //------------------------------------------------------------------ int updateStartLab(Signal* signal, @@ -3406,6 +3427,8 @@ private: const Dbtup::ScanOp& op); void commit_operation(Signal*, Uint32, Uint32, Tuple_header*, PagePtr, Operationrec*, Fragrecord*, Tablerec*); + void commit_refresh(Signal*, Uint32, Uint32, Tuple_header*, PagePtr, + KeyReqStruct*, Operationrec*, Fragrecord*, Tablerec*); int retrieve_data_page(Signal*, Page_cache_client::Request, OperationrecPtr); === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-17 23:29:55 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-25 13:19:02 +0000 @@ -849,7 +849,15 @@ skip_disk: tuple_ptr->m_operation_ptr_i = RNIL; - if(regOperPtr.p->op_struct.op_type != ZDELETE) + if (regOperPtr.p->op_struct.op_type == ZDELETE) + { + jam(); + if (get_page) + ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART); + dealloc_tuple(signal, gci_hi, gci_lo, page.p, tuple_ptr, + &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p); + } + else if(regOperPtr.p->op_struct.op_type != ZREFRESH) { jam(); commit_operation(signal, gci_hi, gci_lo, tuple_ptr, page, @@ -858,14 +866,10 @@ skip_disk: else { jam(); - if (get_page) - { - ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART); - } - dealloc_tuple(signal, gci_hi, gci_lo, page.p, tuple_ptr, - &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p); + commit_refresh(signal, gci_hi, gci_lo, tuple_ptr, page, + &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p); } - } + } if (nextOp != RNIL) { @@ -917,3 +921,48 @@ Dbtup::set_commit_change_mask_info(const } } } + +void +Dbtup::commit_refresh(Signal* signal, + Uint32 gci_hi, + Uint32 gci_lo, + Tuple_header* tuple_ptr, + PagePtr pagePtr, + KeyReqStruct * req_struct, + Operationrec* regOperPtr, + Fragrecord* regFragPtr, + Tablerec* regTabPtr) +{ + /* Committing a refresh operation. + * Refresh of an existing row looks like an update + * and can commit normally. + * Refresh of a non-existing row looks like an Insert which + * is 'undone' at commit time. + * This is achieved by making special calls to ACC to get + * it to forget, before deallocating the tuple locally. + */ + switch(regOperPtr->m_copy_tuple_location.m_file_no){ + case Operationrec::RF_SINGLE_NOT_EXIST: + case Operationrec::RF_MULTI_NOT_EXIST: + break; + case Operationrec::RF_SINGLE_EXIST: + case Operationrec::RF_MULTI_EXIST: + // "Normal" update + commit_operation(signal, gci_hi, gci_lo, tuple_ptr, pagePtr, + regOperPtr, regFragPtr, regTabPtr); + return; + + default: + ndbrequire(false); + } + + Local_key key = regOperPtr->m_tuple_location; + key.m_page_no = pagePtr.p->frag_page_id; + + /** + * Tell ACC to delete + */ + c_lqh->accremoverow(signal, regOperPtr->userpointer, &key); + dealloc_tuple(signal, gci_hi, gci_lo, pagePtr.p, tuple_ptr, + req_struct, regOperPtr, regFragPtr, regTabPtr); +} === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-20 05:11:01 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-25 13:19:02 +0000 @@ -214,7 +214,14 @@ Dbtup::insertActiveOpList(OperationrecPt prevOpPtr.p->op_struct.delete_insert_flag= true; regOperPtr.p->op_struct.delete_insert_flag= true; return true; - } else { + } + else if (op == ZREFRESH) + { + /* ZREFRESH after Delete - ok */ + return true; + } + else + { terrorCode= ZTUPLE_DELETED_ERROR; return false; } @@ -224,6 +231,12 @@ Dbtup::insertActiveOpList(OperationrecPt terrorCode= ZINSERT_ERROR; return false; } + else if (prevOp == ZREFRESH) + { + /* No operation after a ZREFRESH */ + terrorCode= ZOP_AFTER_REFRESH_ERROR; + return false; + } return true; } else @@ -283,21 +296,39 @@ Dbtup::setup_read(KeyReqStruct *req_stru dirty= false; } + /* found == true indicates that savepoint is some state + * within tuple's current transaction's uncommitted operations + */ bool found= find_savepoint(currOpPtr, savepointId); Uint32 currOp= currOpPtr.p->op_struct.op_type; + /* is_insert==true if tuple did not exist before its current + * transaction + */ bool is_insert = (bits & Tuple_header::ALLOC); + + /* If savepoint is in transaction, and post-delete-op + * OR + * Tuple didn't exist before + * AND + * Read is dirty + * OR + * Savepoint is before-transaction + * + * Tuple does not exist in read's view + */ if((found && currOp == ZDELETE) || ((dirty || !found) && is_insert)) { + /* Tuple not visible to this read operation */ terrorCode= ZTUPLE_DELETED_ERROR; break; } if(dirty || !found) { - + /* Read existing committed tuple */ } else { @@ -351,6 +382,17 @@ Dbtup::load_diskpage(Signal* signal, jam(); regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_load_diskpage_on_commit= 1; + if (unlikely((flags & 7) == ZREFRESH)) + { + jam(); + /* Refresh of previously nonexistant DD tuple. + * No diskpage to load at commit time + */ + regOperPtr->op_struct.m_wait_log_buffer= 0; + regOperPtr->op_struct.m_load_diskpage_on_commit= 0; + } + + /* In either case return 1 for 'proceed' */ return 1; } @@ -410,6 +452,7 @@ Dbtup::load_diskpage(Signal* signal, case ZUPDATE: case ZINSERT: case ZWRITE: + case ZREFRESH: regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_load_diskpage_on_commit= 1; } @@ -556,7 +599,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal Uint32 Rstoredid= tupKeyReq->storedProcedure; regOperPtr->fragmentPtr= Rfragptr; - regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf; + regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0x7; regOperPtr->op_struct.delete_insert_flag = false; regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3; @@ -635,10 +678,16 @@ void Dbtup::execTUPKEYREQ(Signal* signal if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx)) { - // No tuple allocatated yet + // No tuple allocated yet goto do_insert; } + if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx)) + { + // No tuple allocated yet + goto do_refresh; + } + if (unlikely(isCopyTuple(pageid, pageidx))) { /** @@ -832,6 +881,23 @@ void Dbtup::execTUPKEYREQ(Signal* signal sendTUPKEYCONF(signal, &req_struct, regOperPtr); return; } + else if (Roptype == ZREFRESH) + { + /** + * No TUX or immediate triggers, just detached triggers + */ + do_refresh: + if (unlikely(handleRefreshReq(signal, operPtr, + fragptr, regTabPtr, + &req_struct, disk_page != RNIL) == -1)) + { + return; + } + + sendTUPKEYCONF(signal, &req_struct, regOperPtr); + return; + + } else { ndbrequire(false); // Invalid op type @@ -2055,6 +2121,197 @@ error: return -1; } +int +Dbtup::handleRefreshReq(Signal* signal, + Ptr regOperPtr, + Ptr regFragPtr, + Tablerec* regTabPtr, + KeyReqStruct *req_struct, + bool disk) +{ + /* Here we setup the tuple so that a transition to its current + * state can be observed by SUMA's detached triggers. + * + * If the tuple does not exist then we fabricate a tuple + * so that it can appear to be 'deleted'. + * The fabricated tuple may have invalid NULL values etc. + * If the tuple does exist then we fabricate a null-change + * update to the tuple. + * + * The logic differs depending on whether there are already + * other operations on the tuple in this transaction. + * No other operations (including Refresh) are allowed after + * a refresh. + */ + Uint32 refresh_case; + if (regOperPtr.p->is_first_operation()) + { + jam(); + if (Local_key::isInvalid(req_struct->frag_page_id, + regOperPtr.p->m_tuple_location.m_page_idx)) + { + jam(); + refresh_case = Operationrec::RF_SINGLE_NOT_EXIST; + //ndbout_c("case 1"); + /** + * This is refresh of non-existing tuple... + * i.e "delete", reuse initial insert + */ + Local_key accminupdate; + Local_key * accminupdateptr = &accminupdate; + + /** + * We don't need ...in this scenario + * - disk + * - default values + */ + Uint32 save_disk = regTabPtr->m_no_of_disk_attributes; + Local_key save_defaults = regTabPtr->m_default_value_location; + Bitmask save_mask = + regTabPtr->notNullAttributeMask; + + regTabPtr->m_no_of_disk_attributes = 0; + regTabPtr->m_default_value_location.setNull(); + regOperPtr.p->op_struct.op_type = ZINSERT; + + /** + * Update notNullAttributeMask to only include primary keys + */ + regTabPtr->notNullAttributeMask.clear(); + const Uint32 * primarykeys = + (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr; + for (Uint32 i = 0; inoOfKeyAttr; i++) + regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16); + + int res = handleInsertReq(signal, regOperPtr, + regFragPtr, regTabPtr, req_struct, + &accminupdateptr); + + regTabPtr->m_no_of_disk_attributes = save_disk; + regTabPtr->m_default_value_location = save_defaults; + regTabPtr->notNullAttributeMask = save_mask; + + if (unlikely(res == -1)) + { + return -1; + } + + regOperPtr.p->op_struct.op_type = ZREFRESH; + + if (accminupdateptr) + { + /** + * Update ACC local-key, once *everything* has completed succesfully + */ + c_lqh->accminupdate(signal, + regOperPtr.p->userpointer, + accminupdateptr); + } + } + else + { + refresh_case = Operationrec::RF_SINGLE_EXIST; + //ndbout_c("case 2"); + jam(); + + Uint32 tup_version_save = req_struct->m_tuple_ptr->get_tuple_version(); + Uint32 new_tup_version = decr_tup_version(tup_version_save); + Tuple_header* origTuple = req_struct->m_tuple_ptr; + origTuple->set_tuple_version(new_tup_version); + int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p, + regTabPtr, req_struct, disk); + /* Now we must reset the original tuple header back + * to the original version. + * The copy tuple will have the correct version due to + * the update incrementing it. + * On commit, the tuple becomes the copy tuple. + * On abort, the original tuple remains. If we don't + * reset it here, then aborts cause the version to + * decrease + */ + origTuple->set_tuple_version(tup_version_save); + if (res == -1) + return -1; + } + } + else + { + /* Not first operation on tuple in transaction */ + jam(); + + Uint32 tup_version_save = req_struct->prevOpPtr.p->tupVersion; + Uint32 new_tup_version = decr_tup_version(tup_version_save); + req_struct->prevOpPtr.p->tupVersion = new_tup_version; + + int res; + if (req_struct->prevOpPtr.p->op_struct.op_type == ZDELETE) + { + refresh_case = Operationrec::RF_MULTI_NOT_EXIST; + //ndbout_c("case 3"); + + jam(); + /** + * We don't need ...in this scenario + * - default values + * + * We keep disk attributes to avoid issues with 'insert' + */ + Local_key save_defaults = regTabPtr->m_default_value_location; + Bitmask save_mask = + regTabPtr->notNullAttributeMask; + + regTabPtr->m_default_value_location.setNull(); + regOperPtr.p->op_struct.op_type = ZINSERT; + + /** + * Update notNullAttributeMask to only include primary keys + */ + regTabPtr->notNullAttributeMask.clear(); + const Uint32 * primarykeys = + (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr; + for (Uint32 i = 0; inoOfKeyAttr; i++) + regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16); + + /** + * This is multi-update + DELETE + REFRESH + */ + Local_key * accminupdateptr = 0; + res = handleInsertReq(signal, regOperPtr, + regFragPtr, regTabPtr, req_struct, + &accminupdateptr); + + regTabPtr->m_default_value_location = save_defaults; + regTabPtr->notNullAttributeMask = save_mask; + + if (unlikely(res == -1)) + { + return -1; + } + + regOperPtr.p->op_struct.op_type = ZREFRESH; + } + else + { + jam(); + refresh_case = Operationrec::RF_MULTI_EXIST; + //ndbout_c("case 4"); + /** + * This is multi-update + INSERT/UPDATE + REFRESH + */ + res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p, + regTabPtr, req_struct, disk); + } + req_struct->prevOpPtr.p->tupVersion = tup_version_save; + if (res == -1) + return -1; + } + + /* Store the refresh scenario in the copy tuple location */ + // TODO : Verify this is never used as a copy tuple location! + regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case; + return 0; +} + bool Dbtup::checkNullAttributes(KeyReqStruct * req_struct, Tablerec* regTabPtr) === modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp' --- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-04-28 09:42:34 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-05-25 14:11:22 +0000 @@ -863,6 +863,7 @@ void Dbtup::checkDetachedTriggers(KeyReq switch (save_type) { case ZUPDATE: case ZINSERT: + case ZREFRESH: req_struct->m_tuple_ptr =get_copy_tuple(®OperPtr->m_copy_tuple_location); break; } @@ -878,7 +879,10 @@ void Dbtup::checkDetachedTriggers(KeyReq return; goto end; } - regOperPtr->op_struct.op_type = ZINSERT; + else if (save_type != ZREFRESH) + { + regOperPtr->op_struct.op_type = ZINSERT; + } } else if (save_type == ZINSERT) { /** @@ -930,6 +934,29 @@ void Dbtup::checkDetachedTriggers(KeyReq regTablePtr->subscriptionUpdateTriggers, regOperPtr, disk); break; + case ZREFRESH: + jam(); + /* Depending on the Refresh scenario, fire Delete or Insert + * triggers to simulate the effect of arriving at the tuple's + * current state. + */ + switch(regOperPtr->m_copy_tuple_location.m_file_no){ + case Operationrec::RF_SINGLE_NOT_EXIST: + case Operationrec::RF_MULTI_NOT_EXIST: + fireDetachedTriggers(req_struct, + regTablePtr->subscriptionDeleteTriggers, + regOperPtr, disk); + break; + case Operationrec::RF_SINGLE_EXIST: + case Operationrec::RF_MULTI_EXIST: + fireDetachedTriggers(req_struct, + regTablePtr->subscriptionInsertTriggers, + regOperPtr, disk); + break; + default: + ndbrequire(false); + } + break; default: ndbrequire(false); break; @@ -1375,12 +1402,14 @@ out: switch(regOperPtr->op_struct.op_type) { case(ZINSERT): + is_insert: jam(); // Send AttrInfo signals with new attribute values trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES); sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref); break; case(ZDELETE): + is_delete: if (trigPtr->sendBeforeValues) { jam(); trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES); @@ -1397,6 +1426,23 @@ out: trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES); sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref); break; + case ZREFRESH: + jam(); + /* Reuse Insert/Delete trigger firing code as necessary */ + switch(regOperPtr->m_copy_tuple_location.m_file_no){ + case Operationrec::RF_SINGLE_NOT_EXIST: + jam(); + case Operationrec::RF_MULTI_NOT_EXIST: + jam(); + goto is_delete; + case Operationrec::RF_SINGLE_EXIST: + jam(); + case Operationrec::RF_MULTI_EXIST: + jam(); + goto is_insert; + default: + ndbrequire(false); + } default: ndbrequire(false); } @@ -1424,6 +1470,25 @@ out: jam(); fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE; break; + case ZREFRESH: + jam(); + switch(regOperPtr->m_copy_tuple_location.m_file_no){ + case Operationrec::RF_SINGLE_NOT_EXIST: + jam(); + case Operationrec::RF_MULTI_NOT_EXIST: + jam(); + fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE; + break; + case Operationrec::RF_SINGLE_EXIST: + jam(); + case Operationrec::RF_MULTI_EXIST: + jam(); + fireTrigOrd->m_triggerEvent = TriggerEvent::TE_INSERT; + break; + default: + ndbrequire(false); + } + break; default: ndbrequire(false); break; @@ -1615,7 +1680,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa // Delete without sending before values only read Primary Key //-------------------------------------------------------------------- return true; - } else { + } else if (regOperPtr->op_struct.op_type != ZREFRESH){ jam(); //-------------------------------------------------------------------- // All others send all attributes that are monitored, except: @@ -1632,6 +1697,27 @@ bool Dbtup::readTriggerInfo(TupTriggerDa numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes, &readBuffer[0]); } + else + { + jam(); + ndbassert(regOperPtr->op_struct.op_type == ZREFRESH); + /* Refresh specific before/after value hacks */ + switch(regOperPtr->m_copy_tuple_location.m_file_no){ + case Operationrec::RF_SINGLE_NOT_EXIST: + case Operationrec::RF_MULTI_NOT_EXIST: + return true; // generate ZDELETE...no before values + case Operationrec::RF_SINGLE_EXIST: + case Operationrec::RF_MULTI_EXIST: + // generate ZINSERT...all after values + numAttrsToRead = setAttrIds(trigPtr->attributeMask, + regTabPtr->m_no_of_attributes, + &readBuffer[0]); + break; + default: + ndbrequire(false); + } + } + ndbrequire(numAttrsToRead <= MAX_ATTRIBUTES_IN_TABLE); //-------------------------------------------------------------------- // Read Main tuple values @@ -1875,6 +1961,9 @@ Dbtup::executeTuxCommitTriggers(Signal* return; jam(); tupVersion= regOperPtr->tupVersion; + } else if (regOperPtr->op_struct.op_type == ZREFRESH) { + /* Refresh should not affect TUX */ + return; } else { ndbrequire(false); tupVersion= 0; // remove warning @@ -1907,6 +1996,10 @@ Dbtup::executeTuxAbortTriggers(Signal* s } else if (regOperPtr->op_struct.op_type == ZDELETE) { jam(); return; + } else if (regOperPtr->op_struct.op_type == ZREFRESH) { + jam(); + /* Refresh should not affect TUX */ + return; } else { ndbrequire(false); tupVersion= 0; // remove warning === modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp' --- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2011-05-11 13:31:44 +0000 +++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2011-05-25 13:19:02 +0000 @@ -1120,7 +1120,8 @@ NdbOperation::buildSignalsNdbRecord(Uint /* Final update signal words */ if ((tOpType == InsertRequest) || (tOpType == WriteRequest) || - (tOpType == UpdateRequest)) + (tOpType == UpdateRequest) || + (tOpType == RefreshRequest)) { updRow= m_attribute_row; NdbBlob *currentBlob= theBlobList; @@ -1333,7 +1334,8 @@ NdbOperation::buildSignalsNdbRecord(Uint if ((tOpType == InsertRequest) || (tOpType == WriteRequest) || - (tOpType == UpdateRequest)) + (tOpType == UpdateRequest) || + (tOpType == RefreshRequest)) { /* Handle setAnyValue() for all cases except delete */ if ((m_flags & OF_USE_ANY_VALUE) != 0) === modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp' --- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-04-27 10:48:16 +0000 +++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-05-25 13:19:02 +0000 @@ -2209,6 +2209,7 @@ NdbTransaction::receiveTCKEY_FAILCONF(co case NdbOperation::DeleteRequest: case NdbOperation::WriteRequest: case NdbOperation::UnlockRequest: + case NdbOperation::RefreshRequest: tOp = tOp->next(); break; case NdbOperation::ReadRequest: @@ -2713,6 +2714,52 @@ NdbTransaction::writeTuple(const NdbReco return op; } +const NdbOperation * +NdbTransaction::refreshTuple(const NdbRecord *key_rec, const char *key_row, + const NdbOperation::OperationOptions *opts, + Uint32 sizeOfOptions) +{ + /* Check TC node version lockless */ + { + Uint32 tcVer = theNdb->theImpl->getNodeInfo(theDBnode).m_info.m_version; + if (unlikely(! ndb_refresh_tuple(tcVer))) + { + /* Function not implemented yet */ + setOperationErrorCodeAbort(4003); + return NULL; + } + } + + /* Check that the NdbRecord specifies the full primary key. */ + if (!(key_rec->flags & NdbRecord::RecHasAllKeys)) + { + setOperationErrorCodeAbort(4292); + return NULL; + } + + Uint8 keymask[NDB_MAX_ATTRIBUTES_IN_TABLE/8]; + bzero(keymask, sizeof(keymask)); + for (Uint32 i = 0; ikey_index_length; i++) + { + Uint32 id = key_rec->columns[key_rec->key_indexes[i]].attrId; + keymask[(id / 8)] |= (1 << (id & 7)); + } + + NdbOperation *op= setupRecordOp(NdbOperation::RefreshRequest, + NdbOperation::LM_Exclusive, + NdbOperation::AbortOnError, + key_rec, key_row, + key_rec, key_row, + keymask /* mask */, + opts, + sizeOfOptions); + if(!op) + return op; + + theSimpleState= 0; + + return op; +} NdbScanOperation * NdbTransaction::scanTable(const NdbRecord *result_record, === modified file 'storage/ndb/src/ndbapi/ndberror.c' --- a/storage/ndb/src/ndbapi/ndberror.c 2011-05-07 06:17:02 +0000 +++ b/storage/ndb/src/ndbapi/ndberror.c 2011-05-25 13:19:02 +0000 @@ -750,6 +750,7 @@ ErrorBundle ErrorCodes[] = { { 2810, DMEC, TR, "No space left on the device" }, { 2811, DMEC, TR, "Error with file permissions, please check file system" }, { 2815, DMEC, TR, "Error in reading files, please check file system" }, + { 920, DMEC, AE, "Row operation defined after refreshTuple()" }, /** * NdbQueryBuilder API errors === modified file 'storage/ndb/test/include/HugoOperations.hpp' --- a/storage/ndb/test/include/HugoOperations.hpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/test/include/HugoOperations.hpp 2011-05-25 13:19:02 +0000 @@ -87,6 +87,11 @@ public: int recordNo, int numRecords = 1); + int pkRefreshRecord(Ndb*, + int recordNo, + int numRecords = 1, + int anyValueInfo = 0); /* 0 - none, 1+ Val | record */ + int execute_Commit(Ndb*, AbortOption ao = AbortOnError); int execute_NoCommit(Ndb*, === modified file 'storage/ndb/test/include/HugoTransactions.hpp' --- a/storage/ndb/test/include/HugoTransactions.hpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/test/include/HugoTransactions.hpp 2011-05-25 13:19:02 +0000 @@ -110,6 +110,9 @@ public: int batch = 1, bool allowConstraintViolation = true, int doSleep = 0); + + int pkRefreshRecords(Ndb*, int startFrom, int count = 1, int batch = 1); + int lockRecords(Ndb*, int records, int percentToLock = 1, === modified file 'storage/ndb/test/ndbapi/testBasic.cpp' --- a/storage/ndb/test/ndbapi/testBasic.cpp 2011-05-07 06:17:02 +0000 +++ b/storage/ndb/test/ndbapi/testBasic.cpp 2011-05-25 13:19:02 +0000 @@ -2415,6 +2415,811 @@ runEnd899(NDBT_Context* ctx, NDBT_Step* } +int initSubscription(NDBT_Context* ctx, NDBT_Step* step){ + /* Subscribe to events on the table, and put access + * to the subscription somewhere handy + */ + Ndb* pNdb = GETNDB(step); + const NdbDictionary::Table& tab = *ctx->getTab(); + bool merge_events = false; + bool report = false; + + char eventName[1024]; + sprintf(eventName,"%s_EVENT",tab.getName()); + + NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); + + if (!myDict) { + g_err << "Dictionary not found " + << pNdb->getNdbError().code << " " + << pNdb->getNdbError().message << endl; + return NDBT_FAILED; + } + + myDict->dropEvent(eventName); + + NdbDictionary::Event myEvent(eventName); + myEvent.setTable(tab.getName()); + myEvent.addTableEvent(NdbDictionary::Event::TE_ALL); + for(int a = 0; a < tab.getNoOfColumns(); a++){ + myEvent.addEventColumn(a); + } + myEvent.mergeEvents(merge_events); + + if (report) + myEvent.setReport(NdbDictionary::Event::ER_SUBSCRIBE); + + int res = myDict->createEvent(myEvent); // Add event to database + + if (res == 0) + myEvent.print(); + else if (myDict->getNdbError().classification == + NdbError::SchemaObjectExists) + { + g_info << "Event creation failed event exists\n"; + res = myDict->dropEvent(eventName); + if (res) { + g_err << "Failed to drop event: " + << myDict->getNdbError().code << " : " + << myDict->getNdbError().message << endl; + return NDBT_FAILED; + } + // try again + res = myDict->createEvent(myEvent); // Add event to database + if (res) { + g_err << "Failed to create event (1): " + << myDict->getNdbError().code << " : " + << myDict->getNdbError().message << endl; + return NDBT_FAILED; + } + } + else + { + g_err << "Failed to create event (2): " + << myDict->getNdbError().code << " : " + << myDict->getNdbError().message << endl; + return NDBT_FAILED; + } + + return NDBT_OK; +} + +int removeSubscription(NDBT_Context* ctx, NDBT_Step* step){ + /* Remove subscription created above */ + Ndb* pNdb = GETNDB(step); + const NdbDictionary::Table& tab = *ctx->getTab(); + + char eventName[1024]; + sprintf(eventName,"%s_EVENT",tab.getName()); + + NdbDictionary::Dictionary *myDict = pNdb->getDictionary(); + + if (!myDict) { + g_err << "Dictionary not found " + << pNdb->getNdbError().code << " " + << pNdb->getNdbError().message << endl; + return NDBT_FAILED; + } + + myDict->dropEvent(eventName); + + return NDBT_OK; +} + +int runVerifyRowCount(NDBT_Context* ctx, NDBT_Step* step) +{ + Ndb* ndb = GETNDB(step); + + /* Check that number of results returned by a normal scan + * and per-fragment rowcount sum are equal + */ + Uint32 rowCountSum = 0; + Uint32 rowScanCount = 0; + + int result = NDBT_OK; + do + { + NdbTransaction* trans = ndb->startTransaction(); + CHECK(trans != NULL); + + NdbScanOperation* scan = trans->getNdbScanOperation(ctx->getTab()); + CHECK(scan != NULL); + + CHECK(scan->readTuples(NdbScanOperation::LM_CommittedRead) == 0); + + NdbInterpretedCode code; + + CHECK(code.interpret_exit_last_row() == 0); + CHECK(code.finalise() == 0); + + NdbRecAttr* rowCountRA = scan->getValue(NdbDictionary::Column::ROW_COUNT); + CHECK(rowCountRA != NULL); + CHECK(scan->setInterpretedCode(&code) == 0); + + CHECK(trans->execute(NoCommit) == 0); + + while (scan->nextResult() == 0) + rowCountSum+= rowCountRA->u_32_value(); + + trans->close(); + + trans = ndb->startTransaction(); + CHECK(trans != NULL); + + scan = trans->getNdbScanOperation(ctx->getTab()); + CHECK(scan != NULL); + + CHECK(scan->readTuples(NdbScanOperation::LM_CommittedRead) == 0); + + rowCountRA = scan->getValue(NdbDictionary::Column::ROW_COUNT); + CHECK(rowCountRA != NULL); + + CHECK(trans->execute(NoCommit) == 0); + + while (scan->nextResult() == 0) + rowScanCount++; + + trans->close(); + } + while(0); + + if (result == NDBT_OK) + { + ndbout_c("Sum of fragment row counts : %u Number rows scanned : %u", + rowCountSum, + rowScanCount); + + if (rowCountSum != rowScanCount) + { + ndbout_c("MISMATCH"); + result = NDBT_FAILED; + } + } + + return result; +} + +enum ApiEventType { Insert, Update, Delete }; + +template class Vector; + +struct EventInfo +{ + ApiEventType type; + int id; + Uint64 gci; +}; +template class Vector; + +int collectEvents(Ndb* ndb, + HugoCalculator& calc, + const NdbDictionary::Table& tab, + Vector& receivedEvents, + int idCol, + int updateCol, + Vector* beforeAttrs, + Vector* afterAttrs) +{ + int MaxTimeouts = 5; + while (true) + { + int res = ndb->pollEvents(1000); + + if (res > 0) + { + NdbEventOperation* pOp; + while ((pOp = ndb->nextEvent())) + { + bool isDelete = (pOp->getEventType() == NdbDictionary::Event::TE_DELETE); + Vector* whichVersion = + isDelete? + beforeAttrs : + afterAttrs; + int id = (*whichVersion)[idCol]->u_32_value(); + Uint64 gci = pOp->getGCI(); + Uint32 anyValue = pOp->getAnyValue(); + Uint32 scenario = ((anyValue >> 24) & 0xff) -1; + Uint32 optype = ((anyValue >> 16) & 0xff); + Uint32 recNum = (anyValue & 0xffff); + + g_err << "# " << receivedEvents.size() + << " GCI : " << (gci >> 32) + << "/" + << (gci & 0xffffffff) + << " id : " + << id + << " scenario : " << scenario + << " optype : " << optype + << " record : " << recNum + << " "; + + /* Check event has self-consistent data */ + int updatesValue = (*whichVersion)[updateCol]->u_32_value(); + + if ((*whichVersion)[updateCol]->isNULL() || + (*whichVersion)[idCol]->isNULL()) + { + g_err << "Null update/id cols : REFRESH of !EXISTS "; + } + + g_err << "(Updates val = " << updatesValue << ")"; + + for (int i=0; i < (int) whichVersion->size(); i++) + { + /* Check PK columns and also other columns for non-delete */ + if (!isDelete || + tab.getColumn(i)->getPrimaryKey()) + { + NdbRecAttr* ra = (*whichVersion)[i]; + if (calc.verifyRecAttr(recNum, updatesValue, ra) != 0) + { + g_err << "Verify failed on recNum : " << recNum << " with updates value " + << updatesValue << " for column " << ra->getColumn()->getAttrId() + << endl; + return NDBT_FAILED; + } + } + } + + EventInfo ei; + + switch (pOp->getEventType()) + { + case NdbDictionary::Event::TE_INSERT: + g_err << " Insert event" << endl; + ei.type = Insert; + break; + case NdbDictionary::Event::TE_DELETE: + ei.type = Delete; + g_err << " Delete event" << endl; + break; + case NdbDictionary::Event::TE_UPDATE: + ei.type = Update; + g_err << " Update event" << endl; + break; + default: + g_err << " Event type : " << pOp->getEventType() << endl; + abort(); + break; + } + + ei.id = recNum; + ei.gci = gci; + + receivedEvents.push_back(ei); + } + } + else + { + if (--MaxTimeouts == 0) + { + break; + } + } + } + + return NDBT_OK; +} + +int verifyEvents(const Vector& receivedEvents, + const Vector& expectedEvents, + int records) +{ + /* Now verify received events against expected + * This is messy as events occurring in the same epoch are unordered + * except via id, so we use id-duplicates to determine which event + * sequence we're looking at. + */ + g_err << "Received total of " << receivedEvents.size() << " events" << endl; + Vector keys; + Vector gcis; + Uint32 z = 0; + Uint64 z2 = 0; + keys.fill(records, z); + gcis.fill(records, z2); + Uint64 currGci = 0; + + for (Uint32 e=0; e < receivedEvents.size(); e++) + { + EventInfo ei = receivedEvents[e]; + + if (ei.gci != currGci) + { + if (ei.gci < currGci) + abort(); + + /* Epoch boundary */ + /* At this point, all id counts must be equal */ + for (int i=0; i < records; i++) + { + if (keys[i] != keys[0]) + { + g_err << "Count for id " << i + << " is " << keys[i] + << " but should be " << keys[0] << endl; + return NDBT_OK; + } + } + + currGci = ei.gci; + } + + Uint32 eventIndex = keys[ei.id]; + keys[ei.id]++; + + ApiEventType et = expectedEvents[eventIndex]; + + if (ei.type != et) + { + g_err << "Expected event of type " << et + << " but found " << ei.type + << " at expectedEvent " << eventIndex + << " and event num " << e << endl; + return NDBT_FAILED; + } + } + + return NDBT_OK; +} + +int runRefreshTuple(NDBT_Context* ctx, NDBT_Step* step){ + int records = ctx->getNumRecords(); + Ndb* ndb = GETNDB(step); + + /* Now attempt to create EventOperation */ + NdbEventOperation* pOp; + const NdbDictionary::Table& tab = *ctx->getTab(); + + char eventName[1024]; + sprintf(eventName,"%s_EVENT",tab.getName()); + + pOp = ndb->createEventOperation(eventName); + if (pOp == NULL) + { + g_err << "Failed to create event operation\n"; + return NDBT_FAILED; + } + + HugoCalculator calc(tab); + Vector eventAfterRecAttr; + Vector eventBeforeRecAttr; + int updateCol = -1; + int idCol = -1; + + /* Now request all attributes */ + for (int a = 0; a < tab.getNoOfColumns(); a++) + { + eventAfterRecAttr.push_back(pOp->getValue(tab.getColumn(a)->getName())); + eventBeforeRecAttr.push_back(pOp->getPreValue(tab.getColumn(a)->getName())); + if (calc.isIdCol(a)) + idCol = a; + if (calc.isUpdateCol(a)) + updateCol = a; + } + + /* Now execute the event */ + if (pOp->execute()) + { + g_err << "Event operation execution failed : " << pOp->getNdbError() << endl; + return NDBT_FAILED; + } + + HugoOperations hugoOps(*ctx->getTab()); + int scenario = 0; + + Vector expectedEvents; + + for (scenario = 0; scenario < 2; scenario++) + { + g_err << "Scenario = " << scenario + << " ( Refresh " + << ((scenario == 0)? "before":"after") + << " operations )" << endl; + int optype = 0; + bool done = false; + int expectedError = 0; + do + { + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + + if (scenario == 0) + { + g_err << "Refresh before operations" << endl; + int anyValue = + ((1) << 8) | + optype; + check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps); + } + + switch(optype) + { + case 0: + { + /* Refresh with no data present */ + g_err << " Do nothing" << endl; + expectedError = 0; /* Single refresh should always be fine */ + expectedEvents.push_back(Delete); + break; + } + case 1: + { + /* [Refresh] Insert [Refresh] */ + g_err << " Insert" << endl; + check(hugoOps.pkInsertRecord(ndb, 0, records, 1) == 0, hugoOps); + if (scenario == 0) + { + /* Tuple already existed error when we insert after refresh */ + expectedError = 630; + expectedEvents.push_back(Delete); + } + else + { + expectedError = 0; + expectedEvents.push_back(Insert); + } + /* Tuple already existed error when we insert after refresh */ + break; + } + case 2: + { + /* Refresh */ + g_err << " Refresh" << endl; + if (scenario == 0) + { + expectedEvents.push_back(Delete); + } + else + { + expectedEvents.push_back(Insert); + } + expectedError = 0; + break; + } + case 3: + { + /* [Refresh] Update [Refresh] */ + g_err << " Update" << endl; + check(hugoOps.pkUpdateRecord(ndb, 0, records, 3) == 0, hugoOps); + if (scenario == 0) + { + expectedError = 920; + expectedEvents.push_back(Delete); + } + else + { + expectedError = 0; + expectedEvents.push_back(Insert); + } + break; + } + case 4: + { + /* [Refresh] Delete [Refresh] */ + g_err << " [Refresh] Delete [Refresh]" << endl; + if (scenario == 0) + { + expectedError = 920; + expectedEvents.push_back(Delete); + } + else + { + expectedError = 0; + expectedEvents.push_back(Delete); + } + check(hugoOps.pkDeleteRecord(ndb, 0, records) == 0, hugoOps); + break; + } + case 5: + { + g_err << " Refresh" << endl; + expectedError = 0; + expectedEvents.push_back(Delete); + /* Refresh with no data present */ + break; + } + case 6: + { + g_err << " Double refresh" << endl; + int anyValue = + ((2) << 8) | + optype; + check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps); + expectedError = 920; /* Row operation defined after refreshTuple() */ + expectedEvents.push_back(Delete); + } + default: + done = true; + break; + } + + if (scenario == 1) + { + g_err << "Refresh after operations" << endl; + int anyValue = + ((4) << 8) | + optype; + check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps); + } + + int rc = hugoOps.execute_Commit(ndb, AO_IgnoreError); + check(rc == expectedError, hugoOps); + + check(hugoOps.closeTransaction(ndb) == 0, hugoOps); + + optype++; + + + /* Now check fragment counts vs findable row counts */ + if (runVerifyRowCount(ctx, step) != NDBT_OK) + return NDBT_FAILED; + + } while (!done); + } // for scenario... + + /* Now check fragment counts vs findable row counts */ + if (runVerifyRowCount(ctx, step) != NDBT_OK) + return NDBT_FAILED; + + /* Now let's dump and check the events */ + g_err << "Expecting the following sequence..." << endl; + for (Uint32 i=0; i < expectedEvents.size(); i++) + { + g_err << i << ". "; + switch(expectedEvents[i]) + { + case Insert: + g_err << "Insert" << endl; + break; + case Update: + g_err << "Update" << endl; + break; + case Delete: + g_err << "Delete" << endl; + break; + default: + abort(); + } + } + + Vector receivedEvents; + + int rc = collectEvents(ndb, calc, tab, receivedEvents, idCol, updateCol, + &eventBeforeRecAttr, + &eventAfterRecAttr); + if (rc == NDBT_OK) + { + rc = verifyEvents(receivedEvents, + expectedEvents, + records); + } + + if (ndb->dropEventOperation(pOp) != 0) + { + g_err << "Drop Event Operation failed : " << ndb->getNdbError() << endl; + return NDBT_FAILED; + } + + return rc; +}; + +enum PreRefreshOps +{ + PR_NONE, + PR_INSERT, + PR_INSERTDELETE, + PR_DELETE +}; + +struct RefreshScenario +{ + const char* name; + bool preExist; + PreRefreshOps preRefreshOps; +}; + +static RefreshScenario refreshTests[] = { + { "No row, No pre-ops", false, PR_NONE }, + { "No row, Insert pre-op", false, PR_INSERT }, + { "No row, Insert-Del pre-op", false, PR_INSERTDELETE }, + { "Row exists, No pre-ops", true, PR_NONE }, + { "Row exists, Delete pre-op", true, PR_DELETE } +}; + +enum OpTypes +{ + READ_C, + READ_S, + READ_E, + INSERT, + UPDATE, + WRITE, + DELETE, + LAST +}; + +const char* opTypeNames[] = +{ + "READ_C", + "READ_S", + "READ_E", + "INSERT", + "UPDATE", + "WRITE", + "DELETE" +}; + + +int +runRefreshLocking(NDBT_Context* ctx, NDBT_Step* step) +{ + /* Check that refresh in various situations has the + * locks we expect it to + * Scenario combinations : + * Now row pre-existing | Row pre-existing + * Trans1 : Refresh | Insert-Refresh | Insert-Delete-Refresh + * Delete-Refresh + * Trans2 : Read [Committed|Shared|Exclusive] | Insert | Update + * Write | Delete + * + * Expectations : Read committed always non-blocking + * Read committed sees pre-existing row + * All other trans2 operations deadlock + */ + + Ndb* ndb = GETNDB(step); + Uint32 numScenarios = sizeof(refreshTests) / sizeof(refreshTests[0]); + HugoTransactions hugoTrans(*ctx->getTab()); + + for (Uint32 s = 0; s < numScenarios; s++) + { + RefreshScenario& scenario = refreshTests[s]; + + if (scenario.preExist) + { + /* Create pre-existing tuple */ + if (hugoTrans.loadTable(ndb, 1) != 0) + { + g_err << "Pre-exist failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + } + + if (hugoTrans.startTransaction(ndb) != 0) + { + g_err << "Start trans failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + + g_err << "Scenario : " << scenario.name << endl; + + /* Do pre-refresh ops */ + switch (scenario.preRefreshOps) + { + case PR_NONE: + break; + case PR_INSERT: + case PR_INSERTDELETE: + if (hugoTrans.pkInsertRecord(ndb, 0) != 0) + { + g_err << "Pre insert failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + + if (scenario.preRefreshOps == PR_INSERT) + break; + case PR_DELETE: + if (hugoTrans.pkDeleteRecord(ndb, 0) != 0) + { + g_err << "Pre delete failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + break; + } + + /* Then refresh */ + if (hugoTrans.pkRefreshRecord(ndb, 0) != 0) + { + g_err << "Refresh failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + + /* Now execute */ + if (hugoTrans.execute_NoCommit(ndb) != 0) + { + g_err << "Execute failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + + { + /* Now try ops from another transaction */ + HugoOperations hugoOps(*ctx->getTab()); + Uint32 ot = READ_C; + + while (ot < LAST) + { + if (hugoOps.startTransaction(ndb) != 0) + { + g_err << "Start trans2 failed : " << hugoOps.getNdbError() << endl; + return NDBT_FAILED; + } + + g_err << "Operation type : " << opTypeNames[ot] << endl; + int res = 0; + switch (ot) + { + case READ_C: + res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_CommittedRead); + break; + case READ_S: + res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_Read); + break; + case READ_E: + res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_Exclusive); + break; + case INSERT: + res = hugoOps.pkInsertRecord(ndb, 0); + break; + case UPDATE: + res = hugoOps.pkUpdateRecord(ndb, 0); + break; + case WRITE: + res = hugoOps.pkWriteRecord(ndb, 0); + break; + case DELETE: + res = hugoOps.pkDeleteRecord(ndb, 0); + break; + case LAST: + abort(); + } + + hugoOps.execute_Commit(ndb); + + if ((ot == READ_C) && (scenario.preExist)) + { + if (hugoOps.getNdbError().code == 0) + { + g_err << "Read committed succeeded" << endl; + } + else + { + g_err << "UNEXPECTED : Read committed failed. " << hugoOps.getNdbError() << endl; + return NDBT_FAILED; + } + } + else + { + if (hugoOps.getNdbError().code == 0) + { + g_err << opTypeNames[ot] << " succeeded, should not have" << endl; + return NDBT_FAILED; + } + } + + hugoOps.closeTransaction(ndb); + + ot = ot + 1; + } + + } + + /* Close refresh transaction */ + hugoTrans.closeTransaction(ndb); + + if (scenario.preExist) + { + /* Cleanup pre-existing before next iteration */ + if (hugoTrans.pkDelRecords(ndb, 0) != 0) + { + g_err << "Delete pre existing failed : " << hugoTrans.getNdbError() << endl; + return NDBT_FAILED; + } + } + } + + return NDBT_OK; +} + + NDBT_TESTSUITE(testBasic); TESTCASE("PkInsert", "Verify that we can insert and delete from this table using PK" @@ -2746,6 +3551,12 @@ TESTCASE("UnlockUpdateBatch", STEP(runPkRead); FINALIZER(runClearTable); } +TESTCASE("RefreshTuple", + "Test refreshTuple() operation properties"){ + INITIALIZER(initSubscription); + INITIALIZER(runRefreshTuple); + FINALIZER(removeSubscription); +} TESTCASE("Bug54986", "") { INITIALIZER(runBug54986); @@ -2773,6 +3584,11 @@ TESTCASE("899", "") STEP(runTest899); FINALIZER(runEnd899); } +TESTCASE("RefreshLocking", + "Test Refresh locking properties") +{ + INITIALIZER(runRefreshLocking); +} NDBT_TESTSUITE_END(testBasic); #if 0 === modified file 'storage/ndb/test/ndbapi/testIndex.cpp' --- a/storage/ndb/test/ndbapi/testIndex.cpp 2011-04-28 07:47:53 +0000 +++ b/storage/ndb/test/ndbapi/testIndex.cpp 2011-05-25 13:19:02 +0000 @@ -1744,6 +1744,73 @@ runMixed2(NDBT_Context* ctx, NDBT_Step* return NDBT_FAILED; } +#define check(b, e) \ + if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; } + +int runRefreshTupleAbort(NDBT_Context* ctx, NDBT_Step* step){ + int records = ctx->getNumRecords(); + int loops = ctx->getNumLoops(); + + Ndb* ndb = GETNDB(step); + + const NdbDictionary::Table& tab = *ctx->getTab(); + + for (int i=0; i < tab.getNoOfColumns(); i++) + { + if (tab.getColumn(i)->getStorageType() == NDB_STORAGETYPE_DISK) + { + g_err << "Table has disk column(s) skipping." << endl; + return NDBT_OK; + } + } + + + g_err << "Loading table." << endl; + HugoTransactions hugoTrans(*ctx->getTab()); + check(hugoTrans.loadTable(ndb, records) == 0, hugoTrans); + + HugoOperations hugoOps(*ctx->getTab()); + + /* Check refresh, abort sequence with an ordered index + * Previously this gave bugs due to corruption of the + * tuple version + */ + while (loops--) + { + Uint32 numRefresh = 2 + rand() % 10; + + g_err << "Refresh, rollback * " << numRefresh << endl; + + while (--numRefresh) + { + /* Refresh, rollback */ + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + check(hugoOps.pkRefreshRecord(ndb, 0, records, 0) == 0, hugoOps); + check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps); + check(hugoOps.execute_Rollback(ndb) == 0, hugoOps); + check(hugoOps.closeTransaction(ndb) == 0, hugoOps); + } + + g_err << "Refresh, commit" << endl; + /* Refresh, commit */ + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + check(hugoOps.pkRefreshRecord(ndb, 0, records, 0) == 0, hugoOps); + check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps); + check(hugoOps.execute_Commit(ndb) == 0, hugoOps); + check(hugoOps.closeTransaction(ndb) == 0, hugoOps); + + g_err << "Update, commit" << endl; + /* Update */ + check(hugoOps.startTransaction(ndb) == 0, hugoOps); + check(hugoOps.pkUpdateRecord(ndb, 0, records, 2 + loops) == 0, hugoOps); + check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps); + check(hugoOps.execute_Commit(ndb) == 0, hugoOps); + check(hugoOps.closeTransaction(ndb) == 0, hugoOps); + } + + return NDBT_OK; +} + int runBuildDuring(NDBT_Context* ctx, NDBT_Step* step){ @@ -3619,6 +3686,16 @@ TESTCASE("Bug60851", "") INITIALIZER(runBug60851); FINALIZER(createPkIndex_Drop); } +TESTCASE("RefreshWithOrderedIndex", + "Refresh tuples with ordered index(es)") +{ + TC_PROPERTY("OrderedIndex", 1); + TC_PROPERTY("LoggedIndexes", Uint32(0)); + INITIALIZER(createPkIndex); + INITIALIZER(runRefreshTupleAbort); + FINALIZER(createPkIndex_Drop); + FINALIZER(runClearTable); +} NDBT_TESTSUITE_END(testIndex); int main(int argc, const char** argv){ === modified file 'storage/ndb/test/run-test/daily-devel-tests.txt' --- a/storage/ndb/test/run-test/daily-devel-tests.txt 2011-04-08 11:06:53 +0000 +++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2011-05-25 13:19:02 +0000 @@ -129,3 +129,16 @@ max-time: 1800 cmd: testDict args: -n SchemaTrans -l 1 +# Refresh tuple +max-time: 300 +cmd: testBasic +args: -n RefreshTuple T6 D1 + +max-time: 300 +cmd: testIndex +args: -n RefreshWithOrderedIndex T2 D2 + +max-time: 300 +cmd: testBasic +args: -n RefreshLocking D1 + === modified file 'storage/ndb/test/src/HugoOperations.cpp' --- a/storage/ndb/test/src/HugoOperations.cpp 2011-04-07 07:22:49 +0000 +++ b/storage/ndb/test/src/HugoOperations.cpp 2011-05-25 13:19:02 +0000 @@ -567,6 +567,47 @@ int HugoOperations::pkDeleteRecord(Ndb* return NDBT_OK; } +int HugoOperations::pkRefreshRecord(Ndb* pNdb, + int recordNo, + int numRecords, + int anyValueInfo){ + + char buffer[NDB_MAX_TUPLE_SIZE]; + const NdbDictionary::Table * pTab = + pNdb->getDictionary()->getTable(tab.getName()); + + if (pTab == 0) + { + return NDBT_FAILED; + } + + const NdbRecord * record = pTab->getDefaultRecord(); + NdbOperation::OperationOptions opts; + opts.optionsPresent = NdbOperation::OperationOptions::OO_ANYVALUE; + for(int r=0; r < numRecords; r++) + { + bzero(buffer, sizeof(buffer)); + if (calc.equalForRow((Uint8*)buffer, record, r + recordNo)) + { + return NDBT_FAILED; + } + + opts.anyValue = anyValueInfo? + (anyValueInfo << 16) | (r+recordNo) : + 0; + + const NdbOperation* pOp = pTrans->refreshTuple(record, buffer, + &opts, sizeof(opts)); + if (pOp == NULL) + { + ERR(pTrans->getNdbError()); + setNdbError(pTrans->getNdbError()); + return NDBT_FAILED; + } + } + return NDBT_OK; +} + int HugoOperations::execute_Commit(Ndb* pNdb, AbortOption eao){ === modified file 'storage/ndb/test/src/HugoTransactions.cpp' --- a/storage/ndb/test/src/HugoTransactions.cpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/test/src/HugoTransactions.cpp 2011-05-25 13:19:02 +0000 @@ -1502,6 +1502,79 @@ HugoTransactions::pkDelRecords(Ndb* pNdb } int +HugoTransactions::pkRefreshRecords(Ndb* pNdb, + int startFrom, + int count, + int batch) +{ + int r = 0; + int retryAttempt = 0; + + g_info << "|- Refreshing records..." << startFrom << "-" << (startFrom+count) + << " (batch=" << batch << ")" << endl; + + while (r < count) + { + if(r + batch > count) + batch = count - r; + + if (retryAttempt >= m_retryMax) + { + g_info << "ERROR: has retried this operation " << retryAttempt + << " times, failing!" << endl; + return NDBT_FAILED; + } + + pTrans = pNdb->startTransaction(); + if (pTrans == NULL) + { + const NdbError err = pNdb->getNdbError(); + + if (err.status == NdbError::TemporaryError){ + ERR(err); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + } + ERR(err); + return NDBT_FAILED; + } + + if (pkRefreshRecord(pNdb, r, batch) != NDBT_OK) + { + ERR(pTrans->getNdbError()); + closeTransaction(pNdb); + return NDBT_FAILED; + } + + if (pTrans->execute(Commit, AbortOnError) == -1) + { + const NdbError err = pTrans->getNdbError(); + + switch(err.status){ + case NdbError::TemporaryError: + ERR(err); + closeTransaction(pNdb); + NdbSleep_MilliSleep(50); + retryAttempt++; + continue; + break; + + default: + ERR(err); + closeTransaction(pNdb); + return NDBT_FAILED; + } + } + + closeTransaction(pNdb); + r += batch; // Read next record + } + + return NDBT_OK; +} + +int HugoTransactions::pkReadUnlockRecords(Ndb* pNdb, int records, int batch, === modified file 'storage/ndb/test/tools/hugoPkUpdate.cpp' --- a/storage/ndb/test/tools/hugoPkUpdate.cpp 2011-02-02 00:40:07 +0000 +++ b/storage/ndb/test/tools/hugoPkUpdate.cpp 2011-05-25 13:19:02 +0000 @@ -43,6 +43,8 @@ struct ThrOutput { NDBT_Stats latency; }; +static int _refresh = 0; + int main(int argc, const char** argv){ ndb_init(); @@ -63,7 +65,9 @@ int main(int argc, const char** argv){ // { "batch", 'b', arg_integer, &_batch, "batch value", "batch" }, { "records", 'r', arg_integer, &_records, "Number of records", "records" }, { "usage", '?', arg_flag, &_help, "Print help", "" }, - { "database", 'd', arg_string, &db, "Database", "" } + { "database", 'd', arg_string, &db, "Database", "" }, + { "refresh", 0, arg_flag, &_refresh, "refresh record rather than update them", "" } + }; int num_args = sizeof(args) / sizeof(args[0]); int optind = 0; @@ -135,7 +139,10 @@ int main(int argc, const char** argv){ ths.stop(); if (ths.get_err()) + { + ths.disconnect(); NDBT_ProgramExit(NDBT_FAILED); + } if (_stats) { NDBT_Stats latency; @@ -160,6 +167,8 @@ int main(int argc, const char** argv){ i++; } + ths.disconnect(); + return NDBT_ProgramExit(NDBT_OK); } @@ -177,9 +186,19 @@ static void hugoPkUpdate(NDBT_Thread& th hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no()); int ret; - ret = hugoTrans.pkUpdateRecords(thr.get_ndb(), - input->records, - input->batch); + if (_refresh == 0) + { + ret = hugoTrans.pkUpdateRecords(thr.get_ndb(), + input->records, + input->batch); + } + else + { + ret = hugoTrans.pkRefreshRecords(thr.get_ndb(), + 0, + input->records, + input->batch); + } if (ret != 0) thr.set_err(ret); } No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).