Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2006-08-24 07:21:10+02:00, jonas@stripped +2 -0
Merge perch.ndb.mysql.com:/home/jonas/src/50-work
into perch.ndb.mysql.com:/home/jonas/src/51-work
MERGE: 1.1810.1696.119
sql/ha_ndbcluster.cc@stripped, 2006-08-24 07:20:41+02:00, jonas@stripped +0 -0
Auto merged
MERGE: 1.175.1.99
storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2006-08-24 07:21:00+02:00, jonas@stripped +0 -9
MERGE: 1.66.12.2
storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2006-08-24 07:20:40+02:00, jonas@stripped +0 -0
Merge rename: ndb/src/ndbapi/NdbScanOperation.cpp -> storage/ndb/src/ndbapi/NdbScanOperation.cpp
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: jonas
# Host: perch.ndb.mysql.com
# Root: /home/jonas/src/51-work/RESYNC
--- 1.66.12.1/ndb/src/ndbapi/NdbScanOperation.cpp 2006-08-24 07:21:14 +02:00
+++ 1.92/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2006-08-24 07:21:14 +02:00
@@ -50,6 +50,7 @@
m_receivers = 0;
m_array = new Uint32[1]; // skip if on delete in fix_receivers
theSCAN_TABREQ = 0;
+ m_executed = false;
}
NdbScanOperation::~NdbScanOperation()
@@ -111,6 +112,7 @@
theNdbCon->theMagicNumber = 0xFE11DF;
theNoOfTupKeyLeft = tab->m_noOfDistributionKeys;
m_read_range_no = 0;
+ m_executed = false;
return 0;
}
@@ -162,6 +164,16 @@
}
m_keyInfo = ((scan_flags & SF_KeyInfo) || lockExcl) ? 1 : 0;
+ bool tupScan = (scan_flags & SF_TupScan);
+
+#if 1 // XXX temp for testing
+ { char* p = getenv("NDB_USE_TUPSCAN");
+ if (p != 0) {
+ unsigned n = atoi(p); // 0-10
+ if ((unsigned int) (::time(0) % 10) < n) tupScan = true;
+ }
+ }
+#endif
bool rangeScan = false;
if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex)
@@ -177,12 +189,9 @@
theStatus = GetValue;
theOperationType = OpenRangeScanRequest;
rangeScan = true;
- }
-
- bool tupScan = (scan_flags & SF_TupScan);
- if (tupScan && rangeScan)
tupScan = false;
-
+ }
+
if (rangeScan && (scan_flags & SF_OrderBy))
parallel = fragCount;
@@ -202,7 +211,7 @@
theSCAN_TABREQ->setSignal(GSN_SCAN_TABREQ);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
req->apiConnectPtr = theNdbCon->theTCConPtr;
- req->tableId = m_accessTable->m_tableId;
+ req->tableId = m_accessTable->m_id;
req->tableSchemaVersion = m_accessTable->m_version;
req->storedProcId = 0xFFFF;
req->buddyConPtr = theNdbCon->theBuddyConPtr;
@@ -363,7 +372,7 @@
int
NdbScanOperation::executeCursor(int nodeId){
NdbTransaction * tCon = theNdbCon;
- TransporterFacade* tp = TransporterFacade::instance();
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
Guard guard(tp->theMutexPtr);
Uint32 magic = tCon->theMagicNumber;
@@ -385,6 +394,7 @@
if (doSendScan(nodeId) == -1)
return -1;
+ m_executed= true; // Mark operation as executed
return 0;
} else {
if (!(tp->get_node_stopping(nodeId) &&
@@ -464,19 +474,28 @@
}
Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- if(theError.code)
- return -1;
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
- Uint32 seq = theNdbCon->theNodeSequence;
- if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
- forceSend) == 0){
+ const Uint32 seq = theNdbCon->theNodeSequence;
+
+ if(theError.code)
+ {
+ goto err4;
+ }
+
+ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
+ {
idx = m_current_api_receiver;
last = m_api_receivers_count;
-
- Uint32 timeout = tp->m_waitfor_timeout;
do {
if(theError.code){
@@ -502,10 +521,9 @@
/**
* No completed...
*/
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(3*timeout);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
idx = last;
@@ -555,6 +573,10 @@
if(theError.code == 0)
setErrorCode(4028); // seq changed = Node fail
break;
+ case -4:
+err4:
+ setErrorCode(theError.code);
+ break;
}
theNdbCon->theTransactionIsStarted = false;
@@ -564,8 +586,8 @@
}
int
-NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
- bool forceSend){
+NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag)
+{
if(cnt > 0){
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
@@ -600,7 +622,7 @@
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = TransporterFacade::instance();
+ TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
@@ -612,9 +634,6 @@
ret = tp->sendSignal(&tSignal, nodeId);
}
}
-
- if (!ret) checkForceSend(forceSend);
-
m_sent_receivers_count = last + sent;
m_api_receivers_count -= cnt;
m_current_api_receiver = 0;
@@ -624,15 +643,6 @@
return 0;
}
-void NdbScanOperation::checkForceSend(bool forceSend)
-{
- if (forceSend) {
- TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
- } else {
- TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
- }//if
-}
-
int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{
@@ -667,10 +677,16 @@
m_conf_receivers_count,
m_sent_receivers_count);
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- close_impl(tp, forceSend);
-
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ close_impl(tp, forceSend, &poll_guard);
}
NdbConnection* tCon = theNdbCon;
@@ -680,7 +696,7 @@
if (releaseOp && tTransCon) {
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
- tTransCon->releaseExecutedScanOperation(tOp);
+ tTransCon->releaseScanOperation(tOp);
}
tCon->theScanningOp = 0;
@@ -775,6 +791,7 @@
*/
Uint32 reqInfo = req->requestInfo;
ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
+ ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
req->requestInfo = reqInfo;
for(Uint32 i = 0; i<theParallelism; i++){
@@ -825,7 +842,7 @@
req->requestInfo = tmp;
tSignal->setLength(ScanTabReq::StaticLength + theDistrKeyIndicator_);
- TransporterFacade *tp = TransporterFacade::instance();
+ TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
LinearSectionPtr ptr[3];
ptr[0].p = m_prepared_receivers;
ptr[0].sz = theParallelism;
@@ -909,13 +926,20 @@
* the scan process.
****************************************************************************/
int
-NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
+NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, Uint32 & size)
{
NdbRecAttr * tRecAttr = m_curr_row;
if(tRecAttr)
{
const Uint32 * src = (Uint32*)tRecAttr->aRef();
- memcpy(data, src, 4*size);
+
+ assert(tRecAttr->get_size_in_bytes() > 0);
+ assert(tRecAttr->get_size_in_bytes() < 65536);
+ const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
+
+ assert(size >= len);
+ memcpy(data, src, 4*len);
+ size = len;
return 0;
}
return -1;
@@ -940,8 +964,10 @@
}
pTrans->theSimpleState = 0;
- const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
-
+ assert(tRecAttr->get_size_in_bytes() > 0);
+ assert(tRecAttr->get_size_in_bytes() < 65536);
+ const Uint32 len = (tRecAttr->get_size_in_bytes() + 3)/4-1;
+
newOp->theTupKeyLen = len;
newOp->theOperationType = opType;
switch (opType) {
@@ -1039,23 +1065,23 @@
int
NdbIndexScanOperation::setBound(const char* anAttrName, int type,
- const void* aValue, Uint32 len)
+ const void* aValue)
{
- return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
+ return setBound(m_accessTable->getColumn(anAttrName), type, aValue);
}
int
NdbIndexScanOperation::setBound(Uint32 anAttrId, int type,
- const void* aValue, Uint32 len)
+ const void* aValue)
{
- return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
+ return setBound(m_accessTable->getColumn(anAttrId), type, aValue);
}
int
NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject,
- const char* aValue,
- Uint32 len){
- return setBound(anAttrObject, BoundEQ, aValue, len);
+ const char* aValue)
+{
+ return setBound(anAttrObject, BoundEQ, aValue);
}
NdbRecAttr*
@@ -1065,7 +1091,7 @@
return NdbScanOperation::getValue_impl(attrInfo, aValue);
}
- int id = attrInfo->m_attrId; // In "real" table
+ int id = attrInfo->getColumnNo(); // In "real" table
assert(m_accessTable->m_index);
int sz = (int)m_accessTable->m_index->m_key_ids.size();
if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
@@ -1102,7 +1128,7 @@
*/
int
NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo,
- int type, const void* aValue, Uint32 len)
+ int type, const void* aValue)
{
if (!tAttrInfo)
{
@@ -1110,24 +1136,23 @@
return -1;
}
if (theOperationType == OpenRangeScanRequest &&
- (0 <= type && type <= 4) &&
- len <= 8000) {
+ (0 <= type && type <= 4)) {
// insert bound type
Uint32 currLen = theTotalNrOfKeyWordInSignal;
Uint32 remaining = KeyInfo::DataLength - currLen;
- Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
bool tDistrKey = tAttrInfo->m_distributionKey;
- len = aValue != NULL ? sizeInBytes : 0;
- if (len != sizeInBytes && (len != 0)) {
- setErrorCodeAbort(4209);
- return -1;
- }
+ Uint32 len = 0;
+ if (aValue != NULL)
+ if (! tAttrInfo->get_var_length(aValue, len)) {
+ setErrorCodeAbort(4209);
+ return -1;
+ }
// insert attribute header
Uint32 tIndexAttrId = tAttrInfo->m_attrId;
Uint32 sizeInWords = (len + 3) / 4;
- AttributeHeader ah(tIndexAttrId, sizeInWords);
+ AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
const Uint32 ahValue = ah.m_value;
const Uint32 align = (UintPtr(aValue) & 7);
@@ -1221,6 +1246,31 @@
return -1;
}
+Uint32
+NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
+{
+ DBUG_ENTER("NdbIndexScanOperation::getKeyFromSCANTABREQ");
+ assert(size >= theTotalNrOfKeyWordInSignal);
+ size = theTotalNrOfKeyWordInSignal;
+ NdbApiSignal* tSignal = theSCAN_TABREQ->next();
+ Uint32 pos = 0;
+ while (pos < size) {
+ assert(tSignal != NULL);
+ Uint32* tData = tSignal->getDataPtrSend();
+ Uint32 rem = size - pos;
+ if (rem > KeyInfo::DataLength)
+ rem = KeyInfo::DataLength;
+ Uint32 i = 0;
+ while (i < rem) {
+ data[pos + i] = tData[KeyInfo::HeaderLength + i];
+ i++;
+ }
+ pos += rem;
+ }
+ DBUG_DUMP("key", (char*)data, size << 2);
+ DBUG_RETURN(size);
+}
+
int
NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 scan_flags,
@@ -1320,10 +1370,11 @@
return (r1_null ? -1 : 1) * jdir;
}
const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
- Uint32 len = r1->theAttrSize * r1->theArraySize;
+ Uint32 len1 = r1->get_size_in_bytes();
+ Uint32 len2 = r2->get_size_in_bytes();
if(!r1_null){
const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_type);
- int r = (*sqlType.m_cmp)(col.m_cs, d1, len, d2, len, true);
+ int r = (*sqlType.m_cmp)(col.m_cs, d1, len1, d2, len2, true);
if(r){
assert(r != NdbSqlUtil::CmpUnknown);
return r * jdir;
@@ -1361,22 +1412,27 @@
if(fetchNeeded){
if(fetchAllowed){
if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- Uint32 timeout = tp->m_waitfor_timeout;
if(seq == tp->getNodeSequence(nodeId) &&
- !send_next_scan_ordered(s_idx, forceSend)){
+ !send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(3*timeout);
- if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
@@ -1463,7 +1519,8 @@
}
int
-NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
+NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx)
+{
if(idx == theParallelism)
return 0;
@@ -1498,15 +1555,16 @@
m_sent_receivers_count = last + 1;
Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = TransporterFacade::instance();
+ TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
tSignal.setLength(4+1);
int ret= tp->sendSignal(&tSignal, nodeId);
- if (!ret) checkForceSend(forceSend);
return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
+NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
+ PollGuard *poll_guard)
+{
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
@@ -1516,16 +1574,12 @@
return -1;
}
- Uint32 timeout = tp->m_waitfor_timeout;
-
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(3*timeout);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, forceSend);
switch(return_code){
case 0:
break;
@@ -1582,7 +1636,7 @@
}
// Send close scan
- if(send_next_scan(api+conf, true, forceSend) == -1)
+ if(send_next_scan(api+conf, true) == -1)
{
theNdbCon->theReleaseOnClose = true;
return -1;
@@ -1593,9 +1647,8 @@
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
- theNdb->theImpl->theWaiter.m_node = nodeId;
- theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
- int return_code = theNdb->receiveResponse(3*timeout);
+ int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
+ forceSend);
switch(return_code){
case 0:
break;
@@ -1634,13 +1687,20 @@
NdbScanOperation::restart(bool forceSend)
{
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
Uint32 nodeId = theNdbCon->theDBnode;
{
int res;
- if((res= close_impl(tp, forceSend)))
+ if((res= close_impl(tp, forceSend, &poll_guard)))
{
return res;
}
@@ -1654,7 +1714,6 @@
theError.code = 0;
if (doSendScan(nodeId) == -1)
return -1;
-
return 0;
}
@@ -1663,9 +1722,16 @@
int res;
{
- TransporterFacade* tp = TransporterFacade::instance();
- Guard guard(tp->theMutexPtr);
- res= close_impl(tp, forceSend);
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ /*
+ The PollGuard has an implicit call of unlock_and_signal through the
+ ~PollGuard method. This method is called implicitly by the compiler
+ in all places where the object is out of context due to a return,
+ break, continue or simply end of statement block
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ res= close_impl(tp, forceSend, &poll_guard);
}
if(!res)
--- 1.347/sql/ha_ndbcluster.cc 2006-08-24 07:21:14 +02:00
+++ 1.348/sql/ha_ndbcluster.cc 2006-08-24 07:21:14 +02:00
@@ -47,6 +47,7 @@
// options from from mysqld.cc
extern my_bool opt_ndb_optimized_node_selection;
extern const char *opt_ndbcluster_connectstring;
+extern ulong opt_ndb_cache_check_time;
const char *ndb_distribution_names[]= {"KEYHASH", "LINHASH", NullS};
TYPELIB ndb_distribution_typelib= { array_elements(ndb_distribution_names)-1,
@@ -6468,6 +6469,7 @@
pthread_cond_init(&COND_ndb_util_thread, NULL);
+ ndb_cache_check_time = opt_ndb_cache_check_time;
// Create utility thread
pthread_t tmp;
if (pthread_create(&tmp, &connection_attrib, ndb_util_thread_func, 0))
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2281) | jonas | 24 Aug |