Below is the list of changes that have just been committed into a local
5.1-ndb repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1762 05/03/04 07:45:47 joreland@stripped +10 -0
wl1866 - ndb diskdata -
insert update read scan works ok.
still not working: multi operations, timeslice tup_commit, delete
ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
1.29 05/03/04 07:45:44 joreland@stripped +507 -0
Disk data routines
ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp
1.35 05/03/04 07:45:44 joreland@stripped +3 -3
Index don't setup disk part
ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
1.44 05/03/04 07:45:44 joreland@stripped +120 -27
Fix diskdata in TUPKEYREQ
ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp
1.6 05/03/04 07:45:44 joreland@stripped +13 -13
Unify callback names
Comment printouts
ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
1.29 05/03/04 07:45:43 joreland@stripped +31 -17
Fix commit of initial insert/update
(wo/ changing size)
ndb/src/kernel/blocks/dbtup/Dbtup.hpp
1.39 05/03/04 07:45:43 joreland@stripped +85 -17
Make inlines on need_expand and need_shrink
Add read/update functions for disk data
ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
1.56 05/03/04 07:45:43 joreland@stripped +136 -46
Add methods for preloading pages on pk and scan access
ndb/src/kernel/blocks/dblqh/Dblqh.hpp
1.40 05/03/04 07:45:43 joreland@stripped +19 -1
Add methods for preloading pages on pk and scan access
ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
1.22 05/03/04 07:45:43 joreland@stripped +1 -1
Increase size of global page pool
ndb/include/kernel/signaldata/TupKey.hpp
1.4 05/03/04 07:45:43 joreland@stripped +2 -1
Add diskpage to TUPKEYREQ
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: joreland
# Host: eel.hemma.oreland.se
# Root: /home/jonas/src/mysql-5.1-ndb-dd
--- 1.5/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp Tue Mar 1 19:04:25 2005
+++ 1.6/ndb/src/kernel/blocks/dbtup/DbtupDiskAlloc.cpp Fri Mar 4 07:45:44 2005
@@ -177,7 +177,7 @@
ndbassert(req.p->m_free_space >= sz);
if(i != idx)
{
- ndbout_c("moving page request %d -> %d", i, idx);
+ //ndbout_c("moving page request %d -> %d", i, idx);
Page_request_list old_list(c_page_request_pool, list[i]);
Page_request_list new_list(c_page_request_pool, list[idx]);
@@ -207,7 +207,7 @@
old_list.remove(extentPtr);
new_list.add(extentPtr);
extentPtr.p->m_free_matrix_pos= new_pos;
- ndbout_c("moving extent from %d to %d", old_pos, new_pos);
+ //ndbout_c("moving extent from %d to %d", old_pos, new_pos);
}
}
}
@@ -251,7 +251,7 @@
update_free_space(tabPtrP,*(PagePtr*)&page, alloc.m_dirty_pages, i, sz);
key->m_page_no= ((Page*)page.p)->m_page_no;
key->m_file_no= ((Page*)page.p)->m_file_no;
- ndbout_c("found dirty page");
+ //ndbout_c("found dirty page");
return 0; // Page in memory
}
}
@@ -272,7 +272,7 @@
req.p->m_ref_count++;
update_free_space(tabPtrP, req, alloc.m_page_requests, i, sz);
* key = req.p->m_key;
- ndbout_c("found transit page");
+ //ndbout_c("found transit page");
return 0;
}
}
@@ -407,7 +407,7 @@
preq.m_page = *key;
preq.m_callback.m_callbackData= req.i;
preq.m_callback.m_callbackFunction =
- safe_cast(&Dbtup::disk_alloc_page_callback);
+ safe_cast(&Dbtup::disk_page_alloc_callback);
int flags= Page_cache_client::ALLOC_REQ;
if(pageBits == 0)
@@ -415,7 +415,7 @@
//XXX empty page -> fast to map
flags |= Page_cache_client::EMPTY_PAGE;
preq.m_callback.m_callbackFunction =
- safe_cast(&Dbtup::disk_alloc_page_callback_initial);
+ safe_cast(&Dbtup::disk_page_alloc_initial_callback);
}
int res= m_pgman.get_page(signal, preq, flags);
@@ -432,10 +432,10 @@
}
void
-Dbtup::disk_alloc_page_callback(Signal* signal,
+Dbtup::disk_page_alloc_callback(Signal* signal,
Uint32 page_request, Uint32 page_id)
{
- ndbout_c("disk_alloc_page_callback id: %d", page_id);
+ //ndbout_c("disk_alloc_page_callback id: %d", page_id);
Ptr<Page_request> req;
c_page_request_pool.getPtr(req, page_request);
@@ -447,14 +447,14 @@
tabPtr.i= req.p->m_table_id;
ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
- disk_alloc_page_callback_common(signal, req, tabPtr, gpage);
+ disk_page_alloc_callback_common(signal, req, tabPtr, gpage);
}
void
-Dbtup::disk_alloc_page_callback_initial(Signal*signal , Uint32 page_request,
+Dbtup::disk_page_alloc_initial_callback(Signal*signal , Uint32 page_request,
Uint32 page_id)
{
- ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
+ //ndbout_c("disk_alloc_page_callback_initial id: %d", page_id);
/**
* 1) lookup page request
* 2) lookup page
@@ -484,11 +484,11 @@
{
abort();
}
- disk_alloc_page_callback_common(signal, req, tabPtr, gpage);
+ disk_page_alloc_callback_common(signal, req, tabPtr, gpage);
}
void
-Dbtup::disk_alloc_page_callback_common(Signal* signal,
+Dbtup::disk_page_alloc_callback_common(Signal* signal,
Ptr<Page_request> req,
Ptr<Tablerec> tabPtr,
Ptr<GlobalPage> pagePtr)
--- 1.3/ndb/include/kernel/signaldata/TupKey.hpp Wed Sep 15 16:57:12 2004
+++ 1.4/ndb/include/kernel/signaldata/TupKey.hpp Fri Mar 4 07:45:43 2005
@@ -36,7 +36,7 @@
friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16
receiverBlockNo);
public:
- STATIC_CONST( SignalLength = 15 );
+ STATIC_CONST( SignalLength = 16 );
private:
@@ -58,6 +58,7 @@
Uint32 coordinatorTC;
Uint32 tcOpIndex;
Uint32 savePointId;
+ Uint32 disk_page;
};
class TupKeyConf {
--- 1.21/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp Sun Jan 23 17:45:31 2005
+++ 1.22/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp Fri Mar 4 07:45:43 2005
@@ -92,7 +92,7 @@
addRecSignal(GSN_TESTSIG, &Cmvmi::execTESTSIG);
subscriberPool.setSize(5);
- m_global_page_pool.setSize(8);
+ m_global_page_pool.setSize(100);
const ndb_mgm_configuration_iterator * db = theConfig.getOwnConfigIterator();
for(unsigned j = 0; j<LogLevel::LOGLEVEL_CATEGORIES; j++){
--- 1.39/ndb/src/kernel/blocks/dblqh/Dblqh.hpp Tue Feb 15 14:11:31 2005
+++ 1.40/ndb/src/kernel/blocks/dblqh/Dblqh.hpp Fri Mar 4 07:45:43 2005
@@ -2543,9 +2543,27 @@
Dbacc* c_acc;
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
- void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32);
+ void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
+
+public:
void acckeyconf_load_diskpage_callback(Signal*, Uint32, Uint32);
+
+private:
+ void next_scanconf_load_diskpage(Signal* signal,
+ ScanRecordPtr scanPtr,
+ Ptr<TcConnectionrec> regTcPtr,
+ Fragrecord* fragPtrP);
+
+ void next_scanconf_tupkeyreq(Signal* signal, ScanRecordPtr,
+ TcConnectionrec * regTcPtr,
+ Fragrecord* fragPtrP,
+ Uint32 disk_page);
+
+public:
+ void next_scanconf_load_diskpage_callback(Signal* signal, Uint32, Uint32);
+
+private:
// ----------------------------------------------------------------
// These are variables handling the records. For most records one
--- 1.55/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp Tue Feb 15 14:11:31 2005
+++ 1.56/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp Fri Mar 4 07:45:43 2005
@@ -984,7 +984,8 @@
if (tabptr.p->tableStatus == Tablerec::NOT_DEFINED){
tabptr.p->tableStatus = Tablerec::ADD_TABLE_ONGOING;
tabptr.p->tableType = tableType;
- tabptr.p->primaryTableId = primaryTableId;
+ tabptr.p->primaryTableId =
+ (primaryTableId == RNIL ? tabptr.i : primaryTableId);
tabptr.p->schemaVersion = tschemaVersion;
tabptr.p->m_disk_table= 0;
}//if
@@ -3903,14 +3904,16 @@
ndbrequire(localKeyFlag == 1);
if(!regTcPtr->m_disk_table)
- acckeyconf_tupkeyreq(signal, regTcPtr, regFragptr.p, localKey1);
+ acckeyconf_tupkeyreq(signal, regTcPtr, regFragptr.p, localKey1, 0);
else
acckeyconf_load_diskpage(signal, tcConnectptr, regFragptr.p, localKey1);
}
void
Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
- Fragrecord* regFragptrP, Uint32 local_key)
+ Fragrecord* regFragptrP,
+ Uint32 local_key,
+ Uint32 disk_page)
{
regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
/* ------------------------------------------------------------------------
@@ -3972,6 +3975,7 @@
tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
tupKeyReq->savePointId = tcConnectptr.p->savePointId;
+ tupKeyReq->disk_page= disk_page;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
}//Dblqh::execACCKEYCONF()
@@ -3980,17 +3984,14 @@
Dblqh::acckeyconf_load_diskpage(Signal* signal, TcConnectionrecPtr regTcPtr,
Fragrecord* regFragptrP, Uint32 local_key)
{
- Callback cb = { safe_cast(&Dblqh::acckeyconf_load_diskpage_callback),
- regTcPtr.i };
int res;
if((res= c_tup->load_diskpage(signal,
- &cb,
regTcPtr.p->tupConnectrec,
regFragptrP->tupFragptr,
local_key,
0)) > 0)
{
- acckeyconf_tupkeyreq(signal, regTcPtr.p, regFragptrP, local_key);
+ acckeyconf_tupkeyreq(signal, regTcPtr.p, regFragptrP, local_key, res);
}
else if(res == 0)
{
@@ -4009,25 +4010,27 @@
void
Dblqh::acckeyconf_load_diskpage_callback(Signal* signal,
Uint32 callbackData,
- Uint32 returnCode)
+ Uint32 disk_page)
{
jamEntry();
tcConnectptr.i = callbackData;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if(returnCode == 0)
+ if(disk_page > 0)
{
FragrecordPtr fragPtr;
c_fragment_pool.getPtr(fragPtr, regTcPtr->fragmentptr);
- acckeyconf_tupkeyreq(signal, regTcPtr, fragPtr.p, regTcPtr->m_local_key);
+ acckeyconf_tupkeyreq(signal, regTcPtr, fragPtr.p,
+ regTcPtr->m_local_key,
+ disk_page);
}
else
{
TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
ref->userRef= callbackData;
- ref->errorCode= returnCode;
+ ref->errorCode= disk_page;
execTUPKEYREF(signal);
}
}
@@ -8136,49 +8139,130 @@
closeScanLab(signal);
return;
}//if
- jam();
- Uint32 tupFragPtr;
- Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
- reqinfo = reqinfo + (tcConnectptr.p->operation << 6);
- reqinfo = reqinfo + (tcConnectptr.p->opExec << 10);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_TUPKEY;
- fragptr.i = tcConnectptr.p->fragmentptr;
- c_fragment_pool.getPtr(fragptr);
- if (! scanptr.p->rangeScan) {
- tupFragPtr = fragptr.p->tupFragptr;
- } else {
+
+ Fragrecord* fragPtrP= fragptr.p;
+ if (scanptr.p->rangeScan) {
jam();
// for ordered index use primary table
- FragrecordPtr tFragPtr;
- tFragPtr.i = fragptr.p->tableFragptr;
- c_fragment_pool.getPtr(tFragPtr);
- tupFragPtr = tFragPtr.p->tupFragptr;
+ fragPtrP= c_fragment_pool.getPtr(fragPtrP->tableFragptr);
}
+
+ if(tcConnectptr.p->m_disk_table)
{
- jam();
- TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
+ next_scanconf_load_diskpage(signal, scanptr, tcConnectptr,fragPtrP);
+ }
+ else
+ {
+ next_scanconf_tupkeyreq(signal, scanptr, tcConnectptr.p, fragPtrP, 0);
+ }
+}
- tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
- tupKeyReq->request = reqinfo;
- tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0];
- tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
- tupKeyReq->attrBufLen = 0;
- tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
- tupKeyReq->applRef = scanptr.p->scanApiBlockref;
- tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
- tupKeyReq->transId1 = tcConnectptr.p->transid[0];
- tupKeyReq->transId2 = tcConnectptr.p->transid[1];
- tupKeyReq->fragPtr = tupFragPtr;
- tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
- tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
- tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
- tupKeyReq->savePointId = tcConnectptr.p->savePointId;
- Uint32 blockNo = refToBlock(tcConnectptr.p->tcTupBlockref);
- EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
- TupKeyReq::SignalLength);
+void
+Dblqh::next_scanconf_load_diskpage(Signal* signal,
+ ScanRecordPtr scanPtr,
+ Ptr<TcConnectionrec> regTcPtr,
+ Fragrecord* fragPtrP)
+{
+ jam();
+
+ int res;
+ Uint32 local_key;
+ local_key = scanptr.p->scanLocalref[0] << MAX_TUPLES_BITS;
+ local_key += scanptr.p->scanLocalref[1];
+
+ if((res= c_tup->load_diskpage_scan(signal,
+ regTcPtr.p->tupConnectrec,
+ fragPtrP->tupFragptr,
+ local_key,
+ 0)) > 0)
+ {
+ next_scanconf_tupkeyreq(signal, scanptr, regTcPtr.p, fragPtrP, res);
+ }
+ else if(res == 0)
+ {
+ regTcPtr.p->transactionState = TcConnectionrec::WAIT_TUP;
+ }
+ else
+ {
+ TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
+ ref->userRef= regTcPtr.i;
+ ref->errorCode= ~0;
+ execTUPKEYREF(signal);
+ }
+}
+
+void
+Dblqh::next_scanconf_load_diskpage_callback(Signal* signal,
+ Uint32 callbackData,
+ Uint32 disk_page)
+{
+ jamEntry();
+
+ Ptr<TcConnectionrec> regTcPtr;
+ regTcPtr.i= callbackData;
+ ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
+
+ ScanRecordPtr scanPtr;
+ c_scanRecordPool.getPtr(scanPtr, regTcPtr.p->tcScanRec);
+
+ if(disk_page > 0)
+ {
+ FragrecordPtr fragPtr;
+ c_fragment_pool.getPtr(fragPtr, regTcPtr.p->fragmentptr);
+
+ if (scanPtr.p->rangeScan) {
+ jam();
+ // for ordered index use primary table
+ fragPtr.p = c_fragment_pool.getPtr(fragPtr.p->tableFragptr);
+ }
+
+ next_scanconf_tupkeyreq(signal, scanPtr, regTcPtr.p, fragPtr.p, disk_page);
+ }
+ else
+ {
+ TupKeyRef * ref = (TupKeyRef *)signal->getDataPtr();
+ ref->userRef= callbackData;
+ ref->errorCode= disk_page;
+ execTUPKEYREF(signal);
}
}
+void
+Dblqh::next_scanconf_tupkeyreq(Signal* signal,
+ Ptr<ScanRecord> scanPtr,
+ TcConnectionrec * regTcPtr,
+ Fragrecord* fragPtrP,
+ Uint32 disk_page)
+{
+ jam();
+ Uint32 reqinfo = (scanPtr.p->scanLockHold == ZFALSE);
+ reqinfo = reqinfo + (regTcPtr->operation << 6);
+ reqinfo = reqinfo + (regTcPtr->opExec << 10);
+ regTcPtr->transactionState = TcConnectionrec::SCAN_TUPKEY;
+
+ TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
+
+ tupKeyReq->connectPtr = regTcPtr->tupConnectrec;
+ tupKeyReq->request = reqinfo;
+ tupKeyReq->keyRef1 = scanPtr.p->scanLocalref[0];
+ tupKeyReq->keyRef2 = scanPtr.p->scanLocalref[1];
+ tupKeyReq->attrBufLen = 0;
+ tupKeyReq->opRef = scanPtr.p->scanApiOpPtr;
+ tupKeyReq->applRef = scanPtr.p->scanApiBlockref;
+ tupKeyReq->storedProcedure = scanPtr.p->scanStoredProcId;
+ tupKeyReq->transId1 = regTcPtr->transid[0];
+ tupKeyReq->transId2 = regTcPtr->transid[1];
+ tupKeyReq->fragPtr = fragPtrP->tupFragptr;
+ tupKeyReq->primaryReplica = (regTcPtr->seqNoReplica == 0)?true:false;
+ tupKeyReq->coordinatorTC = regTcPtr->tcBlockref;
+ tupKeyReq->tcOpIndex = regTcPtr->tcOprec;
+ tupKeyReq->savePointId = regTcPtr->savePointId;
+ tupKeyReq->disk_page= disk_page;
+ Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
+ EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
+ TupKeyReq::SignalLength);
+}
+
/* -------------------------------------------------------------------------
* RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
* -------------------------------------------------------------------------
@@ -8679,7 +8763,13 @@
tcConnectptr.p->commitAckMarker = RNIL;
tcConnectptr.p->m_offset_current_keybuf = 0;
tcConnectptr.p->m_scan_curr_range_no = 0;
+ tcConnectptr.p->m_disk_table = tabptr.p->m_disk_table;
+ TablerecPtr tTablePtr;
+ tTablePtr.i = tabptr.p->primaryTableId;
+ ptrCheckGuard(tTablePtr, ctabrecFileSize, tablerec);
+ tcConnectptr.p->m_disk_table = tTablePtr.p->m_disk_table;
+
tabptr.p->usageCount++;
}//Dblqh::initScanTc()
--- 1.38/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Tue Mar 1 19:04:25 2005
+++ 1.39/ndb/src/kernel/blocks/dbtup/Dbtup.hpp Fri Mar 4 07:45:43 2005
@@ -719,7 +719,22 @@
Uint16 m_no_of_disk_attributes;
Uint16 noOfKeyAttr;
Uint16 noOfCharsets;
+
+ bool need_expand() const {
+ return m_no_of_attributes > m_attributes[MM].m_no_of_fixsize;
+ }
+
+ bool need_expand(bool disk) const {
+ return m_attributes[MM].m_no_of_varsize > 0 ||
+ (disk && m_no_of_disk_attributes > 0);
+ }
+ bool need_shrink() const {
+ return
+ m_attributes[MM].m_no_of_varsize > 0 ||
+ m_attributes[DD].m_no_of_varsize > 0;
+ }
+
/**
* Descriptors for MM and DD part
*/
@@ -937,9 +952,8 @@
*/
STATIC_CONST ( TUP_VERSION_MASK = 0xFFFF );
STATIC_CONST ( CHAINED_ROW = 0x10000 ); // Is var part on different page
- STATIC_CONST ( DISK_LOADED = 0x20000 ); // Is disk loaded
- STATIC_CONST ( DISK_INLINE = 0x40000 ); // Is disk directly after mm
- STATIC_CONST ( DISK_MOVED = 0x80000 ); // Has diskpart moved
+ STATIC_CONST ( DISK_ALLOC = 0x20000 ); // Is disk part allocated
+ STATIC_CONST ( DISK_INLINE = 0x40000 ); // Is disk inline
Uint32 get_tuple_version() const {
return m_header_bits & TUP_VERSION_MASK;
@@ -953,6 +967,10 @@
Uint32* get_null_bits(const Tablerec* tabPtrP) {
return m_null_bits+tabPtrP->m_offsets[MM].m_null_offset;
}
+
+ Uint32* get_null_bits(const Tablerec* tabPtrP, Uint32 mm) {
+ return m_null_bits+tabPtrP->m_offsets[mm].m_null_offset;
+ }
Uint32* get_var_part_ptr(const Tablerec* tabPtrP) {
return m_data + tabPtrP->m_offsets[MM].m_varpart_offset;
@@ -1114,8 +1132,11 @@
*/
bool tuxQueryTh(Uint32 fragPtrI, Uint32 tupAddr, Uint32 tupVersion, Uint32 transId1,
Uint32 transId2, Uint32 savePointId);
- int load_diskpage(Signal*, Callback*, Uint32 opRec, Uint32 fragPtrI,
+ int load_diskpage(Signal*, Uint32 opRec, Uint32 fragPtrI,
Uint32 local_key, Uint32 flags);
+
+ int load_diskpage_scan(Signal*, Uint32 opRec, Uint32 fragPtrI,
+ Uint32 local_key, Uint32 flags);
private:
BLOCK_DEFINES(Dbtup);
@@ -1335,10 +1356,9 @@
//------------------------------------------------------------------
//------------------------------------------------------------------
void execTUPKEYREQ(Signal* signal);
- void execTUPKEYREQ_page_callback(Signal*, Uint32 data, Uint32 errCode);
- void execTUPKEYREQ_page_retreived(Signal* signal, KeyReqStruct*,
- Operationrec*, Fragrecord*, Tablerec*);
-
+ void disk_page_load_callback(Signal*, Uint32 op, Uint32 page);
+ void disk_page_load_scan_callback(Signal*, Uint32 op, Uint32 page);
+
//------------------------------------------------------------------
//------------------------------------------------------------------
void execATTRINFO(Signal* signal);
@@ -1622,19 +1642,41 @@
bool readBitsNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
bool updateBitsNotNULL(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
- bool readFixedDiskNULLable(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
bool updateFixedNULLable(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
- bool readFixedDiskNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
bool updateFixedNotNull(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
- bool readVarDiskNULLable(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
bool updateVarNULLable(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
- bool readVarDiskNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct, AttributeHeader*,
Uint32);
bool updateVarNotNull(Uint32* inBuffer, KeyReqStruct *req_struct, Uint32);
+
+ bool readDiskFixedSizeNotNULL(Uint32* outBuffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2);
+
+ bool readDiskFixedSizeNULLable(Uint32* outBuffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2);
+ bool readDiskVarSizeNULLable(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
+ bool readDiskVarSizeNotNULL(Uint32* outBuffer, KeyReqStruct *req_struct,
AttributeHeader*, Uint32);
+
+ bool updateDiskFixedSizeNULLable(Uint32*, KeyReqStruct*, Uint32);
+ bool updateDiskFixedSizeNotNULL(Uint32*, KeyReqStruct*, Uint32);
+
+ bool updateDiskVarSizeNULLable(Uint32*, KeyReqStruct *, Uint32);
+ bool updateDiskVarSizeNotNULL(Uint32*, KeyReqStruct *, Uint32);
+
+ bool readDiskBitsNULLable(Uint32*, KeyReqStruct*, AttributeHeader*, Uint32);
+ bool readDiskBitsNotNULL(Uint32*, KeyReqStruct*, AttributeHeader*, Uint32);
+ bool updateDiskBitsNULLable(Uint32*, KeyReqStruct*, Uint32);
+ bool updateDiskBitsNotNULL(Uint32*, KeyReqStruct*, Uint32);
+
+
//------------------------------------------------------------------
//------------------------------------------------------------------
bool nullFlagCheck(KeyReqStruct *req_struct, Uint32 attrDes2);
+ bool disk_nullFlagCheck(KeyReqStruct *req_struct, Uint32 attrDes2);
Uint32 read_pseudo(Uint32 attrId,
KeyReqStruct *req_struct,
Uint32* outBuffer);
@@ -1912,7 +1954,8 @@
bool setup_read(KeyReqStruct* req_struct,
Operationrec* const regOperPtr,
Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr);
+ Tablerec* const regTabPtr,
+ bool disk);
bool getPageLastCommitted(Operationrec* const regOperPtr,
Operationrec* const leaderOpPtr);
@@ -2245,6 +2288,7 @@
Uint32* get_ptr(Var_part_ref);
Uint32* get_ptr(Ptr<Var_page>*, Var_part_ref);
Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*);
+ Uint32* get_ptr(PagePtr*, const Local_key*, const Tablerec*, Uint32 mm);
/**
* prealloc space from disk
@@ -2261,9 +2305,9 @@
void update_free_space(const Tablerec*, PagePtr,
DLList<Page>::Head list[], Uint32 i, Uint32 sz);
- void disk_alloc_page_callback(Signal*, Uint32 page_request, Uint32 page_id);
- void disk_alloc_page_callback_initial(Signal*, Uint32, Uint32);
- void disk_alloc_page_callback_common(Signal*,
+ void disk_page_alloc_callback(Signal*, Uint32 page_request, Uint32 page_id);
+ void disk_page_alloc_initial_callback(Signal*, Uint32, Uint32);
+ void disk_page_alloc_callback_common(Signal*,
Ptr<Page_request>,
Ptr<Tablerec>,
Ptr<GlobalPage>);
@@ -2286,7 +2330,7 @@
* Setup all pointer on keyreqstruct to prepare for read
* req_struct->m_tuple_ptr is set to tuple to read
*/
- void prepare_read(KeyReqStruct* req_struct, Tablerec* const regTabPtr);
+ void prepare_read(KeyReqStruct*, Tablerec* const, bool disk);
};
inline
@@ -2405,6 +2449,30 @@
else
return ((Fix_page*)tmp.p)->
get_ptr(key->m_page_idx, regTabPtr->m_offsets[MM].m_fix_header_size);
+}
+
+inline
+Uint32*
+Dbtup::get_ptr(PagePtr* pagePtr,
+ const Local_key* key, const Tablerec* regTabPtr, Uint32 mm)
+{
+ PagePtr tmp;
+ tmp.i= key->m_page_no;
+ if(mm == MM)
+ {
+ ptrCheckGuard(tmp, cnoOfPage, page);
+ }
+ else
+ {
+ tmp.p= (Page*)m_global_page_pool.getPtr(tmp.i);
+ }
+ memcpy(pagePtr, &tmp, sizeof(tmp));
+
+ if(regTabPtr->m_attributes[mm].m_no_of_varsize)
+ return ((Var_page*)tmp.p)->get_ptr(key->m_page_idx);
+ else
+ return ((Fix_page*)tmp.p)->
+ get_ptr(key->m_page_idx, regTabPtr->m_offsets[mm].m_fix_header_size);
}
NdbOut&
--- 1.28/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp Tue Mar 1 19:04:25 2005
+++ 1.29/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp Fri Mar 4 07:45:43 2005
@@ -205,33 +205,47 @@
res *= 1;
}
+ Uint32 copy_bits= copy->m_header_bits;
Ptr<GlobalPage> gpage;
m_global_page_pool.getPtr(gpage, (Uint32)res);
Uint32 *dst;
- if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+ Page* page= (Page*)m_pgman.m_ptr_p;
+ if(copy_bits & Tuple_header::DISK_ALLOC)
{
- Fix_page* page= (Fix_page*)m_pgman.m_ptr_p;
- Uint32 idx= page->alloc_record();
- page->uncommitted_used_space--;
- dst= page->get_ptr(idx, regTabPtr->m_offsets[DD].m_fix_header_size);
-
- disk_ptr->m_operation_ptr_i= RNIL;
- memcpy(dst, disk_ptr, 4*regTabPtr->m_offsets[DD].m_fix_header_size);
- req.m_page.m_page_idx= idx;
- memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr),
- &req.m_page, sizeof(Local_key));
-
- if (regTabPtr->checksumIndicator) {
- jam();
- setChecksum(tuple_ptr, regTabPtr);
+ if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+ {
+ req.m_page.m_page_idx= ((Fix_page*)page)->alloc_record();
+ page->uncommitted_used_space--;
+ }
+ else
+ {
+ page->uncommitted_used_space -= req.m_page.m_page_idx;
+ req.m_page.m_page_idx= ((Var_page*)page)->
+ alloc_record(req.m_page.m_page_idx, (Var_page*)ctemp_page);
}
}
+
+ if(regTabPtr->m_attributes[DD].m_no_of_varsize == 0)
+ {
+ dst= ((Fix_page*)page)->
+ get_ptr(req.m_page.m_page_idx,
+ regTabPtr->m_offsets[DD].m_fix_header_size);
+ }
else
{
- abort();
+ dst= ((Var_page*)page)->get_ptr(req.m_page.m_page_idx);
}
+ disk_ptr->m_operation_ptr_i= RNIL;
+ memcpy(dst, disk_ptr, 4*regTabPtr->m_offsets[DD].m_fix_header_size);
+ memcpy(tuple_ptr->get_disk_ref_ptr(regTabPtr),
+ &req.m_page, sizeof(Local_key));
+
+ if (regTabPtr->checksumIndicator) {
+ jam();
+ setChecksum(tuple_ptr, regTabPtr);
+ }
}
tuple_ptr->m_operation_ptr_i= save;
@@ -481,7 +495,7 @@
Local_key tmp= befOpPtr.p->m_tuple_location;
Uint32 tmpType= befOpPtr.p->op_struct.op_type;
setup_fixed_part(req_struct, befOpPtr.p, regTabPtr);
- prepare_read(req_struct, regTabPtr);
+ prepare_read(req_struct, regTabPtr, false);
befOpPtr.p->op_struct.op_type= ZDELETE;
req_struct->changeMask.clear();
--- 1.43/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp Tue Mar 1 19:04:25 2005
+++ 1.44/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp Fri Mar 4 07:45:44 2005
@@ -331,14 +331,15 @@
Dbtup::setup_read(KeyReqStruct *req_struct,
Operationrec* const regOperPtr,
Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
+ Tablerec* const regTabPtr,
+ bool disk)
{
OperationrecPtr currOpPtr;
currOpPtr.i= req_struct->m_tuple_ptr->m_operation_ptr_i;
if (currOpPtr.i == RNIL)
{
- if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
- prepare_read(req_struct, regTabPtr);
+ if (regTabPtr->need_expand(disk))
+ prepare_read(req_struct, regTabPtr, disk);
return true;
}
@@ -398,8 +399,8 @@
c_undo_buffer.get_ptr(&currOpPtr.p->m_copy_tuple_location);
}
- if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
- prepare_read(req_struct, regTabPtr);
+ if (regTabPtr->need_expand(disk))
+ prepare_read(req_struct, regTabPtr, disk);
#if 0
ndbout_c("reading copy");
@@ -582,7 +583,7 @@
int
Dbtup::load_diskpage(Signal* signal,
- Callback* cb, Uint32 opRec, Uint32 fragPtrI,
+ Uint32 opRec, Uint32 fragPtrI,
Uint32 local_key, Uint32 flags)
{
c_operation_pool.getPtr(operPtr, opRec);
@@ -614,20 +615,84 @@
Tuple_header* ptr= (Tuple_header*)tmp;
int res;
- if(ptr->m_operation_ptr_i == RNIL)
+ Uint32 opPtr= ptr->m_operation_ptr_i;
+ if(opPtr == RNIL)
{
- // We're first
- ptr->m_operation_ptr_i= opRec;
+ // Make sure we maintain order
+ flags |= Page_cache_client::STRICT_ORDER;
+ }
+
+ Page_cache_client::Request req;
+ memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
+ req.m_callback.m_callbackData= opRec;
+ req.m_callback.m_callbackFunction=
+ safe_cast(&Dbtup::disk_page_load_callback);
+
+ if((res= m_pgman.get_page(signal, req, flags)) > 0)
+ {
+ ndbout_c("in cache");
+ // In cache
}
+ else if(res == 0)
+ {
+ ndbout_c("waiting for callback");
+ // set state
+ }
else
{
+ // Error
+ }
+ return res;
+}
+
+void
+Dbtup::disk_page_load_callback(Signal* signal, Uint32 opRec, Uint32 page_id)
+{
+ c_operation_pool.getPtr(operPtr, opRec);
+ c_lqh->acckeyconf_load_diskpage_callback(signal,
+ operPtr.p->userpointer, page_id);
+}
+
+int
+Dbtup::load_diskpage_scan(Signal* signal,
+ Uint32 opRec, Uint32 fragPtrI,
+ Uint32 local_key, Uint32 flags)
+{
+ c_operation_pool.getPtr(operPtr, opRec);
+ fragptr.i= fragPtrI;
+ ptrCheckGuard(fragptr, cnoOfFragrec, fragrecord);
+
+ Operationrec * regOperPtr= operPtr.p;
+ Fragrecord * regFragPtr= fragptr.p;
+
+ tabptr.i = regFragPtr->fragTableId;
+ ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
+ Tablerec* regTabPtr = tabptr.p;
+
+ jam();
+ Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
+ Uint32 frag_page_id= local_key >> MAX_TUPLES_BITS;
+ regOperPtr->m_tuple_location.m_page_no= getRealpid(regFragPtr,
+ frag_page_id);
+ regOperPtr->m_tuple_location.m_page_idx= page_idx;
+
+ PagePtr page_ptr;
+ Uint32* tmp= get_ptr(&page_ptr, ®OperPtr->m_tuple_location, regTabPtr);
+ Tuple_header* ptr= (Tuple_header*)tmp;
+
+ int res;
+ Uint32 opPtr= ptr->m_operation_ptr_i;
+ if(opPtr == RNIL)
+ {
// Make sure we maintain order
flags |= Page_cache_client::STRICT_ORDER;
}
Page_cache_client::Request req;
memcpy(&req.m_page, ptr->get_disk_ref_ptr(regTabPtr), sizeof(Local_key));
- memcpy(&req.m_callback, cb, sizeof(* cb));
+ req.m_callback.m_callbackData= opRec;
+ req.m_callback.m_callbackFunction=
+ safe_cast(&Dbtup::disk_page_load_scan_callback);
if((res= m_pgman.get_page(signal, req, flags)) > 0)
{
@@ -646,6 +711,15 @@
return res;
}
+void
+Dbtup::disk_page_load_scan_callback(Signal* signal,
+ Uint32 opRec, Uint32 page_id)
+{
+ c_operation_pool.getPtr(operPtr, opRec);
+ c_lqh->next_scanconf_load_diskpage_callback(signal,
+ operPtr.p->userpointer, page_id);
+}
+
void Dbtup::execTUPKEYREQ(Signal* signal)
{
TupKeyReq * const tupKeyReq= (TupKeyReq *)signal->getDataPtr();
@@ -705,7 +779,7 @@
sig1= tupKeyReq->savePointId;
sig2= tupKeyReq->primaryReplica;
sig3= tupKeyReq->keyRef2;
-
+
regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2;
regOperPtr->m_tuple_location.m_page_idx= sig3;
@@ -719,17 +793,20 @@
req_struct.TC_index= sig2;
req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4;
-
+
sig1= tupKeyReq->attrBufLen;
sig2= tupKeyReq->applRef;
sig3= tupKeyReq->transId1;
sig4= tupKeyReq->transId2;
+ Uint32 disk_page= tupKeyReq->disk_page;
+
req_struct.log_size= sig1;
req_struct.attrinfo_len= sig1;
req_struct.rec_blockref= sig2;
req_struct.trans_id1= sig3;
req_struct.trans_id2= sig4;
+ req_struct.m_disk_page_ptr.i= disk_page;
Uint32 Roptype = regOperPtr->op_struct.op_type;
if (Rstoredid != ZNIL) {
@@ -760,7 +837,7 @@
if (Roptype == ZREAD) {
jam();
- if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr))
+ if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr, true))
{
if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1)
{
@@ -1020,7 +1097,7 @@
tup_version= prevOp->tupVersion;
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
-
+
/**
* Check consistency before update/delete
*/
@@ -1041,7 +1118,7 @@
Uint32 sizes[4];
Uint64 cmp[2];
};
- if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
+ if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr);
else
memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
@@ -1065,7 +1142,7 @@
return -1;
}
- if (regTabPtr->var_sized_record)
+ if (regTabPtr->need_shrink())
{
shrink_tuple(req_struct, sizes+2, regTabPtr);
if (cmp[0] != cmp[1] && !handle_size_change_after_update(req_struct,
@@ -1126,7 +1203,12 @@
pos += AttributeDescriptor::getSizeInBytes(* (Uint32*)tab_descr);
tab_descr += 2;
}
+ }
+ else
+ {
+ ptr -= Tuple_header::HeaderSize;
}
+
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
@@ -1146,7 +1228,7 @@
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
- req_struct->m_tuple_ptr->m_header_bits= 0;
+ req_struct->m_tuple_ptr->m_header_bits= Tuple_header::DISK_ALLOC;
}
int Dbtup::handleInsertReq(Signal* signal,
@@ -1188,7 +1270,7 @@
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
- if (regTabPtr->var_sized_record || regTabPtr->m_no_of_disk_attributes)
+ if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr);
else
memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
@@ -1205,7 +1287,7 @@
return -1;
}
- if (regTabPtr->var_sized_record)
+ if (regTabPtr->need_shrink())
{
shrink_tuple(req_struct, sizes+2, regTabPtr);
}
@@ -2263,6 +2345,7 @@
Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
Uint32 *dst_ptr= ptr->get_var_part_ptr(tabPtrP);
+ const Uint32 *disk_ref= src->get_disk_ref_ptr(tabPtrP);
const Uint32 *src_ptr= src->get_var_part_ptr(tabPtrP);
const Uint32 *desc= (Uint32*)req_struct->attr_descr;
desc += (tabPtrP->m_attributes[MM].m_no_of_fixsize << 1);
@@ -2291,8 +2374,12 @@
ndbassert(dst_ptr == ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset));
src_ptr = ALIGN_WORD(((char*)src_ptr)+step);
sizes[MM]= 4*(Tuple_header::HeaderSize + tabPtrP->m_offsets[MM].m_varpart_offset)
+ ((Uint16*)src_data)[mm_vars] + (mm_vars * 2) + 2;
+ }
+ else
+ {
+ dst_ptr -= Tuple_header::HeaderSize;
}
-
+
sizes[DD]= 0;
if(dd_tot)
{
@@ -2306,10 +2393,12 @@
}
else
{
- // m_disk_tuple_location is already set
- // XXX
- //src_ptr= get_disk_ptr(req_struct, &req_struct->m_disk_tuple_location);
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no= req_struct->m_disk_page_ptr.i;
+ src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
}
+
// Fix diskpart
req_struct->m_disk_ptr= (Tuple_header*)dst_ptr;
memcpy(dst_ptr, src_ptr, 4*tabPtrP->m_offsets[DD].m_fix_header_size);
@@ -2332,7 +2421,8 @@
}
void
-Dbtup::prepare_read(KeyReqStruct* req_struct, Tablerec* const tabPtrP)
+Dbtup::prepare_read(KeyReqStruct* req_struct,
+ Tablerec* const tabPtrP, bool disk)
{
Tuple_header* ptr= req_struct->m_tuple_ptr;
@@ -2341,6 +2431,7 @@
Uint16 mm_vars= tabPtrP->m_attributes[MM].m_no_of_varsize;
const Uint32 *src_ptr= ptr->get_var_part_ptr(tabPtrP);
+ const Uint32 *disk_ref= ptr->get_disk_ref_ptr(tabPtrP);
if(mm_vars)
{
@@ -2358,7 +2449,7 @@
src_ptr = ALIGN_WORD(dst->m_data_ptr + dst->m_max_var_offset);
}
- if(dd_tot)
+ if(disk && dd_tot)
{
const Uint16 dd_vars= tabPtrP->m_attributes[DD].m_no_of_varsize;
@@ -2369,9 +2460,11 @@
}
else
{
- // m_disk_tuple_location is already set
// XXX
- //src_ptr= get_disk_ptr(req_struct, &req_struct->m_disk_tuple_location);
+ Local_key key;
+ memcpy(&key, disk_ref, sizeof(key));
+ key.m_page_no= req_struct->m_disk_page_ptr.i;
+ src_ptr= get_ptr(&req_struct->m_disk_page_ptr, &key, tabPtrP, DD);
}
// Fix diskpart
req_struct->m_disk_ptr= (Tuple_header*)src_ptr;
--- 1.34/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp Wed Feb 16 13:31:07 2005
+++ 1.35/ndb/src/kernel/blocks/dbtup/DbtupIndex.cpp Fri Mar 4 07:45:44 2005
@@ -182,7 +182,7 @@
fragptr= fragPtr;
operPtr.i= RNIL;
operPtr.p= NULL;
- prepare_read(&req_struct, tablePtr.p);
+ prepare_read(&req_struct, tablePtr.p, false);
// do it
int ret = readAttributes(&req_struct,
@@ -227,7 +227,7 @@
req_struct.m_tuple_ptr= (Tuple_header*)
c_undo_buffer.get_ptr(&opPtrP->m_copy_tuple_location);
}
- prepare_read(&req_struct, tablePtr.p);
+ prepare_read(&req_struct, tablePtr.p, false);
const Uint32* attrIds= &tableDescriptor[tablePtr.p->readKeyArray].tabDescr;
const Uint32 numAttrs= tablePtr.p->noOfKeyAttr;
@@ -328,7 +328,7 @@
req_struct.dirty_op= 1;
setup_fixed_part(&req_struct, &tempOp, tablePtr.p);
- if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p)) {
+ if (setup_read(&req_struct, &tempOp, fragPtr.p, tablePtr.p, false)) {
/*
* We use the normal getPage which will return the tuple to be used
* for this transaction and savepoint id. If its tuple version
--- 1.28/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp Thu Feb 24 01:40:28 2005
+++ 1.29/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp Fri Mar 4 07:45:44 2005
@@ -119,6 +119,35 @@
} else {
ndbrequire(false);
}
+ if(AttributeDescriptor::getDiskBased(attrDescr))
+ {
+ ReadFunction r[] = {
+ &Dbtup::readDiskBitsNotNULL,
+ &Dbtup::readDiskBitsNULLable,
+ &Dbtup::readDiskFixedSizeNotNULL,
+ &Dbtup::readDiskFixedSizeNULLable,
+ &Dbtup::readDiskVarSizeNULLable,
+ &Dbtup::readDiskVarSizeNotNULL
+ };
+ UpdateFunction u[] = {
+ &Dbtup::updateDiskBitsNotNULL,
+ &Dbtup::updateDiskBitsNULLable,
+ &Dbtup::updateDiskFixedSizeNotNULL,
+ &Dbtup::updateDiskFixedSizeNULLable,
+ &Dbtup::updateDiskVarSizeNULLable,
+ &Dbtup::updateDiskVarSizeNotNULL
+ };
+ Uint32 a=
+ AttributeDescriptor::getArrayType(attrDescr)==ZFIXED_ARRAY ? 2 : 4;
+
+ if(AttributeDescriptor::getArraySize(attrDescr) == 0)
+ a= 0;
+
+ Uint32 b=
+ AttributeDescriptor::getNullable(attrDescr)? 1 : 0;
+ regTabPtr->readFunctionArray[i]= r[a+b];
+ regTabPtr->updateFunctionArray[i]= u[a+b];
+ }
} else {
if (AttributeDescriptor::getArrayType(attrDescr) == ZFIXED_ARRAY) {
ljam();
@@ -423,6 +452,16 @@
}
bool
+Dbtup::disk_nullFlagCheck(KeyReqStruct *req_struct, Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr= tabptr.p;
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr);
+ Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+
+ return BitmaskImpl::get(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+}
+
+bool
Dbtup::readVarSizeNotNULL(Uint32* out_buffer,
KeyReqStruct *req_struct,
AttributeHeader* ah_out,
@@ -502,6 +541,153 @@
return false;
}//Dbtup::readDynBigVarSize()
+bool
+Dbtup::readDiskFixedSizeNotNULL(Uint32* outBuffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2)
+{
+ Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+ Uint32 indexBuf= req_struct->out_buf_index;
+ Uint32 readOffset= AttributeOffset::getOffset(attrDes2);
+ Uint32 attrNoOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+ Uint32 maxRead= req_struct->max_read;
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+
+ ndbrequire((readOffset + attrNoOfWords - 1) < req_struct->check_offset);
+ if (! charsetFlag || ! req_struct->xfrm_flag) {
+ Uint32 newIndexBuf = indexBuf + attrNoOfWords;
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize(attrNoOfWords);
+ MEMCOPY_NO_WORDS(&outBuffer[indexBuf],
+ &tuple_header[readOffset],
+ attrNoOfWords);
+ req_struct->out_buf_index = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }//if
+ } else {
+ ljam();
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+ const uchar* srcPtr = (uchar*)&tuple_header[readOffset];
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 lb, len;
+ bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
+ Uint32 xmul = cs->strxfrm_multiply;
+ if (xmul == 0)
+ xmul = 1;
+ Uint32 dstLen = xmul * (srcBytes - lb);
+ Uint32 maxIndexBuf = indexBuf + (dstLen >> 2);
+ if (maxIndexBuf <= maxRead && ok) {
+ ljam();
+ uchar* dstPtr = (uchar*)&outBuffer[indexBuf];
+ const char* ssrcPtr = (const char*)srcPtr;
+ int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
+ ndbrequire(n != -1);
+ while ((n & 3) != 0) {
+ dstPtr[n++] = 0;
+ }
+ Uint32 dstWords = (n >> 2);
+ ahOut->setDataSize(dstWords);
+ Uint32 newIndexBuf = indexBuf + dstWords;
+ ndbrequire(newIndexBuf <= maxRead);
+ req_struct->out_buf_index = newIndexBuf;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ }
+ }
+ return false;
+}
+
+bool
+Dbtup::readDiskFixedSizeNULLable(Uint32* outBuffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2)
+{
+ if (!disk_nullFlagCheck(req_struct, attrDes2)) {
+ ljam();
+ return readDiskFixedSizeNotNULL(outBuffer,
+ req_struct,
+ ahOut,
+ attrDes2);
+ } else {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }
+}
+
+bool
+Dbtup::readDiskVarSizeNotNULL(Uint32* out_buffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ah_out,
+ Uint32 attr_des2)
+{
+ Uint32 attr_descriptor, index_buf, var_index;
+ Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
+ Uint32 var_attr_pos, max_read;
+
+ Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+ var_index= AttributeOffset::getOffset(attr_des2);
+ var_attr_pos= req_struct->m_var_data[DD].m_offset_array_ptr[var_index];
+ vsize_in_bytes= req_struct->m_var_data[DD].m_offset_array_ptr[var_index+idx] -
var_attr_pos;
+ attr_descriptor= req_struct->attr_descriptor;
+ index_buf= req_struct->out_buf_index;
+ max_var_size= AttributeDescriptor::getSizeInWords(attr_descriptor);
+ max_read= req_struct->max_read;
+ vsize_in_words= convert_byte_to_word_size(vsize_in_bytes);
+ new_index= index_buf + vsize_in_words;
+
+ ndbrequire(vsize_in_words <= max_var_size);
+ if (new_index <= max_read) {
+ ljam();
+ ah_out->setDataSize(vsize_in_words);
+ memcpy(out_buffer+index_buf,
+ req_struct->m_var_data[DD].m_data_ptr+var_attr_pos,
+ vsize_in_bytes);
+ char* rest= (char*)(out_buffer+index_buf);
+ memset(rest+vsize_in_bytes, 0, 4*vsize_in_words - vsize_in_bytes);
+ req_struct->out_buf_index= new_index;
+ return true;
+ } else {
+ ljam();
+ terrorCode= ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }
+}
+
+bool
+Dbtup::readDiskVarSizeNULLable(Uint32* outBuffer,
+ KeyReqStruct *req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2)
+{
+ if (!disk_nullFlagCheck(req_struct, attrDes2)) {
+ ljam();
+ return readDiskVarSizeNotNULL(outBuffer,
+ req_struct,
+ ahOut,
+ attrDes2);
+ } else {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }
+}
+
+
/* ---------------------------------------------------------------------- */
/* THIS ROUTINE IS USED TO UPDATE A NUMBER OF ATTRIBUTES. IT IS */
/* USED BY THE INSERT ROUTINE, THE UPDATE ROUTINE AND IT CAN BE */
@@ -1055,6 +1241,327 @@
{
ljam();
BitmaskImpl::set(regTabPtr->m_offsets[MM].m_null_words, bits, pos);
+
+ req_struct->in_buf_index = newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }//if
+ }//if
+}
+
+bool
+Dbtup::updateDiskFixedSizeNotNULL(Uint32* inBuffer,
+ KeyReqStruct *req_struct,
+ Uint32 attrDes2)
+{
+ Uint32 attrDescriptor= req_struct->attr_descriptor;
+ Uint32 indexBuf= req_struct->in_buf_index;
+ Uint32 inBufLen= req_struct->in_buf_len;
+ Uint32 updateOffset= AttributeOffset::getOffset(attrDes2);
+ Uint32 charsetFlag = AttributeOffset::getCharsetFlag(attrDes2);
+
+ AttributeHeader ahIn(inBuffer[indexBuf]);
+ Uint32 noOfWords= AttributeDescriptor::getSizeInWords(attrDescriptor);
+ Uint32 nullIndicator= ahIn.isNULL();
+ Uint32 newIndex= indexBuf + noOfWords + 1;
+ Uint32 *tuple_header= req_struct->m_disk_ptr->m_data;
+ ndbrequire((updateOffset + noOfWords - 1) < req_struct->check_offset);
+
+ if (newIndex <= inBufLen) {
+ if (!nullIndicator) {
+ ljam();
+ if (charsetFlag) {
+ ljam();
+ Tablerec* regTabPtr = tabptr.p;
+ Uint32 typeId = AttributeDescriptor::getType(attrDescriptor);
+ Uint32 bytes = AttributeDescriptor::getSizeInBytes(attrDescriptor);
+ Uint32 i = AttributeOffset::getCharsetPos(attrDes2);
+ ndbrequire(i < regTabPtr->noOfCharsets);
+ // not const in MySQL
+ CHARSET_INFO* cs = regTabPtr->charsetArray[i];
+ const char* ssrc = (const char*)&inBuffer[indexBuf + 1];
+ Uint32 lb, len;
+ if (! NdbSqlUtil::get_var_length(typeId, ssrc, bytes, lb, len)) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
+ // fast fix bug#7340
+ if (typeId != NDB_TYPE_TEXT &&
+ (*cs->cset->well_formed_len)(cs, ssrc + lb, ssrc + lb + len, ZNIL) != len) {
+ ljam();
+ terrorCode = ZINVALID_CHAR_FORMAT;
+ return false;
+ }
+ }
+ req_struct->in_buf_index= newIndex;
+ MEMCOPY_NO_WORDS(&tuple_header[updateOffset],
+ &inBuffer[indexBuf + 1],
+ noOfWords);
+ return true;
+ } else {
+ ljam();
+ terrorCode= ZNOT_NULL_ATTR;
+ return false;
+ }
+ } else {
+ ljam();
+ terrorCode= ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }
+}
+
+bool
+Dbtup::updateDiskFixedSizeNULLable(Uint32* inBuffer,
+ KeyReqStruct *req_struct,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr= tabptr.p;
+ AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+ Uint32 nullIndicator= ahIn.isNULL();
+ Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+
+ if (!nullIndicator) {
+ ljam();
+ BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+ return updateDiskFixedSizeNotNULL(inBuffer,
+ req_struct,
+ attrDes2);
+ } else {
+ Uint32 newIndex= req_struct->in_buf_index + 1;
+ if (newIndex <= req_struct->in_buf_len) {
+ BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+ ljam();
+ req_struct->in_buf_index= newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode= ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }
+ }
+}
+
+bool
+Dbtup::updateDiskVarSizeNotNULL(Uint32* in_buffer,
+ KeyReqStruct *req_struct,
+ Uint32 attr_des2)
+{
+ Uint32 attr_descriptor, index_buf, in_buf_len, var_index, null_ind;
+ Uint32 vsize_in_bytes, vsize_in_words, new_index, max_var_size;
+ Uint32 var_attr_pos;
+ char *var_data_start;
+ Uint16 *vpos_array;
+
+ attr_descriptor= req_struct->attr_descriptor;
+ index_buf= req_struct->in_buf_index;
+ in_buf_len= req_struct->in_buf_len;
+ var_index= AttributeOffset::getOffset(attr_des2);
+ AttributeHeader ahIn(in_buffer[index_buf]);
+ null_ind= ahIn.isNULL();
+ vsize_in_words= ahIn.getDataSize();
+ max_var_size= AttributeDescriptor::getSizeInBytes(attr_descriptor);
+ new_index= index_buf + vsize_in_words + 1;
+ vpos_array= req_struct->m_var_data[DD].m_offset_array_ptr;
+ Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+ Uint32 check_offset= req_struct->m_var_data[DD].m_max_var_offset;
+
+ if (new_index <= in_buf_len && vsize_in_words <= max_var_size) {
+ if (!null_ind) {
+ ljam();
+ int size_in_bytes= 4*vsize_in_words > max_var_size ? max_var_size :
4*vsize_in_words;
+
+ var_attr_pos= vpos_array[var_index];
+ var_data_start= req_struct->m_var_data[DD].m_data_ptr;
+ vpos_array[var_index+idx]= var_attr_pos+size_in_bytes;
+ req_struct->in_buf_index= new_index;
+
+ ndbrequire(var_attr_pos+vsize_in_words*4 <= check_offset);
+ memcpy(var_data_start+var_attr_pos, &in_buffer[index_buf + 1],
+ size_in_bytes);
+ return true;
+ } else {
+ ljam();
+ terrorCode= ZNOT_NULL_ATTR;
+ return false;
+ }
+ } else {
+ ljam();
+ terrorCode= ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }
+}
+
+bool
+Dbtup::updateDiskVarSizeNULLable(Uint32* inBuffer,
+ KeyReqStruct *req_struct,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr= tabptr.p;
+ AttributeHeader ahIn(inBuffer[req_struct->in_buf_index]);
+ Uint32 nullIndicator= ahIn.isNULL();
+ Uint32 pos= AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+ Uint32 idx= req_struct->m_var_data[DD].m_var_len_offset;
+
+ if (!nullIndicator) {
+ ljam();
+ BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+ return updateDiskVarSizeNotNULL(inBuffer,
+ req_struct,
+ attrDes2);
+ } else {
+ Uint32 newIndex= req_struct->in_buf_index + 1;
+ Uint32 var_index= AttributeOffset::getOffset(attrDes2);
+ Uint32 var_pos= req_struct->var_pos_array[var_index];
+ if (newIndex <= req_struct->in_buf_len) {
+ ljam();
+ BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+ req_struct->var_pos_array[var_index+idx]= var_pos;
+ req_struct->in_buf_index= newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode= ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }
+ }
+}
+
+bool
+Dbtup::readDiskBitsNotNULL(Uint32* outBuffer,
+ KeyReqStruct* req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount =
+ AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+ Uint32 indexBuf = req_struct->out_buf_index;
+ Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 maxRead = req_struct->max_read;
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize((bitCount + 31) >> 5);
+ req_struct->out_buf_index = newIndexBuf;
+
+ BitmaskImpl::getField(regTabPtr->m_offsets[DD].m_null_words, bits, pos,
+ bitCount, outBuffer+indexBuf);
+
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::readDiskBitsNULLable(Uint32* outBuffer,
+ KeyReqStruct* req_struct,
+ AttributeHeader* ahOut,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount =
+ AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+
+ Uint32 indexBuf = req_struct->out_buf_index;
+ Uint32 newIndexBuf = indexBuf + ((bitCount + 31) >> 5);
+ Uint32 maxRead = req_struct->max_read;
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+
+ if(BitmaskImpl::get(regTabPtr->m_offsets[DD].m_null_words, bits, pos))
+ {
+ ljam();
+ ahOut->setNULL();
+ return true;
+ }
+
+ if (newIndexBuf <= maxRead) {
+ ljam();
+ ahOut->setDataSize((bitCount + 31) >> 5);
+ req_struct->out_buf_index = newIndexBuf;
+ BitmaskImpl::getField(regTabPtr->m_offsets[DD].m_null_words, bits, pos+1,
+ bitCount, outBuffer+indexBuf);
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZTRY_TO_READ_TOO_MUCH_ERROR;
+ return false;
+ }//if
+}
+
+bool
+Dbtup::updateDiskBitsNotNULL(Uint32* inBuffer,
+ KeyReqStruct* req_struct,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 indexBuf = req_struct->in_buf_index;
+ Uint32 inBufLen = req_struct->in_buf_len;
+ AttributeHeader ahIn(inBuffer[indexBuf]);
+ Uint32 nullIndicator = ahIn.isNULL();
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount =
+ AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+ Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr, DD);
+
+ if (newIndex <= inBufLen) {
+ if (!nullIndicator) {
+ BitmaskImpl::setField(regTabPtr->m_offsets[DD].m_null_words, bits, pos,
+ bitCount, inBuffer+indexBuf+1);
+ req_struct->in_buf_index = newIndex;
+ return true;
+ } else {
+ ljam();
+ terrorCode = ZNOT_NULL_ATTR;
+ return false;
+ }//if
+ } else {
+ ljam();
+ terrorCode = ZAI_INCONSISTENCY_ERROR;
+ return false;
+ }//if
+ return true;
+}
+
+bool
+Dbtup::updateDiskBitsNULLable(Uint32* inBuffer,
+ KeyReqStruct* req_struct,
+ Uint32 attrDes2)
+{
+ Tablerec* const regTabPtr = tabptr.p;
+ Uint32 indexBuf = req_struct->in_buf_index;
+ AttributeHeader ahIn(inBuffer[indexBuf]);
+ Uint32 nullIndicator = ahIn.isNULL();
+ Uint32 pos = AttributeOffset::getNullFlagPos(attrDes2);
+ Uint32 bitCount =
+ AttributeDescriptor::getArraySize(req_struct->attr_descriptor);
+ Uint32 *bits= req_struct->m_disk_ptr->get_null_bits(regTabPtr);
+
+ if (!nullIndicator) {
+ BitmaskImpl::clear(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
+ BitmaskImpl::setField(regTabPtr->m_offsets[DD].m_null_words, bits, pos+1,
+ bitCount, inBuffer+indexBuf+1);
+
+ Uint32 newIndex = indexBuf + 1 + ((bitCount + 31) >> 5);
+ req_struct->in_buf_index = newIndex;
+ return true;
+ } else {
+ Uint32 newIndex = indexBuf + 1;
+ if (newIndex <= req_struct->in_buf_len)
+ {
+ ljam();
+ BitmaskImpl::set(regTabPtr->m_offsets[DD].m_null_words, bits, pos);
req_struct->in_buf_index = newIndex;
return true;
| Thread |
|---|
| • bk commit into 5.1-ndb tree (joreland:1.1762) | jonas.oreland | 4 Mar |