#At file:///home/frazer/bzr/mysql-5.1-telco-7.0/ based on revid:jonas@stripped
4418 Frazer Clement 2011-05-25
WL5353 Primary Cluster Conflict Resolution
Implementation of refreshTuple() mechanism.
A new operation type, refreshTuple() is defined.
When executed, it causes an NdbApi data change event to be
generated for the affected row.
If the row does not exist, a DELETE event, with the primary key,
is generated.
If the row does exist, an INSERT event, with the primary key and
all values is generated.
A refreshTuple() operation must be the last operation on a particular
tuple in a transaction. An error will be returned if any other operation
(including refreshTuple) is defined on a refreshed row in a transaction.
refreshTuple() does not currently support tables with BLOB columns.
modified:
storage/ndb/include/kernel/kernel_types.h
storage/ndb/include/ndb_version.h.in
storage/ndb/include/ndbapi/NdbOperation.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
storage/ndb/src/ndbapi/NdbOperationExec.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/src/ndbapi/ndberror.c
storage/ndb/test/include/HugoOperations.hpp
storage/ndb/test/include/HugoTransactions.hpp
storage/ndb/test/ndbapi/testBasic.cpp
storage/ndb/test/ndbapi/testIndex.cpp
storage/ndb/test/run-test/daily-devel-tests.txt
storage/ndb/test/src/HugoOperations.cpp
storage/ndb/test/src/HugoTransactions.cpp
storage/ndb/test/tools/hugoPkUpdate.cpp
=== modified file 'storage/ndb/include/kernel/kernel_types.h'
--- a/storage/ndb/include/kernel/kernel_types.h 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/include/kernel/kernel_types.h 2011-05-25 13:19:02 +0000
@@ -36,9 +36,7 @@ enum Operation_t {
,ZDELETE = 3
,ZWRITE = 4
,ZREAD_EX = 5
-#if 0
- ,ZREAD_CONSISTENT = 6
-#endif
+ ,ZREFRESH = 6
,ZUNLOCK = 7
};
=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/include/ndb_version.h.in 2011-05-25 13:19:02 +0000
@@ -652,4 +652,28 @@ ndb_tup_extrabits(Uint32 x)
}
}
+#define NDBD_REFRESH_TUPLE_70 NDB_MAKE_VERSION(7,0,26)
+#define NDBD_REFRESH_TUPLE_71 NDB_MAKE_VERSION(7,1,15)
+#define NDBD_REFRESH_TUPLE_72 NDB_MAKE_VERSION(7,2,1)
+
+static
+inline
+int
+ndb_refresh_tuple(Uint32 x)
+{
+ {
+ const Uint32 major = (x >> 16) & 0xFF;
+ const Uint32 minor = (x >> 8) & 0xFF;
+
+ if (major == 7 && minor < 2)
+ {
+ if (minor == 0)
+ return x >= NDBD_REFRESH_TUPLE_70;
+ else if (minor == 1)
+ return x >= NDBD_REFRESH_TUPLE_71;
+ }
+ return x >= NDBD_REFRESH_TUPLE_72;
+ }
+}
+
#endif
=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp 2011-05-25 13:19:02 +0000
@@ -914,6 +914,7 @@ public:
DeleteRequest = 3, ///< Delete Operation
WriteRequest = 4, ///< Write Operation
ReadExclusive = 5, ///< Read exclusive
+ RefreshRequest = 6, ///<
UnlockRequest = 7, ///< Unlock operation
OpenScanRequest, ///< Scan Operation
OpenRangeScanRequest, ///< Range scan operation
=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2011-04-27 10:48:16 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2011-05-25 13:19:02 +0000
@@ -752,6 +752,12 @@ public:
const NdbOperation::OperationOptions *opts = 0,
Uint32 sizeOfOptions = 0);
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+ const NdbOperation *refreshTuple(const NdbRecord *key_rec, const char *key_row,
+ const NdbOperation::OperationOptions *opts = 0,
+ Uint32 sizeOfOptions = 0);
+#endif
+
/**
* Scan a table, using NdbRecord to read out column data.
*
=== modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp 2011-05-25 13:19:02 +0000
@@ -36,6 +36,7 @@ printTCKEYREQ(FILE * output, const Uint3
sig->getOperationType(requestInfo) == ZDELETE ? "Delete" :
sig->getOperationType(requestInfo) == ZWRITE ? "Write" :
sig->getOperationType(requestInfo) == ZUNLOCK ? "Unlock" :
+ sig->getOperationType(requestInfo) == ZREFRESH ? "Refresh" :
"Unknown");
{
if(sig->getDirtyFlag(requestInfo)){
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2011-05-25 13:19:02 +0000
@@ -140,7 +140,7 @@ ndbout << "Ptr: " << ptr.p->word32 << "
/**
* Check kernel_types for other operation types
*/
-#define ZSCAN_OP 6
+#define ZSCAN_OP 8
#define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2
#define ZTABLESIZE 16
@@ -642,6 +642,7 @@ public:
class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal);
+ void removerow(Uint32 op, const Local_key*);
private:
BLOCK_DEFINES(Dbacc);
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2011-04-20 11:58:16 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2011-05-25 13:19:02 +0000
@@ -971,9 +971,12 @@ void Dbacc::initOpRec(Signal* signal)
Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read
Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty
Uint32 dirtyReadFlag = readFlag & dirtyFlag;
+ Uint32 operation = Treqinfo & 0xf;
+ if (operation == ZREFRESH)
+ operation = ZWRITE; /* Insert if !exist, otherwise lock */
Uint32 opbits = 0;
- opbits |= Treqinfo & 0x7;
+ opbits |= operation;
opbits |= ((Treqinfo >> 4) & 0x3) ? (Uint32) Operationrec::OP_LOCK_MODE : 0;
opbits |= ((Treqinfo >> 4) & 0x3) ? (Uint32) Operationrec::OP_ACC_LOCK_MODE : 0;
opbits |= (dirtyReadFlag) ? (Uint32) Operationrec::OP_DIRTY_READ : 0;
@@ -2323,6 +2326,27 @@ void Dbacc::execACCMINUPDATE(Signal* sig
ndbrequire(false);
}//Dbacc::execACCMINUPDATE()
+void
+Dbacc::removerow(Uint32 opPtrI, const Local_key* key)
+{
+ jamEntry();
+ operationRecPtr.i = opPtrI;
+ ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ fragrecptr.i = operationRecPtr.p->fragptr;
+
+ /* Mark element disappeared */
+ opbits |= Operationrec::OP_ELEMENT_DISAPPEARED;
+ opbits &= ~Uint32(Operationrec::OP_COMMIT_DELETE_CHECK);
+ operationRecPtr.p->m_op_bits = opbits;
+
+#ifdef VM_TRACE
+ ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ ndbrequire(operationRecPtr.p->localdata[0] == key->m_page_no);
+ ndbrequire(operationRecPtr.p->localdata[1] == key->m_page_idx);
+#endif
+}//Dbacc::execACCMINUPDATE()
+
/* ******************--------------------------------------------------------------- */
/* ACC_COMMITREQ COMMIT TRANSACTION */
/* SENDER: LQH, LEVEL B */
@@ -2371,6 +2395,16 @@ void Dbacc::execACC_COMMITREQ(Signal* si
}//if
} else {
jam(); /* EXPAND PROCESS HANDLING */
+ if (unlikely(opbits & Operationrec::OP_ELEMENT_DISAPPEARED))
+ {
+ jam();
+ /* Commit of refresh of non existing tuple.
+ * ZREFRESH->ZWRITE->ZINSERT
+ * Do not affect element count
+ */
+ ndbrequire((opbits & Operationrec::OP_MASK) == ZINSERT);
+ return;
+ }
fragrecptr.p->noOfElements++;
fragrecptr.p->slack -= fragrecptr.p->elementLength;
if (fragrecptr.p->slack >= (1u << 31)) {
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2011-05-25 13:19:02 +0000
@@ -3159,6 +3159,7 @@ public:
bool is_same_trans(Uint32 opId, Uint32 trid1, Uint32 trid2);
void get_op_info(Uint32 opId, Uint32 *hash, Uint32* gci_hi, Uint32* gci_lo);
void accminupdate(Signal*, Uint32 opPtrI, const Local_key*);
+ void accremoverow(Signal*, Uint32 opPtrI, const Local_key*);
/**
*
@@ -3368,6 +3369,16 @@ Dblqh::accminupdate(Signal* signal, Uint
}
inline
+void
+Dblqh::accremoverow(Signal* signal, Uint32 opId, const Local_key* key)
+{
+ TcConnectionrecPtr regTcPtr;
+ regTcPtr.i= opId;
+ ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+ c_acc->removerow(regTcPtr.p->accConnectrec, key);
+}
+
+inline
bool
Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
{
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2011-05-25 13:19:02 +0000
@@ -148,6 +148,7 @@ operator<<(NdbOut& out, Operation_t op)
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
case ZUNLOCK: out << "UNLOCK"; break;
+ case ZREFRESH: out << "REFRESH"; break;
}
return out;
}
@@ -4533,6 +4534,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
regTcPtr->lockType =
op == ZREAD_EX ? ZUPDATE :
(Operation_t) op == ZWRITE ? ZINSERT :
+ (Operation_t) op == ZREFRESH ? ZINSERT :
(Operation_t) op == ZUNLOCK ? ZREAD : // lockType not relevant for unlock req
(Operation_t) op;
}
@@ -5072,6 +5074,7 @@ void Dblqh::prepareContinueAfterBlockedL
case ZINSERT: TRACENR("INSERT"); break;
case ZDELETE: TRACENR("DELETE"); break;
case ZUNLOCK: TRACENR("UNLOCK"); break;
+ case ZREFRESH: TRACENR("REFRESH"); break;
default: TRACENR("<Unknown: " << regTcPtr->operation << ">"); break;
}
@@ -5121,7 +5124,6 @@ Dblqh::exec_acckeyreq(Signal* signal, Tc
Uint32 taccreq;
regTcPtr.p->transactionState = TcConnectionrec::WAIT_ACC;
taccreq = regTcPtr.p->operation;
- taccreq = taccreq + (regTcPtr.p->opSimple << 3);
taccreq = taccreq + (regTcPtr.p->lockType << 4);
taccreq = taccreq + (regTcPtr.p->dirtyOp << 6);
taccreq = taccreq + (regTcPtr.p->replicaType << 7);
@@ -5286,15 +5288,17 @@ Dblqh::handle_nr_copy(Signal* signal, Pt
if (match)
{
jam();
- if (op != ZDELETE)
+ if (op != ZDELETE && op != ZREFRESH)
{
if (TRACENR_FLAG)
- TRACENR(" Changing from to ZWRITE" << endl);
+ TRACENR(" Changing from INSERT/UPDATE to ZWRITE" << endl);
regTcPtr.p->operation = ZWRITE;
}
goto run;
}
-
+
+ ndbassert(!match && op == ZINSERT);
+
/**
* 1) Delete row at specified rowid (if len > 0)
* 2) Delete specified row at different rowid (if exists)
@@ -6006,7 +6010,7 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
TRACE_OP(regTcPtr, "TUPKEYREQ");
- regTcPtr->m_use_rowid |= (op == ZINSERT);
+ regTcPtr->m_use_rowid |= (op == ZINSERT || op == ZREFRESH);
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2011-05-25 13:19:02 +0000
@@ -1694,6 +1694,7 @@ private:
void checkNodeFailComplete(Signal* signal, Uint32 failedNodeId, Uint32 bit);
void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
+ bool isRefreshSupported() const;
// Initialisation
void initData();
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-25 09:31:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2011-05-25 13:19:02 +0000
@@ -3061,6 +3061,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
case ZINSERT:
case ZDELETE:
case ZWRITE:
+ case ZREFRESH:
jam();
break;
default:
@@ -3142,6 +3143,34 @@ handle_reorg_trigger(DiGetNodesConf * co
}
}
+bool
+Dbtc::isRefreshSupported() const
+{
+ const NodeVersionInfo& nvi = getNodeVersionInfo();
+ const Uint32 minVer = nvi.m_type[NodeInfo::DB].m_min_version;
+ const Uint32 maxVer = nvi.m_type[NodeInfo::DB].m_max_version;
+
+ if (likely (minVer == maxVer))
+ {
+ /* Normal case, use function */
+ return ndb_refresh_tuple(minVer);
+ }
+
+ /* As refresh feature was introduced across three minor versions
+ * we check that all data nodes support it. This slow path
+ * should only be hit during upgrades between versions
+ */
+ for (Uint32 i=1; i < MAX_NODES; i++)
+ {
+ const NodeInfo& nodeInfo = getNodeInfo(i);
+ if ((nodeInfo.m_type == NODE_TYPE_DB) &&
+ (nodeInfo.m_connected) &&
+ (! ndb_refresh_tuple(nodeInfo.m_version)))
+ return false;
+ }
+ return true;
+}
+
/**
* tckeyreq050Lab
* This method is executed once all KeyInfo has been obtained for
@@ -3368,6 +3397,14 @@ void Dbtc::tckeyreq050Lab(Signal* signal
TlastReplicaNo = tnoOfBackup + tnoOfStandby;
regTcPtr->lastReplicaNo = (Uint8)TlastReplicaNo;
regTcPtr->noOfNodes = (Uint8)(TlastReplicaNo + 1);
+
+ if (unlikely((Toperation == ZREFRESH) &&
+ (! isRefreshSupported())))
+ {
+ /* Function not implemented yet */
+ TCKEY_abort(signal,63);
+ return;
+ }
}//if
if (regCachePtr->isLongTcKeyReq ||
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-20 05:11:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2011-05-25 13:19:02 +0000
@@ -268,6 +268,7 @@ inline const Uint32* ALIGN_WORD(const vo
#define ZMUST_BE_ABORTED_ERROR 898
#define ZTUPLE_DELETED_ERROR 626
#define ZINSERT_ERROR 630
+#define ZOP_AFTER_REFRESH_ERROR 920
#define ZINVALID_CHAR_FORMAT 744
#define ZROWID_ALLOCATED 899
@@ -843,6 +844,19 @@ struct Operationrec {
* version even if in the same transaction.
*/
Uint16 tupVersion;
+
+ /*
+ * When refreshing a row, there are four scenarios
+ * The actual scenario is encoded in the 'copy tuple location'
+ * to enable special handling at commit time
+ */
+ enum RefreshScenario
+ {
+ RF_SINGLE_NOT_EXIST = 1, /* Refresh op first in trans, no row */
+ RF_SINGLE_EXIST = 2, /* Refresh op first in trans, row exists */
+ RF_MULTI_NOT_EXIST = 3, /* Refresh op !first in trans, row deleted */
+ RF_MULTI_EXIST = 4 /* Refresh op !first in trans, row exists */
+ };
};
typedef Ptr<Operationrec> OperationrecPtr;
@@ -2080,6 +2094,13 @@ private:
KeyReqStruct* req_struct,
bool disk);
+ int handleRefreshReq(Signal* signal,
+ Ptr<Operationrec>,
+ Ptr<Fragrecord>,
+ Tablerec*,
+ KeyReqStruct*,
+ bool disk);
+
//------------------------------------------------------------------
//------------------------------------------------------------------
int updateStartLab(Signal* signal,
@@ -3406,6 +3427,8 @@ private:
const Dbtup::ScanOp& op);
void commit_operation(Signal*, Uint32, Uint32, Tuple_header*, PagePtr,
Operationrec*, Fragrecord*, Tablerec*);
+ void commit_refresh(Signal*, Uint32, Uint32, Tuple_header*, PagePtr,
+ KeyReqStruct*, Operationrec*, Fragrecord*, Tablerec*);
int retrieve_data_page(Signal*,
Page_cache_client::Request,
OperationrecPtr);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-17 23:29:55 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp 2011-05-25 13:19:02 +0000
@@ -849,7 +849,15 @@ skip_disk:
tuple_ptr->m_operation_ptr_i = RNIL;
- if(regOperPtr.p->op_struct.op_type != ZDELETE)
+ if (regOperPtr.p->op_struct.op_type == ZDELETE)
+ {
+ jam();
+ if (get_page)
+ ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
+ dealloc_tuple(signal, gci_hi, gci_lo, page.p, tuple_ptr,
+ &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p);
+ }
+ else if(regOperPtr.p->op_struct.op_type != ZREFRESH)
{
jam();
commit_operation(signal, gci_hi, gci_lo, tuple_ptr, page,
@@ -858,14 +866,10 @@ skip_disk:
else
{
jam();
- if (get_page)
- {
- ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
- }
- dealloc_tuple(signal, gci_hi, gci_lo, page.p, tuple_ptr,
- &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p);
+ commit_refresh(signal, gci_hi, gci_lo, tuple_ptr, page,
+ &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p);
}
- }
+ }
if (nextOp != RNIL)
{
@@ -917,3 +921,48 @@ Dbtup::set_commit_change_mask_info(const
}
}
}
+
+void
+Dbtup::commit_refresh(Signal* signal,
+ Uint32 gci_hi,
+ Uint32 gci_lo,
+ Tuple_header* tuple_ptr,
+ PagePtr pagePtr,
+ KeyReqStruct * req_struct,
+ Operationrec* regOperPtr,
+ Fragrecord* regFragPtr,
+ Tablerec* regTabPtr)
+{
+ /* Committing a refresh operation.
+ * Refresh of an existing row looks like an update
+ * and can commit normally.
+ * Refresh of a non-existing row looks like an Insert which
+ * is 'undone' at commit time.
+ * This is achieved by making special calls to ACC to get
+ * it to forget, before deallocating the tuple locally.
+ */
+ switch(regOperPtr->m_copy_tuple_location.m_file_no){
+ case Operationrec::RF_SINGLE_NOT_EXIST:
+ case Operationrec::RF_MULTI_NOT_EXIST:
+ break;
+ case Operationrec::RF_SINGLE_EXIST:
+ case Operationrec::RF_MULTI_EXIST:
+ // "Normal" update
+ commit_operation(signal, gci_hi, gci_lo, tuple_ptr, pagePtr,
+ regOperPtr, regFragPtr, regTabPtr);
+ return;
+
+ default:
+ ndbrequire(false);
+ }
+
+ Local_key key = regOperPtr->m_tuple_location;
+ key.m_page_no = pagePtr.p->frag_page_id;
+
+ /**
+ * Tell ACC to delete
+ */
+ c_lqh->accremoverow(signal, regOperPtr->userpointer, &key);
+ dealloc_tuple(signal, gci_hi, gci_lo, pagePtr.p, tuple_ptr,
+ req_struct, regOperPtr, regFragPtr, regTabPtr);
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-20 05:11:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2011-05-25 13:19:02 +0000
@@ -214,7 +214,14 @@ Dbtup::insertActiveOpList(OperationrecPt
prevOpPtr.p->op_struct.delete_insert_flag= true;
regOperPtr.p->op_struct.delete_insert_flag= true;
return true;
- } else {
+ }
+ else if (op == ZREFRESH)
+ {
+ /* ZREFRESH after Delete - ok */
+ return true;
+ }
+ else
+ {
terrorCode= ZTUPLE_DELETED_ERROR;
return false;
}
@@ -224,6 +231,12 @@ Dbtup::insertActiveOpList(OperationrecPt
terrorCode= ZINSERT_ERROR;
return false;
}
+ else if (prevOp == ZREFRESH)
+ {
+ /* No operation after a ZREFRESH */
+ terrorCode= ZOP_AFTER_REFRESH_ERROR;
+ return false;
+ }
return true;
}
else
@@ -283,21 +296,39 @@ Dbtup::setup_read(KeyReqStruct *req_stru
dirty= false;
}
+ /* found == true indicates that savepoint is some state
+ * within tuple's current transaction's uncommitted operations
+ */
bool found= find_savepoint(currOpPtr, savepointId);
Uint32 currOp= currOpPtr.p->op_struct.op_type;
+ /* is_insert==true if tuple did not exist before its current
+ * transaction
+ */
bool is_insert = (bits & Tuple_header::ALLOC);
+
+ /* If savepoint is in transaction, and post-delete-op
+ * OR
+ * Tuple didn't exist before
+ * AND
+ * Read is dirty
+ * OR
+ * Savepoint is before-transaction
+ *
+ * Tuple does not exist in read's view
+ */
if((found && currOp == ZDELETE) ||
((dirty || !found) && is_insert))
{
+ /* Tuple not visible to this read operation */
terrorCode= ZTUPLE_DELETED_ERROR;
break;
}
if(dirty || !found)
{
-
+ /* Read existing committed tuple */
}
else
{
@@ -351,6 +382,17 @@ Dbtup::load_diskpage(Signal* signal,
jam();
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
+ if (unlikely((flags & 7) == ZREFRESH))
+ {
+ jam();
+ /* Refresh of previously nonexistant DD tuple.
+ * No diskpage to load at commit time
+ */
+ regOperPtr->op_struct.m_wait_log_buffer= 0;
+ regOperPtr->op_struct.m_load_diskpage_on_commit= 0;
+ }
+
+ /* In either case return 1 for 'proceed' */
return 1;
}
@@ -410,6 +452,7 @@ Dbtup::load_diskpage(Signal* signal,
case ZUPDATE:
case ZINSERT:
case ZWRITE:
+ case ZREFRESH:
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
}
@@ -556,7 +599,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal
Uint32 Rstoredid= tupKeyReq->storedProcedure;
regOperPtr->fragmentPtr= Rfragptr;
- regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
+ regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0x7;
regOperPtr->op_struct.delete_insert_flag = false;
regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3;
@@ -635,10 +678,16 @@ void Dbtup::execTUPKEYREQ(Signal* signal
if (Roptype == ZINSERT && Local_key::isInvalid(pageid, pageidx))
{
- // No tuple allocatated yet
+ // No tuple allocated yet
goto do_insert;
}
+ if (Roptype == ZREFRESH && Local_key::isInvalid(pageid, pageidx))
+ {
+ // No tuple allocated yet
+ goto do_refresh;
+ }
+
if (unlikely(isCopyTuple(pageid, pageidx)))
{
/**
@@ -832,6 +881,23 @@ void Dbtup::execTUPKEYREQ(Signal* signal
sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return;
}
+ else if (Roptype == ZREFRESH)
+ {
+ /**
+ * No TUX or immediate triggers, just detached triggers
+ */
+ do_refresh:
+ if (unlikely(handleRefreshReq(signal, operPtr,
+ fragptr, regTabPtr,
+ &req_struct, disk_page != RNIL) == -1))
+ {
+ return;
+ }
+
+ sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+ return;
+
+ }
else
{
ndbrequire(false); // Invalid op type
@@ -2055,6 +2121,197 @@ error:
return -1;
}
+int
+Dbtup::handleRefreshReq(Signal* signal,
+ Ptr<Operationrec> regOperPtr,
+ Ptr<Fragrecord> regFragPtr,
+ Tablerec* regTabPtr,
+ KeyReqStruct *req_struct,
+ bool disk)
+{
+ /* Here we setup the tuple so that a transition to its current
+ * state can be observed by SUMA's detached triggers.
+ *
+ * If the tuple does not exist then we fabricate a tuple
+ * so that it can appear to be 'deleted'.
+ * The fabricated tuple may have invalid NULL values etc.
+ * If the tuple does exist then we fabricate a null-change
+ * update to the tuple.
+ *
+ * The logic differs depending on whether there are already
+ * other operations on the tuple in this transaction.
+ * No other operations (including Refresh) are allowed after
+ * a refresh.
+ */
+ Uint32 refresh_case;
+ if (regOperPtr.p->is_first_operation())
+ {
+ jam();
+ if (Local_key::isInvalid(req_struct->frag_page_id,
+ regOperPtr.p->m_tuple_location.m_page_idx))
+ {
+ jam();
+ refresh_case = Operationrec::RF_SINGLE_NOT_EXIST;
+ //ndbout_c("case 1");
+ /**
+ * This is refresh of non-existing tuple...
+ * i.e "delete", reuse initial insert
+ */
+ Local_key accminupdate;
+ Local_key * accminupdateptr = &accminupdate;
+
+ /**
+ * We don't need ...in this scenario
+ * - disk
+ * - default values
+ */
+ Uint32 save_disk = regTabPtr->m_no_of_disk_attributes;
+ Local_key save_defaults = regTabPtr->m_default_value_location;
+ Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
+ regTabPtr->notNullAttributeMask;
+
+ regTabPtr->m_no_of_disk_attributes = 0;
+ regTabPtr->m_default_value_location.setNull();
+ regOperPtr.p->op_struct.op_type = ZINSERT;
+
+ /**
+ * Update notNullAttributeMask to only include primary keys
+ */
+ regTabPtr->notNullAttributeMask.clear();
+ const Uint32 * primarykeys =
+ (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
+ for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
+ regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
+
+ int res = handleInsertReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, req_struct,
+ &accminupdateptr);
+
+ regTabPtr->m_no_of_disk_attributes = save_disk;
+ regTabPtr->m_default_value_location = save_defaults;
+ regTabPtr->notNullAttributeMask = save_mask;
+
+ if (unlikely(res == -1))
+ {
+ return -1;
+ }
+
+ regOperPtr.p->op_struct.op_type = ZREFRESH;
+
+ if (accminupdateptr)
+ {
+ /**
+ * Update ACC local-key, once *everything* has completed succesfully
+ */
+ c_lqh->accminupdate(signal,
+ regOperPtr.p->userpointer,
+ accminupdateptr);
+ }
+ }
+ else
+ {
+ refresh_case = Operationrec::RF_SINGLE_EXIST;
+ //ndbout_c("case 2");
+ jam();
+
+ Uint32 tup_version_save = req_struct->m_tuple_ptr->get_tuple_version();
+ Uint32 new_tup_version = decr_tup_version(tup_version_save);
+ Tuple_header* origTuple = req_struct->m_tuple_ptr;
+ origTuple->set_tuple_version(new_tup_version);
+ int res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
+ regTabPtr, req_struct, disk);
+ /* Now we must reset the original tuple header back
+ * to the original version.
+ * The copy tuple will have the correct version due to
+ * the update incrementing it.
+ * On commit, the tuple becomes the copy tuple.
+ * On abort, the original tuple remains. If we don't
+ * reset it here, then aborts cause the version to
+ * decrease
+ */
+ origTuple->set_tuple_version(tup_version_save);
+ if (res == -1)
+ return -1;
+ }
+ }
+ else
+ {
+ /* Not first operation on tuple in transaction */
+ jam();
+
+ Uint32 tup_version_save = req_struct->prevOpPtr.p->tupVersion;
+ Uint32 new_tup_version = decr_tup_version(tup_version_save);
+ req_struct->prevOpPtr.p->tupVersion = new_tup_version;
+
+ int res;
+ if (req_struct->prevOpPtr.p->op_struct.op_type == ZDELETE)
+ {
+ refresh_case = Operationrec::RF_MULTI_NOT_EXIST;
+ //ndbout_c("case 3");
+
+ jam();
+ /**
+ * We don't need ...in this scenario
+ * - default values
+ *
+ * We keep disk attributes to avoid issues with 'insert'
+ */
+ Local_key save_defaults = regTabPtr->m_default_value_location;
+ Bitmask<MAXNROFATTRIBUTESINWORDS> save_mask =
+ regTabPtr->notNullAttributeMask;
+
+ regTabPtr->m_default_value_location.setNull();
+ regOperPtr.p->op_struct.op_type = ZINSERT;
+
+ /**
+ * Update notNullAttributeMask to only include primary keys
+ */
+ regTabPtr->notNullAttributeMask.clear();
+ const Uint32 * primarykeys =
+ (Uint32*)&tableDescriptor[regTabPtr->readKeyArray].tabDescr;
+ for (Uint32 i = 0; i<regTabPtr->noOfKeyAttr; i++)
+ regTabPtr->notNullAttributeMask.set(primarykeys[i] >> 16);
+
+ /**
+ * This is multi-update + DELETE + REFRESH
+ */
+ Local_key * accminupdateptr = 0;
+ res = handleInsertReq(signal, regOperPtr,
+ regFragPtr, regTabPtr, req_struct,
+ &accminupdateptr);
+
+ regTabPtr->m_default_value_location = save_defaults;
+ regTabPtr->notNullAttributeMask = save_mask;
+
+ if (unlikely(res == -1))
+ {
+ return -1;
+ }
+
+ regOperPtr.p->op_struct.op_type = ZREFRESH;
+ }
+ else
+ {
+ jam();
+ refresh_case = Operationrec::RF_MULTI_EXIST;
+ //ndbout_c("case 4");
+ /**
+ * This is multi-update + INSERT/UPDATE + REFRESH
+ */
+ res = handleUpdateReq(signal, regOperPtr.p, regFragPtr.p,
+ regTabPtr, req_struct, disk);
+ }
+ req_struct->prevOpPtr.p->tupVersion = tup_version_save;
+ if (res == -1)
+ return -1;
+ }
+
+ /* Store the refresh scenario in the copy tuple location */
+ // TODO : Verify this is never used as a copy tuple location!
+ regOperPtr.p->m_copy_tuple_location.m_file_no = refresh_case;
+ return 0;
+}
+
bool
Dbtup::checkNullAttributes(KeyReqStruct * req_struct,
Tablerec* regTabPtr)
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2011-05-25 13:19:02 +0000
@@ -863,6 +863,7 @@ void Dbtup::checkDetachedTriggers(KeyReq
switch (save_type) {
case ZUPDATE:
case ZINSERT:
+ case ZREFRESH:
req_struct->m_tuple_ptr =get_copy_tuple(®OperPtr->m_copy_tuple_location);
break;
}
@@ -878,7 +879,10 @@ void Dbtup::checkDetachedTriggers(KeyReq
return;
goto end;
}
- regOperPtr->op_struct.op_type = ZINSERT;
+ else if (save_type != ZREFRESH)
+ {
+ regOperPtr->op_struct.op_type = ZINSERT;
+ }
}
else if (save_type == ZINSERT) {
/**
@@ -930,6 +934,29 @@ void Dbtup::checkDetachedTriggers(KeyReq
regTablePtr->subscriptionUpdateTriggers,
regOperPtr, disk);
break;
+ case ZREFRESH:
+ jam();
+ /* Depending on the Refresh scenario, fire Delete or Insert
+ * triggers to simulate the effect of arriving at the tuple's
+ * current state.
+ */
+ switch(regOperPtr->m_copy_tuple_location.m_file_no){
+ case Operationrec::RF_SINGLE_NOT_EXIST:
+ case Operationrec::RF_MULTI_NOT_EXIST:
+ fireDetachedTriggers(req_struct,
+ regTablePtr->subscriptionDeleteTriggers,
+ regOperPtr, disk);
+ break;
+ case Operationrec::RF_SINGLE_EXIST:
+ case Operationrec::RF_MULTI_EXIST:
+ fireDetachedTriggers(req_struct,
+ regTablePtr->subscriptionInsertTriggers,
+ regOperPtr, disk);
+ break;
+ default:
+ ndbrequire(false);
+ }
+ break;
default:
ndbrequire(false);
break;
@@ -1375,12 +1402,14 @@ out:
switch(regOperPtr->op_struct.op_type) {
case(ZINSERT):
+ is_insert:
jam();
// Send AttrInfo signals with new attribute values
trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
break;
case(ZDELETE):
+ is_delete:
if (trigPtr->sendBeforeValues) {
jam();
trigAttrInfo->setAttrInfoType(TrigAttrInfo::BEFORE_VALUES);
@@ -1397,6 +1426,23 @@ out:
trigAttrInfo->setAttrInfoType(TrigAttrInfo::AFTER_VALUES);
sendTrigAttrInfo(signal, afterBuffer, noAfterWords, executeDirect, ref);
break;
+ case ZREFRESH:
+ jam();
+ /* Reuse Insert/Delete trigger firing code as necessary */
+ switch(regOperPtr->m_copy_tuple_location.m_file_no){
+ case Operationrec::RF_SINGLE_NOT_EXIST:
+ jam();
+ case Operationrec::RF_MULTI_NOT_EXIST:
+ jam();
+ goto is_delete;
+ case Operationrec::RF_SINGLE_EXIST:
+ jam();
+ case Operationrec::RF_MULTI_EXIST:
+ jam();
+ goto is_insert;
+ default:
+ ndbrequire(false);
+ }
default:
ndbrequire(false);
}
@@ -1424,6 +1470,25 @@ out:
jam();
fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE;
break;
+ case ZREFRESH:
+ jam();
+ switch(regOperPtr->m_copy_tuple_location.m_file_no){
+ case Operationrec::RF_SINGLE_NOT_EXIST:
+ jam();
+ case Operationrec::RF_MULTI_NOT_EXIST:
+ jam();
+ fireTrigOrd->m_triggerEvent = TriggerEvent::TE_DELETE;
+ break;
+ case Operationrec::RF_SINGLE_EXIST:
+ jam();
+ case Operationrec::RF_MULTI_EXIST:
+ jam();
+ fireTrigOrd->m_triggerEvent = TriggerEvent::TE_INSERT;
+ break;
+ default:
+ ndbrequire(false);
+ }
+ break;
default:
ndbrequire(false);
break;
@@ -1615,7 +1680,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
// Delete without sending before values only read Primary Key
//--------------------------------------------------------------------
return true;
- } else {
+ } else if (regOperPtr->op_struct.op_type != ZREFRESH){
jam();
//--------------------------------------------------------------------
// All others send all attributes that are monitored, except:
@@ -1632,6 +1697,27 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes,
&readBuffer[0]);
}
+ else
+ {
+ jam();
+ ndbassert(regOperPtr->op_struct.op_type == ZREFRESH);
+ /* Refresh specific before/after value hacks */
+ switch(regOperPtr->m_copy_tuple_location.m_file_no){
+ case Operationrec::RF_SINGLE_NOT_EXIST:
+ case Operationrec::RF_MULTI_NOT_EXIST:
+ return true; // generate ZDELETE...no before values
+ case Operationrec::RF_SINGLE_EXIST:
+ case Operationrec::RF_MULTI_EXIST:
+ // generate ZINSERT...all after values
+ numAttrsToRead = setAttrIds(trigPtr->attributeMask,
+ regTabPtr->m_no_of_attributes,
+ &readBuffer[0]);
+ break;
+ default:
+ ndbrequire(false);
+ }
+ }
+
ndbrequire(numAttrsToRead <= MAX_ATTRIBUTES_IN_TABLE);
//--------------------------------------------------------------------
// Read Main tuple values
@@ -1875,6 +1961,9 @@ Dbtup::executeTuxCommitTriggers(Signal*
return;
jam();
tupVersion= regOperPtr->tupVersion;
+ } else if (regOperPtr->op_struct.op_type == ZREFRESH) {
+ /* Refresh should not affect TUX */
+ return;
} else {
ndbrequire(false);
tupVersion= 0; // remove warning
@@ -1907,6 +1996,10 @@ Dbtup::executeTuxAbortTriggers(Signal* s
} else if (regOperPtr->op_struct.op_type == ZDELETE) {
jam();
return;
+ } else if (regOperPtr->op_struct.op_type == ZREFRESH) {
+ jam();
+ /* Refresh should not affect TUX */
+ return;
} else {
ndbrequire(false);
tupVersion= 0; // remove warning
=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2011-05-11 13:31:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2011-05-25 13:19:02 +0000
@@ -1120,7 +1120,8 @@ NdbOperation::buildSignalsNdbRecord(Uint
/* Final update signal words */
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest) ||
- (tOpType == UpdateRequest))
+ (tOpType == UpdateRequest) ||
+ (tOpType == RefreshRequest))
{
updRow= m_attribute_row;
NdbBlob *currentBlob= theBlobList;
@@ -1333,7 +1334,8 @@ NdbOperation::buildSignalsNdbRecord(Uint
if ((tOpType == InsertRequest) ||
(tOpType == WriteRequest) ||
- (tOpType == UpdateRequest))
+ (tOpType == UpdateRequest) ||
+ (tOpType == RefreshRequest))
{
/* Handle setAnyValue() for all cases except delete */
if ((m_flags & OF_USE_ANY_VALUE) != 0)
=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-04-27 10:48:16 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2011-05-25 13:19:02 +0000
@@ -2209,6 +2209,7 @@ NdbTransaction::receiveTCKEY_FAILCONF(co
case NdbOperation::DeleteRequest:
case NdbOperation::WriteRequest:
case NdbOperation::UnlockRequest:
+ case NdbOperation::RefreshRequest:
tOp = tOp->next();
break;
case NdbOperation::ReadRequest:
@@ -2713,6 +2714,52 @@ NdbTransaction::writeTuple(const NdbReco
return op;
}
+const NdbOperation *
+NdbTransaction::refreshTuple(const NdbRecord *key_rec, const char *key_row,
+ const NdbOperation::OperationOptions *opts,
+ Uint32 sizeOfOptions)
+{
+ /* Check TC node version lockless */
+ {
+ Uint32 tcVer = theNdb->theImpl->getNodeInfo(theDBnode).m_info.m_version;
+ if (unlikely(! ndb_refresh_tuple(tcVer)))
+ {
+ /* Function not implemented yet */
+ setOperationErrorCodeAbort(4003);
+ return NULL;
+ }
+ }
+
+ /* Check that the NdbRecord specifies the full primary key. */
+ if (!(key_rec->flags & NdbRecord::RecHasAllKeys))
+ {
+ setOperationErrorCodeAbort(4292);
+ return NULL;
+ }
+
+ Uint8 keymask[NDB_MAX_ATTRIBUTES_IN_TABLE/8];
+ bzero(keymask, sizeof(keymask));
+ for (Uint32 i = 0; i<key_rec->key_index_length; i++)
+ {
+ Uint32 id = key_rec->columns[key_rec->key_indexes[i]].attrId;
+ keymask[(id / 8)] |= (1 << (id & 7));
+ }
+
+ NdbOperation *op= setupRecordOp(NdbOperation::RefreshRequest,
+ NdbOperation::LM_Exclusive,
+ NdbOperation::AbortOnError,
+ key_rec, key_row,
+ key_rec, key_row,
+ keymask /* mask */,
+ opts,
+ sizeOfOptions);
+ if(!op)
+ return op;
+
+ theSimpleState= 0;
+
+ return op;
+}
NdbScanOperation *
NdbTransaction::scanTable(const NdbRecord *result_record,
=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c 2011-05-07 06:17:02 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c 2011-05-25 13:19:02 +0000
@@ -750,6 +750,7 @@ ErrorBundle ErrorCodes[] = {
{ 2810, DMEC, TR, "No space left on the device" },
{ 2811, DMEC, TR, "Error with file permissions, please check file system" },
{ 2815, DMEC, TR, "Error in reading files, please check file system" },
+ { 920, DMEC, AE, "Row operation defined after refreshTuple()" },
/**
* NdbQueryBuilder API errors
=== modified file 'storage/ndb/test/include/HugoOperations.hpp'
--- a/storage/ndb/test/include/HugoOperations.hpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/include/HugoOperations.hpp 2011-05-25 13:19:02 +0000
@@ -87,6 +87,11 @@ public:
int recordNo,
int numRecords = 1);
+ int pkRefreshRecord(Ndb*,
+ int recordNo,
+ int numRecords = 1,
+ int anyValueInfo = 0); /* 0 - none, 1+ Val | record */
+
int execute_Commit(Ndb*,
AbortOption ao = AbortOnError);
int execute_NoCommit(Ndb*,
=== modified file 'storage/ndb/test/include/HugoTransactions.hpp'
--- a/storage/ndb/test/include/HugoTransactions.hpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/include/HugoTransactions.hpp 2011-05-25 13:19:02 +0000
@@ -110,6 +110,9 @@ public:
int batch = 1,
bool allowConstraintViolation = true,
int doSleep = 0);
+
+ int pkRefreshRecords(Ndb*, int startFrom, int count = 1, int batch = 1);
+
int lockRecords(Ndb*,
int records,
int percentToLock = 1,
=== modified file 'storage/ndb/test/ndbapi/testBasic.cpp'
--- a/storage/ndb/test/ndbapi/testBasic.cpp 2011-05-07 06:17:02 +0000
+++ b/storage/ndb/test/ndbapi/testBasic.cpp 2011-05-25 13:19:02 +0000
@@ -2415,6 +2415,811 @@ runEnd899(NDBT_Context* ctx, NDBT_Step*
}
+int initSubscription(NDBT_Context* ctx, NDBT_Step* step){
+ /* Subscribe to events on the table, and put access
+ * to the subscription somewhere handy
+ */
+ Ndb* pNdb = GETNDB(step);
+ const NdbDictionary::Table& tab = *ctx->getTab();
+ bool merge_events = false;
+ bool report = false;
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+
+ NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
+
+ if (!myDict) {
+ g_err << "Dictionary not found "
+ << pNdb->getNdbError().code << " "
+ << pNdb->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+
+ myDict->dropEvent(eventName);
+
+ NdbDictionary::Event myEvent(eventName);
+ myEvent.setTable(tab.getName());
+ myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
+ for(int a = 0; a < tab.getNoOfColumns(); a++){
+ myEvent.addEventColumn(a);
+ }
+ myEvent.mergeEvents(merge_events);
+
+ if (report)
+ myEvent.setReport(NdbDictionary::Event::ER_SUBSCRIBE);
+
+ int res = myDict->createEvent(myEvent); // Add event to database
+
+ if (res == 0)
+ myEvent.print();
+ else if (myDict->getNdbError().classification ==
+ NdbError::SchemaObjectExists)
+ {
+ g_info << "Event creation failed event exists\n";
+ res = myDict->dropEvent(eventName);
+ if (res) {
+ g_err << "Failed to drop event: "
+ << myDict->getNdbError().code << " : "
+ << myDict->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+ // try again
+ res = myDict->createEvent(myEvent); // Add event to database
+ if (res) {
+ g_err << "Failed to create event (1): "
+ << myDict->getNdbError().code << " : "
+ << myDict->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+ }
+ else
+ {
+ g_err << "Failed to create event (2): "
+ << myDict->getNdbError().code << " : "
+ << myDict->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+
+ return NDBT_OK;
+}
+
+int removeSubscription(NDBT_Context* ctx, NDBT_Step* step){
+ /* Remove subscription created above */
+ Ndb* pNdb = GETNDB(step);
+ const NdbDictionary::Table& tab = *ctx->getTab();
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+
+ NdbDictionary::Dictionary *myDict = pNdb->getDictionary();
+
+ if (!myDict) {
+ g_err << "Dictionary not found "
+ << pNdb->getNdbError().code << " "
+ << pNdb->getNdbError().message << endl;
+ return NDBT_FAILED;
+ }
+
+ myDict->dropEvent(eventName);
+
+ return NDBT_OK;
+}
+
+int runVerifyRowCount(NDBT_Context* ctx, NDBT_Step* step)
+{
+ Ndb* ndb = GETNDB(step);
+
+ /* Check that number of results returned by a normal scan
+ * and per-fragment rowcount sum are equal
+ */
+ Uint32 rowCountSum = 0;
+ Uint32 rowScanCount = 0;
+
+ int result = NDBT_OK;
+ do
+ {
+ NdbTransaction* trans = ndb->startTransaction();
+ CHECK(trans != NULL);
+
+ NdbScanOperation* scan = trans->getNdbScanOperation(ctx->getTab());
+ CHECK(scan != NULL);
+
+ CHECK(scan->readTuples(NdbScanOperation::LM_CommittedRead) == 0);
+
+ NdbInterpretedCode code;
+
+ CHECK(code.interpret_exit_last_row() == 0);
+ CHECK(code.finalise() == 0);
+
+ NdbRecAttr* rowCountRA = scan->getValue(NdbDictionary::Column::ROW_COUNT);
+ CHECK(rowCountRA != NULL);
+ CHECK(scan->setInterpretedCode(&code) == 0);
+
+ CHECK(trans->execute(NoCommit) == 0);
+
+ while (scan->nextResult() == 0)
+ rowCountSum+= rowCountRA->u_32_value();
+
+ trans->close();
+
+ trans = ndb->startTransaction();
+ CHECK(trans != NULL);
+
+ scan = trans->getNdbScanOperation(ctx->getTab());
+ CHECK(scan != NULL);
+
+ CHECK(scan->readTuples(NdbScanOperation::LM_CommittedRead) == 0);
+
+ rowCountRA = scan->getValue(NdbDictionary::Column::ROW_COUNT);
+ CHECK(rowCountRA != NULL);
+
+ CHECK(trans->execute(NoCommit) == 0);
+
+ while (scan->nextResult() == 0)
+ rowScanCount++;
+
+ trans->close();
+ }
+ while(0);
+
+ if (result == NDBT_OK)
+ {
+ ndbout_c("Sum of fragment row counts : %u Number rows scanned : %u",
+ rowCountSum,
+ rowScanCount);
+
+ if (rowCountSum != rowScanCount)
+ {
+ ndbout_c("MISMATCH");
+ result = NDBT_FAILED;
+ }
+ }
+
+ return result;
+}
+
+enum ApiEventType { Insert, Update, Delete };
+
+template class Vector<ApiEventType>;
+
+struct EventInfo
+{
+ ApiEventType type;
+ int id;
+ Uint64 gci;
+};
+template class Vector<EventInfo>;
+
+int collectEvents(Ndb* ndb,
+ HugoCalculator& calc,
+ const NdbDictionary::Table& tab,
+ Vector<EventInfo>& receivedEvents,
+ int idCol,
+ int updateCol,
+ Vector<NdbRecAttr*>* beforeAttrs,
+ Vector<NdbRecAttr*>* afterAttrs)
+{
+ int MaxTimeouts = 5;
+ while (true)
+ {
+ int res = ndb->pollEvents(1000);
+
+ if (res > 0)
+ {
+ NdbEventOperation* pOp;
+ while ((pOp = ndb->nextEvent()))
+ {
+ bool isDelete = (pOp->getEventType() == NdbDictionary::Event::TE_DELETE);
+ Vector<NdbRecAttr*>* whichVersion =
+ isDelete?
+ beforeAttrs :
+ afterAttrs;
+ int id = (*whichVersion)[idCol]->u_32_value();
+ Uint64 gci = pOp->getGCI();
+ Uint32 anyValue = pOp->getAnyValue();
+ Uint32 scenario = ((anyValue >> 24) & 0xff) -1;
+ Uint32 optype = ((anyValue >> 16) & 0xff);
+ Uint32 recNum = (anyValue & 0xffff);
+
+ g_err << "# " << receivedEvents.size()
+ << " GCI : " << (gci >> 32)
+ << "/"
+ << (gci & 0xffffffff)
+ << " id : "
+ << id
+ << " scenario : " << scenario
+ << " optype : " << optype
+ << " record : " << recNum
+ << " ";
+
+ /* Check event has self-consistent data */
+ int updatesValue = (*whichVersion)[updateCol]->u_32_value();
+
+ if ((*whichVersion)[updateCol]->isNULL() ||
+ (*whichVersion)[idCol]->isNULL())
+ {
+ g_err << "Null update/id cols : REFRESH of !EXISTS ";
+ }
+
+ g_err << "(Updates val = " << updatesValue << ")";
+
+ for (int i=0; i < (int) whichVersion->size(); i++)
+ {
+ /* Check PK columns and also other columns for non-delete */
+ if (!isDelete ||
+ tab.getColumn(i)->getPrimaryKey())
+ {
+ NdbRecAttr* ra = (*whichVersion)[i];
+ if (calc.verifyRecAttr(recNum, updatesValue, ra) != 0)
+ {
+ g_err << "Verify failed on recNum : " << recNum << " with updates value "
+ << updatesValue << " for column " << ra->getColumn()->getAttrId()
+ << endl;
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ EventInfo ei;
+
+ switch (pOp->getEventType())
+ {
+ case NdbDictionary::Event::TE_INSERT:
+ g_err << " Insert event" << endl;
+ ei.type = Insert;
+ break;
+ case NdbDictionary::Event::TE_DELETE:
+ ei.type = Delete;
+ g_err << " Delete event" << endl;
+ break;
+ case NdbDictionary::Event::TE_UPDATE:
+ ei.type = Update;
+ g_err << " Update event" << endl;
+ break;
+ default:
+ g_err << " Event type : " << pOp->getEventType() << endl;
+ abort();
+ break;
+ }
+
+ ei.id = recNum;
+ ei.gci = gci;
+
+ receivedEvents.push_back(ei);
+ }
+ }
+ else
+ {
+ if (--MaxTimeouts == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ return NDBT_OK;
+}
+
+int verifyEvents(const Vector<EventInfo>& receivedEvents,
+ const Vector<ApiEventType>& expectedEvents,
+ int records)
+{
+ /* Now verify received events against expected
+ * This is messy as events occurring in the same epoch are unordered
+ * except via id, so we use id-duplicates to determine which event
+ * sequence we're looking at.
+ */
+ g_err << "Received total of " << receivedEvents.size() << " events" << endl;
+ Vector<Uint32> keys;
+ Vector<Uint64> gcis;
+ Uint32 z = 0;
+ Uint64 z2 = 0;
+ keys.fill(records, z);
+ gcis.fill(records, z2);
+ Uint64 currGci = 0;
+
+ for (Uint32 e=0; e < receivedEvents.size(); e++)
+ {
+ EventInfo ei = receivedEvents[e];
+
+ if (ei.gci != currGci)
+ {
+ if (ei.gci < currGci)
+ abort();
+
+ /* Epoch boundary */
+ /* At this point, all id counts must be equal */
+ for (int i=0; i < records; i++)
+ {
+ if (keys[i] != keys[0])
+ {
+ g_err << "Count for id " << i
+ << " is " << keys[i]
+ << " but should be " << keys[0] << endl;
+ return NDBT_OK;
+ }
+ }
+
+ currGci = ei.gci;
+ }
+
+ Uint32 eventIndex = keys[ei.id];
+ keys[ei.id]++;
+
+ ApiEventType et = expectedEvents[eventIndex];
+
+ if (ei.type != et)
+ {
+ g_err << "Expected event of type " << et
+ << " but found " << ei.type
+ << " at expectedEvent " << eventIndex
+ << " and event num " << e << endl;
+ return NDBT_FAILED;
+ }
+ }
+
+ return NDBT_OK;
+}
+
+int runRefreshTuple(NDBT_Context* ctx, NDBT_Step* step){
+ int records = ctx->getNumRecords();
+ Ndb* ndb = GETNDB(step);
+
+ /* Now attempt to create EventOperation */
+ NdbEventOperation* pOp;
+ const NdbDictionary::Table& tab = *ctx->getTab();
+
+ char eventName[1024];
+ sprintf(eventName,"%s_EVENT",tab.getName());
+
+ pOp = ndb->createEventOperation(eventName);
+ if (pOp == NULL)
+ {
+ g_err << "Failed to create event operation\n";
+ return NDBT_FAILED;
+ }
+
+ HugoCalculator calc(tab);
+ Vector<NdbRecAttr*> eventAfterRecAttr;
+ Vector<NdbRecAttr*> eventBeforeRecAttr;
+ int updateCol = -1;
+ int idCol = -1;
+
+ /* Now request all attributes */
+ for (int a = 0; a < tab.getNoOfColumns(); a++)
+ {
+ eventAfterRecAttr.push_back(pOp->getValue(tab.getColumn(a)->getName()));
+ eventBeforeRecAttr.push_back(pOp->getPreValue(tab.getColumn(a)->getName()));
+ if (calc.isIdCol(a))
+ idCol = a;
+ if (calc.isUpdateCol(a))
+ updateCol = a;
+ }
+
+ /* Now execute the event */
+ if (pOp->execute())
+ {
+ g_err << "Event operation execution failed : " << pOp->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ HugoOperations hugoOps(*ctx->getTab());
+ int scenario = 0;
+
+ Vector<ApiEventType> expectedEvents;
+
+ for (scenario = 0; scenario < 2; scenario++)
+ {
+ g_err << "Scenario = " << scenario
+ << " ( Refresh "
+ << ((scenario == 0)? "before":"after")
+ << " operations )" << endl;
+ int optype = 0;
+ bool done = false;
+ int expectedError = 0;
+ do
+ {
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+
+ if (scenario == 0)
+ {
+ g_err << "Refresh before operations" << endl;
+ int anyValue =
+ ((1) << 8) |
+ optype;
+ check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps);
+ }
+
+ switch(optype)
+ {
+ case 0:
+ {
+ /* Refresh with no data present */
+ g_err << " Do nothing" << endl;
+ expectedError = 0; /* Single refresh should always be fine */
+ expectedEvents.push_back(Delete);
+ break;
+ }
+ case 1:
+ {
+ /* [Refresh] Insert [Refresh] */
+ g_err << " Insert" << endl;
+ check(hugoOps.pkInsertRecord(ndb, 0, records, 1) == 0, hugoOps);
+ if (scenario == 0)
+ {
+ /* Tuple already existed error when we insert after refresh */
+ expectedError = 630;
+ expectedEvents.push_back(Delete);
+ }
+ else
+ {
+ expectedError = 0;
+ expectedEvents.push_back(Insert);
+ }
+ /* Tuple already existed error when we insert after refresh */
+ break;
+ }
+ case 2:
+ {
+ /* Refresh */
+ g_err << " Refresh" << endl;
+ if (scenario == 0)
+ {
+ expectedEvents.push_back(Delete);
+ }
+ else
+ {
+ expectedEvents.push_back(Insert);
+ }
+ expectedError = 0;
+ break;
+ }
+ case 3:
+ {
+ /* [Refresh] Update [Refresh] */
+ g_err << " Update" << endl;
+ check(hugoOps.pkUpdateRecord(ndb, 0, records, 3) == 0, hugoOps);
+ if (scenario == 0)
+ {
+ expectedError = 920;
+ expectedEvents.push_back(Delete);
+ }
+ else
+ {
+ expectedError = 0;
+ expectedEvents.push_back(Insert);
+ }
+ break;
+ }
+ case 4:
+ {
+ /* [Refresh] Delete [Refresh] */
+ g_err << " [Refresh] Delete [Refresh]" << endl;
+ if (scenario == 0)
+ {
+ expectedError = 920;
+ expectedEvents.push_back(Delete);
+ }
+ else
+ {
+ expectedError = 0;
+ expectedEvents.push_back(Delete);
+ }
+ check(hugoOps.pkDeleteRecord(ndb, 0, records) == 0, hugoOps);
+ break;
+ }
+ case 5:
+ {
+ g_err << " Refresh" << endl;
+ expectedError = 0;
+ expectedEvents.push_back(Delete);
+ /* Refresh with no data present */
+ break;
+ }
+ case 6:
+ {
+ g_err << " Double refresh" << endl;
+ int anyValue =
+ ((2) << 8) |
+ optype;
+ check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps);
+ expectedError = 920; /* Row operation defined after refreshTuple() */
+ expectedEvents.push_back(Delete);
+ }
+ default:
+ done = true;
+ break;
+ }
+
+ if (scenario == 1)
+ {
+ g_err << "Refresh after operations" << endl;
+ int anyValue =
+ ((4) << 8) |
+ optype;
+ check(hugoOps.pkRefreshRecord(ndb, 0, records, anyValue) == 0, hugoOps);
+ }
+
+ int rc = hugoOps.execute_Commit(ndb, AO_IgnoreError);
+ check(rc == expectedError, hugoOps);
+
+ check(hugoOps.closeTransaction(ndb) == 0, hugoOps);
+
+ optype++;
+
+
+ /* Now check fragment counts vs findable row counts */
+ if (runVerifyRowCount(ctx, step) != NDBT_OK)
+ return NDBT_FAILED;
+
+ } while (!done);
+ } // for scenario...
+
+ /* Now check fragment counts vs findable row counts */
+ if (runVerifyRowCount(ctx, step) != NDBT_OK)
+ return NDBT_FAILED;
+
+ /* Now let's dump and check the events */
+ g_err << "Expecting the following sequence..." << endl;
+ for (Uint32 i=0; i < expectedEvents.size(); i++)
+ {
+ g_err << i << ". ";
+ switch(expectedEvents[i])
+ {
+ case Insert:
+ g_err << "Insert" << endl;
+ break;
+ case Update:
+ g_err << "Update" << endl;
+ break;
+ case Delete:
+ g_err << "Delete" << endl;
+ break;
+ default:
+ abort();
+ }
+ }
+
+ Vector<EventInfo> receivedEvents;
+
+ int rc = collectEvents(ndb, calc, tab, receivedEvents, idCol, updateCol,
+ &eventBeforeRecAttr,
+ &eventAfterRecAttr);
+ if (rc == NDBT_OK)
+ {
+ rc = verifyEvents(receivedEvents,
+ expectedEvents,
+ records);
+ }
+
+ if (ndb->dropEventOperation(pOp) != 0)
+ {
+ g_err << "Drop Event Operation failed : " << ndb->getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ return rc;
+};
+
+enum PreRefreshOps
+{
+ PR_NONE,
+ PR_INSERT,
+ PR_INSERTDELETE,
+ PR_DELETE
+};
+
+struct RefreshScenario
+{
+ const char* name;
+ bool preExist;
+ PreRefreshOps preRefreshOps;
+};
+
+static RefreshScenario refreshTests[] = {
+ { "No row, No pre-ops", false, PR_NONE },
+ { "No row, Insert pre-op", false, PR_INSERT },
+ { "No row, Insert-Del pre-op", false, PR_INSERTDELETE },
+ { "Row exists, No pre-ops", true, PR_NONE },
+ { "Row exists, Delete pre-op", true, PR_DELETE }
+};
+
+enum OpTypes
+{
+ READ_C,
+ READ_S,
+ READ_E,
+ INSERT,
+ UPDATE,
+ WRITE,
+ DELETE,
+ LAST
+};
+
+const char* opTypeNames[] =
+{
+ "READ_C",
+ "READ_S",
+ "READ_E",
+ "INSERT",
+ "UPDATE",
+ "WRITE",
+ "DELETE"
+};
+
+
+int
+runRefreshLocking(NDBT_Context* ctx, NDBT_Step* step)
+{
+ /* Check that refresh in various situations has the
+ * locks we expect it to
+ * Scenario combinations :
+ * Now row pre-existing | Row pre-existing
+ * Trans1 : Refresh | Insert-Refresh | Insert-Delete-Refresh
+ * Delete-Refresh
+ * Trans2 : Read [Committed|Shared|Exclusive] | Insert | Update
+ * Write | Delete
+ *
+ * Expectations : Read committed always non-blocking
+ * Read committed sees pre-existing row
+ * All other trans2 operations deadlock
+ */
+
+ Ndb* ndb = GETNDB(step);
+ Uint32 numScenarios = sizeof(refreshTests) / sizeof(refreshTests[0]);
+ HugoTransactions hugoTrans(*ctx->getTab());
+
+ for (Uint32 s = 0; s < numScenarios; s++)
+ {
+ RefreshScenario& scenario = refreshTests[s];
+
+ if (scenario.preExist)
+ {
+ /* Create pre-existing tuple */
+ if (hugoTrans.loadTable(ndb, 1) != 0)
+ {
+ g_err << "Pre-exist failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ }
+
+ if (hugoTrans.startTransaction(ndb) != 0)
+ {
+ g_err << "Start trans failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ g_err << "Scenario : " << scenario.name << endl;
+
+ /* Do pre-refresh ops */
+ switch (scenario.preRefreshOps)
+ {
+ case PR_NONE:
+ break;
+ case PR_INSERT:
+ case PR_INSERTDELETE:
+ if (hugoTrans.pkInsertRecord(ndb, 0) != 0)
+ {
+ g_err << "Pre insert failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ if (scenario.preRefreshOps == PR_INSERT)
+ break;
+ case PR_DELETE:
+ if (hugoTrans.pkDeleteRecord(ndb, 0) != 0)
+ {
+ g_err << "Pre delete failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ break;
+ }
+
+ /* Then refresh */
+ if (hugoTrans.pkRefreshRecord(ndb, 0) != 0)
+ {
+ g_err << "Refresh failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ /* Now execute */
+ if (hugoTrans.execute_NoCommit(ndb) != 0)
+ {
+ g_err << "Execute failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ {
+ /* Now try ops from another transaction */
+ HugoOperations hugoOps(*ctx->getTab());
+ Uint32 ot = READ_C;
+
+ while (ot < LAST)
+ {
+ if (hugoOps.startTransaction(ndb) != 0)
+ {
+ g_err << "Start trans2 failed : " << hugoOps.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+
+ g_err << "Operation type : " << opTypeNames[ot] << endl;
+ int res = 0;
+ switch (ot)
+ {
+ case READ_C:
+ res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_CommittedRead);
+ break;
+ case READ_S:
+ res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_Read);
+ break;
+ case READ_E:
+ res = hugoOps.pkReadRecord(ndb,0,1,NdbOperation::LM_Exclusive);
+ break;
+ case INSERT:
+ res = hugoOps.pkInsertRecord(ndb, 0);
+ break;
+ case UPDATE:
+ res = hugoOps.pkUpdateRecord(ndb, 0);
+ break;
+ case WRITE:
+ res = hugoOps.pkWriteRecord(ndb, 0);
+ break;
+ case DELETE:
+ res = hugoOps.pkDeleteRecord(ndb, 0);
+ break;
+ case LAST:
+ abort();
+ }
+
+ hugoOps.execute_Commit(ndb);
+
+ if ((ot == READ_C) && (scenario.preExist))
+ {
+ if (hugoOps.getNdbError().code == 0)
+ {
+ g_err << "Read committed succeeded" << endl;
+ }
+ else
+ {
+ g_err << "UNEXPECTED : Read committed failed. " << hugoOps.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ }
+ else
+ {
+ if (hugoOps.getNdbError().code == 0)
+ {
+ g_err << opTypeNames[ot] << " succeeded, should not have" << endl;
+ return NDBT_FAILED;
+ }
+ }
+
+ hugoOps.closeTransaction(ndb);
+
+ ot = ot + 1;
+ }
+
+ }
+
+ /* Close refresh transaction */
+ hugoTrans.closeTransaction(ndb);
+
+ if (scenario.preExist)
+ {
+ /* Cleanup pre-existing before next iteration */
+ if (hugoTrans.pkDelRecords(ndb, 0) != 0)
+ {
+ g_err << "Delete pre existing failed : " << hugoTrans.getNdbError() << endl;
+ return NDBT_FAILED;
+ }
+ }
+ }
+
+ return NDBT_OK;
+}
+
+
NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK"
@@ -2746,6 +3551,12 @@ TESTCASE("UnlockUpdateBatch",
STEP(runPkRead);
FINALIZER(runClearTable);
}
+TESTCASE("RefreshTuple",
+ "Test refreshTuple() operation properties"){
+ INITIALIZER(initSubscription);
+ INITIALIZER(runRefreshTuple);
+ FINALIZER(removeSubscription);
+}
TESTCASE("Bug54986", "")
{
INITIALIZER(runBug54986);
@@ -2773,6 +3584,11 @@ TESTCASE("899", "")
STEP(runTest899);
FINALIZER(runEnd899);
}
+TESTCASE("RefreshLocking",
+ "Test Refresh locking properties")
+{
+ INITIALIZER(runRefreshLocking);
+}
NDBT_TESTSUITE_END(testBasic);
#if 0
=== modified file 'storage/ndb/test/ndbapi/testIndex.cpp'
--- a/storage/ndb/test/ndbapi/testIndex.cpp 2011-04-28 07:47:53 +0000
+++ b/storage/ndb/test/ndbapi/testIndex.cpp 2011-05-25 13:19:02 +0000
@@ -1744,6 +1744,73 @@ runMixed2(NDBT_Context* ctx, NDBT_Step*
return NDBT_FAILED;
}
+#define check(b, e) \
+ if (!(b)) { g_err << "ERR: " << step->getName() << " failed on line " << __LINE__ << ": " << e.getNdbError() << endl; return NDBT_FAILED; }
+
+int runRefreshTupleAbort(NDBT_Context* ctx, NDBT_Step* step){
+ int records = ctx->getNumRecords();
+ int loops = ctx->getNumLoops();
+
+ Ndb* ndb = GETNDB(step);
+
+ const NdbDictionary::Table& tab = *ctx->getTab();
+
+ for (int i=0; i < tab.getNoOfColumns(); i++)
+ {
+ if (tab.getColumn(i)->getStorageType() == NDB_STORAGETYPE_DISK)
+ {
+ g_err << "Table has disk column(s) skipping." << endl;
+ return NDBT_OK;
+ }
+ }
+
+
+ g_err << "Loading table." << endl;
+ HugoTransactions hugoTrans(*ctx->getTab());
+ check(hugoTrans.loadTable(ndb, records) == 0, hugoTrans);
+
+ HugoOperations hugoOps(*ctx->getTab());
+
+ /* Check refresh, abort sequence with an ordered index
+ * Previously this gave bugs due to corruption of the
+ * tuple version
+ */
+ while (loops--)
+ {
+ Uint32 numRefresh = 2 + rand() % 10;
+
+ g_err << "Refresh, rollback * " << numRefresh << endl;
+
+ while (--numRefresh)
+ {
+ /* Refresh, rollback */
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+ check(hugoOps.pkRefreshRecord(ndb, 0, records, 0) == 0, hugoOps);
+ check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
+ check(hugoOps.execute_Rollback(ndb) == 0, hugoOps);
+ check(hugoOps.closeTransaction(ndb) == 0, hugoOps);
+ }
+
+ g_err << "Refresh, commit" << endl;
+ /* Refresh, commit */
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+ check(hugoOps.pkRefreshRecord(ndb, 0, records, 0) == 0, hugoOps);
+ check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
+ check(hugoOps.execute_Commit(ndb) == 0, hugoOps);
+ check(hugoOps.closeTransaction(ndb) == 0, hugoOps);
+
+ g_err << "Update, commit" << endl;
+ /* Update */
+ check(hugoOps.startTransaction(ndb) == 0, hugoOps);
+ check(hugoOps.pkUpdateRecord(ndb, 0, records, 2 + loops) == 0, hugoOps);
+ check(hugoOps.execute_NoCommit(ndb) == 0, hugoOps);
+ check(hugoOps.execute_Commit(ndb) == 0, hugoOps);
+ check(hugoOps.closeTransaction(ndb) == 0, hugoOps);
+ }
+
+ return NDBT_OK;
+}
+
int
runBuildDuring(NDBT_Context* ctx, NDBT_Step* step){
@@ -3619,6 +3686,16 @@ TESTCASE("Bug60851", "")
INITIALIZER(runBug60851);
FINALIZER(createPkIndex_Drop);
}
+TESTCASE("RefreshWithOrderedIndex",
+ "Refresh tuples with ordered index(es)")
+{
+ TC_PROPERTY("OrderedIndex", 1);
+ TC_PROPERTY("LoggedIndexes", Uint32(0));
+ INITIALIZER(createPkIndex);
+ INITIALIZER(runRefreshTupleAbort);
+ FINALIZER(createPkIndex_Drop);
+ FINALIZER(runClearTable);
+}
NDBT_TESTSUITE_END(testIndex);
int main(int argc, const char** argv){
=== modified file 'storage/ndb/test/run-test/daily-devel-tests.txt'
--- a/storage/ndb/test/run-test/daily-devel-tests.txt 2011-04-08 11:06:53 +0000
+++ b/storage/ndb/test/run-test/daily-devel-tests.txt 2011-05-25 13:19:02 +0000
@@ -129,3 +129,16 @@ max-time: 1800
cmd: testDict
args: -n SchemaTrans -l 1
+# Refresh tuple
+max-time: 300
+cmd: testBasic
+args: -n RefreshTuple T6 D1
+
+max-time: 300
+cmd: testIndex
+args: -n RefreshWithOrderedIndex T2 D2
+
+max-time: 300
+cmd: testBasic
+args: -n RefreshLocking D1
+
=== modified file 'storage/ndb/test/src/HugoOperations.cpp'
--- a/storage/ndb/test/src/HugoOperations.cpp 2011-04-07 07:22:49 +0000
+++ b/storage/ndb/test/src/HugoOperations.cpp 2011-05-25 13:19:02 +0000
@@ -567,6 +567,47 @@ int HugoOperations::pkDeleteRecord(Ndb*
return NDBT_OK;
}
+int HugoOperations::pkRefreshRecord(Ndb* pNdb,
+ int recordNo,
+ int numRecords,
+ int anyValueInfo){
+
+ char buffer[NDB_MAX_TUPLE_SIZE];
+ const NdbDictionary::Table * pTab =
+ pNdb->getDictionary()->getTable(tab.getName());
+
+ if (pTab == 0)
+ {
+ return NDBT_FAILED;
+ }
+
+ const NdbRecord * record = pTab->getDefaultRecord();
+ NdbOperation::OperationOptions opts;
+ opts.optionsPresent = NdbOperation::OperationOptions::OO_ANYVALUE;
+ for(int r=0; r < numRecords; r++)
+ {
+ bzero(buffer, sizeof(buffer));
+ if (calc.equalForRow((Uint8*)buffer, record, r + recordNo))
+ {
+ return NDBT_FAILED;
+ }
+
+ opts.anyValue = anyValueInfo?
+ (anyValueInfo << 16) | (r+recordNo) :
+ 0;
+
+ const NdbOperation* pOp = pTrans->refreshTuple(record, buffer,
+ &opts, sizeof(opts));
+ if (pOp == NULL)
+ {
+ ERR(pTrans->getNdbError());
+ setNdbError(pTrans->getNdbError());
+ return NDBT_FAILED;
+ }
+ }
+ return NDBT_OK;
+}
+
int HugoOperations::execute_Commit(Ndb* pNdb,
AbortOption eao){
=== modified file 'storage/ndb/test/src/HugoTransactions.cpp'
--- a/storage/ndb/test/src/HugoTransactions.cpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/src/HugoTransactions.cpp 2011-05-25 13:19:02 +0000
@@ -1502,6 +1502,79 @@ HugoTransactions::pkDelRecords(Ndb* pNdb
}
int
+HugoTransactions::pkRefreshRecords(Ndb* pNdb,
+ int startFrom,
+ int count,
+ int batch)
+{
+ int r = 0;
+ int retryAttempt = 0;
+
+ g_info << "|- Refreshing records..." << startFrom << "-" << (startFrom+count)
+ << " (batch=" << batch << ")" << endl;
+
+ while (r < count)
+ {
+ if(r + batch > count)
+ batch = count - r;
+
+ if (retryAttempt >= m_retryMax)
+ {
+ g_info << "ERROR: has retried this operation " << retryAttempt
+ << " times, failing!" << endl;
+ return NDBT_FAILED;
+ }
+
+ pTrans = pNdb->startTransaction();
+ if (pTrans == NULL)
+ {
+ const NdbError err = pNdb->getNdbError();
+
+ if (err.status == NdbError::TemporaryError){
+ ERR(err);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ }
+ ERR(err);
+ return NDBT_FAILED;
+ }
+
+ if (pkRefreshRecord(pNdb, r, batch) != NDBT_OK)
+ {
+ ERR(pTrans->getNdbError());
+ closeTransaction(pNdb);
+ return NDBT_FAILED;
+ }
+
+ if (pTrans->execute(Commit, AbortOnError) == -1)
+ {
+ const NdbError err = pTrans->getNdbError();
+
+ switch(err.status){
+ case NdbError::TemporaryError:
+ ERR(err);
+ closeTransaction(pNdb);
+ NdbSleep_MilliSleep(50);
+ retryAttempt++;
+ continue;
+ break;
+
+ default:
+ ERR(err);
+ closeTransaction(pNdb);
+ return NDBT_FAILED;
+ }
+ }
+
+ closeTransaction(pNdb);
+ r += batch; // Read next record
+ }
+
+ return NDBT_OK;
+}
+
+int
HugoTransactions::pkReadUnlockRecords(Ndb* pNdb,
int records,
int batch,
=== modified file 'storage/ndb/test/tools/hugoPkUpdate.cpp'
--- a/storage/ndb/test/tools/hugoPkUpdate.cpp 2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/tools/hugoPkUpdate.cpp 2011-05-25 13:19:02 +0000
@@ -43,6 +43,8 @@ struct ThrOutput {
NDBT_Stats latency;
};
+static int _refresh = 0;
+
int main(int argc, const char** argv){
ndb_init();
@@ -63,7 +65,9 @@ int main(int argc, const char** argv){
// { "batch", 'b', arg_integer, &_batch, "batch value", "batch" },
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
{ "usage", '?', arg_flag, &_help, "Print help", "" },
- { "database", 'd', arg_string, &db, "Database", "" }
+ { "database", 'd', arg_string, &db, "Database", "" },
+ { "refresh", 0, arg_flag, &_refresh, "refresh record rather than update them", "" }
+
};
int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0;
@@ -135,7 +139,10 @@ int main(int argc, const char** argv){
ths.stop();
if (ths.get_err())
+ {
+ ths.disconnect();
NDBT_ProgramExit(NDBT_FAILED);
+ }
if (_stats) {
NDBT_Stats latency;
@@ -160,6 +167,8 @@ int main(int argc, const char** argv){
i++;
}
+ ths.disconnect();
+
return NDBT_ProgramExit(NDBT_OK);
}
@@ -177,9 +186,19 @@ static void hugoPkUpdate(NDBT_Thread& th
hugoTrans.setThrInfo(ths.get_count(), thr.get_thread_no());
int ret;
- ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
- input->records,
- input->batch);
+ if (_refresh == 0)
+ {
+ ret = hugoTrans.pkUpdateRecords(thr.get_ndb(),
+ input->records,
+ input->batch);
+ }
+ else
+ {
+ ret = hugoTrans.pkRefreshRecords(thr.get_ndb(),
+ 0,
+ input->records,
+ input->batch);
+ }
if (ret != 0)
thr.set_err(ret);
}
Attachment: [text/bzr-bundle] bzr/frazer.clement@oracle.com-20110525131902-9mm9q5r4m74wsiix.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (frazer.clement:4418) | Frazer Clement | 25 May |