Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2370 06/05/18 10:17:53 jonas@stripped +9 -0
ndb - bug#19293 and family
introduce acc per row logical mutex to fix difficult error handling cases
storage/ndb/test/ndbapi/testOperations.cpp
1.13 06/05/18 10:17:50 jonas@stripped +2 -2
renable last 3 lock upgrade testcases since they now pass
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
1.28 06/05/18 10:17:50 jonas@stripped +0 -1
remove unused states/methods
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.37 06/05/18 10:17:50 jonas@stripped +16 -104
remove extremly tricky code that handles disk_insert_but_no_mem_insert
that is no long needed with current acc changes
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.44 06/05/18 10:17:50 jonas@stripped +0 -3
remove unused states/methods
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
1.105 06/05/18 10:17:50 jonas@stripped +259 -158
1) impl. a new read key from operation record needed by acc
2) expand TRACE_OP toolkit
3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
1.46 06/05/18 10:17:50 jonas@stripped +26 -3
1) impl. a new read key from operation record needed by acc
2) expand TRACE_OP toolkit
3) impl. ACCKEY_ORD as needed by ACC changes
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
1.67 06/05/18 10:17:50 jonas@stripped +2009 -1097
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp
1.21 06/05/18 10:17:49 jonas@stripped +0 -4
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp
1.31 06/05/18 10:17:49 jonas@stripped +88 -45
1) Fix per row mutex so that only 1 op at a time is running on a row
2) Change TUP_ALLOC/DEALLOC semantic, so that a new record will be allocated if LQ = { T1(DEL) - T2(INS) }
3) Rewrite lock queus to be O(1) in all cases but a few abort cases where we scan parallell queue
4) Impl. a validate_lock_queue/dump_lock_queue test framework
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-jonas
--- 1.30/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2006-05-07 09:41:37 +02:00
+++ 1.31/storage/ndb/src/kernel/blocks/dbacc/Dbacc.hpp 2006-05-18 10:17:49 +02:00
@@ -17,14 +17,13 @@
#ifndef DBACC_H
#define DBACC_H
-
+#ifdef VM_TRACE
+#define ACC_SAFE_QUEUE
+#endif
#include <pc.hpp>
#include <SimulatedBlock.hpp>
-// primary key is stored in TUP
-#include "../dbtup/Dbtup.hpp"
-
#ifdef DBACC_C
// Debug Macros
#define dbgWord32(ptr, ind, val)
@@ -135,7 +134,10 @@
#define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3
-#define ZSCAN_OP 5
+/**
+ * Check kernel_types for other operation types
+ */
+#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2
#define ZTABLESIZE 16
@@ -477,6 +479,7 @@
/* OPERATIONREC */
/* --------------------------------------------------------------------------------- */
struct Operationrec {
+ Uint32 m_op_bits;
Uint32 localdata[2];
Uint32 elementIsforward;
Uint32 elementPage;
@@ -485,36 +488,62 @@
Uint32 fragptr;
Uint32 hashvaluePart;
Uint32 hashValue;
- Uint32 insertDeleteLen;
Uint32 nextLockOwnerOp;
Uint32 nextOp;
Uint32 nextParallelQue;
- Uint32 nextSerialQue;
+ union {
+ Uint32 nextSerialQue;
+ Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
+ };
Uint32 prevOp;
Uint32 prevLockOwnerOp;
- Uint32 prevParallelQue;
- Uint32 prevSerialQue;
+ union {
+ Uint32 prevParallelQue;
+ Uint32 m_lo_last_parallel_op_ptr_i;
+ };
+ union {
+ Uint32 prevSerialQue;
+ Uint32 m_lo_last_serial_op_ptr_i;
+ };
Uint32 scanRecPtr;
Uint32 transId1;
Uint32 transId2;
- State opState;
Uint32 userptr;
- State transactionstate;
Uint16 elementContainer;
Uint16 tupkeylen;
Uint32 xfrmtupkeylen;
Uint32 userblockref;
Uint32 scanBits;
- Uint8 elementIsDisappeared;
- Uint8 insertIsDone;
- Uint8 lockMode;
- Uint8 lockOwner;
- Uint8 nodeType;
- Uint8 operation;
- Uint8 opSimple;
- Uint8 dirtyRead;
- Uint8 commitDeleteCheckFlag;
- Uint8 isAccLockReq;
+
+ enum OpBits {
+ OP_MASK = 0x0000F // 4 bits for operation type
+ ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
+ ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
+ // before me
+ ,OP_LOCK_OWNER = 0x00040
+ ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
+ ,OP_DIRTY_READ = 0x00100
+ ,OP_LOCK_REQ = 0x00200 // isAccLockReq
+ ,OP_COMMIT_DELETE_CHECK = 0x00400
+ ,OP_INSERT_IS_DONE = 0x00800
+ ,OP_ELEMENT_DISAPPEARED = 0x01000
+
+ ,OP_STATE_MASK = 0xF0000
+ ,OP_STATE_IDLE = 0xF0000
+ ,OP_STATE_WAITING = 0x00000
+ ,OP_STATE_RUNNING = 0x10000
+ ,OP_STATE_EXECUTED = 0x30000
+ ,OP_STATE_RUNNING_ABORT = 0x20000
+
+ ,OP_EXECUTED_DIRTY_READ = 0x3050F
+ ,OP_INITIAL = ~(Uint32)0
+ };
+
+ bool is_same_trans(const Operationrec* op) const {
+ return
+ transId1 == op->transId1 && transId2 == op->transId2;
+ }
+
}; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr;
@@ -577,7 +606,6 @@
Uint32 scanUserblockref;
Uint32 scanMask;
Uint8 scanLockMode;
- Uint8 scanKeyinfoFlag;
Uint8 scanTimer;
Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag;
@@ -602,7 +630,8 @@
virtual ~Dbacc();
// pointer to TUP instance in this thread
- Dbtup* c_tup;
+ class Dbtup* c_tup;
+ class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal);
@@ -640,7 +669,8 @@
void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck();
-
+ void report_dealloc(Signal* signal, const Operationrec* opPtrP);
+
typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
void initFragAdd(Signal*, FragmentrecPtr);
@@ -679,14 +709,30 @@
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal);
- Uint32 placeReadInLockQueue(Signal* signal);
- void placeSerialQueueRead(Signal* signal);
- void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*);
- void moveLastParallelQueue(Signal* signal);
- void moveLastParallelQueueWrite(Signal* signal);
- Uint32 placeWriteInLockQueue(Signal* signal);
- void placeSerialQueueWrite(Signal* signal);
+
+#ifdef VM_TRACE
+ Uint32 getNoParallelTransactionFull(const Operationrec*);
+#endif
+#ifdef ACC_SAFE_QUEUE
+ bool validate_lock_queue(OperationrecPtr opPtr);
+ Uint32 get_parallel_head(OperationrecPtr opPtr);
+ void dump_lock_queue(OperationrecPtr loPtr);
+#else
+ bool validate_lock_queue(OperationrecPtr) { return true;}
+#endif
+
+public:
+ void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
+ void startNext(Signal* signal, OperationrecPtr lastOp);
+
+private:
+ Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
+ Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
+ void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
+ void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
+ void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
+
void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal);
@@ -716,8 +762,8 @@
void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal);
- Uint32 readTablePk(Uint32 localkey1);
- void getElement(Signal* signal);
+ Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
+ Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal);
void commitdelete(Signal* signal);
void deleteElement(Signal* signal);
@@ -726,12 +772,17 @@
void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal);
- void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal);
- void copyOpInfo(Signal* signal);
+ void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal);
+ void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
+ void startNew(Signal* signal, OperationrecPtr newOwner);
+ void abortWaitingOperation(Signal*, OperationrecPtr);
+ void abortExecutedOperation(Signal*, OperationrecPtr);
+
void takeOutFragWaitQue(Signal* signal);
+ void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op);
void allocOverflowPage(Signal* signal);
@@ -780,8 +831,8 @@
void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal);
- void accIsLockedLab(Signal* signal);
- void insertExistElemLab(Signal* signal);
+ void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
+ void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal);
@@ -840,8 +891,6 @@
Operationrec *operationrec;
OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr;
- OperationrecPtr copyInOperPtr;
- OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr;
@@ -885,8 +934,6 @@
Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr;
- Page8Ptr priPageptr;
- Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr;
Page8Ptr isoPageptr;
@@ -926,8 +973,6 @@
Tabrec *tabrec;
TabrecPtr tabptr;
Uint32 ctablesize;
- Uint32 tpwiElementptr;
- Uint32 tpriElementptr;
Uint32 tgseElementptr;
Uint32 tgseContainerptr;
Uint32 trlHead;
@@ -961,8 +1006,6 @@
Uint32 tdelForward;
Uint32 tiopPageId;
Uint32 tipPageId;
- Uint32 tgeLocked;
- Uint32 tgeResult;
Uint32 tgeContainerptr;
Uint32 tgeElementptr;
Uint32 tgeForward;
--- 1.20/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2006-02-13 13:12:41 +01:00
+++ 1.21/storage/ndb/src/kernel/blocks/dbacc/DbaccInit.cpp 2006-05-18 10:17:49 +02:00
@@ -130,8 +130,6 @@
&fragrecptr,
&operationRecPtr,
&idrOperationRecPtr,
- ©InOperPtr,
- ©OperPtr,
&mlpqOperPtr,
&queOperPtr,
&readWriteOpPtr,
@@ -161,8 +159,6 @@
&lcnPageptr,
&lcnCopyPageptr,
&lupPageptr,
- &priPageptr,
- &pwiPageptr,
&ciPageidptr,
&gsePageidptr,
&isoPageptr,
--- 1.66/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2006-05-07 09:41:38 +02:00
+++ 1.67/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2006-05-18 10:17:50 +02:00
@@ -41,6 +41,19 @@
#define DEBUG(x)
#endif
+#ifdef ACC_SAFE_QUEUE
+#define vlqrequire(x) do { if (unlikely(!(x))) {\
+ dump_lock_queue(loPtr); \
+ ndbrequire(false); } } while(0)
+#else
+#define vlqrequire(x) ndbrequire(x)
+#endif
+
+
+// primary key is stored in TUP
+#include "../dbtup/Dbtup.hpp"
+#include "../dblqh/Dblqh.hpp"
+
// Signal entries and statement blocks
/* --------------------------------------------------------------------------------- */
@@ -208,8 +221,8 @@
switch (tstartphase) {
case 1:
jam();
- c_tup = (Dbtup*)globalData.getBlock(DBTUP);
- ndbrequire(c_tup != 0);
+ ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
+ ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0);
break;
}
tuserblockref = signal->theData[3];
@@ -435,9 +448,7 @@
for (operationRecPtr.i = 0; operationRecPtr.i < coprecsize; operationRecPtr.i++) {
refresh_watch_dog();
ptrAss(operationRecPtr, operationrec);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->opState = FREE_OP;
+ operationRecPtr.p->m_op_bits = ~0;
operationRecPtr.p->nextOp = operationRecPtr.i + 1;
}//for
operationRecPtr.i = coprecsize - 1;
@@ -899,8 +910,7 @@
ptrGuard(operationRecPtr);
operationRecPtr.p->userptr = tuserptr;
operationRecPtr.p->userblockref = tuserblockref;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = ~0;
/* ******************************< */
/* ACCSEIZECONF */
/* ******************************< */
@@ -954,22 +964,19 @@
operationRecPtr.p->xfrmtupkeylen = signal->theData[4];
operationRecPtr.p->transId1 = signal->theData[5];
operationRecPtr.p->transId2 = signal->theData[6];
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->operation = Treqinfo & 0x7;
- /* --------------------------------------------------------------------------------- */
- // opSimple is not used in this version. Is needed for deadlock handling later on.
- /* --------------------------------------------------------------------------------- */
- // operationRecPtr.p->opSimple = (Treqinfo >> 3) & 0x1;
-
- operationRecPtr.p->lockMode = (Treqinfo >> 4) & 0x3;
Uint32 readFlag = (((Treqinfo >> 4) & 0x3) == 0); // Only 1 if Read
Uint32 dirtyFlag = (((Treqinfo >> 6) & 0x1) == 1); // Only 1 if Dirty
Uint32 dirtyReadFlag = readFlag & dirtyFlag;
- operationRecPtr.p->dirtyRead = dirtyReadFlag;
- operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
+ Uint32 opbits = 0;
+ opbits |= Treqinfo & 0x7;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= ((Treqinfo >> 4) & 0x3) ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= (dirtyReadFlag) ? Operationrec::OP_DIRTY_READ : 0;
+ opbits |= ((Treqinfo >> 31) & 0x1) ? Operationrec::OP_LOCK_REQ : 0;
+
+ //operationRecPtr.p->nodeType = (Treqinfo >> 7) & 0x3;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
operationRecPtr.p->nextParallelQue = RNIL;
@@ -977,14 +984,10 @@
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
operationRecPtr.p->elementPage = RNIL;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->insertIsDone = ZFALSE;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength;
operationRecPtr.p->scanRecPtr = RNIL;
+ operationRecPtr.p->m_op_bits = opbits;
// bit to mark lock operation
- operationRecPtr.p->isAccLockReq = (Treqinfo >> 31) & 0x1;
// undo log is not run via ACCKEYREQ
}//Dbacc::initOpRec()
@@ -995,7 +998,7 @@
void Dbacc::sendAcckeyconf(Signal* signal)
{
signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = operationRecPtr.p->operation;
+ signal->theData[1] = operationRecPtr.p->m_op_bits & Operationrec::OP_MASK;
signal->theData[2] = operationRecPtr.p->fid;
signal->theData[3] = operationRecPtr.p->localdata[0];
signal->theData[4] = operationRecPtr.p->localdata[1];
@@ -1003,7 +1006,8 @@
}//Dbacc::sendAcckeyconf()
-void Dbacc::ACCKEY_error(Uint32 fromWhere)
+void
+Dbacc::ACCKEY_error(Uint32 fromWhere)
{
switch(fromWhere) {
case 0:
@@ -1057,7 +1061,8 @@
}//if
ptrAss(operationRecPtr, operationrec);
ptrAss(fragrecptr, fragmentrec);
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
+
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
initOpRec(signal);
// normalize key if any char attr
@@ -1071,23 +1076,31 @@
/* WE REMEMBER THESE ADDRESS IF WE LATER NEED TO INSERT */
/* THE ITEM AFTER NOT FINDING THE ITEM. */
/*---------------------------------------------------------------*/
- getElement(signal);
-
- if (tgeResult == ZTRUE) {
- switch (operationRecPtr.p->operation) {
+ OperationrecPtr lockOwnerPtr;
+ const Uint32 found = getElement(signal, lockOwnerPtr);
+
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (found == ZTRUE)
+ {
+ switch (op) {
case ZREAD:
case ZUPDATE:
case ZDELETE:
case ZWRITE:
case ZSCAN_OP:
- if (!tgeLocked){
- if(operationRecPtr.p->operation == ZWRITE)
+ if (!lockOwnerPtr.p)
+ {
+ if(op == ZWRITE)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
}
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
sendAcckeyconf(signal);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+ if (! (opbits & Operationrec::OP_DIRTY_READ)) {
/*---------------------------------------------------------------*/
// It is not a dirty read. We proceed by locking and continue with
// the operation.
@@ -1104,42 +1117,44 @@
dbgWord32(gePageptr, tgeElementptr, eh);
gePageptr.p->word32[tgeElementptr] = eh;
- insertLockOwnersList(signal , operationRecPtr);
- return;
+ opbits |= Operationrec::OP_LOCK_OWNER;
+ insertLockOwnersList(signal, operationRecPtr);
} else {
jam();
/*---------------------------------------------------------------*/
// It is a dirty read. We do not lock anything. Set state to
// IDLE since no COMMIT call will come.
/*---------------------------------------------------------------*/
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- return;
+ opbits = Operationrec::OP_EXECUTED_DIRTY_READ;
}//if
+ operationRecPtr.p->m_op_bits = opbits;
+ return;
} else {
jam();
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
return;
}//if
break;
case ZINSERT:
jam();
- insertExistElemLab(signal);
+ insertExistElemLab(signal, lockOwnerPtr);
return;
break;
default:
ndbrequire(false);
break;
}//switch
- } else if (tgeResult == ZFALSE) {
- switch (operationRecPtr.p->operation) {
- case ZINSERT:
+ } else if (found == ZFALSE) {
+ switch (op){
case ZWRITE:
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZINSERT);
+ case ZINSERT:
jam();
- // If a write operation makes an insert we switch operation to ZINSERT so
- // that the commit-method knows an insert has been made and updates noOfElements.
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
insertelementLab(signal);
return;
break;
@@ -1157,13 +1172,285 @@
}//switch
} else {
jam();
- acckeyref1Lab(signal, tgeResult);
+ acckeyref1Lab(signal, found);
return;
}//if
return;
}//Dbacc::execACCKEYREQ()
void
+Dbacc::execACCKEY_ORD(Signal* signal, Uint32 opPtrI)
+{
+ OperationrecPtr lastOp;
+ lastOp.i = opPtrI;
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ Uint32 opbits = lastOp.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+
+ if (likely(opbits == Operationrec::OP_EXECUTED_DIRTY_READ))
+ {
+ jam();
+ lastOp.p->m_op_bits = Operationrec::OP_INITIAL;
+ return;
+ }
+ else if (likely(opstate == Operationrec::OP_STATE_RUNNING))
+ {
+ opbits |= Operationrec::OP_STATE_EXECUTED;
+ lastOp.p->m_op_bits = opbits;
+ startNext(signal, lastOp);
+ return;
+ }
+ else if (opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ {
+ }
+ else
+ {
+ }
+
+ ndbout_c("bits: %.8x state: %.8x", opbits, opstate);
+ ndbrequire(false);
+}
+
+void
+Dbacc::startNext(Signal* signal, OperationrecPtr lastOp)
+{
+ jam();
+ OperationrecPtr nextOp;
+ OperationrecPtr loPtr;
+ nextOp.i = lastOp.p->nextParallelQue;
+ loPtr.i = lastOp.p->m_lock_owner_ptr_i;
+ Uint32 opbits = lastOp.p->m_op_bits;
+
+ if ((opbits & Operationrec::OP_STATE_MASK)!= Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+ return;
+ }
+
+ Uint32 nextbits;
+ if (nextOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+ goto checkop;
+ }
+
+ if ((opbits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ nextOp.i = loPtr.p->nextSerialQue;
+ }
+ else
+ {
+ jam();
+ nextOp.i = loPtr.i;
+ loPtr = lastOp;
+ }
+
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+
+ if (nextOp.i == RNIL)
+ {
+ jam();
+ return;
+ }
+
+ /**
+ * There is an op in serie queue...
+ * Check if it can run
+ */
+ ptrCheckGuard(nextOp, coprecsize, operationrec);
+ nextbits = nextOp.p->m_op_bits;
+
+ bool same = nextOp.p->is_same_trans(lastOp.p);
+
+ if (!same && ((opbits & Operationrec::OP_ACC_LOCK_MODE) ||
+ (nextbits & Operationrec::OP_LOCK_MODE)))
+ {
+ jam();
+ /**
+ * Not same transaction
+ * and either last had exclusive lock
+ * or next had exclusive lock
+ */
+ return;
+ }
+
+ /**
+ * same trans and X-lock
+ */
+ if (same && (opbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
+ jam();
+ goto upgrade;
+ }
+
+ /**
+ * all shared lock...
+ */
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ jam();
+ goto upgrade;
+ }
+
+ /**
+ * There is a shared parallell queue & and exclusive op is first in queue
+ */
+ ndbassert((opbits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ (nextbits & Operationrec::OP_LOCK_MODE));
+
+ /**
+ * We must check if there are many transactions in parallel queue...
+ */
+ OperationrecPtr tmp;
+ tmp.i = loPtr.p->nextParallelQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ if (!nextOp.p->is_same_trans(tmp.p))
+ {
+ jam();
+ /**
+ * parallel queue contained another transaction, dont let it run
+ */
+ return;
+ }
+ }
+
+upgrade:
+ /**
+ * Move first op in serie queue to end of parallell queue
+ */
+
+ tmp.i = loPtr.p->nextSerialQue = nextOp.p->nextSerialQue;
+ loPtr.p->m_lo_last_parallel_op_ptr_i = nextOp.i;
+ nextOp.p->nextSerialQue = RNIL;
+ nextOp.p->prevSerialQue = RNIL;
+ nextOp.p->m_lock_owner_ptr_i = loPtr.i;
+ nextOp.p->prevParallelQue = lastOp.i;
+ lastOp.p->nextParallelQue = nextOp.i;
+
+ if (tmp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ tmp.p->prevSerialQue = loPtr.i;
+ }
+ else
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ nextbits |= Operationrec::OP_RUN_QUEUE;
+
+ /**
+ * Currently no grouping of ops in serie queue
+ */
+ ndbrequire(nextOp.p->nextParallelQue == RNIL);
+
+checkop:
+ Uint32 errCode = 0;
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = nextOp;
+
+ Uint32 lastop = opbits & Operationrec::OP_MASK;
+ Uint32 nextop = nextbits & Operationrec::OP_MASK;
+
+ nextbits &= nextbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ nextbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (lastop == ZDELETE)
+ {
+ jam();
+ if (nextop != ZINSERT && nextop != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
+ }
+
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ nextbits |= (nextop = ZINSERT);
+ nextbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
+ }
+ else if (nextop == ZINSERT)
+ {
+ jam();
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (nextop == ZWRITE)
+ {
+ jam();
+ nextbits &= ~(Uint32)Operationrec::OP_MASK;
+ nextbits |= (nextop = ZUPDATE);
+ goto conf;
+ }
+ else
+ {
+ jam();
+ }
+
+conf:
+ nextOp.p->m_op_bits = nextbits;
+ nextOp.p->localdata[0] = lastOp.p->localdata[0];
+ nextOp.p->localdata[1] = lastOp.p->localdata[1];
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ sendAcckeyconf(signal);
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBA);
+ }
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ nextOp.p->m_op_bits = nextbits;
+
+ if (nextop == ZSCAN_OP && (nextbits & Operationrec::OP_LOCK_REQ) == 0)
+ {
+ jam();
+ nextOp.p->m_op_bits |= Operationrec::OP_ELEMENT_DISAPPEARED;
+ takeOutScanLockQueue(nextOp.p->scanRecPtr);
+ putReadyScanQueue(signal, nextOp.p->scanRecPtr);
+ }
+ else
+ {
+ jam();
+ signal->theData[0] = nextOp.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(nextOp.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+ }
+
+ operationRecPtr = save;
+ return;
+}
+
+
+#if 0
+void
+Dbacc::execACCKEY_REP_REF(Signal* signal, Uint32 opPtrI)
+{
+}
+#endif
+
+void
Dbacc::xfrmKeyData(Signal* signal)
{
Uint32 table = fragrecptr.p->myTableId;
@@ -1176,23 +1463,22 @@
operationRecPtr.p->xfrmtupkeylen = len;
}
-void Dbacc::accIsLockedLab(Signal* signal)
+void
+Dbacc::accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
ndbrequire(csystemRestart == ZFALSE);
- queOperPtr.i = ElementHeader::getOpPtrI(gePageptr.p->word32[tgeElementptr]);
- ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (operationRecPtr.p->dirtyRead == ZFALSE) {
+
+ Uint32 bits = operationRecPtr.p->m_op_bits;
+ validate_lock_queue(lockOwnerPtr);
+
+ if ((bits & Operationrec::OP_DIRTY_READ) == 0){
Uint32 return_result;
- if (operationRecPtr.p->lockMode == ZREADLOCK) {
+ if ((bits & Operationrec::OP_LOCK_MODE) == ZREADLOCK) {
jam();
- priPageptr = gePageptr;
- tpriElementptr = tgeElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(lockOwnerPtr);
} else {
jam();
- pwiPageptr = gePageptr;
- tpwiElementptr = tgeElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(lockOwnerPtr);
}//if
if (return_result == ZPARALLEL_QUEUE) {
jam();
@@ -1202,24 +1488,29 @@
jam();
signal->theData[0] = RNIL;
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else {
jam();
acckeyref1Lab(signal, return_result);
return;
}//if
ndbrequire(false);
- } else {
- if (queOperPtr.p->elementIsDisappeared == ZFALSE) {
+ }
+ else
+ {
+ if (!(lockOwnerPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED) &&
+ lockOwnerPtr.p->localdata[0] != ~(Uint32)0)
+ {
jam();
- /*---------------------------------------------------------------*/
- // It is a dirty read. We do not lock anything. Set state to
- // IDLE since no COMMIT call will arrive.
- /*---------------------------------------------------------------*/
+ /* ---------------------------------------------------------------
+ * It is a dirty read. We do not lock anything. Set state to
+ *IDLE since no COMMIT call will arrive.
+ * ---------------------------------------------------------------*/
sendAcckeyconf(signal);
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_EXECUTED_DIRTY_READ;
return;
- } else {
+ }
+ else
+ {
jam();
/*---------------------------------------------------------------*/
// The tuple does not exist in the committed world currently.
@@ -1231,17 +1522,18 @@
}//if
}//Dbacc::accIsLockedLab()
-/* --------------------------------------------------------------------------------- */
-/* I N S E R T E X I S T E L E M E N T */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::insertExistElemLab(Signal* signal)
+/* ------------------------------------------------------------------------ */
+/* I N S E R T E X I S T E L E M E N T */
+/* ------------------------------------------------------------------------ */
+void Dbacc::insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr)
{
- if (!tgeLocked){
+ if (!lockOwnerPtr.p)
+ {
jam();
acckeyref1Lab(signal, ZWRITE_ERROR);/* THE ELEMENT ALREADY EXIST */
return;
}//if
- accIsLockedLab(signal);
+ accIsLockedLab(signal, lockOwnerPtr);
}//Dbacc::insertExistElemLab()
/* --------------------------------------------------------------------------------- */
@@ -1259,26 +1551,8 @@
}//if
}//if
ndbrequire(operationRecPtr.p->tupkeylen <= fragrecptr.p->keyLength);
-
- Uint32 localKey;
- if(!operationRecPtr.p->isAccLockReq)
- {
- signal->theData[0] = operationRecPtr.p->userptr;
- Uint32 blockNo = refToBlock(operationRecPtr.p->userblockref);
- EXECUTE_DIRECT(blockNo, GSN_LQH_ALLOCREQ, signal, 1);
- jamEntry();
- if (signal->theData[0] != 0) {
- jam();
- Uint32 result_code = signal->theData[0];
- acckeyref1Lab(signal, result_code);
- return;
- }//if
- localKey = (signal->theData[1] << MAX_TUPLES_BITS) + signal->theData[2];
- }
- else
- {
- localKey = signal->theData[7];
- }
+ ndbassert(!(operationRecPtr.p->m_op_bits & Operationrec::OP_LOCK_REQ));
+ Uint32 localKey = ~(Uint32)0;
insertLockOwnersList(signal, operationRecPtr);
@@ -1293,196 +1567,18 @@
idrOperationRecPtr = operationRecPtr;
clocalkey[0] = localKey;
operationRecPtr.p->localdata[0] = localKey;
- /* --------------------------------------------------------------------------------- */
- /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ /* WE SET THE LOCAL KEY TO MINUS ONE TO INDICATE IT IS NOT YET VALID. */
+ /* ----------------------------------------------------------------------- */
insertElement(signal);
sendAcckeyconf(signal);
return;
}//Dbacc::insertelementLab()
-/* --------------------------------------------------------------------------------- */
-/* PLACE_READ_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PRI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPRI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* THE ELEMENT WAS LOCKED AND WE WANT TO READ THE TUPLE. WE WILL CHECK THE LOCK */
-/* QUEUES TO PERFORM THE PROPER ACTION. */
-/* */
-/* IN SOME PLACES IN THE CODE BELOW THAT HANDLES WHAT TO DO WHEN THE TUPLE IS LOCKED */
-/* WE DO ASSUME THAT NEXT_PARALLEL_QUEUE AND NEXT_SERIAL_QUEUE ON OPERATION_REC_PTR */
-/* HAVE BEEN INITIALISED TO RNIL. THUS WE DO NOT PERFORM THIS ONCE MORE EVEN IF IT */
-/* COULD BE NICE FOR READABILITY. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeReadInLockQueue(Signal* signal)
-{
- if (getNoParallelTransaction(queOperPtr.p) == 1) {
- if ((queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2)) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THIS TRANSACTION ALREADY OWNS THE LOCK */
- /* ALONE. PUT THE OPERATION LAST IN THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = queOperPtr.p->lockMode;
- break;
- }//switch
- return ZPARALLEL_QUEUE;
- }//if
- }//if
- if (queOperPtr.p->nextSerialQue == RNIL) {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ OPERATION AND THERE IS NO SERIAL QUEUE. IF THERE IS NO */
- /* WRITE OPERATION THAT OWNS THE LOCK OR ANY WRITE OPERATION IN THE PARALLEL QUEUE */
- /* IT IS ENOUGH TO CHECK THE LOCK MODE OF THE LEADER IN THE PARALLEL QUEUE. IF IT IS */
- /* A READ LOCK THEN WE PLACE OURSELVES IN THE PARALLEL QUEUE OTHERWISE WE GO ON TO */
- /* PLACE OURSELVES IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- switch (queOperPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueue(signal);
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- return ZPARALLEL_QUEUE;
- default:
- jam();
- queOperPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = queOperPtr.i;
- break;
- }//switch
- } else {
- jam();
- placeSerialQueueRead(signal);
- }//if
- return ZSERIAL_QUEUE;
-}//Dbacc::placeReadInLockQueue()
-
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THIS TRANSACTION IS ALREADY PLACED AT SOME SPOT IN THE PARALLEL */
-/* SERIAL QUEUE WITHOUT ANY NEIGHBORS FROM OTHER TRANSACTION. IF SO WE WILL INSERT */
-/* IT IN THAT PARALLEL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueRead(Signal* signal)
-{
- readWriteOpPtr.i = queOperPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
-PSQR_LOOP:
- jam();
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS NO PREVIOUS OPERATION IN THIS TRANSACTION WHICH WE COULD PUT IT */
- /* IN THE PARALLEL QUEUE TOGETHER WITH. */
- /* --------------------------------------------------------------------------------- */
- checkOnlyReadEntry(signal);
- return;
- }//if
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING A READ IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE TRANSACTION PREVIOUSLY SET A WRITE LOCK WE MUST ENSURE THAT ALL */
- /* OPERATIONS IN THE PARALLEL QUEUE HAVE WRITE LOCK MODE TO AVOID STRANGE BUGS.*/
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->lockMode = readWriteOpPtr.p->lockMode;
- break;
- }//switch
- return;
- }//if
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- goto PSQR_LOOP;
-}//Dbacc::placeSerialQueueRead()
-
-/* --------------------------------------------------------------------------------- */
-/* WE WILL CHECK IF THE LAST ENTRY IN THE SERIAL QUEUE CONTAINS ONLY READ */
-/* OPERATIONS. IF SO WE WILL INSERT IT IN THAT PARALLEL QUEUE. OTHERWISE WE */
-/* WILL PLACE IT AT THE END OF THE SERIAL QUEUE. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::checkOnlyReadEntry(Signal* signal)
-{
- switch (readWriteOpPtr.p->lockMode) {
- case ZREADLOCK:
- jam();
- /* --------------------------------------------------------------------------------- */
- /* SINCE THIS LAST QUEUE ONLY CONTAINS READ LOCKS WE CAN JOIN THE PARALLEL QUEUE AT */
- /* THE END. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueue(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- break;
- default:
- jam(); /* PUT THE OPERATION RECORD IN THE SERIAL QUEUE */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- break;
- }//switch
-}//Dbacc::checkOnlyReadEntry()
-/* --------------------------------------------------------------------------------- */
-/* GET_NO_PARALLEL_TRANSACTION */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------ */
+/* GET_NO_PARALLEL_TRANSACTION */
+/* ------------------------------------------------------------------------ */
Uint32
Dbacc::getNoParallelTransaction(const Operationrec * op)
{
@@ -1502,135 +1598,641 @@
return 1;
}//Dbacc::getNoParallelTransaction()
-void Dbacc::moveLastParallelQueue(Signal* signal)
+#ifdef VM_TRACE
+Uint32
+Dbacc::getNoParallelTransactionFull(const Operationrec * op)
{
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
-}//Dbacc::moveLastParallelQueue()
+ ConstPtr<Operationrec> tmp;
+
+ tmp.p = op;
+ while ((tmp.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ tmp.i = tmp.p->prevParallelQue;
+ if (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ return getNoParallelTransaction(tmp.p);
+}
+#endif
+
+#ifdef ACC_SAFE_QUEUE
-void Dbacc::moveLastParallelQueueWrite(Signal* signal)
+Uint32
+Dbacc::get_parallel_head(OperationrecPtr opPtr)
{
- /* --------------------------------------------------------------------------------- */
- /* ENSURE THAT ALL OPERATIONS HAVE LOCK MODE SET TO WRITE SINCE WE INSERT A */
- /* WRITE LOCK INTO THE PARALLEL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- while (mlpqOperPtr.p->nextParallelQue != RNIL) {
- jam();
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
- mlpqOperPtr.i = mlpqOperPtr.p->nextParallelQue;
- ptrCheckGuard(mlpqOperPtr, coprecsize, operationrec);
- }//if
- mlpqOperPtr.p->lockMode = operationRecPtr.p->lockMode;
-}//Dbacc::moveLastParallelQueueWrite()
+ while ((opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ opPtr.p->prevParallelQue != RNIL)
+ {
+ opPtr.i = opPtr.p->prevParallelQue;
+ ptrCheckGuard(opPtr, coprecsize, operationrec);
+ }
+
+ return opPtr.i;
+}
-/* --------------------------------------------------------------------------------- */
-/* PLACE_WRITE_IN_LOCK_QUEUE */
-/* INPUT: OPERATION_REC_PTR OUR OPERATION POINTER */
-/* QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER */
-/* PWI_PAGEPTR PAGE POINTER OF ELEMENT */
-/* TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT */
-/* OUTPUT TRESULT = */
-/* ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE */
-/* OPERATION CAN PROCEED NOW. */
-/* ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE */
-/* ERROR CODE OPERATION NEEDS ABORTING */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::placeWriteInLockQueue(Signal* signal)
+bool
+Dbacc::validate_lock_queue(OperationrecPtr opPtr)
{
- if (!((getNoParallelTransaction(queOperPtr.p) == 1) &&
- (queOperPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (queOperPtr.p->transId2 == operationRecPtr.p->transId2))) {
+ OperationrecPtr loPtr;
+ loPtr.i = get_parallel_head(opPtr);
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+
+ while((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ // Now we have lock owner...
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_RUN_QUEUE);
+
+ // 1 Validate page pointer
+ {
+ Page8Ptr pagePtr;
+ pagePtr.i = loPtr.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ arrGuard(loPtr.p->elementPointer, 2048);
+ Uint32 eh = pagePtr.p->word32[loPtr.p->elementPointer];
+ vlqrequire(ElementHeader::getLocked(eh));
+ vlqrequire(ElementHeader::getOpPtrI(eh) == loPtr.i);
+ }
+
+ // 2 Lock owner should always have same LOCK_MODE and ACC_LOCK_MODE
+ if (loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((loPtr.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0);
+ }
+
+ // 3 Lock owner should never be waiting...
+ bool running = false;
+ {
+ Uint32 opstate = loPtr.p->m_op_bits & Operationrec::OP_STATE_MASK;
+ if (opstate == Operationrec::OP_STATE_RUNNING ||
+ opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ running = true;
+ else
+ {
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+ }
+
+ // Validate parallel queue
+ {
+ bool many = false;
+ bool orlockmode = loPtr.p->m_op_bits & Operationrec::OP_LOCK_MODE;
+ OperationrecPtr lastP = loPtr;
+
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ Uint32 prev = lastP.i;
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+
+ vlqrequire(lastP.p->prevParallelQue == prev);
+
+ Uint32 opbits = lastP.p->m_op_bits;
+ many |= loPtr.p->is_same_trans(lastP.p) ? 0 : 1;
+ orlockmode |= !!(opbits & Operationrec::OP_LOCK_MODE);
+
+ vlqrequire(opbits & Operationrec::OP_RUN_QUEUE);
+ vlqrequire((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ if (running)
+ {
+ // If I found a running operation,
+ // all following should be waiting
+ vlqrequire(opstate == Operationrec::OP_STATE_WAITING);
+ }
+ else
+ {
+ if (opstate == Operationrec::OP_STATE_RUNNING ||
+ opstate == Operationrec::OP_STATE_RUNNING_ABORT)
+ running = true;
+ else
+ vlqrequire(opstate == Operationrec::OP_STATE_EXECUTED);
+ }
+
+ if (lastP.p->m_op_bits & Operationrec::OP_LOCK_MODE)
+ {
+ vlqrequire(lastP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ }
+ else
+ {
+ vlqrequire((lastP.p->m_op_bits && orlockmode) == orlockmode);
+ vlqrequire((lastP.p->m_op_bits & Operationrec::OP_MASK) == ZREAD ||
+ (lastP.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP);
+ }
+
+ if (many)
+ {
+ vlqrequire(orlockmode == 0);
+ }
+ }
+
+ if (lastP.i != loPtr.i)
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == lastP.i);
+ vlqrequire(lastP.p->m_lock_owner_ptr_i == loPtr.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_parallel_op_ptr_i == RNIL);
+ }
+ }
+
+ // Validate serie queue
+ if (loPtr.p->nextSerialQue != RNIL)
+ {
+ Uint32 prev = loPtr.i;
+ OperationrecPtr lastS;
+ lastS.i = loPtr.p->nextSerialQue;
+ while (true)
+ {
+ ptrCheckGuard(lastS, coprecsize, operationrec);
+ vlqrequire(lastS.p->prevSerialQue == prev);
+ vlqrequire(getNoParallelTransaction(lastS.p) == 1);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_RUN_QUEUE) == 0);
+ vlqrequire((lastS.p->m_op_bits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_WAITING);
+ if (lastS.p->nextSerialQue == RNIL)
+ break;
+ prev = lastS.i;
+ lastS.i = lastS.p->nextSerialQue;
+ }
+
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == lastS.i);
+ }
+ else
+ {
+ vlqrequire(loPtr.p->m_lo_last_serial_op_ptr_i == RNIL);
+ }
+ return true;
+}
+
+NdbOut&
+operator<<(NdbOut & out, Dbacc::OperationrecPtr ptr)
+{
+ Uint32 opbits = ptr.p->m_op_bits;
+ out << "[ " << dec << ptr.i
+ << " [ " << hex << ptr.p->transId1
+ << " " << hex << ptr.p->transId2 << "] "
+ << " bits: H'" << hex << opbits << " ";
+
+ bool read = false;
+ switch(opbits & Dbacc::Operationrec::OP_MASK){
+ case ZREAD: out << "READ "; read = true; break;
+ case ZINSERT: out << "INSERT "; break;
+ case ZUPDATE: out << "UPDATE "; break;
+ case ZDELETE: out << "DELETE "; break;
+ case ZWRITE: out << "WRITE "; break;
+ case ZSCAN_OP: out << "SCAN "; read = true; break;
+ default:
+ out << "<Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_MASK)
+ << "> ";
+ }
+
+ if (read)
+ {
+ if (opbits & Dbacc::Operationrec::OP_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ if (opbits & Dbacc::Operationrec::OP_ACC_LOCK_MODE)
+ out << "(X)";
+ else
+ out << "(S)";
+ }
+
+ if (opbits)
+ {
+ out << "(RQ)";
+ }
+
+ switch(opbits & Dbacc::Operationrec::OP_STATE_MASK){
+ case Dbacc::Operationrec::OP_STATE_WAITING:
+ out << " WAITING "; break;
+ case Dbacc::Operationrec::OP_STATE_RUNNING:
+ out << " RUNNING "; break;
+ case Dbacc::Operationrec::OP_STATE_EXECUTED:
+ out << " EXECUTED "; break;
+ case Dbacc::Operationrec::OP_STATE_RUNNING_ABORT:
+ out << " RUNNIG_ABORT "; break;
+ case Dbacc::Operationrec::OP_STATE_IDLE:
+ out << " IDLE "; break;
+ default:
+ out << " <Unknown: H'"
+ << hex << (opbits & Dbacc::Operationrec::OP_STATE_MASK)
+ << "> ";
+ }
+
+/*
+ OP_MASK = 0x000F // 4 bits for operation type
+ ,OP_LOCK_MODE = 0x0010 // 0 - shared lock, 1 = exclusive lock
+ ,OP_ACC_LOCK_MODE = 0x0020 // Or:de lock mode of all operation
+ // before me
+ ,OP_LOCK_OWNER = 0x0040
+ ,OP_DIRTY_READ = 0x0080
+ ,OP_LOCK_REQ = 0x0100 // isAccLockReq
+ ,OP_COMMIT_DELETE_CHECK = 0x0200
+ ,OP_INSERT_IS_DONE = 0x0400
+ ,OP_ELEMENT_DISAPPEARED = 0x0800
+
+ ,OP_STATE_MASK = 0xF000
+ ,OP_STATE_IDLE = 0xF000
+ ,OP_STATE_WAITING = 0x0000
+ ,OP_STATE_RUNNING = 0x1000
+ ,OP_STATE_EXECUTED = 0x3000
+ ,OP_STATE_RUNNING_ABORT = 0x2000
+ };
+*/
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ out << "LO ";
+
+ if (opbits & Dbacc::Operationrec::OP_DIRTY_READ)
+ out << "DR ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_REQ)
+ out << "LOCK_REQ ";
+
+ if (opbits & Dbacc::Operationrec::OP_COMMIT_DELETE_CHECK)
+ out << "COMMIT_DELETE_CHECK ";
+
+ if (opbits & Dbacc::Operationrec::OP_INSERT_IS_DONE)
+ out << "INSERT_IS_DONE ";
+
+ if (opbits & Dbacc::Operationrec::OP_ELEMENT_DISAPPEARED)
+ out << "ELEMENT_DISAPPEARED ";
+
+ if (opbits & Dbacc::Operationrec::OP_LOCK_OWNER)
+ {
+ out << "last_parallel: " << dec << ptr.p->m_lo_last_parallel_op_ptr_i << " ";
+ out << "last_serial: " << dec << ptr.p->m_lo_last_serial_op_ptr_i << " ";
+ }
+
+ out << "]";
+ return out;
+}
+
+void
+Dbacc::dump_lock_queue(OperationrecPtr loPtr)
+{
+ if ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevParallelQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevParallelQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0 &&
+ loPtr.p->prevSerialQue != RNIL)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ }
+
+ ndbout << "-- HEAD --" << endl;
+ OperationrecPtr tmp = loPtr;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ ndbout << tmp << " ";
+ tmp.i = tmp.p->nextParallelQue;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << " <LOOP>";
+ break;
+ }
+ }
+ ndbout << endl;
+
+ tmp.i = loPtr.p->nextSerialQue;
+ while (tmp.i != RNIL)
+ {
+ ptrCheckGuard(tmp, coprecsize, operationrec);
+ OperationrecPtr tmp2 = tmp;
+
+ if (tmp.i == loPtr.i)
+ {
+ ndbout << "<LOOP S>" << endl;
+ break;
+ }
+
+ while (tmp2.i != RNIL)
+ {
+ ptrCheckGuard(tmp2, coprecsize, operationrec);
+ ndbout << tmp2 << " ";
+ tmp2.i = tmp2.p->nextParallelQue;
+
+ if (tmp2.i == tmp.i)
+ {
+ ndbout << "<LOOP 3>";
+ break;
+ }
+ }
+ ndbout << endl;
+ tmp.i = tmp.p->nextSerialQue;
+ }
+}
+#endif
+
+/* -------------------------------------------------------------------------
+ * PLACE_WRITE_IN_LOCK_QUEUE
+ * INPUT: OPERATION_REC_PTR OUR OPERATION POINTER
+ * QUE_OPER_PTR LOCK QUEUE OWNER OPERATION POINTER
+ * PWI_PAGEPTR PAGE POINTER OF ELEMENT
+ * TPWI_ELEMENTPTR ELEMENT POINTER OF ELEMENT
+ * OUTPUT TRESULT =
+ * ZPARALLEL_QUEUE OPERATION PLACED IN PARALLEL QUEUE
+ * OPERATION CAN PROCEED NOW.
+ * ZSERIAL_QUEUE OPERATION PLACED IN SERIAL QUEUE
+ * ERROR CODE OPERATION NEEDS ABORTING
+ * ------------------------------------------------------------------------- */
+Uint32
+Dbacc::placeWriteInLockQueue(OperationrecPtr lockOwnerPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ if (lastbits & Operationrec::OP_ACC_LOCK_MODE)
+ {
+ if(operationRecPtr.p->is_same_trans(lastOpPtr.p))
+ {
+ goto checkop;
+ }
+ }
+ else
+ {
+ /**
+ * We dont have an exclusive lock on operation and
+ *
+ */
jam();
- placeSerialQueueWrite(signal);
- return ZSERIAL_QUEUE;
- }//if
+
+ /**
+ * Scan parallell queue to see if we are the only one
+ */
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (!loopPtr.p->is_same_trans(operationRecPtr.p))
+ {
+ goto serial;
+ }
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+ goto checkop;
+ }
+serial:
+ jam();
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return ZSERIAL_QUEUE;
+
+checkop:
/*
- WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
- TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
- Read-All, Update-All, Insert-All and Delete-Insert are allowed
- combinations.
- Delete-Read, Delete-Update and Delete-Delete are not an allowed
- combination and will result in tuple not found error.
+ WE ARE PERFORMING AN READ EXCLUSIVE, INSERT, UPDATE OR DELETE IN THE SAME
+ TRANSACTION WHERE WE PREVIOUSLY HAVE EXECUTED AN OPERATION.
+ Read-All, Update-All, Insert-All and Delete-Insert are allowed
+ combinations.
+ Delete-Read, Delete-Update and Delete-Delete are not an allowed
+ combination and will result in tuple not found error.
*/
- mlpqOperPtr = queOperPtr;
- moveLastParallelQueueWrite(signal);
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
- if (operationRecPtr.p->operation == ZINSERT &&
- mlpqOperPtr.p->operation != ZDELETE){
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
jam();
- return ZWRITE_ERROR;
- }//if
- if(operationRecPtr.p->operation == ZWRITE)
- {
- operationRecPtr.p->operation =
- (mlpqOperPtr.p->operation == ZDELETE) ? ZINSERT : ZUPDATE;
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 lop = lastbits & Operationrec::OP_MASK;
+ if (op == ZINSERT && lop != ZDELETE)
+ {
+ jam();
+ return ZWRITE_ERROR;
+ }//if
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+#if 0
+ if (lop == ZDELETE && (op != ZINSERT && op != ZWRITE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#else
+ if (lop == ZDELETE && (op == ZUPDATE && op == ZDELETE))
+ {
+ jam();
+ return ZREAD_ERROR;
+ }
+#endif
+
+ if(op == ZWRITE)
+ {
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (lop == ZDELETE) ? ZINSERT : ZUPDATE;
+ }
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
}
- operationRecPtr.p->localdata[0] = queOperPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = queOperPtr.p->localdata[1];
- operationRecPtr.p->prevParallelQue = mlpqOperPtr.i;
- mlpqOperPtr.p->nextParallelQue = operationRecPtr.i;
- return ZPARALLEL_QUEUE;
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
}//Dbacc::placeWriteInLockQueue()
-/* --------------------------------------------------------------------------------- */
-/* WE HAVE TO PLACE IT SOMEWHERE IN THE SERIAL QUEUE INSTEAD. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::placeSerialQueueWrite(Signal* signal)
+Uint32
+Dbacc::placeReadInLockQueue(OperationrecPtr lockOwnerPtr)
{
- readWriteOpPtr = queOperPtr;
-PSQW_LOOP:
- if (readWriteOpPtr.p->nextSerialQue == RNIL) {
+ OperationrecPtr lastOpPtr;
+ OperationrecPtr loopPtr = lockOwnerPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ ndbassert(get_parallel_head(lastOpPtr) == lockOwnerPtr.i);
+
+ /**
+ * Last operation in parallell queue of lock owner is same trans
+ * and ACC_LOCK_MODE is exlusive, then we can proceed
+ */
+ Uint32 lastbits = lastOpPtr.p->m_op_bits;
+ bool same = operationRecPtr.p->is_same_trans(lastOpPtr.p);
+ if (same && (lastbits & Operationrec::OP_ACC_LOCK_MODE))
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE COULD NOT PUT IN ANY PARALLEL QUEUE. WE MUST PUT IT LAST IN THE SERIAL QUEUE. */
- /* --------------------------------------------------------------------------------- */
- readWriteOpPtr.p->nextSerialQue = operationRecPtr.i;
- operationRecPtr.p->prevSerialQue = readWriteOpPtr.i;
- return;
- }//if
- readWriteOpPtr.i = readWriteOpPtr.p->nextSerialQue;
- ptrCheckGuard(readWriteOpPtr, coprecsize, operationrec);
- if (getNoParallelTransaction(readWriteOpPtr.p) == 1) {
- /* --------------------------------------------------------------------------------- */
- /* THERE WAS ONLY ONE TRANSACTION INVOLVED IN THE PARALLEL QUEUE. IF THIS IS OUR */
- /* TRANSACTION WE CAN STILL GET HOLD OF THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- if ((readWriteOpPtr.p->transId1 == operationRecPtr.p->transId1) &&
- (readWriteOpPtr.p->transId2 == operationRecPtr.p->transId2)) {
+ goto checkop;
+ }
+
+ if ((lastbits & Operationrec::OP_ACC_LOCK_MODE) && !same)
+ {
+ jam();
+ /**
+ * Last op in serial queue had X-lock and was not our transaction...
+ */
+ goto serial;
+ }
+
+ if (lockOwnerPtr.p->nextSerialQue == RNIL)
+ {
+ jam();
+ goto checkop;
+ }
+
+ /**
+ * Scan parallell queue to see if we are already there...
+ */
+ do
+ {
+ ptrCheckGuard(loopPtr, coprecsize, operationrec);
+ if (loopPtr.p->is_same_trans(operationRecPtr.p))
+ goto checkop;
+ loopPtr.i = loopPtr.p->nextParallelQue;
+ } while (loopPtr.i != RNIL);
+
+serial:
+ placeSerialQueue(lockOwnerPtr, operationRecPtr);
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return ZSERIAL_QUEUE;
+
+checkop:
+ Uint32 lstate = lastbits & Operationrec::OP_STATE_MASK;
+
+ Uint32 retValue = ZSERIAL_QUEUE; // So that it gets blocked...
+ if (lstate == Operationrec::OP_STATE_EXECUTED)
+ {
+ jam();
+
+ /**
+ * NOTE. No checking op operation types, as one can read different save
+ * points...
+ */
+
+#if 0
+ /**
+ * Since last operation has executed...we can now check operation types
+ * if not, we have to wait until it has executed
+ */
+ if (lop == ZDELETE)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- /* WE ARE PERFORMING AN UPDATE OR DELETE IN THE SAME TRANSACTION WHERE WE ALREADY */
- /* PREVIOUSLY HAVE EXECUTED AN OPERATION. INSERT-DELETE, READ-UPDATE, READ-READ, */
- /* UPDATE-UPDATE, UPDATE-DELETE, READ-DELETE, INSERT-READ, INSERT-UPDATE ARE ALLOWED */
- /* COMBINATIONS. A NEW INSERT AFTER A DELETE IS NOT ALLOWED AND SUCH AN INSERT WILL */
- /* GO TO THE SERIAL LOCK QUEUE WHICH IT WILL NOT LEAVE UNTIL A TIME-OUT AND THE */
- /* TRANSACTION IS ABORTED. READS AND UPDATES AFTER DELETES IS ALSO NOT ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- mlpqOperPtr = readWriteOpPtr;
- moveLastParallelQueueWrite(signal);
- readWriteOpPtr = mlpqOperPtr;
- operationRecPtr.p->prevParallelQue = readWriteOpPtr.i;
- readWriteOpPtr.p->nextParallelQue = operationRecPtr.i;
- operationRecPtr.p->localdata[0] = readWriteOpPtr.p->localdata[0];
- operationRecPtr.p->localdata[1] = readWriteOpPtr.p->localdata[1];
- return;
- }//if
- }//if
- goto PSQW_LOOP;
-}//Dbacc::placeSerialQueueWrite()
+ return ZREAD_ERROR;
+ }
+#endif
+
+ opbits |= Operationrec::OP_STATE_RUNNING;
+ operationRecPtr.p->localdata[0] = lastOpPtr.p->localdata[0];
+ operationRecPtr.p->localdata[1] = lastOpPtr.p->localdata[1];
+ retValue = ZPARALLEL_QUEUE;
+ }
+ opbits |= (lastbits & Operationrec::OP_ACC_LOCK_MODE);
+ opbits |= Operationrec::OP_RUN_QUEUE;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ operationRecPtr.p->prevParallelQue = lastOpPtr.i;
+ operationRecPtr.p->m_lock_owner_ptr_i = lockOwnerPtr.i;
+ lastOpPtr.p->nextParallelQue = operationRecPtr.i;
+ lockOwnerPtr.p->m_lo_last_parallel_op_ptr_i = operationRecPtr.i;
+
+ validate_lock_queue(lockOwnerPtr);
+
+ return retValue;
+}//Dbacc::placeReadInLockQueue
+
+void Dbacc::placeSerialQueue(OperationrecPtr lockOwnerPtr,
+ OperationrecPtr opPtr)
+{
+ OperationrecPtr lastOpPtr;
+ lastOpPtr.i = lockOwnerPtr.p->m_lo_last_serial_op_ptr_i;
+
+ if (lastOpPtr.i == RNIL)
+ {
+ // Lock owner is last...
+ ndbrequire(lockOwnerPtr.p->nextSerialQue == RNIL);
+ lastOpPtr = lockOwnerPtr;
+ }
+ else
+ {
+ ptrCheckGuard(lastOpPtr, coprecsize, operationrec);
+ }
+
+ operationRecPtr.p->prevSerialQue = lastOpPtr.i;
+ lastOpPtr.p->nextSerialQue = opPtr.i;
+ lockOwnerPtr.p->m_lo_last_serial_op_ptr_i = opPtr.i;
+}
/* ------------------------------------------------------------------------- */
/* ACC KEYREQ END */
/* ------------------------------------------------------------------------- */
void Dbacc::acckeyref1Lab(Signal* signal, Uint32 result_code)
{
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
+ operationRecPtr.p->m_op_bits = ~0;
/* ************************<< */
/* ACCKEYREF */
/* ************************<< */
@@ -1639,15 +2241,15 @@
return;
}//Dbacc::acckeyref1Lab()
-/* ******************--------------------------------------------------------------- */
-/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
-/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
-/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
-/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
-/* OPERATION_REC_PTR, OPERATION RECORD PTR */
-/* CLOCALKEY(0), LOCAL KEY 1 */
-/* CLOCALKEY(1) LOCAL KEY 2 */
-/* ******************--------------------------------------------------------------- */
+/* ******************----------------------------------------------------- */
+/* ACCMINUPDATE UPDATE LOCAL KEY REQ */
+/* DESCRIPTION: UPDATES LOCAL KEY OF AN ELEMENTS IN THE HASH TABLE */
+/* THIS SIGNAL IS WAITED AFTER ANY INSERT REQ */
+/* ENTER ACCMINUPDATE WITH SENDER: LQH, LEVEL B */
+/* OPERATION_REC_PTR, OPERATION RECORD PTR */
+/* CLOCALKEY(0), LOCAL KEY 1 */
+/* CLOCALKEY(1) LOCAL KEY 2 */
+/* ******************----------------------------------------------------- */
void Dbacc::execACCMINUPDATE(Signal* signal)
{
Page8Ptr ulkPageidptr;
@@ -1660,19 +2262,26 @@
tlocalkey1 = signal->theData[1];
tlocalkey2 = signal->theData[2];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if (operationRecPtr.p->transactionstate == ACTIVE) {
- fragrecptr.i = operationRecPtr.p->fragptr;
- ulkPageidptr.i = operationRecPtr.p->elementPage;
- tulkLocalPtr = operationRecPtr.p->elementPointer + operationRecPtr.p->elementIsforward;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ ulkPageidptr.i = operationRecPtr.p->elementPage;
+ tulkLocalPtr = operationRecPtr.p->elementPointer +
+ operationRecPtr.p->elementIsforward;
+
+ if ((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_RUNNING)
+ {
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(ulkPageidptr, cpagesize, page8);
dbgWord32(ulkPageidptr, tulkLocalPtr, tlocalkey1);
arrGuard(tulkLocalPtr, 2048);
ulkPageidptr.p->word32[tulkLocalPtr] = tlocalkey1;
operationRecPtr.p->localdata[0] = tlocalkey1;
- if (fragrecptr.p->localkeylen == 1) {
+ if (likely(fragrecptr.p->localkeylen == 1))
+ {
return;
- } else if (fragrecptr.p->localkeylen == 2) {
+ }
+ else if (fragrecptr.p->localkeylen == 2)
+ {
jam();
tulkLocalPtr = tulkLocalPtr + operationRecPtr.p->elementIsforward;
operationRecPtr.p->localdata[1] = tlocalkey2;
@@ -1696,15 +2305,17 @@
{
Uint8 Toperation;
jamEntry();
- operationRecPtr.i = signal->theData[0];
+ Uint32 tmp = operationRecPtr.i = signal->theData[0];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
+ void* ptr = operationRecPtr.p;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
+ Toperation = opbits & Operationrec::OP_MASK;
commitOperation(signal);
- Toperation = operationRecPtr.p->operation;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ ndbassert(operationRecPtr.i == tmp);
+ ndbassert(operationRecPtr.p == ptr);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
if(Toperation != ZREAD){
fragrecptr.p->m_commit_count++;
if (Toperation != ZINSERT) {
@@ -1713,7 +2324,7 @@
} else {
jam();
fragrecptr.p->noOfElements--;
- fragrecptr.p->slack += operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack += fragrecptr.p->elementLength;
if (fragrecptr.p->slack > fragrecptr.p->slackCheck) {
/* TIME FOR JOIN BUCKETS PROCESS */
if (fragrecptr.p->expandCounter > 0) {
@@ -1732,7 +2343,7 @@
} else {
jam(); /* EXPAND PROCESS HANDLING */
fragrecptr.p->noOfElements++;
- fragrecptr.p->slack -= operationRecPtr.p->insertDeleteLen;
+ fragrecptr.p->slack -= fragrecptr.p->elementLength;
if (fragrecptr.p->slack >= (1u << 31)) {
/* IT MEANS THAT IF SLACK < ZERO */
if (fragrecptr.p->expandFlag == 0) {
@@ -1749,45 +2360,57 @@
return;
}//Dbacc::execACC_COMMITREQ()
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
-/* ******************------------------------------+ */
-/* SENDER: LQH, LEVEL B */
-/* ******************--------------------------------------------------------------- */
-/* ACC ABORT REQ ABORT TRANSACTION */
-/* ******************------------------------------+ */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT ALL OPERATION OF THE TRANSACTION */
+/* ******************------------------------------+ */
+/* SENDER: LQH, LEVEL B */
+/* ******************------------------------------------------------------- */
+/* ACC ABORT REQ ABORT TRANSACTION */
+/* ******************------------------------------+ */
/* SENDER: LQH, LEVEL B */
void Dbacc::execACC_ABORTREQ(Signal* signal)
{
jamEntry();
- accAbortReqLab(signal);
-}//Dbacc::execACC_ABORTREQ()
-
-void Dbacc::accAbortReqLab(Signal* signal)
-{
operationRecPtr.i = signal->theData[0];
- bool sendConf = signal->theData[1];
+ Uint32 sendConf = signal->theData[1];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
+ fragrecptr.i = operationRecPtr.p->fragptr;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
tresult = 0; /* ZFALSE */
- if ((operationRecPtr.p->transactionstate == ACTIVE) ||
- (operationRecPtr.p->transactionstate == WAIT_COMMIT_ABORT)) {
+
+ if (opbits == Operationrec::OP_EXECUTED_DIRTY_READ)
+ {
+ jam();
+ }
+ else if (opstate == Operationrec::OP_STATE_EXECUTED ||
+ opstate == Operationrec::OP_STATE_WAITING ||
+ opstate == Operationrec::OP_STATE_RUNNING)
+ {
jam();
- fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- operationRecPtr.p->transactionstate = ABORT;
abortOperation(signal);
- } else {
- ndbrequire(operationRecPtr.p->transactionstate == IDLE);
- jam();
- }//if
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- if (! sendConf)
- return;
+ }
+
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
+
signal->theData[0] = operationRecPtr.p->userptr;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF, signal, 1, JBB);
- return;
-}//Dbacc::accAbortReqLab()
+ signal->theData[1] = 0;
+ switch(sendConf){
+ case 0:
+ return;
+ case 2:
+ if (opstate != Operationrec::OP_STATE_RUNNING)
+ {
+ return;
+ }
+ case 1:
+ sendSignal(operationRecPtr.p->userblockref, GSN_ACC_ABORTCONF,
+ signal, 1, JBB);
+ }
+
+ signal->theData[1] = RNIL;
+}
/*
* Lock or unlock tuple.
@@ -1828,8 +2451,7 @@
// init as in ACCSEIZEREQ
operationRecPtr.p->userptr = req->userPtr;
operationRecPtr.p->userblockref = req->userRef;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
- operationRecPtr.p->transactionstate = IDLE;
+ operationRecPtr.p->m_op_bits = ~0;
operationRecPtr.p->scanRecPtr = RNIL;
// do read with lock via ACCKEYREQ
Uint32 lockMode = (lockOp == AccLockReq::LockShared) ? 0 : 1;
@@ -1880,8 +2502,8 @@
jam();
// do abort via ACC_ABORTREQ (immediate)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = false; // Dont send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 0; // Dont send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -1891,8 +2513,8 @@
jam();
// do abort via ACC_ABORTREQ (with conf signal)
signal->theData[0] = req->accOpPtr;
- signal->theData[1] = true; // send abort
- accAbortReqLab(signal);
+ signal->theData[1] = 1; // send abort
+ execACC_ABORTREQ(signal);
releaseOpRec(signal);
req->returnCode = AccLockReq::Success;
*sig = *req;
@@ -2551,16 +3173,29 @@
}//Dbacc::getdirindex()
Uint32
-Dbacc::readTablePk(Uint32 localkey1)
+Dbacc::readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec* op)
{
+ int ret;
Uint32 tableId = fragrecptr.p->myTableId;
Uint32 fragId = fragrecptr.p->myfid;
- Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ bool xfrm = fragrecptr.p->hasCharAttr;
+
#ifdef VM_TRACE
memset(ckeys, 0x1f, (fragrecptr.p->keyLength * MAX_XFRM_MULTIPLY) << 2);
#endif
- int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, ckeys, true);
+
+ if (likely(localkey1 != ~(Uint32)0))
+ {
+ Uint32 fragPageId = localkey1 >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localkey1 & ((1 << MAX_TUPLES_BITS ) - 1);
+ ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex,
+ ckeys, true);
+ }
+ else
+ {
+ ndbrequire(ElementHeader::getLocked(eh));
+ ret = c_lqh->readPrimaryKeys(op->userptr, ckeys, xfrm);
+ }
jamEntry();
ndbrequire(ret >= 0);
return ret;
@@ -2594,11 +3229,12 @@
#endif
#endif
-void Dbacc::getElement(Signal* signal)
+Uint32
+Dbacc::getElement(Signal* signal, OperationrecPtr& lockOwnerPtr)
{
+ Uint32 errcode;
DirRangePtr geOverflowrangeptr;
DirectoryarrayPtr geOverflowDirptr;
- OperationrecPtr geTmpOperationRecPtr;
Uint32 tgeElementHeader;
Uint32 tgeElemStep;
Uint32 tgeContainerhead;
@@ -2613,7 +3249,6 @@
getdirindex(signal);
tgePageindex = tgdiPageindex;
gePageptr = gdiPageptr;
- tgeResult = ZFALSE;
/*
* The value seached is
* - table key for ACCKEYREQ, stored in TUP
@@ -2623,7 +3258,6 @@
ndbrequire(TelemLen == ZELEM_HEAD_SIZE + fragrecptr.p->localkeylen);
tgeNextptrtype = ZLEFT;
- tgeLocked = 0;
const Uint32 tmp = fragrecptr.p->k + fragrecptr.p->lhfragbits;
const Uint32 opHashValuePart = (operationRecPtr.p->hashValue >> tmp) &0xFFFF;
@@ -2636,9 +3270,17 @@
tgeKeyptr = (tgeElementptr + ZELEM_HEAD_SIZE) + fragrecptr.p->localkeylen;
tgeElemStep = TelemLen;
tgeForward = 1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr + tgeRemLen - 1) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely(((tgeContainerptr + tgeRemLen - 1) >= 2048)))
+ {
+ errcode = 5;
+ goto error;
+ }
} else if (tgeNextptrtype == ZRIGHT) {
jam();
tgeContainerptr = tgeContainerptr + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
@@ -2646,29 +3288,42 @@
tgeKeyptr = (tgeElementptr - ZELEM_HEAD_SIZE) - fragrecptr.p->localkeylen;
tgeElemStep = 0 - TelemLen;
tgeForward = (Uint32)-1;
- if (tgeContainerptr >= 2048) { ACCKEY_error(4); return;}
+ if (unlikely(tgeContainerptr >= 2048))
+ {
+ errcode = 4;
+ goto error;
+ }
tgeRemLen = gePageptr.p->word32[tgeContainerptr] >> 26;
- if ((tgeContainerptr - tgeRemLen) >= 2048) { ACCKEY_error(5); return;}
+ if (unlikely((tgeContainerptr - tgeRemLen) >= 2048))
+ {
+ errcode = 5;
+ goto error;
+ }
} else {
- ACCKEY_error(6); return;
+ errcode = 6;
+ goto error;
}//if
if (tgeRemLen >= ZCON_HEAD_SIZE + TelemLen) {
- if (tgeRemLen > ZBUF_SIZE) {
- ACCKEY_error(7); return;
+ if (unlikely(tgeRemLen > ZBUF_SIZE))
+ {
+ errcode = 7;
+ goto error;
}//if
- /* --------------------------------------------------------------------------------- */
- // There is at least one element in this container. Check if it is the element
- // searched for.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------- */
+ // There is at least one element in this container.
+ // Check if it is the element searched for.
+ /* ------------------------------------------------------------------- */
do {
tgeElementHeader = gePageptr.p->word32[tgeElementptr];
tgeRemLen = tgeRemLen - TelemLen;
Uint32 hashValuePart;
+ lockOwnerPtr.i = RNIL;
+ lockOwnerPtr.p = NULL;
if (ElementHeader::getLocked(tgeElementHeader)) {
jam();
- geTmpOperationRecPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
- ptrCheckGuard(geTmpOperationRecPtr, coprecsize, operationrec);
- hashValuePart = geTmpOperationRecPtr.p->hashvaluePart;
+ lockOwnerPtr.i = ElementHeader::getOpPtrI(tgeElementHeader);
+ ptrCheckGuard(lockOwnerPtr, coprecsize, operationrec);
+ hashValuePart = lockOwnerPtr.p->hashvaluePart;
} else {
jam();
hashValuePart = ElementHeader::getHashValuePart(tgeElementHeader);
@@ -2678,21 +3333,21 @@
Uint32 localkey1 = gePageptr.p->word32[tgeElementptr + tgeForward];
Uint32 localkey2 = 0;
bool found;
- if (! searchLocalKey) {
- Uint32 len = readTablePk(localkey1);
+ if (! searchLocalKey)
+ {
+ Uint32 len = readTablePk(localkey1, tgeElementptr, lockOwnerPtr.p);
found = (len == operationRecPtr.p->xfrmtupkeylen) &&
(memcmp(Tkeydata, ckeys, len << 2) == 0);
} else {
jam();
found = (localkey1 == Tkeydata[0]);
}
- if (found) {
+ if (found)
+ {
jam();
- tgeLocked = ElementHeader::getLocked(tgeElementHeader);
- tgeResult = ZTRUE;
operationRecPtr.p->localdata[0] = localkey1;
operationRecPtr.p->localdata[1] = localkey2;
- return;
+ return ZTRUE;
}
}
if (tgeRemLen <= ZCON_HEAD_SIZE) {
@@ -2701,18 +3356,22 @@
tgeElementptr = tgeElementptr + tgeElemStep;
} while (true);
}//if
- if (tgeRemLen != ZCON_HEAD_SIZE) {
- ACCKEY_error(8); return;
+ if (unlikely(tgeRemLen != ZCON_HEAD_SIZE))
+ {
+ errcode = 8;
+ goto error;
}//if
tgeContainerhead = gePageptr.p->word32[tgeContainerptr];
tgeNextptrtype = (tgeContainerhead >> 7) & 0x3;
if (tgeNextptrtype == 0) {
jam();
- return; /* NO MORE CONTAINER */
+ return ZFALSE; /* NO MORE CONTAINER */
}//if
tgePageindex = tgeContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
- if (tgePageindex > ZEMPTYLIST) {
- ACCKEY_error(9); return;
+ if (unlikely(tgePageindex > ZEMPTYLIST))
+ {
+ errcode = 9;
+ goto error;
}//if
if (((tgeContainerhead >> 9) & 1) == ZFALSE) {
jam();
@@ -2726,55 +3385,72 @@
ptrCheckGuard(gePageptr, cpagesize, page8);
}//if
} while (1);
- return;
+
+ return ZFALSE;
+
+error:
+ ACCKEY_error(errcode);
+ return ~0;
}//Dbacc::getElement()
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* END OF GET_ELEMENT MODULE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* */
-/* MODULE: DELETE */
-/* */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* --------------------------------------------------------------------------------- */
-/* COMMITDELETE */
-/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
-/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
-/* */
-/* OUTPUT: */
-/* NONE */
-/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE COMMIT OF TRANSA- */
-/* CTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND DELETES IT. IT DOES SO BY */
-/* REPLACING IT WITH THE LAST ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT */
-/* IS ALSO THE LAST ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT. */
-/* --------------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* END OF GET_ELEMENT MODULE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* */
+/* MODULE: DELETE */
+/* */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* COMMITDELETE */
+/* INPUT: OPERATION_REC_PTR, PTR TO AN OPERATION RECORD. */
+/* FRAGRECPTR, PTR TO A FRAGMENT RECORD */
+/* */
+/* OUTPUT: */
+/* NONE */
+/* DESCRIPTION: DELETE OPERATIONS WILL BE COMPLETED AT THE
+ * COMMIT OF TRANSACTION. THIS SUBROUTINE SEARCHS FOR ELEMENT AND
+ * DELETES IT. IT DOES SO BY REPLACING IT WITH THE LAST
+ * ELEMENT IN THE BUCKET. IF THE DELETED ELEMENT IS ALSO THE LAST
+ * ELEMENT THEN IT IS ONLY NECESSARY TO REMOVE THE ELEMENT
+ * ------------------------------------------------------------------------- */
+void
+Dbacc::report_dealloc(Signal* signal, const Operationrec* opPtrP)
+{
+ Uint32 localKey = opPtrP->localdata[0];
+ Uint32 opbits = opPtrP->m_op_bits;
+ Uint32 userptr= opPtrP->userptr;
+ Uint32 scanInd =
+ ((opbits & Operationrec::OP_MASK) == ZSCAN_OP) ||
+ (opbits & Operationrec::OP_LOCK_REQ);
+
+ if (localKey != ~(Uint32)0)
+ {
+ signal->theData[0] = fragrecptr.p->myfid;
+ signal->theData[1] = fragrecptr.p->myTableId;
+ Uint32 pageId = localKey >> MAX_TUPLES_BITS;
+ Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
+ signal->theData[2] = pageId;
+ signal->theData[3] = pageIndex;
+ signal->theData[4] = userptr;
+ signal->theData[5] = scanInd;
+ EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
+ jamEntry();
+ }
+}
+
void Dbacc::commitdelete(Signal* signal)
{
jam();
- Uint32 localKey = operationRecPtr.p->localdata[0];
- Uint32 userptr= operationRecPtr.p->userptr;
- Uint32 scanInd = operationRecPtr.p->operation == ZSCAN_OP
- || operationRecPtr.p->isAccLockReq;
-
- signal->theData[0] = fragrecptr.p->myfid;
- signal->theData[1] = fragrecptr.p->myTableId;
- Uint32 pageId = localKey >> MAX_TUPLES_BITS;
- Uint32 pageIndex = localKey & ((1 << MAX_TUPLES_BITS) - 1);
- signal->theData[2] = pageId;
- signal->theData[3] = pageIndex;
- signal->theData[4] = userptr;
- signal->theData[5] = scanInd;
- EXECUTE_DIRECT(DBLQH, GSN_TUP_DEALLOCREQ, signal, 6);
- jamEntry();
+ report_dealloc(signal, operationRecPtr.p);
getdirindex(signal);
tlastPageindex = tgdiPageindex;
@@ -3263,31 +3939,316 @@
/*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */
/*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */
/* ------------------------------------------------------------------------- */
+
+/**
+ *
+ * P0 - P1 - P2 - P3
+ * S0
+ * S1
+ * S2
+ */
+void
+Dbacc::abortParallelQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ OperationrecPtr nextP;
+ OperationrecPtr prevP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+ Uint32 opstate = opbits & Operationrec::OP_STATE_MASK;
+ nextP.i = opPtr.p->nextParallelQue;
+ prevP.i = opPtr.p->prevParallelQue;
+ loPtr.i = opPtr.p->m_lock_owner_ptr_i;
+
+ ndbassert(! (opbits & Operationrec::OP_LOCK_OWNER));
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ nextP.p->prevParallelQue = prevP.i;
+ }
+ else if (prevP.i != loPtr.i)
+ {
+ jam();
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_parallel_op_ptr_i = prevP.i;
+ prevP.p->m_lock_owner_ptr_i = loPtr.i;
+
+ /**
+ * Abort P3...check start next
+ */
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ jam();
+ ndbassert(prevP.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ prevP.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ startNext(signal, prevP);
+ validate_lock_queue(prevP);
+ return;
+ }
+
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Abort P1, P2
+ */
+
+ /**
+ * Scan to last of run queue
+ */
+ while (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+
+#ifdef VM_TRACE
+ loPtr.i = nextP.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ ndbassert(loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ ndbassert(loPtr.p->m_lo_last_parallel_op_ptr_i == nextP.i);
+#endif
+ startNext(signal, nextP);
+ validate_lock_queue(nextP);
+
+ return;
+}
+
+void
+Dbacc::abortSerieQueueOperation(Signal* signal, OperationrecPtr opPtr)
+{
+ OperationrecPtr prevS, nextS;
+ OperationrecPtr prevP, nextP;
+ OperationrecPtr loPtr;
+
+ Uint32 opbits = opPtr.p->m_op_bits;
+
+ prevS.i = opPtr.p->prevSerialQue;
+ nextS.i = opPtr.p->nextSerialQue;
+
+ prevP.i = opPtr.p->prevParallelQue;
+ nextP.i = opPtr.p->nextParallelQue;
+
+ ndbassert((opbits & Operationrec::OP_LOCK_OWNER) == 0);
+ ndbassert((opbits & Operationrec::OP_RUN_QUEUE) == 0);
+
+ if (prevP.i != RNIL)
+ {
+ /**
+ * We're not list head...
+ */
+ ptrCheckGuard(prevP, coprecsize, operationrec);
+ ndbassert(prevP.p->nextParallelQue == opPtr.i);
+ prevP.p->nextParallelQue = nextP.i;
+
+ if (nextP.i != RNIL)
+ {
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ ndbassert((nextP.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_WAITING);
+ nextP.p->prevParallelQue = prevP.i;
+
+ if ((prevP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE) == 0 &&
+ opbits & Operationrec::OP_LOCK_MODE)
+ {
+ /**
+ * Scan right in parallel queue to fix OP_ACC_LOCK_MODE
+ */
+ while ((nextP.p->m_op_bits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextP.p->m_op_bits & Operationrec::OP_ACC_LOCK_MODE);
+ nextP.p->m_op_bits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.i = nextP.p->nextParallelQue;
+ if (nextP.i == RNIL)
+ break;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ }
+ }
+ }
+ validate_lock_queue(prevP);
+ return;
+ }
+ else
+ {
+ /**
+ * We're a list head
+ */
+ ptrCheckGuard(prevS, coprecsize, operationrec);
+ ndbassert(prevS.p->nextSerialQue == opPtr.i);
+
+ if (nextP.i != RNIL)
+ {
+ /**
+ * Promote nextP to list head
+ */
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ ndbassert(nextP.p->prevParallelQue == opPtr.i);
+ prevS.p->nextSerialQue = nextP.i;
+ nextP.p->prevParallelQue = RNIL;
+ nextP.p->nextSerialQue = nextS.i;
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = nextP.i;
+ validate_lock_queue(prevS);
+ return;
+ }
+ else
+ {
+ // nextS is RNIL, i.e we're last in serie queue...
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ loPtr.p->m_lo_last_serial_op_ptr_i = nextP.i;
+ validate_lock_queue(loPtr);
+ return;
+ }
+ }
+
+ if (nextS.i == RNIL)
+ {
+ /**
+ * Abort S2
+ */
+
+ // nextS is RNIL, i.e we're last in serie queue...
+ // and we have no parallel queue,
+ // we must update lockOwner.m_lo_last_serial_op_ptr_i
+ prevS.p->nextSerialQue = RNIL;
+
+ loPtr = prevS;
+ while ((loPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) == 0)
+ {
+ loPtr.i = loPtr.p->prevSerialQue;
+ ptrCheckGuard(loPtr, coprecsize, operationrec);
+ }
+ ndbassert(loPtr.p->m_lo_last_serial_op_ptr_i == opPtr.i);
+ if (prevS.i != loPtr.i)
+ {
+ jam();
+ loPtr.p->m_lo_last_serial_op_ptr_i = prevS.i;
+ }
+ else
+ {
+ loPtr.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+ validate_lock_queue(loPtr);
+ }
+ else if (nextP.i == RNIL)
+ {
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ prevS.p->nextSerialQue = nextS.i;
+ nextS.p->prevSerialQue = prevS.i;
+
+ if (prevS.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ /**
+ * Abort S0
+ */
+ OperationrecPtr lastOp;
+ lastOp.i = prevS.p->m_lo_last_parallel_op_ptr_i;
+ if (lastOp.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(lastOp, coprecsize, operationrec);
+ ndbassert(lastOp.p->m_lock_owner_ptr_i = prevS.i);
+ }
+ else
+ {
+ jam();
+ lastOp = prevS;
+ }
+ startNext(signal, lastOp);
+ validate_lock_queue(lastOp);
+ }
+ else
+ {
+ validate_lock_queue(prevS);
+ }
+ }
+ }
+}
+
+
void Dbacc::abortOperation(Signal* signal)
{
- OperationrecPtr aboOperRecPtr;
- OperationrecPtr TaboOperRecPtr;
- Page8Ptr aboPageidptr;
- Uint32 taboElementptr;
- Uint32 tmp2Olq;
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+ validate_lock_queue(operationRecPtr);
+
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if (operationRecPtr.p->insertIsDone == ZTRUE) {
- jam();
- operationRecPtr.p->elementIsDisappeared = ZTRUE;
- }//if
- if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ if (opbits & Operationrec::OP_INSERT_IS_DONE)
+ {
+ jam();
+ opbits |= Operationrec::OP_ELEMENT_DISAPPEARED;
+ }//if
+ operationRecPtr.p->m_op_bits = opbits;
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (queue)
+ {
jam();
- releaselock(signal);
- } else {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */
- /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- if (operationRecPtr.p->elementIsDisappeared == ZFALSE) {
+ release_lockowner(signal, operationRecPtr, false);
+ }
+ else
+ {
+ /* -------------------------------------------------------------------
+ * WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED.
+ * IF INSERT OR STANDBY WE DELETE THE ELEMENT OTHERWISE WE REMOVE
+ * THE LOCK FROM THE ELEMENT.
+ * ------------------------------------------------------------------ */
+ if ((opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
jam();
+ Page8Ptr aboPageidptr;
+ Uint32 taboElementptr;
+ Uint32 tmp2Olq;
+
taboElementptr = operationRecPtr.p->elementPointer;
aboPageidptr.i = operationRecPtr.p->elementPage;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3297,87 +4258,31 @@
arrGuard(taboElementptr, 2048);
aboPageidptr.p->word32[taboElementptr] = tmp2Olq;
return;
- } else {
+ }
+ else
+ {
jam();
commitdelete(signal);
}//if
}//if
- } else {
- /* --------------------------------------------------------------- */
- // We are not the lock owner.
- /* --------------------------------------------------------------- */
- jam();
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- jam();
- /* ---------------------------------------------------------------------------------- */
- /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/
- /* We will simply remove it from the parallel list without any other rearrangements. */
- /* ---------------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
- } else if (operationRecPtr.p->prevSerialQue != RNIL) {
- /* ------------------------------------------------------------------------- */
- // We are not in the parallel queue owning the lock. Thus we are in another parallel
- // queue longer down in the serial queue. We are however first since prevParallelQue
- // == RNIL.
- /* ------------------------------------------------------------------------- */
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ------------------------------------------------------------------------- */
- // We have an operation in the queue after us. We simply rearrange this parallel queue.
- // The new leader of this parallel queue will be operation in the serial queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i;
- }//if
- TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- // We are the only operation in this parallel queue. We will thus shrink the serial
- // queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- }//if
- }//if
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the
- // lock then we cannot be in any lock queue at all.
- /* ------------------------------------------------------------------------- */
-}//Dbacc::abortOperation()
+ }
+ else if (opbits & Operationrec::OP_RUN_QUEUE)
+ {
+ abortParallelQueueOperation(signal, operationRecPtr);
+ }
+ else
+ {
+ abortSerieQueueOperation(signal, operationRecPtr);
+ }
+}
-void Dbacc::commitDeleteCheck()
+void
+Dbacc::commitDeleteCheck()
{
OperationrecPtr opPtr;
OperationrecPtr lastOpPtr;
OperationrecPtr deleteOpPtr;
- bool elementDeleted = false;
+ Uint32 elementDeleted = 0;
bool deleteCheckOngoing = true;
Uint32 hashValue = 0;
lastOpPtr = operationRecPtr;
@@ -3390,7 +4295,9 @@
}//while
deleteOpPtr = lastOpPtr;
do {
- if (deleteOpPtr.p->operation == ZDELETE) {
+ Uint32 opbits = deleteOpPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ if (op == ZDELETE) {
jam();
/* -------------------------------------------------------------------
* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO
@@ -3404,10 +4311,9 @@
* DELETE-OPERATION THAT HAS A HASH VALUE.
* ----------------------------------------------------------------- */
hashValue = deleteOpPtr.p->hashValue;
- elementDeleted = true;
+ elementDeleted = Operationrec::OP_ELEMENT_DISAPPEARED;
deleteCheckOngoing = false;
- } else if ((deleteOpPtr.p->operation == ZREAD) ||
- (deleteOpPtr.p->operation == ZSCAN_OP)) {
+ } else if (op == ZREAD || op == ZSCAN_OP) {
/* -------------------------------------------------------------------
* We are trying to find out whether the commit will in the end delete
* the tuple. Normally the delete will be the last operation in the
@@ -3418,7 +4324,7 @@
* we have to continue scanning the list looking for a delete operation.
*/
deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
- if (deleteOpPtr.i == RNIL) {
+ if (opbits & Operationrec::OP_LOCK_OWNER) {
jam();
deleteCheckOngoing = false;
} else {
@@ -3437,14 +4343,14 @@
opPtr = lastOpPtr;
do {
jam();
- opPtr.p->commitDeleteCheckFlag = ZTRUE;
+ opPtr.p->m_op_bits |= Operationrec::OP_COMMIT_DELETE_CHECK;
if (elementDeleted) {
jam();
- opPtr.p->elementIsDisappeared = ZTRUE;
+ opPtr.p->m_op_bits |= elementDeleted;
opPtr.p->hashValue = hashValue;
}//if
opPtr.i = opPtr.p->prevParallelQue;
- if (opPtr.i == RNIL) {
+ if (opPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER) {
jam();
break;
}//if
@@ -3460,14 +4366,13 @@
/* ------------------------------------------------------------------------- */
void Dbacc::commitOperation(Signal* signal)
{
- OperationrecPtr tolqTmpPtr;
- Page8Ptr coPageidptr;
- Uint32 tcoElementptr;
- Uint32 tmp2Olq;
-
- if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) &&
- (operationRecPtr.p->operation != ZSCAN_OP) &&
- (operationRecPtr.p->operation != ZREAD)) {
+ validate_lock_queue(operationRecPtr);
+
+ Uint32 opbits = operationRecPtr.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ ndbrequire((opbits & Operationrec::OP_STATE_MASK) == Operationrec::OP_STATE_EXECUTED);
+ if ((opbits & Operationrec::OP_COMMIT_DELETE_CHECK) == 0 && (op != ZREAD))
+ {
jam();
/* This method is used to check whether the end result of the transaction
will be to delete the tuple. In this case all operation will be marked
@@ -3479,16 +4384,30 @@
lock is released.
*/
commitDeleteCheck();
+ opbits = operationRecPtr.p->m_op_bits;
}//if
- if (operationRecPtr.p->lockOwner == ZTRUE) {
+
+ ndbassert(opbits & Operationrec::OP_RUN_QUEUE);
+
+ if (opbits & Operationrec::OP_LOCK_OWNER)
+ {
takeOutLockOwnersList(signal, operationRecPtr);
- if ((operationRecPtr.p->nextParallelQue == RNIL) &&
- (operationRecPtr.p->nextSerialQue == RNIL) &&
- (operationRecPtr.p->elementIsDisappeared == ZFALSE)) {
+ opbits &= ~(Uint32)Operationrec::OP_LOCK_OWNER;
+ operationRecPtr.p->m_op_bits = opbits;
+
+ const bool queue = (operationRecPtr.p->nextParallelQue != RNIL ||
+ operationRecPtr.p->nextSerialQue != RNIL);
+
+ if (!queue && (opbits & Operationrec::OP_ELEMENT_DISAPPEARED) == 0)
+ {
/*
- This is the normal path through the commit for operations owning the
- lock without any queues and not a delete operation.
- */
+ * This is the normal path through the commit for operations owning the
+ * lock without any queues and not a delete operation.
+ */
+ Page8Ptr coPageidptr;
+ Uint32 tcoElementptr;
+ Uint32 tmp2Olq;
+
coPageidptr.i = operationRecPtr.p->elementPage;
tcoElementptr = operationRecPtr.p->elementPointer;
tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
@@ -3498,445 +4417,421 @@
arrGuard(tcoElementptr, 2048);
coPageidptr.p->word32[tcoElementptr] = tmp2Olq;
return;
- } else if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
+ }
+ else if (queue)
+ {
jam();
/*
- The case when there is a queue lined up.
- Release the lock and pass it to the next operation lined up.
- */
- releaselock(signal);
+ * The case when there is a queue lined up.
+ * Release the lock and pass it to the next operation lined up.
+ */
+ release_lockowner(signal, operationRecPtr, true);
return;
- } else {
+ }
+ else
+ {
jam();
/*
- No queue and elementIsDisappeared is true. We perform the actual delete
- operation.
- */
+ * No queue and elementIsDisappeared is true.
+ * We perform the actual delete operation.
+ */
commitdelete(signal);
return;
}//if
- } else {
- /*
- THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
- ELEMENT.
- */
- ndbrequire(operationRecPtr.p->prevParallelQue != RNIL);
+ }
+ else
+ {
+ /**
+ * THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
+ * ELEMENT.
+ */
jam();
- tolqTmpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- tolqTmpPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
-
+ OperationrecPtr prev, next, lockOwner;
+ prev.i = operationRecPtr.p->prevParallelQue;
+ next.i = operationRecPtr.p->nextParallelQue;
+ lockOwner.i = operationRecPtr.p->m_lock_owner_ptr_i;
+ ptrCheckGuard(prev, coprecsize, operationrec);
+
+ prev.p->nextParallelQue = next.i;
+ if (next.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(next, coprecsize, operationrec);
+ next.p->prevParallelQue = prev.i;
+ }
+ else if (prev.p->m_op_bits & Operationrec::OP_LOCK_OWNER)
+ {
+ jam();
+ ndbassert(lockOwner.i == prev.i);
+ prev.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ next = prev;
+ }
+ else
+ {
+ jam();
+ /**
+ * Last operation in parallell queue
+ */
+ ndbassert(prev.i != lockOwner.i);
+ ptrCheckGuard(lockOwner, coprecsize, operationrec);
+ ndbassert(lockOwner.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
+ lockOwner.p->m_lo_last_parallel_op_ptr_i = prev.i;
+ prev.p->m_lock_owner_ptr_i = lockOwner.i;
+ next = prev;
+ }
+
/**
* Check possible lock upgrade
- * 1) Find lock owner
- * 2) Count transactions in parallel que
- * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
- * upgrade next serial
*/
- if(operationRecPtr.p->lockMode)
+ if(opbits & Operationrec::OP_ACC_LOCK_MODE)
{
jam();
+
/**
- * Committing a non shared operation can't lead to lock upgrade
+ * Not lock owner...committing a exclusive operation...
+ *
+ * e.g
+ * T1(R) T1(X)
+ * T2(R/X)
+ *
+ * If T1(X) commits T2(R/X) is not supposed to run
+ * as T1(R) should also commit
+ *
+ * e.g
+ * T1(R) T1(X) T1*(R)
+ * T2(R/X)
+ *
+ * If T1*(R) commits T2(R/X) is not supposed to run
+ * as T1(R),T2(x) should also commit
*/
+ validate_lock_queue(prev);
return;
}
-
- OperationrecPtr lock_owner;
- lock_owner.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
- Uint32 transid[2] = { lock_owner.p->transId1,
- lock_owner.p->transId2 };
-
-
- while(lock_owner.p->prevParallelQue != RNIL)
+
+ /**
+ * We committed a shared lock
+ * Check if we can start next...
+ */
+ while(next.p->nextParallelQue != RNIL)
{
- lock_owner.i = lock_owner.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
+ jam();
+ next.i = next.p->nextParallelQue;
+ ptrCheckGuard(next, coprecsize, operationrec);
- if(lock_owner.p->transId1 != transid[0] ||
- lock_owner.p->transId2 != transid[1])
+ if ((next.p->m_op_bits & Operationrec::OP_STATE_MASK) !=
+ Operationrec::OP_STATE_EXECUTED)
{
jam();
- /**
- * If more than 1 trans in lock queue -> no lock upgrade
- */
return;
}
}
- check_lock_upgrade(signal, lock_owner, operationRecPtr);
+ startNext(signal, next);
+
+ validate_lock_queue(prev);
}
}//Dbacc::commitOperation()
-void
-Dbacc::check_lock_upgrade(Signal* signal,
- OperationrecPtr lock_owner,
- OperationrecPtr release_op)
-{
- if((lock_owner.p->transId1 == release_op.p->transId1 &&
- lock_owner.p->transId2 == release_op.p->transId2) ||
- release_op.p->lockMode ||
- lock_owner.p->nextSerialQue == RNIL)
- {
- jam();
- /**
- * No lock upgrade if same trans or lock owner has no serial queue
- * or releasing non shared op
- */
- return;
- }
-
- OperationrecPtr next;
- next.i = lock_owner.p->nextSerialQue;
- ptrCheckGuard(next, coprecsize, operationrec);
+void
+Dbacc::release_lockowner(Signal* signal, OperationrecPtr opPtr, bool commit)
+{
+ OperationrecPtr nextP;
+ OperationrecPtr nextS;
+ OperationrecPtr newOwner;
+ OperationrecPtr lastP;
- if(lock_owner.p->transId1 != next.p->transId1 ||
- lock_owner.p->transId2 != next.p->transId2)
+ Uint32 opbits = opPtr.p->m_op_bits;
+ nextP.i = opPtr.p->nextParallelQue;
+ nextS.i = opPtr.p->nextSerialQue;
+ lastP.i = opPtr.p->m_lo_last_parallel_op_ptr_i;
+ Uint32 lastS = opPtr.p->m_lo_last_serial_op_ptr_i;
+
+ ndbassert(lastP.i != RNIL || lastS != RNIL);
+ ndbassert(nextP.i != RNIL || nextS.i != RNIL);
+
+ enum {
+ NOTHING,
+ CHECK_LOCK_UPGRADE,
+ START_NEW
+ } action = NOTHING;
+
+ if (nextP.i != RNIL)
{
jam();
- /**
- * No lock upgrad if !same trans in serial queue
- */
- return;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ newOwner = nextP;
+
+ if (lastP.i == newOwner.i)
+ {
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ lastP = nextP;
+ }
+ else
+ {
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ lastP.p->m_lock_owner_ptr_i = newOwner.i;
+ }
+
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ newOwner.p->nextSerialQue = nextS.i;
+
+ if (nextS.i != RNIL)
+ {
+ jam();
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ ndbassert(nextS.p->prevSerialQue == opPtr.i);
+ nextS.p->prevSerialQue = newOwner.i;
+ }
+
+ if (commit)
+ {
+ if ((opbits & Operationrec::OP_ACC_LOCK_MODE) == ZREADLOCK)
+ {
+ jam();
+ /**
+ * Lock owner...committing a shared operation...
+ * this can be a lock upgrade
+ *
+ * e.g
+ * T1(R) T2(R)
+ * T2(X)
+ *
+ * If T1(R) commits T2(X) is supposed to run
+ *
+ * e.g
+ * T1(X) T1(R)
+ * T2(R)
+ *
+ * If T1(X) commits, then T1(R) _should_ commit before T2(R) is
+ * allowed to proceed
+ */
+ action = CHECK_LOCK_UPGRADE;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_op_bits |= Operationrec::OP_LOCK_MODE;
+ }
+ }
+ else
+ {
+ /**
+ * Aborting an operation can *always* lead to lock upgrade
+ */
+ action = CHECK_LOCK_UPGRADE;
+
+ /**
+ * Update ACC_LOCK_MODE
+ */
+ if (opbits & Operationrec::OP_LOCK_MODE)
+ {
+ Uint32 nextbits = nextP.p->m_op_bits;
+ while ((nextbits & Operationrec::OP_LOCK_MODE) == 0)
+ {
+ ndbassert(nextbits & Operationrec::OP_ACC_LOCK_MODE);
+ nextbits &= ~(Uint32)Operationrec::OP_ACC_LOCK_MODE;
+ nextP.p->m_op_bits = nextbits;
+
+ if (nextP.p->nextParallelQue != RNIL)
+ {
+ nextP.i = nextP.p->nextParallelQue;
+ ptrCheckGuard(nextP, coprecsize, operationrec);
+ nextbits = nextP.p->m_op_bits;
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
}
-
- if (getNoParallelTransaction(lock_owner.p) > 1)
+ else
{
jam();
- /**
- * No lock upgrade if more than 1 transaction in parallell queue
- */
- return;
- }
+ ptrCheckGuard(nextS, coprecsize, operationrec);
+ newOwner = nextS;
+
+ newOwner.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
+ report_dealloc(signal, opPtr.p);
+ newOwner.p->localdata[0] = ~0;
+ }
+ else
+ {
+ jam();
+ newOwner.p->localdata[0] = opPtr.p->localdata[0];
+ newOwner.p->localdata[1] = opPtr.p->localdata[1];
+ }
+
+ lastP = newOwner;
+ while (lastP.p->nextParallelQue != RNIL)
+ {
+ lastP.i = lastP.p->nextParallelQue;
+ ptrCheckGuard(lastP, coprecsize, operationrec);
+ lastP.p->m_op_bits |= Operationrec::OP_RUN_QUEUE;
+ }
+
+ if (newOwner.i != lastP.i)
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = lastP.i;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_parallel_op_ptr_i = RNIL;
+ }
- if (getNoParallelTransaction(next.p) > 1)
- {
- jam();
- /**
- * No lock upgrade if more than 1 transaction in next's parallell queue
- */
- return;
+ if (newOwner.i != lastS)
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = lastS;
+ }
+ else
+ {
+ jam();
+ newOwner.p->m_lo_last_serial_op_ptr_i = RNIL;
+ }
+
+ action = START_NEW;
}
- OperationrecPtr tmp;
- tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
- if(tmp.i != RNIL)
- {
- ptrCheckGuard(tmp, coprecsize, operationrec);
- ndbassert(tmp.p->prevSerialQue == next.i);
- tmp.p->prevSerialQue = lock_owner.i;
- }
- next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
+ insertLockOwnersList(signal, newOwner);
- // Find end of parallell que
- tmp = lock_owner;
- Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ?
- next.p->lockMode : lock_owner.p->lockMode;
- while(tmp.p->nextParallelQue != RNIL)
+ /**
+ * Copy op info, and store op in element
+ *
+ */
{
- jam();
- tmp.i = tmp.p->nextParallelQue;
- tmp.p->lockMode = lockMode;
- ptrCheckGuard(tmp, coprecsize, operationrec);
- }
- tmp.p->lockMode = lockMode;
-
- next.p->prevParallelQue = tmp.i;
- tmp.p->nextParallelQue = next.i;
-
- OperationrecPtr save = operationRecPtr;
-
- Uint32 localdata[2];
- localdata[0] = lock_owner.p->localdata[0];
- localdata[1] = lock_owner.p->localdata[1];
- do {
- next.p->localdata[0] = localdata[0];
- next.p->localdata[1] = localdata[1];
- next.p->lockMode = lockMode;
-
- operationRecPtr = next;
- executeNextOperation(signal);
- if (next.p->nextParallelQue != RNIL)
+ newOwner.p->elementPage = opPtr.p->elementPage;
+ newOwner.p->elementIsforward = opPtr.p->elementIsforward;
+ newOwner.p->elementPointer = opPtr.p->elementPointer;
+ newOwner.p->elementContainer = opPtr.p->elementContainer;
+ newOwner.p->scanBits = opPtr.p->scanBits;
+ newOwner.p->hashvaluePart = opPtr.p->hashvaluePart;
+ newOwner.p->m_op_bits |= (opbits & Operationrec::OP_ELEMENT_DISAPPEARED);
+ if (opbits & Operationrec::OP_ELEMENT_DISAPPEARED)
{
+ /* ------------------------------------------------------------------- */
+ // If the elementIsDisappeared is set then we know that the
+ // hashValue is also set since it always originates from a
+ // committing abort or a aborting insert.
+ // Scans do not initialise the hashValue and must have this
+ // value initialised if they are
+ // to successfully commit the delete.
+ /* ------------------------------------------------------------------- */
jam();
- next.i = next.p->nextParallelQue;
- ptrCheckGuard(next, coprecsize, operationrec);
- } else {
- jam();
- break;
+ newOwner.p->hashValue = opPtr.p->hashValue;
}//if
- } while (1);
+
+ Page8Ptr pagePtr;
+ pagePtr.i = newOwner.p->elementPage;
+ ptrCheckGuard(pagePtr, cpagesize, page8);
+ const Uint32 tmp = ElementHeader::setLocked(newOwner.i);
+ arrGuard(newOwner.p->elementPointer, 2048);
+ pagePtr.p->word32[newOwner.p->elementPointer] = tmp;
+ }
- operationRecPtr = save;
+ switch(action){
+ case NOTHING:
+ validate_lock_queue(newOwner);
+ return;
+ case START_NEW:
+ startNew(signal, newOwner);
+ validate_lock_queue(newOwner);
+ return;
+ case CHECK_LOCK_UPGRADE:
+ startNext(signal, lastP);
+ validate_lock_queue(lastP);
+ break;
+ }
}
-/* ------------------------------------------------------------------------- */
-/* RELEASELOCK */
-/* RESETS LOCK OF AN ELEMENT. */
-/* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */
-/* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */
-/* ------------------------------------------------------------------------- */
-void Dbacc::releaselock(Signal* signal)
-{
- OperationrecPtr rloOperPtr;
- OperationrecPtr trlOperPtr;
- OperationrecPtr trlTmpOperPtr;
- Uint32 TelementIsDisappeared;
-
- trlOperPtr.i = RNIL;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /** ---------------------------------------------------------------------
- * NEXT OPERATION TAKES OVER THE LOCK.
- * We will simply move the info from the leader
- * to the new queue leader.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyInOperPtr = trlOperPtr;
- copyOperPtr = operationRecPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevParallelQue = RNIL;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- /* -----------------------------------------------------------------
- * THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE
- * NEW LOCK OWNER.
- * ------------------------------------------------------------------ */
- trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
- }//if
-
- check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
- } else {
- ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
- jam();
- /** ---------------------------------------------------------------------
- * THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY.
- * WE NEED TO REARRANGE LISTS AND START A NUMBER OF OPERATIONS.
- * -------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyOperPtr = operationRecPtr;
- copyInOperPtr = trlOperPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevSerialQue = RNIL;
- ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
- /* --------------------------------------------------------------------- */
- /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
- /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
- /* --------------------------------------------------------------------- */
- rloOperPtr = operationRecPtr;
- trlTmpOperPtr = trlOperPtr;
- TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
- Uint32 ThashValue = trlOperPtr.p->hashValue;
- do {
- /* ------------------------------------------------------------------ */
- // Ensure that all operations in the queue are assigned with the
- // elementIsDisappeared to ensure that the element is removed after
- // a previous delete. An insert does however revert this decision
- // since the element is put back again.
- // Local checkpoints complicate life here since they do not
- // execute the next operation but simply change
- // the state on the operation.
- // We need to set-up the variable elementIsDisappeared
- // properly even when local checkpoints and inserts/writes after
- // deletes occur.
- /* ------------------------------------------------------------------- */
- trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
- if (TelementIsDisappeared == ZTRUE) {
- /* ----------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the
- // hashValue is also set since it always originates from a
- // committing abort or a aborting insert.
- // Scans do not initialise the hashValue and must have this
- // value initialised if they are to successfully commit the delete.
- /* ----------------------------------------------------------------- */
- jam();
- trlTmpOperPtr.p->hashValue = ThashValue;
- }//if
- trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
- trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
- /* ------------------------------------------------------------------- */
- // Restart the queued operation.
- /* ------------------------------------------------------------------- */
- operationRecPtr = trlTmpOperPtr;
- TelementIsDisappeared = executeNextOperation(signal);
- ThashValue = operationRecPtr.p->hashValue;
- if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ----------------------------------------------------------------- */
- // We will continue with the next operation in the parallel
- // queue and start this as well.
- /* ----------------------------------------------------------------- */
- trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- } else {
- jam();
- break;
- }//if
- } while (1);
- operationRecPtr = rloOperPtr;
- }//if
-
- // Insert the next op into the lock owner list
- insertLockOwnersList(signal, trlOperPtr);
- return;
-}//Dbacc::releaselock()
+void
+Dbacc::startNew(Signal* signal, OperationrecPtr newOwner)
+{
+ OperationrecPtr save = operationRecPtr;
+ operationRecPtr = newOwner;
+
+ Uint32 opbits = newOwner.p->m_op_bits;
+ Uint32 op = opbits & Operationrec::OP_MASK;
+ Uint32 opstate = (opbits & Operationrec::OP_STATE_MASK);
+ ndbassert(opstate == Operationrec::OP_STATE_WAITING);
+ ndbassert(opbits & Operationrec::OP_LOCK_OWNER);
+ const bool deleted = opbits & Operationrec::OP_ELEMENT_DISAPPEARED;
+ Uint32 errCode = 0;
-/* --------------------------------------------------------------------------------- */
-/* COPY_OP_INFO */
-/* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */
-/* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */
-/* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */
-/* --------------------------------------------------------------------------------- */
-void Dbacc::copyOpInfo(Signal* signal)
-{
- Page8Ptr coiPageidptr;
-
- copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage;
- copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward;
- copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer;
- copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer;
- copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits;
- copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart;
- copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared;
- if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
- jam();
- copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue;
- }//if
- coiPageidptr.i = copyOperPtr.p->elementPage;
- ptrCheckGuard(coiPageidptr, cpagesize, page8);
- const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i);
- dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp);
- arrGuard(copyOperPtr.p->elementPointer, 2048);
- coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp;
- copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0];
- copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1];
-}//Dbacc::copyOpInfo()
+ opbits &= opbits & ~(Uint32)Operationrec::OP_STATE_MASK;
+ opbits |= Operationrec::OP_STATE_RUNNING;
+
+ if (op == ZSCAN_OP && (opbits & Operationrec::OP_LOCK_REQ) == 0)
+ goto scan;
-/* ******************--------------------------------------------------------------- */
-/* EXECUTE NEXT OPERATION */
-/* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */
-/* --------------------------------------------------------------------------------- */
-Uint32 Dbacc::executeNextOperation(Signal* signal)
-{
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------- */
- /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS DELETED. */
- /* --------------------------------------------------------------------- */
- if (((operationRecPtr.p->operation != ZINSERT) &&
- (operationRecPtr.p->operation != ZWRITE)) ||
- (operationRecPtr.p->prevParallelQue != RNIL)) {
- if (operationRecPtr.p->operation != ZSCAN_OP ||
- operationRecPtr.p->isAccLockReq) {
- jam();
- /* ----------------------------------------------------------------- */
- // Updates and reads with a previous delete simply aborts with read
- // error indicating that tuple did not exist.
- // Also inserts and writes not being the first operation.
- /* ----------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZREAD_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- } else {
- /* ----------------------------------------------------------------- */
- /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A
- * SCAN => SPECIAL TREATMENT.
- * IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION
- * FROM THE SCAN LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED
- * THEN WE SHOULD RESTART THE SCAN PROCESS. OTHERWISE WE SIMPLY
- * RELEASE THE OPERATION AND DECREASE THE NUMBER OF LOCKS HELD.
- * ----------------------------------------------------------------- */
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }//if
- /* --------------------------------------------------------------------- */
- // Insert and writes can continue but need to be converted to inserts.
- /* --------------------------------------------------------------------- */
- jam();
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
- } else if (operationRecPtr.p->operation == ZINSERT) {
- bool abortFlag = true;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE) {
- jam();
- abortFlag = false;
- }//if
- }//if
- if (abortFlag) {
- jam();
- /* ------------------------------------------------------------------- */
- /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. */
- /* THIS IS CLEARLY NOT A GOOD IDEA. */
- /* ------------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZWRITE_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal,
- 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }
- else if(operationRecPtr.p->operation == ZWRITE)
+ if (deleted)
{
jam();
- operationRecPtr.p->operation = ZUPDATE;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE)
- {
- jam();
- operationRecPtr.p->operation = ZINSERT;
- }
+ if (op != ZINSERT && op != ZWRITE)
+ {
+ errCode = ZREAD_ERROR;
+ goto ref;
}
+
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits &= ~(Uint32)Operationrec::OP_ELEMENT_DISAPPEARED;
+ opbits |= (op = ZINSERT);
+ opbits |= Operationrec::OP_INSERT_IS_DONE;
+ goto conf;
}
-
- if (operationRecPtr.p->operation == ZSCAN_OP &&
- ! operationRecPtr.p->isAccLockReq) {
+ else if (op == ZINSERT)
+ {
jam();
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- } else {
+ errCode = ZWRITE_ERROR;
+ goto ref;
+ }
+ else if (op == ZWRITE)
+ {
jam();
- sendAcckeyconf(signal);
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF,
- signal, 6, JBB);
- }//if
- return operationRecPtr.p->elementIsDisappeared;
-}//Dbacc::executeNextOperation()
+ opbits &= ~(Uint32)Operationrec::OP_MASK;
+ opbits |= (op = ZUPDATE);
+ goto conf;
+ }
+
+conf:
+ newOwner.p->m_op_bits = opbits;
+
+ sendAcckeyconf(signal);
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYCONF,
+ signal, 6, JBA);
+
+ operationRecPtr = save;
+ return;
+
+scan:
+ jam();
+ newOwner.p->m_op_bits = opbits;
+
+ takeOutScanLockQueue(newOwner.p->scanRecPtr);
+ putReadyScanQueue(signal, newOwner.p->scanRecPtr);
+
+ operationRecPtr = save;
+ return;
+
+ref:
+ newOwner.p->m_op_bits = opbits;
+
+ signal->theData[0] = newOwner.p->userptr;
+ signal->theData[1] = errCode;
+ sendSignal(newOwner.p->userblockref, GSN_ACCKEYREF, signal,
+ 2, JBB);
+
+ operationRecPtr = save;
+ return;
+}
/**
* takeOutLockOwnersList
@@ -3950,7 +4845,6 @@
{
const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
-
#ifdef VM_TRACE
// Check that operation is already in the list
OperationrecPtr tmpOperPtr;
@@ -3965,8 +4859,7 @@
ndbrequire(inList == true);
#endif
- ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
- outOperPtr.p->lockOwner = ZFALSE;
+ ndbassert(outOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER);
// Fast path through the code for the common case.
if ((Tprev == RNIL) && (Tnext == RNIL)) {
@@ -4007,7 +4900,6 @@
const OperationrecPtr& insOperPtr)
{
OperationrecPtr tmpOperPtr;
-
#ifdef VM_TRACE
// Check that operation is not already in list
tmpOperPtr.i = fragrecptr.p->lockOwnersList;
@@ -4017,12 +4909,12 @@
tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
}
#endif
+ tmpOperPtr.i = fragrecptr.p->lockOwnersList;
+
+ ndbrequire(! (insOperPtr.p->m_op_bits & Operationrec::OP_LOCK_OWNER));
- ndbrequire(insOperPtr.p->lockOwner == ZFALSE);
-
- insOperPtr.p->lockOwner = ZTRUE;
+ insOperPtr.p->m_op_bits |= Operationrec::OP_LOCK_OWNER;
insOperPtr.p->prevLockOwnerOp = RNIL;
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
fragrecptr.p->lockOwnersList = insOperPtr.i;
@@ -5385,6 +6277,7 @@
if (!scanPtr.p->scanReadCommittedFlag) {
commitOperation(signal);
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5400,9 +6293,10 @@
jam();
fragrecptr.i = scanPtr.p->activeLocalFrag;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- /* --------------------------------------------------------------------------------- */
- /* THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL. RELESE ALL INVOLVED REC. */
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------------
+ * THE SCAN PROCESS IS FINISHED. RELOCK ALL LOCKED EL.
+ * RELESE ALL INVOLVED REC.
+ * ------------------------------------------------------------------- */
releaseScanLab(signal);
return;
break;
@@ -5452,24 +6346,26 @@
scanPtr.p->nextBucketIndex++;
if (scanPtr.p->scanBucketState == ScanRec::SECOND_LAP) {
if (scanPtr.p->nextBucketIndex > scanPtr.p->maxBucketIndexToRescan) {
- /* --------------------------------------------------------------------------------- */
- // We have finished the rescan phase. We are ready to proceed with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
+ // We have finished the rescan phase.
+ // We are ready to proceed with the next fragment part.
+ /* ---------------------------------------------------------------- */
jam();
checkNextFragmentLab(signal);
return;
}//if
} else if (scanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
if ((fragrecptr.p->p + fragrecptr.p->maxp) < scanPtr.p->nextBucketIndex) {
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
// All buckets have been scanned a first time.
- /* --------------------------------------------------------------------------------- */
+ /* ---------------------------------------------------------------- */
if (scanPtr.p->minBucketIndexToRescan == 0xFFFFFFFF) {
jam();
- /* --------------------------------------------------------------------------------- */
- // We have not had any merges behind the scan. Thus it is not necessary to perform
- // any rescan any buckets and we can proceed immediately with the next fragment part.
- /* --------------------------------------------------------------------------------- */
+ /* -------------------------------------------------------------- */
+ // We have not had any merges behind the scan.
+ // Thus it is not necessary to perform any rescan any buckets
+ // and we can proceed immediately with the next fragment part.
+ /* --------------------------------------------------------------- */
checkNextFragmentLab(signal);
return;
} else {
@@ -5561,18 +6457,23 @@
tslElementptr = tnsElementptr;
setlock(signal);
insertLockOwnersList(signal, operationRecPtr);
+ operationRecPtr.p->m_op_bits |=
+ Operationrec::OP_STATE_RUNNING | Operationrec::OP_RUN_QUEUE;
}//if
} else {
arrGuard(tnsElementptr, 2048);
queOperPtr.i =
ElementHeader::getOpPtrI(nsPageptr.p->word32[tnsElementptr]);
ptrCheckGuard(queOperPtr, coprecsize, operationrec);
- if (queOperPtr.p->elementIsDisappeared == ZTRUE) {
+ if (queOperPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED ||
+ queOperPtr.p->localdata[0] == ~(Uint32)0)
+ {
jam();
- /* --------------------------------------------------------------------------------- */
- // If the lock owner indicates the element is disappeared then we will not report this
- // tuple. We will continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ------------------------------------------------------------------ */
+ // If the lock owner indicates the element is disappeared then
+ // we will not report this tuple. We will continue with the next tuple.
+ /* ------------------------------------------------------------------ */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5584,31 +6485,29 @@
Uint32 return_result;
if (scanPtr.p->scanLockMode == ZREADLOCK) {
jam();
- priPageptr = nsPageptr;
- tpriElementptr = tnsElementptr;
- return_result = placeReadInLockQueue(signal);
+ return_result = placeReadInLockQueue(queOperPtr);
} else {
jam();
- pwiPageptr = nsPageptr;
- tpwiElementptr = tnsElementptr;
- return_result = placeWriteInLockQueue(signal);
+ return_result = placeWriteInLockQueue(queOperPtr);
}//if
if (return_result == ZSERIAL_QUEUE) {
- /* --------------------------------------------------------------------------------- */
- /* WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO WAIT FOR */
- /* THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT. */
- /* --------------------------------------------------------------------------------- */
+ /* -----------------------------------------------------------------
+ * WE PLACED THE OPERATION INTO A SERIAL QUEUE AND THUS WE HAVE TO
+ * WAIT FOR THE LOCK TO BE RELEASED. WE CONTINUE WITH THE NEXT ELEMENT
+ * ----------------------------------------------------------------- */
putOpScanLockQue(); /* PUT THE OP IN A QUE IN THE SCAN REC */
signal->theData[0] = scanPtr.i;
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
sendSignal(cownBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return;
- } else if (return_result == ZWRITE_ERROR) {
+ } else if (return_result != ZPARALLEL_QUEUE) {
jam();
- /* --------------------------------------------------------------------------------- */
- // The tuple is either not committed yet or a delete in the same transaction (not
- // possible here since we are a scan). Thus we simply continue with the next tuple.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------- */
+ // The tuple is either not committed yet or a delete in
+ // the same transaction (not possible here since we are a scan).
+ // Thus we simply continue with the next tuple.
+ /* ----------------------------------------------------------------- */
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
signal->theData[0] = scanPtr.i;
@@ -5619,10 +6518,11 @@
ndbassert(return_result == ZPARALLEL_QUEUE);
}//if
}//if
- /* --------------------------------------------------------------------------------- */
- // Committed read proceed without caring for locks immediately down here except when
- // the tuple was deleted permanently and no new operation has inserted it again.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
+ // Committed read proceed without caring for locks immediately
+ // down here except when the tuple was deleted permanently
+ // and no new operation has inserted it again.
+ /* ----------------------------------------------------------------------- */
putActiveScanOp(signal);
sendNextScanConf(signal);
return;
@@ -5646,14 +6546,15 @@
DirRangePtr cnfDirRangePtr;
DirectoryarrayPtr cnfDirptr;
Page8Ptr cnfPageidptr;
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
// Set the active fragment part.
// Set the current bucket scanned to the first.
// Start with the first lap.
// Remember the number of buckets at start of the scan.
- // Set the minimum and maximum to values that will always be smaller and larger than.
+ // Set the minimum and maximum to values that will always be smaller and
+ // larger than.
// Reset the scan indicator on the first bucket.
- /* --------------------------------------------------------------------------------- */
+ /* ----------------------------------------------------------------------- */
scanPtr.p->activeLocalFrag = fragrecptr.i;
scanPtr.p->nextBucketIndex = 0; /* INDEX OF SCAN BUCKET */
scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
@@ -5672,11 +6573,11 @@
releaseScanBucket(signal);
}//Dbacc::initScanFragmentPart()
-/* --------------------------------------------------------------------------------- */
-/* FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED. ALL OPERATION IN THE */
-/* ACTIVE OR WAIT QUEUE ARE RELEASED, SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN */
-/* RECORD IS RELEASED. */
-/* --------------------------------------------------------------------------------- */
+/* -------------------------------------------------------------------------
+ * FLAG = 6 = ZCOPY_CLOSE THE SCAN PROCESS IS READY OR ABORTED.
+ * ALL OPERATION IN THE ACTIVE OR WAIT QUEUE ARE RELEASED,
+ * SCAN FLAG OF ROOT FRAG IS RESET AND THE SCAN RECORD IS RELEASED.
+ * ------------------------------------------------------------------------ */
void Dbacc::releaseScanLab(Signal* signal)
{
releaseAndCommitActiveOps(signal);
@@ -5715,8 +6616,17 @@
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutActiveScanOp(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5737,8 +6647,17 @@
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
if (!scanPtr.p->scanReadCommittedFlag) {
jam();
- commitOperation(signal);
+ if ((operationRecPtr.p->m_op_bits & Operationrec::OP_STATE_MASK) ==
+ Operationrec::OP_STATE_EXECUTED)
+ {
+ commitOperation(signal);
+ }
+ else
+ {
+ abortOperation(signal);
+ }
}//if
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
takeOutReadyScanQueue(signal);
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
@@ -5761,6 +6680,7 @@
abortOperation(signal);
}//if
takeOutScanLockQueue(scanPtr.i);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
operationRecPtr.i = trsoOperPtr.i;
@@ -5795,9 +6715,11 @@
takeOutReadyScanQueue(signal);
fragrecptr.i = operationRecPtr.p->fragptr;
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
+ if (operationRecPtr.p->m_op_bits & Operationrec::OP_ELEMENT_DISAPPEARED)
+ {
jam();
abortOperation(signal);
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
releaseOpRec(signal);
scanPtr.p->scanOpsAllocated--;
continue;
@@ -5884,7 +6806,8 @@
jamEntry();
tatrOpPtr.i = signal->theData[1]; /* OPER PTR OF ACC */
ptrCheckGuard(tatrOpPtr, coprecsize, operationrec);
- if (tatrOpPtr.p->operation == ZSCAN_OP) {
+ if ((tatrOpPtr.p->m_op_bits & Operationrec::OP_MASK) == ZSCAN_OP)
+ {
tatrOpPtr.p->transId1 = signal->theData[2];
tatrOpPtr.p->transId2 = signal->theData[3];
} else {
@@ -5981,29 +6904,28 @@
scanPtr.p->scanOpsAllocated++;
+ Uint32 opbits = 0;
+ opbits |= ZSCAN_OP;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanLockMode ? Operationrec::OP_ACC_LOCK_MODE : 0;
+ opbits |= scanPtr.p->scanReadCommittedFlag ?
+ Operationrec::OP_EXECUTED_DIRTY_READ : 0;
+ opbits |= Operationrec::OP_COMMIT_DELETE_CHECK;
operationRecPtr.p->userptr = RNIL;
operationRecPtr.p->scanRecPtr = scanPtr.i;
- operationRecPtr.p->operation = ZSCAN_OP;
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->lockMode = scanPtr.p->scanLockMode;
operationRecPtr.p->fid = fragrecptr.p->myfid;
operationRecPtr.p->fragptr = fragrecptr.i;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
operationRecPtr.p->nextParallelQue = RNIL;
operationRecPtr.p->prevParallelQue = RNIL;
operationRecPtr.p->nextSerialQue = RNIL;
operationRecPtr.p->prevSerialQue = RNIL;
operationRecPtr.p->transId1 = scanPtr.p->scanTrid1;
operationRecPtr.p->transId2 = scanPtr.p->scanTrid2;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->dirtyRead = 0;
- operationRecPtr.p->nodeType = 0; // Not a stand-by node
operationRecPtr.p->elementIsforward = tisoIsforward;
operationRecPtr.p->elementContainer = tisoContainerptr;
operationRecPtr.p->elementPointer = tisoElementptr;
operationRecPtr.p->elementPage = isoPageptr.i;
- operationRecPtr.p->isAccLockReq = ZFALSE;
+ operationRecPtr.p->m_op_bits = opbits;
tisoLocalPtr = tisoElementptr + tisoIsforward;
guard24 = fragrecptr.p->localkeylen - 1;
for (tisoTmp = 0; tisoTmp <= guard24; tisoTmp++) {
@@ -6868,14 +7790,12 @@
}
ndbrequire(opInList == false);
#endif
- ndbrequire(operationRecPtr.p->lockOwner == ZFALSE);
+ ndbrequire(operationRecPtr.p->m_op_bits == Operationrec::OP_INITIAL);
operationRecPtr.p->nextOp = cfreeopRec;
cfreeopRec = operationRecPtr.i; /* UPDATE FREE LIST OF OP RECORDS */
operationRecPtr.p->prevOp = RNIL;
- operationRecPtr.p->opState = FREE_OP;
- operationRecPtr.p->transactionstate = IDLE;
- operationRecPtr.p->operation = ZUNDEFINED_OP;
+ operationRecPtr.p->m_op_bits = Operationrec::OP_INITIAL;
}//Dbacc::releaseOpRec()
/* --------------------------------------------------------------------------------- */
@@ -7377,8 +8297,8 @@
OperationrecPtr tmpOpPtr;
tmpOpPtr.i = recordNo;
ptrAss(tmpOpPtr, operationrec);
- infoEvent("Dbacc::operationrec[%d]: opState=%d, transid(0x%x, 0x%x)",
- tmpOpPtr.i, tmpOpPtr.p->opState, tmpOpPtr.p->transId1,
+ infoEvent("Dbacc::operationrec[%d]: transid(0x%x, 0x%x)",
+ tmpOpPtr.i, tmpOpPtr.p->transId1,
tmpOpPtr.p->transId2);
infoEvent("elementIsforward=%d, elementPage=%d, elementPointer=%d ",
tmpOpPtr.p->elementIsforward, tmpOpPtr.p->elementPage,
@@ -7386,8 +8306,7 @@
infoEvent("fid=%d, fragptr=%d, hashvaluePart=%d ",
tmpOpPtr.p->fid, tmpOpPtr.p->fragptr,
tmpOpPtr.p->hashvaluePart);
- infoEvent("hashValue=%d, insertDeleteLen=%d",
- tmpOpPtr.p->hashValue, tmpOpPtr.p->insertDeleteLen);
+ infoEvent("hashValue=%d", tmpOpPtr.p->hashValue);
infoEvent("nextLockOwnerOp=%d, nextOp=%d, nextParallelQue=%d ",
tmpOpPtr.p->nextLockOwnerOp, tmpOpPtr.p->nextOp,
tmpOpPtr.p->nextParallelQue);
@@ -7398,15 +8317,8 @@
tmpOpPtr.p->prevLockOwnerOp, tmpOpPtr.p->nextParallelQue);
infoEvent("prevSerialQue=%d, scanRecPtr=%d",
tmpOpPtr.p->prevSerialQue, tmpOpPtr.p->scanRecPtr);
- infoEvent("transactionstate=%d, elementIsDisappeared=%d, insertIsDone=%d ",
- tmpOpPtr.p->transactionstate, tmpOpPtr.p->elementIsDisappeared,
- tmpOpPtr.p->insertIsDone);
- infoEvent("lockMode=%d, lockOwner=%d, nodeType=%d ",
- tmpOpPtr.p->lockMode, tmpOpPtr.p->lockOwner,
- tmpOpPtr.p->nodeType);
- infoEvent("operation=%d, opSimple=%d, dirtyRead=%d,scanBits=%d ",
- tmpOpPtr.p->operation, tmpOpPtr.p->opSimple,
- tmpOpPtr.p->dirtyRead, tmpOpPtr.p->scanBits);
+ infoEvent("m_op_bits=0x%x, scanBits=%d ",
+ tmpOpPtr.p->m_op_bits, tmpOpPtr.p->scanBits);
return;
}
--- 1.45/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2006-02-13 13:12:42 +01:00
+++ 1.46/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp 2006-05-18 10:17:50 +02:00
@@ -33,8 +33,6 @@
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
-#include "../dbacc/Dbacc.hpp"
-
#ifdef DBLQH_C
// Constants
/* ------------------------------------------------------------------------- */
@@ -2556,8 +2554,19 @@
Dbtup* c_tup;
Dbacc* c_acc;
+
+ /**
+ * Read primary key from tup
+ */
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
+ /**
+ * Read primary key from operation
+ */
+public:
+ Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
+private:
+
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
@@ -2924,6 +2933,11 @@
}
DLHashTable<ScanRecord> c_scanTakeOverHash;
+
+#ifdef ERROR_INSERT
+ inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
+ void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
+#endif
};
inline
@@ -2991,10 +3005,19 @@
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal);
- if (ERROR_INSERTED(5712))
+ if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key;
}
+inline
+bool
+Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
+{
+ return (ERROR_INSERTED(5712) &&
+ (regTcPtr->operation == ZINSERT ||
+ regTcPtr->operation == ZDELETE)) ||
+ ERROR_INSERTED(5713);
+}
#endif
--- 1.104/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-05-04 18:22:13 +02:00
+++ 1.105/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2006-05-18 10:17:50 +02:00
@@ -67,48 +67,70 @@
// seen only when we debug the product
#ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state;
return out;
}
+static
NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state;
return out;
}
+static
+NdbOut &
+operator<<(NdbOut& out, Operation_t op)
+{
+ switch(op){
+ case ZREAD: out << "READ"; break;
+ case ZREAD_EX: out << "READ-EX"; break;
+ case ZINSERT: out << "INSERT"; break;
+ case ZUPDATE: out << "UPDATE"; break;
+ case ZDELETE: out << "DELETE"; break;
+ case ZWRITE: out << "WRITE"; break;
+ }
+ return out;
+}
+
#else
#define DEBUG(x)
#endif
@@ -120,7 +142,7 @@
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h>
-NdbOut * tracenrout = 0;
+static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1
@@ -132,6 +154,13 @@
#define CLEAR_TRACENR_FLAG
#endif
+#ifdef ERROR_INSERT
+static NdbOut * traceopout = 0;
+#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
+#else
+#define TRACE_OP(x) {}
+#endif
+
/* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */
/* */
@@ -454,6 +483,10 @@
name = NdbConfig_SignalLogFileName(getOwnNodeId());
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif
+
+#ifdef ERROR_INSERT
+ traceopout = &ndbout;
+#endif
return;
break;
@@ -2531,14 +2564,15 @@
case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED:
jam();
-/* -------------------------------------------------------------------------- */
-/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
-/* -------------------------------------------------------------------------- */
+/* ------------------------------------------------------------------------- */
+/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
+/* ------------------------------------------------------------------------- */
break;
default:
ndbrequire(false);
break;
}//switch
+
}//Dblqh::execTUPKEYCONF()
/* ************> */
@@ -2560,6 +2594,8 @@
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
+ TRACE_OP(regTcPtr, "TUPKEYREF");
+
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{
jam();
@@ -2568,7 +2604,12 @@
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
}
-
+ else if (getNodeState().startLevel == NodeState::SL_STARTED)
+ {
+ if (terrorCode == 899)
+ ndbout << "899: " << regTcPtr->m_row_id << endl;
+ }
+
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
@@ -3606,57 +3647,7 @@
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return;
}//if
-//#define TRACE_LQHKEYREQ
-#ifdef TRACE_LQHKEYREQ
- {
- ndbout << (regTcPtr->operation == ZREAD ? "READ" :
- regTcPtr->operation == ZUPDATE ? "UPDATE" :
- regTcPtr->operation == ZINSERT ? "INSERT" :
- regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
- << "(" << (int)regTcPtr->operation << ")"
- << " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
- << ", " << refToNode(regTcPtr->clientBlockref) << ")"
- << " table=" << regTcPtr->tableref << " ";
-
- ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
-
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "]" << endl;
-
- ndbout << "attr=[" << hex;
- for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
- ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
-
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
- while(i < regTcPtr->totReclenAi)
- {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
- for(Uint32 j= 0; j<dataLen; j++, i++)
- ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
-
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }
-
- ndbout << "]" << endl;
- }
-#endif
+
/* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
@@ -3763,6 +3754,7 @@
/* ----------------------------------------------------------------- */
if (TRACENR_FLAG)
{
+ TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break;
@@ -3847,6 +3839,9 @@
signal->theData[8] = sig2;
signal->theData[9] = sig3;
signal->theData[10] = sig4;
+
+ TRACE_OP(regTcPtr.p, "ACC");
+
if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11);
}//if
@@ -4133,7 +4128,7 @@
jam();
ndbrequire(rowid == 0);
signal->theData[0] = accPtr;
- signal->theData[1] = false;
+ signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry();
return;
@@ -4144,16 +4139,18 @@
*/
ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id;
- signal->theData[0] = regTcPtr.p->accConnectrec;
+
+ c_acc->execACCKEY_ORD(signal, accPtr);
+ signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry();
-
+
ndbrequire(regTcPtr.p->m_dealloc == 1);
int ret = c_tup->nr_delete(signal, regTcPtr.i,
fragPtr.p->tupFragptr, ®TcPtr.p->m_row_id,
regTcPtr.p->gci);
jamEntry();
-
+
if (ret)
{
ndbassert(ret == 1);
@@ -4167,7 +4164,7 @@
}
TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
-
+
regTcPtr.p->m_dealloc = 0;
regTcPtr.p->m_row_id = save;
fragptr = fragPtr;
@@ -4274,6 +4271,45 @@
}
}
+Uint32
+Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
+{
+ TcConnectionrecPtr regTcPtr;
+ DatabufPtr regDatabufptr;
+ Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
+
+ jamEntry();
+ regTcPtr.i = opPtrI;
+ ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+
+ Uint32 tableId = regTcPtr.p->tableref;
+ Uint32 keyLen = regTcPtr.p->primKeyLen;
+ regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
+ Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
+
+ memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
+ if (keyLen > 4)
+ {
+ tmp += 4;
+ Uint32 pos = 4;
+ do {
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
+ regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+ tmp += sizeof(regDatabufptr.p->data) >> 2;
+ pos += sizeof(regDatabufptr.p->data) >> 2;
+ } while(pos < keyLen);
+ }
+
+ if (xfrm)
+ {
+ jam();
+ Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
+ return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
+ }
+
+ return keyLen;
+}
/* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */
@@ -4447,10 +4483,6 @@
* ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS;
-#ifdef TRACE_LQHKEYREQ
- ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
- << endl;
-#endif
Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6);
@@ -4506,70 +4538,13 @@
tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1;
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
- {
- ndbout << "INSERT " << regFragptrP->tabRef
- << "(" << regFragptrP->fragId << ")";
-
- {
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "] ";
- }
-
- if(regTcPtr->m_use_rowid)
- ndbout << " " << regTcPtr->m_row_id;
- }
-
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
- {
- Local_key lk; lk.assref(local_key);
-
- ndbout << "DELETE " << regFragptrP->tabRef
- << "(" << regFragptrP->fragId << ") " << lk;
-
- {
- ndbout << "key=[" << hex;
- Uint32 i;
- for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
- ndbout << hex << regTcPtr->tupkeyData[i] << " ";
- }
-
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while(i < regTcPtr->primKeyLen)
- {
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
- ndbout << hex << regDatabufptr.p->data[j] << " ";
- }
- ndbout << "]" << endl;
- }
-
- }
-
+ TRACE_OP(regTcPtr, "TUPKEYREQ");
+
regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
-
- if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
- {
- ndbout << endl;
- }
}//Dblqh::execACCKEYCONF()
void
@@ -4654,27 +4629,37 @@
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat;
+ Uint32 readLen = tupKeyConf->readLength;
+ Uint32 writeLen = tupKeyConf->writeLength;
+ Uint32 accOp = regTcPtr->accConnectrec;
+ c_acc->execACCKEY_ORD(signal, accOp);
+
+ TRACE_OP(regTcPtr, "TUPKEYCONF");
+
if (regTcPtr->simpleRead) {
jam();
/* ----------------------------------------------------------------------
- * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
- * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET
+ * THE OPERATION IS A SIMPLE READ.
+ * WE WILL IMMEDIATELY COMMIT THE OPERATION.
+ * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
+ * (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
- * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
- * ---------------------------------------------------------------------- */
+ * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
+ * READ LENGTH
+ * --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal);
return;
}//if
- if (tupKeyConf->readLength != 0) {
+ if (readLen != 0)
+ {
jam();
/* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
-
- regTcPtr->readlenAi = tupKeyConf->readLength;
+ regTcPtr->readlenAi = readLen;
}//if
- regTcPtr->totSendlenAi = tupKeyConf->writeLength;
+ regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@@ -5597,6 +5582,8 @@
if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
+
+ TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref;
@@ -5818,6 +5805,10 @@
ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
+
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+ TRACE_OP(regTcPtr, "COMMIT");
+
commitReqLab(signal, gci);
return;
}//if
@@ -5937,6 +5928,10 @@
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) {
+
+ TcConnectionrec * const regTcPtr = tcConnectptr.p;
+ TRACE_OP(regTcPtr, "COMPLETE");
+
if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam();
@@ -6313,12 +6308,16 @@
TRACENR(endl);
}
+ TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else {
if(!dirtyOp){
+ TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@@ -6362,6 +6361,8 @@
c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr;
+ TRACE_OP(tcPtr, "ACC_COMMITREQ");
+
Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@@ -6670,6 +6671,8 @@
regTcPtr->commitAckMarker = RNIL;
}
+ TRACE_OP(regTcPtr, "ABORT");
+
abortStateHandlerLab(signal);
return;
@@ -7087,23 +7090,30 @@
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP.
- * ------------------------------------------------------------------------ */
+ * ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- fragptr.i = regTcPtr->fragmentptr;
- c_fragment_pool.getPtr(fragptr);
- signal->theData[0] = regTcPtr->tupConnectrec;
- EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
+ TRACE_OP(regTcPtr, "ACC ABORT");
+
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec;
- signal->theData[1] = true;
+ signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
- /* ------------------------------------------------------------------------
- * We need to insert a real-time break by sending ACC_ABORTCONF through the
- * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
- * TUPKEYREF that are in the job buffer but not yet processed. Doing
- * everything without that would race and create a state error when they
- * are executed.
- * ----------------------------------------------------------------------- */
+
+ if (signal->theData[1] == RNIL)
+ {
+ jam();
+ /* ------------------------------------------------------------------------
+ * We need to insert a real-time break by sending ACC_ABORTCONF through the
+ * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
+ * TUPKEYREF that are in the job buffer but not yet processed. Doing
+ * everything without that would race and create a state error when they
+ * are executed.
+ * --------------------------------------------------------------------- */
+ return;
+ }
+
+ execACC_ABORTCONF(signal);
return;
}//Dblqh::abortContinueAfterBlockedLab()
@@ -7117,6 +7127,11 @@
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
+
+ TRACE_OP(regTcPtr, "ACC_ABORTCONF");
+ signal->theData[0] = regTcPtr->tupConnectrec;
+ EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
+
continueAbortLab(signal);
return;
}//Dblqh::execACC_ABORTCONF()
@@ -8837,7 +8852,9 @@
}//if
// If accOperationPtr == RNIL no record was returned by ACC
- if (nextScanConf->accOperationPtr == RNIL) {
+ Uint32 accOpPtr = nextScanConf->accOperationPtr;
+ if (accOpPtr == RNIL)
+ {
jam();
/*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
@@ -8871,7 +8888,8 @@
jam();
set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows,
- nextScanConf->accOperationPtr);
+ accOpPtr);
+
jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
@@ -9071,12 +9089,24 @@
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
+
+ Uint32 rows = scanptr.p->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
+ if (accOpPtr != (Uint32)-1)
+ {
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+ }
+ else
+ {
+ ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
+ if ((scanptr.p->scanLockHold == ZTRUE) && rows)
+ {
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@@ -9093,7 +9123,7 @@
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4;
- scanptr.p->m_curr_batch_size_rows++;
+ scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) {
@@ -9103,7 +9133,7 @@
return;
} else {
jam();
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
+ scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@@ -9187,12 +9217,24 @@
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
+
+ Uint32 rows = scanptr.p->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
+ if (accOpPtr != (Uint32)-1)
+ {
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+ }
+ else
+ {
+ ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
+ }
+
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
+ if ((scanptr.p->scanLockHold == ZTRUE) && rows)
+ {
jam();
scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal);
@@ -9213,8 +9255,8 @@
scanptr.p->scanReleaseCounter = 1;
} else {
jam();
- scanptr.p->m_curr_batch_size_rows++;
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
+ scanptr.p->m_curr_batch_size_rows = rows + 1;
+ scanptr.p->scanReleaseCounter = rows + 1;
}//if
/* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY
@@ -9224,7 +9266,7 @@
return;
}//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
- if (scanptr.p->m_curr_batch_size_rows > 0) {
+ if (rows) {
if (time_passed > 1) {
/* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
@@ -9232,7 +9274,7 @@
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API.
* ----------------------------------------------------------------------- */
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
+ scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal);
return;
}
@@ -9872,6 +9914,8 @@
fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE;
+ scanptr.p->m_curr_batch_size_rows = 0;
+ scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0,
0,
@@ -10074,7 +10118,7 @@
initCopyTc(signal, ZDELETE);
set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
tcConP->gci = nextScanConf->gci;
-
+
tcConP->primKeyLen = 0;
tcConP->totSendlenAi = 0;
tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
@@ -10197,6 +10241,12 @@
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p;
+
+ Uint32 rows = scanP->m_curr_batch_size_rows;
+ Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
+ ndbassert(accOpPtr != (Uint32)-1);
+ c_acc->execACCKEY_ORD(signal, accOpPtr);
+
if (tcConnectptr.p->errorCode != 0) {
jam();
closeCopyLab(signal);
@@ -18538,6 +18588,21 @@
}
ndbrequire(arg != 2308);
}
+
+#ifdef ERROR_INSERT
+ if (arg == 5712 || arg == 5713)
+ {
+ if (arg == 5712)
+ {
+ traceopout = &ndbout;
+ }
+ else if (arg == 5713)
+ {
+ traceopout = tracenrout;
+ }
+ SET_ERROR_INSERT_VALUE(arg);
+ }
+#endif
}//Dblqh::execDUMP_STATE_ORD()
@@ -18702,3 +18767,39 @@
logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
}
+#if defined ERROR_INSERT
+void
+Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
+{
+ (* traceopout)
+ << "[ " << hex << regTcPtr->transid[0]
+ << " " << hex << regTcPtr->transid[1] << " ] " << dec
+ << pos
+ << " " << (Operation_t)regTcPtr->operation
+ << " " << regTcPtr->tableref
+ << "(" << regTcPtr->fragmentid << ")"
+ << "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
+
+ {
+ (* traceopout) << "key=[" << hex;
+ Uint32 i;
+ for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
+ (* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
+ }
+
+ DatabufPtr regDatabufptr;
+ regDatabufptr.i = regTcPtr->firstTupkeybuf;
+ while(i < regTcPtr->primKeyLen)
+ {
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
+ (* traceopout) << hex << regDatabufptr.p->data[j] << " ";
+ }
+ (* traceopout) << "] ";
+ }
+
+ if (regTcPtr->m_use_rowid)
+ (* traceopout) << " " << regTcPtr->m_row_id;
+ (* traceopout) << endl;
+}
+#endif
--- 1.43/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2006-03-23 04:17:22 +01:00
+++ 1.44/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2006-05-18 10:17:50 +02:00
@@ -297,7 +297,6 @@
};
enum TupleState {
- TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3
@@ -305,7 +304,6 @@
enum State {
NOT_INITIALIZED = 0,
- COMMON_AREA_PAGES = 1,
IDLE = 17,
ACTIVE = 18,
SYSTEM_RESTART = 19,
@@ -1441,7 +1439,6 @@
void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal);
- void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);
--- 1.36/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2006-05-05 00:59:21 +02:00
+++ 1.37/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp 2006-05-18 10:17:50 +02:00
@@ -202,29 +202,6 @@
}
}
-void Dbtup::execTUP_ALLOCREQ(Signal* signal)
-{
- OperationrecPtr regOperPtr;
-
- jamEntry();
-
- regOperPtr.i= signal->theData[0];
- c_operation_pool.getPtr(regOperPtr);
-
- regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
- //ndbout_c("execTUP_ALLOCREQ");
-
- signal->theData[0]= 0;
- signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
- signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
- return;
-
-mem_error:
- jam();
- signal->theData[0]= ZMEM_NOMEM_ERROR;
- return;
-}
-
void
Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr)
@@ -455,13 +432,13 @@
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p;
- if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT)
+ if(local_key == ~(Uint32)0)
{
jam();
regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
return 1;
- }
+ }
jam();
Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
@@ -663,7 +640,7 @@
regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2;
- regOperPtr->m_tuple_location.m_page_idx= sig3;
+ Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex;
@@ -673,7 +650,7 @@
req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
- req_struct.frag_page_id= sig4;
+ Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen;
@@ -706,7 +683,8 @@
copyAttrinfo(regOperPtr, &cinBuffer[0]);
- if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT)
+ Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
+ if(Roptype == ZINSERT && localkey == ~0)
{
// No tuple allocatated yet
goto do_insert;
@@ -1159,49 +1137,6 @@
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
}
-void
-Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
- Operationrec* regOperPtr,
- Tablerec* regTabPtr)
-{
- regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
- req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
-
- const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
- const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
- Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
-
- if(cnt1)
- {
- // Disk part is 32-bit aligned
- char *varptr = req_struct->m_var_data[MM].m_data_ptr;
- ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
- }
- else
- {
- ptr -= Tuple_header::HeaderSize;
- }
-
- req_struct->m_disk_ptr= (Tuple_header*)ptr;
-
- if(cnt2)
- {
- KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
- ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
- dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
- dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
- dst->m_var_len_offset= cnt2;
- dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
- }
-
- // Set all null bits
- memset(req_struct->m_disk_ptr->m_null_bits+
- regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
- 4*regTabPtr->m_offsets[DD].m_null_words);
- req_struct->m_tuple_ptr->m_header_bits =
- (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
-}
-
int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr,
@@ -1215,8 +1150,8 @@
Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0;
- bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT;
- bool disk_insert = regOperPtr.p->is_first_operation() && disk;
+ bool mem_insert = regOperPtr.p->is_first_operation();
+ bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
@@ -1244,21 +1179,16 @@
if(mem_insert)
{
jam();
- ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
}
else
{
- if (!regOperPtr.p->is_first_operation())
- {
- Operationrec* prevOp= req_struct->prevOpPtr.p;
- ndbassert(prevOp->op_struct.op_type == ZDELETE);
- tup_version= prevOp->tupVersion + 1;
-
- if(!prevOp->is_first_operation())
- org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
- }
-
+ Operationrec* prevOp= req_struct->prevOpPtr.p;
+ ndbassert(prevOp->op_struct.op_type == ZDELETE);
+ tup_version= prevOp->tupVersion + 1;
+
+ if(!prevOp->is_first_operation())
+ org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else
@@ -1268,11 +1198,6 @@
if (disk_insert)
{
int res;
- if (unlikely(!mem_insert))
- {
- sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
- fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
- }
if (ERROR_INSERTED(4015))
{
@@ -1381,6 +1306,7 @@
}
if (unlikely(ptr == 0))
{
+ jam();
goto alloc_rowid_error;
}
}
@@ -1396,7 +1322,7 @@
(varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
}
- else if(!rowid || !regOperPtr.p->is_first_operation())
+ else
{
int ret;
if (ERROR_INSERTED(4020))
@@ -1416,20 +1342,6 @@
}
req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
- }
- else
- {
- if ((req_struct->m_row_id.m_page_no == frag_page_id &&
- req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
- {
- ndbout_c("no mem insert but rowid (same)");
- base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
- }
- else
- {
- // no mem insert, but rowid
- ndbrequire(false);
- }
}
base->m_header_bits |= Tuple_header::ALLOC &
--- 1.27/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2006-03-02 16:44:09 +01:00
+++ 1.28/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2006-05-18 10:17:50 +02:00
@@ -89,7 +89,6 @@
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
- addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);
--- 1.12/storage/ndb/test/ndbapi/testOperations.cpp 2006-01-11 09:26:04 +01:00
+++ 1.13/storage/ndb/test/ndbapi/testOperations.cpp 2006-05-18 10:17:50 +02:00
@@ -665,9 +665,9 @@
for(Uint32 i = 0; i < 12; i++)
{
- if(i == 6 || i == 8 || i == 10)
+ if(false && (i == 6 || i == 8 || i == 10))
continue;
-
+
BaseString name("bug_9749");
name.appfmt("_%d", i);
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2370) BUG#19293 | jonas | 18 May |