MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:May 4 2009 1:03pm
Subject:bzr push into mysql-5.1-telco-7.0-spj branch (jonas:2880 to 2881)
View as plain text  
 2881 Jonas Oreland	2009-05-04
      ndb spj - add support for setting ANY_VALUE directly in LQHKEYREQ/SCAN_FRAGREQ to be used as correlation factor

    M  storage/ndb/include/kernel/AttributeHeader.hpp
    M  storage/ndb/include/kernel/signaldata/LqhKey.hpp
    M  storage/ndb/include/kernel/signaldata/ScanFrag.hpp
    M  storage/ndb/include/kernel/signaldata/TupKey.hpp
    M  storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
    M  storage/ndb/src/common/debugger/signaldata/ScanFrag.cpp
    M  storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
    M  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    M  storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
    M  storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
    M  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    M  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
    M  storage/ndb/test/tools/test_spj.cpp
 2880 Jonas Oreland	2009-05-04
      spj

    A  storage/ndb/include/kernel/signaldata/DbspjErr.hpp
    A  storage/ndb/include/kernel/signaldata/QueryTree.hpp
    A  storage/ndb/src/kernel/blocks/dbspj/
    A  storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
    A  storage/ndb/src/kernel/blocks/dbspj/DbspjInit.cpp
    A  storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
    A  storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.cpp
    A  storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
    A  storage/ndb/src/kernel/vm/ArenaPool.cpp
    A  storage/ndb/src/kernel/vm/ArenaPool.hpp
    A  storage/ndb/test/tools/test_spj.cpp
    M  .bzr-mysql/default.conf
    M  storage/ndb/include/kernel/AttributeHeader.hpp
    M  storage/ndb/include/kernel/BlockNumbers.h
    M  storage/ndb/include/kernel/Interpreter.hpp
    M  storage/ndb/include/kernel/signaldata/DiGetNodes.hpp
    M  storage/ndb/include/kernel/signaldata/LqhKey.hpp
    M  storage/ndb/include/kernel/signaldata/ScanFrag.hpp
    M  storage/ndb/include/kernel/signaldata/ScanTab.hpp
    M  storage/ndb/include/kernel/signaldata/TcKeyRef.hpp
    M  storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
    M  storage/ndb/include/ndbapi/NdbOperation.hpp
    M  storage/ndb/include/transporter/TransporterDefinitions.hpp
    M  storage/ndb/src/common/debugger/BlockNames.cpp
    M  storage/ndb/src/common/debugger/signaldata/LqhKey.cpp
    M  storage/ndb/src/common/debugger/signaldata/ScanTab.cpp
    M  storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp
    M  storage/ndb/src/kernel/Makefile.am
    M  storage/ndb/src/kernel/SimBlockList.cpp
    M  storage/ndb/src/kernel/blocks/Makefile.am
    M  storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
    M  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
    M  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
    M  storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
    M  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
    M  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
    M  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
    M  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
    M  storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
    M  storage/ndb/src/kernel/blocks/record_types.hpp
    M  storage/ndb/src/kernel/vm/Makefile.am
    M  storage/ndb/src/kernel/vm/Pool.hpp
    M  storage/ndb/src/kernel/vm/RWPool.hpp
    M  storage/ndb/src/kernel/vm/SimulatedBlock.cpp
    M  storage/ndb/src/kernel/vm/SimulatedBlock.hpp
    M  storage/ndb/src/kernel/vm/mt.cpp
    M  storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
    M  storage/ndb/src/ndbapi/NdbOperation.cpp
    M  storage/ndb/src/ndbapi/NdbOperationExec.cpp
    M  storage/ndb/src/ndbapi/NdbScanOperation.cpp
    M  storage/ndb/test/tools/Makefile.am
=== modified file 'storage/ndb/include/kernel/AttributeHeader.hpp'
--- a/storage/ndb/include/kernel/AttributeHeader.hpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/include/kernel/AttributeHeader.hpp	2009-05-04 13:01:07 +0000
@@ -51,6 +51,7 @@ public:
   STATIC_CONST( READ_ALL     = 0xFFF0 );
   STATIC_CONST( READ_LCP     = 0xFFEF );
   STATIC_CONST( FLUSH_AI     = 0xFFEE );
+  STATIC_CONST( READ_ANY_VALUE = 0xFFED );
   
   /**
    * Optimize pseudo column and optimization options

=== modified file 'storage/ndb/include/kernel/signaldata/LqhKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhKey.hpp	2009-05-04 13:01:07 +0000
@@ -153,6 +153,12 @@ private:
    */
   static UintR getNormalProtocolFlag(const UintR & requestInfo);
   static void setNormalProtocolFlag(UintR & requestInfo, UintR val);
+
+  /**
+   * Include any-value
+   */
+  static UintR getAnyValueFlag(const UintR & requestInfo);
+  static void setAnyValueFlag(UintR & requestInfo, UintR val);
 };
 
 /**
@@ -180,6 +186,7 @@ private:
  * g = gci flag               - 1  Bit (12)
  * n = NR copy                - 1  Bit (13)
  * P = Do normal protocol even if dirty-read - 1 Bit (14)
+ * A = AnyValue flag          - 1  Bit (24)
 
  * Short LQHKEYREQ :
  *             1111111111222222222233
@@ -190,7 +197,7 @@ private:
  * Long LQHKEYREQ :
  *             1111111111222222222233
  *   01234567890123456789012345678901
- *             llgn pdisooorr   cumxz
+ *             llgnPpdisooorrA  cumxz
  *
  */
 
@@ -218,6 +225,7 @@ private:
 #define RI_GCI_SHIFT         (12)
 #define RI_NR_COPY_SHIFT     (13)
 #define RI_NORMAL_DIRTY      (14)
+#define RI_ANY_VALUE         (24)
 
 /**
  * Scan Info
@@ -594,6 +602,19 @@ LqhKeyReq::getNormalProtocolFlag(const U
   return (requestInfo >> RI_NORMAL_DIRTY) & 1;
 }
 
+inline
+void
+LqhKeyReq::setAnyValueFlag(UintR & requestInfo, UintR val){
+  ASSERT_BOOL(val, "LqhKeyReq::setNrCopyFlag");
+  requestInfo |= (val << RI_ANY_VALUE);
+}
+
+inline
+UintR
+LqhKeyReq::getAnyValueFlag(const UintR & requestInfo){
+  return (requestInfo >> RI_ANY_VALUE) & 1;
+}
+
 class LqhKeyConf {
   /**
    * Reciver(s)

=== modified file 'storage/ndb/include/kernel/signaldata/ScanFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/ScanFrag.hpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/include/kernel/signaldata/ScanFrag.hpp	2009-05-04 13:01:07 +0000
@@ -63,6 +63,7 @@ public:
   };
   Uint32 batch_size_rows;
   Uint32 batch_size_bytes;
+  Uint32 variableData[1];
   
   static Uint32 getLockMode(const Uint32 & requestInfo);
   static Uint32 getHoldLockFlag(const Uint32 & requestInfo);
@@ -90,6 +91,9 @@ public:
 
   static void setReorgFlag(Uint32 & requestInfo, Uint32 val);
   static Uint32 getReorgFlag(const Uint32 & requestInfo);
+
+  static void setAnyValueFlag(Uint32 & requestInfo, Uint32 val);
+  static Uint32 getAnyValueFlag(const Uint32 & requestInfo);
 };
 
 /*
@@ -252,11 +256,12 @@ public:
  * t = tup scan              - 1  Bit 11 (implies x=z=0)
  * p = Scan prio             - 4  Bits (12-15) -> max 15
  * r = Reorg flag            - 2  Bits (1-2)
+ * A = any value flag        - 1  Bit  (16)
  *
  *           1111111111222222222233
  * 01234567890123456789012345678901
  *  rrcdlxhkrztppppaaaaaaaaaaaaaaaa   Short variant ( < 6.4.0)
- *  rrcdlxhkrztpppp                   Long variant (6.4.0 +)
+ *  rrcdlxhkrztppppA                  Long variant (6.4.0 +)
  */
 #define SF_LOCK_MODE_SHIFT   (5)
 #define SF_LOCK_MODE_MASK    (1)
@@ -279,6 +284,8 @@ public:
 #define SF_REORG_SHIFT      (1)
 #define SF_REORG_MASK       (3)
 
+#define SF_ANY_VALUE_SHIFT  (16)
+
 inline 
 Uint32
 ScanFragReq::getLockMode(const Uint32 & requestInfo){
@@ -455,4 +462,17 @@ ScanFragReq::setReorgFlag(UintR & reques
   requestInfo |= (val << SF_REORG_SHIFT);
 }
 
+inline
+Uint32
+ScanFragReq::getAnyValueFlag(const Uint32 & requestInfo){
+  return (requestInfo >> SF_ANY_VALUE_SHIFT) & 1;
+}
+
+inline
+void
+ScanFragReq::setAnyValueFlag(UintR & requestInfo, UintR val){
+  ASSERT_BOOL(val, "ScanFragReq::setAnyValueFlag");
+  requestInfo |= (val << SF_ANY_VALUE_SHIFT);
+}
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/TupKey.hpp'
--- a/storage/ndb/include/kernel/signaldata/TupKey.hpp	2008-08-07 11:52:50 +0000
+++ b/storage/ndb/include/kernel/signaldata/TupKey.hpp	2009-05-04 13:01:07 +0000
@@ -35,7 +35,7 @@ class TupKeyReq {
   friend bool printTUPKEYREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
 
 public:
-  STATIC_CONST( SignalLength = 19 );
+  STATIC_CONST( SignalLength = 20 );
 
 private:
 
@@ -61,6 +61,7 @@ private:
   Uint32 m_row_id_page_no;
   Uint32 m_row_id_page_idx;
   Uint32 attrInfoIVal;
+  Uint32 anyValue;
 };
 
 class TupKeyConf {

=== modified file 'storage/ndb/src/common/debugger/signaldata/LqhKey.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/LqhKey.cpp	2009-05-04 13:01:07 +0000
@@ -143,6 +143,12 @@ printLQHKEYREQ(FILE * output, const Uint
     fprintf(output, " GCI: %u", sig->variableData[nextPos + 0]);
     nextPos++;
   }
+
+  if (LqhKeyReq::getAnyValueFlag(reqInfo))
+  {
+    fprintf(output, " AnyValue: 0x%x", sig->variableData[nextPos + 0]);
+    nextPos++;
+  }
   
   if(!LqhKeyReq::getInterpretedFlag(reqInfo)){
     fprintf(output, " AttrInfo: ");

=== modified file 'storage/ndb/src/common/debugger/signaldata/ScanFrag.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/ScanFrag.cpp	2008-06-05 20:29:13 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/ScanFrag.cpp	2009-05-04 13:01:07 +0000
@@ -23,9 +23,9 @@ bool
 printSCAN_FRAGREQ(FILE * output, const Uint32 * theData, 
 		  Uint32 len, Uint16 receiverBlockNo) {
   const ScanFragReq * const sig = (ScanFragReq *)theData;
-  fprintf(output, " senderData: %x\n", sig->senderData);
-  fprintf(output, " resultRef: %x\n", sig->resultRef);
-  fprintf(output, " savePointId: %x\n", sig->savePointId);
+  fprintf(output, " senderData: 0x%x\n", sig->senderData);
+  fprintf(output, " resultRef: 0x%x\n", sig->resultRef);
+  fprintf(output, " savePointId: %u\n", sig->savePointId);
 
   fprintf(output, " flags: ");
   if (ScanFragReq::getLockMode(sig->requestInfo))
@@ -51,15 +51,20 @@ printSCAN_FRAGREQ(FILE * output, const U
   fprintf(output, " prio: %u\n",
           ScanFragReq::getScanPrio(sig->requestInfo));
 
-  fprintf(output, " tableId: %x\n", sig->tableId);
-  fprintf(output, " fragmentNo: %x\n", sig->fragmentNoKeyLen & 0xFFFF);
-  fprintf(output, " keyLen: %x\n", sig->fragmentNoKeyLen >> 16);
-  fprintf(output, " schemaVersion: %x\n", sig->schemaVersion);
-  fprintf(output, " transId1: %x\n", sig->transId1);
-  fprintf(output, " transId2: %x\n", sig->transId2);
-  fprintf(output, " clientOpPtr: %x\n", sig->clientOpPtr);
-  fprintf(output, " batch_size_rows: %x\n", sig->batch_size_rows);
-  fprintf(output, " batch_size_bytes: %x\n", sig->batch_size_bytes);
+  fprintf(output, " tableId: %u\n", sig->tableId);
+  fprintf(output, " fragmentNo: %u\n", sig->fragmentNoKeyLen & 0xFFFF);
+  fprintf(output, " keyLen: %u\n", sig->fragmentNoKeyLen >> 16);
+  fprintf(output, " schemaVersion: 0x%x\n", sig->schemaVersion);
+  fprintf(output, " transId1: 0x%x\n", sig->transId1);
+  fprintf(output, " transId2: 0x%x\n", sig->transId2);
+  fprintf(output, " clientOpPtr: 0x%x\n", sig->clientOpPtr);
+  fprintf(output, " batch_size_rows: %u\n", sig->batch_size_rows);
+  fprintf(output, " batch_size_bytes: %u\n", sig->batch_size_bytes);
+  if (len > ScanFragReq::SignalLength)
+  {
+    fprintf(output, " AnyValue: 0x%x\n", sig->variableData[0]);
+  }
+
   return true;
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2009-04-13 04:21:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2009-05-04 13:01:07 +0000
@@ -1978,6 +1978,7 @@ public:
       Uint32 m_scan_curr_range_no;
       UintR noFiredTriggers;
     };
+    Uint32 m_anyValue;
     Uint16 errorCode;
     Uint16 logStartPageIndex;
     Uint16 logStartPageNo;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-05-04 13:01:07 +0000
@@ -4010,6 +4010,13 @@ void Dblqh::execLQHKEYREQ(Signal* signal
     nextPos++;
   }//if
 
+  Uint32 TanyValueFlag = LqhKeyReq::getAnyValueFlag(Treqinfo);
+  regTcPtr->m_anyValue = 0;
+  if (TanyValueFlag == 1) {
+    regTcPtr->m_anyValue = lqhKeyReq->variableData[nextPos];
+    nextPos++;
+  }
+
   UintR TitcKeyLen = 0;
   Uint32 keyLenWithLQHReq = 0;
   UintR TreclenAiLqhkey   = 0;
@@ -5235,6 +5242,7 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
 
   sig0 = regTcPtr->m_row_id.m_page_no;
   sig1 = regTcPtr->m_row_id.m_page_idx;
+  sig2 = regTcPtr->m_anyValue;
   
   tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
   tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
@@ -5244,6 +5252,7 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
 
   tupKeyReq->m_row_id_page_no = sig0;
   tupKeyReq->m_row_id_page_idx = sig1;
+  tupKeyReq->anyValue = sig2;
   
   TRACE_OP(regTcPtr, "TUPKEYREQ");
   
@@ -9226,6 +9235,16 @@ void Dblqh::execSCAN_FRAGREQ(Signal* sig
     handle.clear();
   }
 
+  if (true)
+  {
+    /**
+     * TODO this needs to be parameterized using get/setAnyValueFlag
+     *      and nextPos (see execLQHKEYREQ)
+     *      but since long/short SCAN_FRAGREQ can arrive...we skip it for now
+     */
+    tcConnectptr.p->m_anyValue = scanFragReq->variableData[0];
+  }
+
   errorCode = initScanrec(scanFragReq, aiLen);
   if (errorCode != ZOK) {
     jam();
@@ -9962,6 +9981,7 @@ Dblqh::next_scanconf_tupkeyreq(Signal* s
   tupKeyReq->disk_page= disk_page;
   /* No AttrInfo sent to TUP, it uses a stored procedure */
   tupKeyReq->attrInfoIVal= RNIL;
+  tupKeyReq->anyValue = regTcPtr->m_anyValue+scanPtr.p->m_curr_batch_size_rows;
   Uint32 blockNo = refToMain(regTcPtr->tcTupBlockref);
   EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal, 
 		 TupKeyReq::SignalLength);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-05-04 13:01:07 +0000
@@ -81,6 +81,7 @@ public:
     Uint32 m_type;
     Uint32 m_src_node_no;
     Uint32 m_src_node_ptrI;
+    Uint32 m_src_correlation;
 
     struct Header
     {
@@ -215,7 +216,7 @@ public:
     Uint32 m_api_resultRef;
     Uint32 m_api_resultData;
     Uint32 m_outstanding;
-    Uint32 m_lqhKeyReq[LqhKeyReq::FixedSignalLength + 2];
+    Uint32 m_lqhKeyReq[LqhKeyReq::FixedSignalLength + 3];
   };
 
   struct ScanFragData
@@ -242,7 +243,7 @@ public:
     Uint32 m_scan_status;    // fragmentCompleted
     Uint32 m_rows_received;  // #execTRANSID_AI
     Uint32 m_rows_expecting; // ScanFragConf
-    Uint32 m_scanFragReq[ScanFragReq::SignalLength];
+    Uint32 m_scanFragReq[ScanFragReq::SignalLength + 1];
   };
 
   /**
@@ -336,6 +337,7 @@ public:
 
     struct {
       Uint32 m_ref;              // dst for signal
+      Uint32 m_correlation;      // correlation value
       Uint32 m_keyInfoPtrI;      // keyInfoSection
       Uint32 m_attrInfoPtrI;     // attrInfoSection
       Uint32 m_attrInfoParamPtrI;// attrInfoParamSection
@@ -447,6 +449,7 @@ private:
    */
   Uint32 buildRowHeader(RowRef::Header *, SegmentedSectionPtr);
   Uint32 buildRowHeader(RowRef::Header *, const Uint32 * src, Uint32 len);
+  Uint32 getColData32(const RowRef::Section&, Uint32 colNo);
   Uint32 appendToPattern(Local_pattern_store &, DABuffer & tree, Uint32);
   Uint32 appendColToPattern(Local_pattern_store&,const RowRef::Linear&, Uint32);
 

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-05-04 13:01:07 +0000
@@ -600,6 +600,7 @@ Dbspj::createNode(Build_context& ctx, Pt
     treeNodePtr.p->m_requestPtrI = requestPtr.i;
     treeNodePtr.p->m_bits = TreeNode::T_LEAF; // start as leaf...
     treeNodePtr.p->m_state = TreeNode::TN_BUILDING;
+    treeNodePtr.p->m_send.m_correlation = 0;
     treeNodePtr.p->m_send.m_keyInfoPtrI = RNIL;
     treeNodePtr.p->m_send.m_attrInfoPtrI = RNIL;
     treeNodePtr.p->m_send.m_attrInfoParamPtrI = RNIL;
@@ -908,8 +909,8 @@ Dbspj::execTRANSID_AI(Signal* signal)
   /**
    * build easy-access-array for row
    */
-  Uint32 tmp[1+MAX_ATTRIBUTES_IN_TABLE];
-  buildRowHeader((RowRef::Header*)tmp, dataPtr);
+  Uint32 tmp[2+MAX_ATTRIBUTES_IN_TABLE];
+  Uint32 cnt = buildRowHeader((RowRef::Header*)tmp, dataPtr);
 
   /**
    * TODO: If row needs to be buffered (m_bits & ROW_BUFFER)
@@ -921,6 +922,7 @@ Dbspj::execTRANSID_AI(Signal* signal)
   row.m_src_node_no = treeNodePtr.p->m_node_no;
   row.m_row_data.m_section.m_header = (RowRef::Header*)tmp;
   row.m_row_data.m_section.m_dataPtr = dataPtr;
+  row.m_src_correlation = getColData32(row.m_row_data.m_section, cnt - 1);
 
   ndbrequire(treeNodePtr.p->m_info&&treeNodePtr.p->m_info->m_execTRANSID_AI);
   (this->*(treeNodePtr.p->m_info->m_execTRANSID_AI))(signal,
@@ -988,6 +990,8 @@ Dbspj::lookup_build(Build_context& ctx,
       /**
        * TODO reference()+treeNodePtr.i is passed twice
        *   this can likely be optimized using the requestInfo-bits
+       * UPDATE: This can be accomplished by *not* setApplicationAddressFlag
+       *         and patch LQH to then instead use tcBlockref/clientConnectPtr
        */
       dst->transId1 = transId1;
       dst->transId2 = transId2;
@@ -1002,6 +1006,7 @@ Dbspj::lookup_build(Build_context& ctx,
       LqhKeyReq::setDirtyFlag(requestInfo, 1);
       LqhKeyReq::setSimpleFlag(requestInfo, 1);
       LqhKeyReq::setNormalProtocolFlag(requestInfo, 1);
+      LqhKeyReq::setAnyValueFlag(requestInfo, 1);
       dst->requestInfo = requestInfo;
     }
 
@@ -1138,6 +1143,7 @@ Dbspj::lookup_send(Signal* signal,
   LqhKeyReq* req = (LqhKeyReq*)signal->getDataPtrSend();
   memcpy(req, treeNodePtr.p->m_lookup_data.m_lqhKeyReq,
 	 sizeof(treeNodePtr.p->m_lookup_data.m_lqhKeyReq));
+  req->variableData[2] = treeNodePtr.p->m_send.m_correlation;
 
   SectionHandle handle(this);
 
@@ -1190,7 +1196,8 @@ Dbspj::lookup_send(Signal* signal,
 #ifdef DEBUG_LQHKEYREQ
   ndbout_c("LQHKEYREQ to %x", ref);
   printLQHKEYREQ(stdout, signal->getDataPtrSend(),
-		 LqhKeyReq::FixedSignalLength + 2, DBLQH);
+		 NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
+                 DBLQH);
   printf("KEYINFO: ");
   print(handle.m_ptr[0], stdout);
   printf("ATTRINFO: ");
@@ -1198,8 +1205,8 @@ Dbspj::lookup_send(Signal* signal,
 #endif
 
   sendSignal(ref, GSN_LQHKEYREQ, signal,
-	     LqhKeyReq::FixedSignalLength + 2, JBB,
-	     &handle);
+	     NDB_ARRAY_SIZE(treeNodePtr.p->m_lookup_data.m_lqhKeyReq),
+             JBB, &handle);
 
   Uint32 add = 2;
   if (treeNodePtr.p->m_bits & TreeNode::T_LEAF)
@@ -1320,6 +1327,7 @@ Dbspj::lookup_start_child(Signal* signal
   Uint32 err;
   const LqhKeyReq* src = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
   const Uint32 tableId = LqhKeyReq::getTableId(src->tableSchemaVersion);
+  const Uint32 corrVal = rowRef.m_src_correlation;
 
   do
   {
@@ -1358,6 +1366,10 @@ Dbspj::lookup_start_child(Signal* signal
      * TODO merge better with lookup_start (refactor)
      */
     {
+      /**
+       * TODO, how should correlation factor really be constructed
+       */
+      treeNodePtr.p->m_send.m_correlation = (corrVal << 16); 
       treeNodePtr.p->m_send.m_ref = tmp.receiverRef;
       LqhKeyReq * dst = (LqhKeyReq*)treeNodePtr.p->m_lookup_data.m_lqhKeyReq;
       dst->hashValue = tmp.hashInfo[0];
@@ -1497,6 +1509,8 @@ Dbspj::scanFrag_build(Build_context& ctx
     Uint32 requestInfo = 0;
     ScanFragReq::setReadCommittedFlag(requestInfo, 1);
     ScanFragReq::setScanPrio(requestInfo, ctx.m_scanPrio);
+    ScanFragReq::setAnyValueFlag(requestInfo, 1);
+
 #if 0
     static void setDescendingFlag(Uint32 & requestInfo, Uint32 descending);
     static void setTupScanFlag(Uint32 & requestInfo, Uint32 tupScan);
@@ -1630,6 +1644,7 @@ Dbspj::scanFrag_send(Signal* signal,
   ScanFragReq* req = (ScanFragReq*)signal->getDataPtrSend();
   memcpy(req, treeNodePtr.p->m_scanfrag_data.m_scanFragReq,
 	 sizeof(treeNodePtr.p->m_scanfrag_data.m_scanFragReq));
+  req->variableData[0] = treeNodePtr.p->m_send.m_correlation;
 
   SectionHandle handle(this);
 
@@ -1692,7 +1707,8 @@ Dbspj::scanFrag_send(Signal* signal,
 #ifdef DEBUG_SCAN_FRAGREQ
   ndbout_c("SCAN_FRAGREQ to %x", ref);
   printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
-                    ScanFragReq::SignalLength, DBLQH);
+                    NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+                    DBLQH);
   printf("ATTRINFO: ");
   print(handle.m_ptr[0], stdout);
   if (handle.m_cnt > 1)
@@ -1703,7 +1719,8 @@ Dbspj::scanFrag_send(Signal* signal,
 #endif
 
   sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
-	     ScanFragReq::SignalLength, JBB, &handle);
+	     NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+             JBB, &handle);
 
   treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
   treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
@@ -1930,7 +1947,7 @@ Dbspj::buildRowHeader(RowRef::Header * h
     * dst++ = tmp;
   } while (r0.step(len));
 
-  return header->m_len = (save - dst);
+  return header->m_len = (dst - save);
 }
 
 /**
@@ -1950,7 +1967,7 @@ Dbspj::buildRowHeader(RowRef::Header * h
     src += tmp_len;
   }
 
-  return header->m_len = (save - dst);
+  return header->m_len = (dst - save);
 }
 
 Uint32
@@ -2009,6 +2026,29 @@ Dbspj::appendTreeToSection(Uint32 & ptrI
 }
 
 Uint32
+Dbspj::getColData32(const RowRef::Section & row, Uint32 col)
+{
+  /**
+   * TODO handle errors
+   */
+  const Uint32 * header = (const Uint32*)row.m_header->m_headers;
+  SegmentedSectionPtr ptr(row.m_dataPtr);
+  SectionReader reader(ptr, getSectionSegmentPool());
+  Uint32 offset = 0;
+  for (Uint32 i = 0; i<col; i++)
+  {
+    offset += 1 + AttributeHeader::getDataSize(* header++);
+  }
+  ndbrequire(reader.step(offset));
+  Uint32 tmp;
+  ndbrequire(reader.getWord(&tmp));
+  Uint32 len = AttributeHeader::getDataSize(tmp);
+  ndbrequire(len == 1);
+  ndbrequire(reader.getWord(&tmp));
+  return tmp;
+}
+
+Uint32
 Dbspj::appendColToSection(Uint32 & dst, const RowRef::Section & row, Uint32 col)
 {
   /**
@@ -2509,7 +2549,7 @@ Dbspj::parseDA(Build_context& ctx,
       }
 
       Uint32 sum_read = 0;
-      Uint32 dst[MAX_ATTRIBUTES_IN_TABLE];
+      Uint32 dst[MAX_ATTRIBUTES_IN_TABLE + 2];
 
       if (paramBits & DABits::PI_ATTR_LIST)
       {
@@ -2560,14 +2600,19 @@ Dbspj::parseDA(Build_context& ctx,
         for (Uint32 i = 0; i<cnt; i++)
           dst[i] <<= 16;
 
+        /**
+         * Read correlation factor
+         */
+        dst[cnt] = AttributeHeader::READ_ANY_VALUE << 16;
+
         err = DbspjErr::OutOfSectionMemory;
-        if (!appendToSection(attrInfoPtrI, dst, cnt))
+        if (!appendToSection(attrInfoPtrI, dst, cnt + 1))
         {
           DEBUG_CRASH();
           break;
         }
 
-        sum_read += cnt;
+        sum_read += cnt + 1;
       }
 
       if (interpreted)

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2009-05-04 13:01:07 +0000
@@ -111,8 +111,6 @@ void Dbtup::copyAttrinfo(Operationrec * 
     copy(inBuffer, attrInfoIVal);
   }
 
-  regOperPtr->m_any_value= 0;
-  
   return;
 }
 
@@ -533,11 +531,13 @@ void Dbtup::execTUPKEYREQ(Signal* signal
  /* -----------    INITIATE THE OPERATION RECORD       -------------- */
  /* ----------------------------------------------------------------- */
    Uint32 Rstoredid= tupKeyReq->storedProcedure;
+   Uint32 anyValue = tupKeyReq->anyValue;
 
    regOperPtr->fragmentPtr= Rfragptr;
    regOperPtr->op_struct.op_type= (TrequestInfo >> 6) & 0xf;
    regOperPtr->op_struct.delete_insert_flag = false;
    regOperPtr->op_struct.m_reorg = (TrequestInfo >> 12) & 3;
+   regOperPtr->m_any_value = anyValue;
 
    regOperPtr->m_copy_tuple_location.setNull();
    regOperPtr->tupVersion= ZNIL;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2009-05-04 13:01:07 +0000
@@ -2399,6 +2399,12 @@ Dbtup::read_pseudo(const Uint32 * inBuff
     flush_read_buffer(req_struct, outBuf, resultRef, resultData);
     return 2;
   }
+  case AttributeHeader::READ_ANY_VALUE:{
+    jam();
+    sz = 1;
+    outBuffer[1] = operPtr.p->m_any_value;
+    break;
+  }
   default:
     return 0;
   }

=== modified file 'storage/ndb/test/tools/test_spj.cpp'
--- a/storage/ndb/test/tools/test_spj.cpp	2009-05-04 07:12:52 +0000
+++ b/storage/ndb/test/tools/test_spj.cpp	2009-05-04 13:01:07 +0000
@@ -303,10 +303,15 @@ int main(int argc, char** argv){
     };
     p1.requestInfo = DABits::PI_ATTR_LIST;
     p1.resultData = 0x10000; //NdbScanFilterImpl::getTransPtr(pOp);
-    p1.optional[0] = 1; // Length of user projecttion
+    p1.optional[0] = 2; // Length of user projecttion
     AttributeHeader::init(p1.optional + 1, AttributeHeader::READ_ALL,
                           pTab->getNoOfColumns());
-    QueryNode::setOpLen(p1.len, QueryNode::QN_SCAN_FRAG, p1.NodeSize + 2);
+    /**
+     * correlation value
+     */
+    AttributeHeader::init(p1.optional + 2, AttributeHeader::READ_ANY_VALUE,
+                          0);
+    QueryNode::setOpLen(p1.len, QueryNode::QN_SCAN_FRAG, p1.NodeSize + 3);
 
 
     union {
@@ -332,10 +337,14 @@ int main(int argc, char** argv){
     };
     p2.requestInfo = DABits::PI_ATTR_LIST;
     p2.resultData = 0x20000; //NdbScanFilterImpl::getTransPtr(pOp);
-    p2.optional[0] = 1; // Length of user projection
+    p2.optional[0] = 2; // Length of user projection
     AttributeHeader::init(p2.optional+1, AttributeHeader::READ_ALL,
                           pTab->getNoOfColumns());
-    QueryNode::setOpLen(p2.len, QueryNode::QN_LOOKUP, p2.NodeSize + 2);
+    /**
+     * correlation value
+     */
+    AttributeHeader::init(p2.optional+2, AttributeHeader::READ_ANY_VALUE, 0);
+    QueryNode::setOpLen(p2.len, QueryNode::QN_LOOKUP, p2.NodeSize + 3);
 
 
     union {


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20090504130107-4kspdeht1clcu4ly.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj branch (jonas:2880 to 2881)Jonas Oreland4 May