List:Commits« Previous MessageNext Message »
From:justin.he Date:April 2 2008 10:20am
Subject:bk commit into 5.1 tree (Justin.He:1.2589)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of Justin.He.  When Justin.He does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-04-02 18:20:04+08:00, Justin.He@stripped +26 -0
  WL3085 optimize table moving fixpart (excepet dbtup stuff)
  1) add two scan to move varpart and fixpart in NDBAPI
  2) use a NdbRecord interface
  3) introduce a descending Tuple Scan
  4) add MOVE operation

  sql/ha_ndbcluster.cc@stripped, 2008-04-02 18:19:01+08:00, Justin.He@stripped +3 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    release resource if initialize fail

  storage/ndb/include/kernel/AttributeHeader.hpp@stripped, 2008-04-02 18:19:02+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add a pseudo column to get page size from NDBD

  storage/ndb/include/kernel/kernel_types.h@stripped, 2008-04-02 18:19:09+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add MOVE operation definition

  storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2008-04-02 18:19:10+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add declaration for pesudo column of fix page size

  storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2008-04-02 18:19:12+08:00, Justin.He@stripped +20 -2
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add stuff related to NdbRecord interface for optimizing table

  storage/ndb/include/ndbapi/NdbScanOperation.hpp@stripped, 2008-04-02 18:19:13+08:00, Justin.He@stripped +25 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add a new interface to move tuple during scanning table

  storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp@stripped, 2008-04-02 18:19:14+08:00, Justin.He@stripped +18 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add for ZMOVE operation; and something when we need updating acc key

  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp@stripped, 2008-04-02 18:19:22+08:00, Justin.He@stripped +8 -1
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add for ZMOVE operation

  storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp@stripped, 2008-04-02 18:19:22+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add for ZMOVE operation

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2008-04-02 18:19:23+08:00, Justin.He@stripped +11 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add handleMoveReq() function to process MOVE operation

  storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp@stripped, 2008-04-02 18:19:23+08:00, Justin.He@stripped +9 -2
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add additional condition check for MOVE operation
    for moving fixpart, need to implement further

  storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp@stripped, 2008-04-02 18:19:24+08:00, Justin.He@stripped +83 -19
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add Moving function,
    but for moving fixpart, need to implement further

  storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp@stripped, 2008-04-02 18:19:25+08:00, Justin.He@stripped +10 -4
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add read/write pseduo data

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2008-04-02 18:19:25+08:00, Justin.He@stripped +55 -8
    WL3085 optimize table moving fixpart (except dbtup stuff)
    introduce a descending Tuple Scan

  storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp@stripped, 2008-04-02 18:19:26+08:00, Justin.He@stripped +8 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add for ZMOVE operation

  storage/ndb/src/ndbapi/NdbBlob.cpp@stripped, 2008-04-02 18:19:26+08:00, Justin.He@stripped +2 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add for ZMOVE operation

  storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2008-04-02 18:19:26+08:00, Justin.He@stripped +516 -128
    WL3085 optimize table moving fixpart (except dbtup stuff)
    use two scan to move varpart and fixpart in NDBAPI

  storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2008-04-02 18:19:29+08:00, Justin.He@stripped +231 -2
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add some definition for two scan 

  storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2008-04-02 18:19:30+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    optimize table with NdbRecord interface

  storage/ndb/src/ndbapi/NdbOperationDefine.cpp@stripped, 2008-04-02 18:19:31+08:00, Justin.He@stripped +16 -1
    WL3085 optimize table moving fixpart (except dbtup stuff)
    for NdbRecord stuff

  storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2008-04-02 18:19:31+08:00, Justin.He@stripped +18 -1
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add attrtinfo data for OPTIMIZE pseudo

  storage/ndb/src/ndbapi/NdbOperationSearch.cpp@stripped, 2008-04-02 18:19:33+08:00, Justin.He@stripped +1 -1
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add ZMOVE operation

  storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2008-04-02 18:19:34+08:00, Justin.He@stripped +38 -7
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add a new function moveCurrentTuple() to move tuple during scanning table

  storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2008-04-02 18:19:34+08:00, Justin.He@stripped +1 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    add ZMOVE operation

  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp@stripped, 2008-04-02 18:19:35+08:00, Justin.He@stripped +4 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    initialize pseudo column for fix page size

  storage/ndb/tools/select_all.cpp@stripped, 2008-04-02 18:19:37+08:00, Justin.He@stripped +4 -0
    WL3085 optimize table moving fixpart (except dbtup stuff)
    update example and have it support descending tuple scan

diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
--- a/sql/ha_ndbcluster.cc	2008-03-28 06:48:31 +08:00
+++ b/sql/ha_ndbcluster.cc	2008-04-02 18:19:01 +08:00
@@ -7615,6 +7615,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
   {
     DBUG_PRINT("info",
                ("Optimze table %s returned %d", m_tabname, error));
+    th.close();
     ERR_RETURN(ndb->getNdbError());
   }
   while((result= th.next()) == 1)
@@ -7645,6 +7646,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
           DBUG_PRINT("info",
                      ("Optimze index %s returned %d", 
                       index->getName(), error));
+          ih.close();
           ERR_RETURN(ndb->getNdbError());
           
         }
@@ -7668,6 +7670,7 @@ int ha_ndbcluster::ndb_optimize_table(TH
           DBUG_PRINT("info",
                      ("Optimze unique index %s returned %d", 
                       unique_index->getName(), error));
+          ih.close();
           ERR_RETURN(ndb->getNdbError());
         } 
         while((result= ih.next()) == 1)
diff -Nrup a/storage/ndb/include/kernel/AttributeHeader.hpp b/storage/ndb/include/kernel/AttributeHeader.hpp
--- a/storage/ndb/include/kernel/AttributeHeader.hpp	2008-03-18 15:12:37 +08:00
+++ b/storage/ndb/include/kernel/AttributeHeader.hpp	2008-04-02 18:19:02 +08:00
@@ -50,6 +50,7 @@ public:
   STATIC_CONST( COPY_ROWID   = 0xFFF1 );
   STATIC_CONST( READ_ALL     = 0xFFF0 );
   STATIC_CONST( READ_LCP     = 0xFFEF );
+  STATIC_CONST( FIXPAGE_SIZE = 0xFFEE );
   
   /**
    * Optimize pseudo column and optimization options
diff -Nrup a/storage/ndb/include/kernel/kernel_types.h b/storage/ndb/include/kernel/kernel_types.h
--- a/storage/ndb/include/kernel/kernel_types.h	2006-12-24 03:20:03 +08:00
+++ b/storage/ndb/include/kernel/kernel_types.h	2008-04-02 18:19:09 +08:00
@@ -35,6 +35,7 @@ enum Operation_t {
 #if 0
   ,ZREAD_CONSISTENT = 6
 #endif
+  ,ZMOVE    = 7
 };
 
 /**
diff -Nrup a/storage/ndb/include/ndbapi/NdbDictionary.hpp b/storage/ndb/include/ndbapi/NdbDictionary.hpp
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp	2008-02-20 16:34:18 +08:00
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp	2008-04-02 18:19:10 +08:00
@@ -572,6 +572,7 @@ public:
     static const Column * ANY_VALUE;
     static const Column * COPY_ROWID;
     static const Column * OPTIMIZE;
+    static const Column * FIXPAGE_SIZE;
     
     int getSizeInBytes() const;
 
diff -Nrup a/storage/ndb/include/ndbapi/NdbOperation.hpp b/storage/ndb/include/ndbapi/NdbOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-02-20 20:30:12 +08:00
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	2008-04-02 18:19:12 +08:00
@@ -419,7 +419,7 @@ public:
   int  setValue(const char* anAttrName, double aValue);
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   int  setAnyValue(Uint32 aValue);
-  int  setOptimize(Uint32 options);
+  int  setOptimize(Uint64 options);
 #endif
 
 #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
@@ -869,6 +869,7 @@ public:
     DeleteRequest = 3,            ///< Delete Operation
     WriteRequest = 4,             ///< Write Operation
     ReadExclusive = 5,            ///< Read exclusive
+    MoveRequest = 7,              ///< Move operation, equal to ZMOVE in kernel_types.h
     OpenScanRequest,              ///< Scan Operation
     OpenRangeScanRequest,         ///< Range scan operation
     NotDefined2,                  ///< Internal for debugging
@@ -986,7 +987,8 @@ public:
                  OO_PARTITION_ID = 0x08, 
                  OO_INTERPRETED  = 0x10,
                  OO_ANYVALUE     = 0x20,
-                 OO_CUSTOMDATA   = 0x40 };
+                 OO_CUSTOMDATA   = 0x40,
+                 OO_OPTIMIZE     = 0x80};
 
     /* An operation-specific abort option.
      * Only necessary if the default abortoption behaviour
@@ -1015,6 +1017,19 @@ public:
 
     /* customData ptr for this operation */
     void * customData;
+
+    /* 64 bits table optimization flags
+     * high 32bits stand for page no. of ROWID;
+     * in low 32bits, high 16bits stand for page index of ROWID,
+     * low 16bits stand for flags of optimizing table;
+     * when optimization flags stand for VARPART, ROWID bits are applicable;
+     * 0123456701234567012345670123456701234567012345670123456701234567
+     * ppppppppppppppppppppppppppppppppiiiiiiiiiiiiiiiiffffffffffffffff
+     * p: page no. of ROWID when move fixpart to
+     * i: page idx of ROWID when move fixpart to
+     * f: flags of optimization, such as optimze varpart, optimze fixpart, etc.
+     */
+    Uint64 optimizeFlags;
   };
 
 
@@ -1418,6 +1433,9 @@ protected:
   Uint32 m_numExtraSetValues;
 
   Uint32 m_any_value;                           // Valid if m_use_any_value!=0
+
+  bool m_use_optimize_flags;
+  Uint64 m_optimize_flags;
 
   // Blobs in this operation
   NdbBlob* theBlobList;
diff -Nrup a/storage/ndb/include/ndbapi/NdbScanOperation.hpp b/storage/ndb/include/ndbapi/NdbScanOperation.hpp
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2008-02-19 23:00:27 +08:00
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2008-04-02 18:19:13 +08:00
@@ -55,6 +55,10 @@ public:
     */
     SF_OrderBy = (1 << 24),
     /* Index scan in descending order, instead of default ascending. */
+    /*
+       Now we also use SF_TupScan OR SF_Descending flags to implement
+       a reversed order of logcial storage location of tuple 
+    */
     SF_Descending = (2 << 24),
     /*
       Enable @ref get_range_no (index scan only).
@@ -364,6 +368,27 @@ public:
                                          const unsigned char *mask= 0,
                                          const NdbOperation::OperationOptions *opts = 0,
                                          Uint32 sizeOfOptions = 0);
+
+  /*
+   * Move the current tuple include two aspects:
+   * 1) move varpart:
+   *    let kernel decide whether the varpart of tuple need to be moved and where to move
+   * 2) move fixpart:
+   *    we specify the rowid of destion where move the fixpart of tuple to
+   *
+   * Parameters:
+   * Uint32 optimize_option:
+   *   == 1, to move varpart
+   *   == 2, to move fixpart
+   *   == 3, to move both varpart and fixpart
+   * Uint32 page_no, Uint32 page_idx:
+   *   the rowid of destination tuple where we want move current tuple to  
+   */
+  const NdbOperation *moveCurrentTuple(NdbTransaction *takeOverTrans, 
+                                       const NdbRecord *attr_rec,
+                                       const Uint32 optimize_option,
+                                       const Uint32 page_no,
+                                       const Uint32 page_idx);
 
   /* Delete the current tuple. NdbRecord version.
    * The tuple can be read before being deleted.  Specify the columns to read
diff -Nrup a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2008-02-03 21:16:33 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2008-04-02 18:19:14 +08:00
@@ -1099,6 +1099,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
     switch (op) {
     case ZREAD:
     case ZUPDATE:
+    case ZMOVE:
     case ZDELETE:
     case ZWRITE:
     case ZSCAN_OP:
@@ -1144,6 +1145,21 @@ void Dbacc::execACCKEYREQ(Signal* signal
 	operationRecPtr.p->m_op_bits = opbits;
 	return;
       } else {
+        if (op == ZMOVE) {
+          //TODO, for fixpart moving
+          /**
+           * We need update element data since we need to update 
+           * acc key after moving fixpart of optimizing table,
+           * and this should not influnce lock issue.
+           * On NON-primary fragment node, need to update forcely,
+           * because normally cannot get lock in commit phase.
+           */
+          Uint32 eh = gePageptr.p->word32[tgeElementptr];
+          operationRecPtr.p->elementPage = gePageptr.i;
+          operationRecPtr.p->elementContainer = tgeContainerptr;
+          operationRecPtr.p->elementPointer = tgeElementptr;
+          operationRecPtr.p->elementIsforward = tgeForward;
+        }
         jam();
         accIsLockedLab(signal, lockOwnerPtr);
         return;
@@ -1174,6 +1190,7 @@ void Dbacc::execACCKEYREQ(Signal* signal
       break;
     case ZREAD:
     case ZUPDATE:
+    case ZMOVE:
     case ZDELETE:
     case ZSCAN_OP:
       jam();
@@ -1812,6 +1829,7 @@ operator<<(NdbOut & out, Dbacc::Operatio
   case ZREAD: out << "READ "; read = true; break;
   case ZINSERT: out << "INSERT "; break;
   case ZUPDATE: out << "UPDATE "; break;
+  case ZMOVE: out << "MOVE "; break;
   case ZDELETE: out << "DELETE "; break;
   case ZWRITE: out << "WRITE "; break;
   case ZSCAN_OP: out << "SCAN "; read = true; break;
diff -Nrup a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-02-06 20:18:07 +08:00
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-04-02 18:19:22 +08:00
@@ -128,6 +128,7 @@ operator<<(NdbOut& out, Operation_t op)
   case ZREAD_EX: out << "READ-EX"; break;
   case ZINSERT: out << "INSERT"; break;
   case ZUPDATE: out << "UPDATE"; break;
+  case ZMOVE: out << "MOVE"; break;
   case ZDELETE: out << "DELETE"; break;
   case ZWRITE: out << "WRITE"; break;
   }
@@ -3730,7 +3731,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal
   regTcPtr->m_disk_table = tabptr.p->m_disk_table;
   if(refToBlock(signal->senderBlockRef()) == RESTORE)
     regTcPtr->m_disk_table &= !LqhKeyReq::getNoDiskFlag(Treqinfo);
-  else if(op == ZREAD || op == ZREAD_EX || op == ZUPDATE)
+  else if(op == ZREAD || op == ZREAD_EX || op == ZUPDATE || op == ZMOVE)
     regTcPtr->m_disk_table &= !LqhKeyReq::getNoDiskFlag(Treqinfo);
   
   tabptr.p->usageCount++;
@@ -3982,6 +3983,7 @@ void Dblqh::prepareContinueAfterBlockedL
     switch (regTcPtr->operation) {
     case ZREAD: TRACENR("READ"); break;
     case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZMOVE: TRACENR("MOVE"); break;
     case ZWRITE: TRACENR("WRITE"); break;
     case ZINSERT: TRACENR("INSERT"); break;
     case ZDELETE: TRACENR("DELETE"); break;
@@ -6550,6 +6552,7 @@ void Dblqh::commitContinueAfterBlockedLa
 	switch (regTcPtr.p->operation) {
 	case ZREAD: TRACENR("READ"); break;
 	case ZUPDATE: TRACENR("UPDATE"); break;
+	case ZMOVE: TRACENR("MOVE"); break;
 	case ZWRITE: TRACENR("WRITE"); break;
 	case ZINSERT: TRACENR("INSERT"); break;
 	case ZDELETE: TRACENR("DELETE"); break;
@@ -7043,6 +7046,7 @@ void Dblqh::execACCKEYREF(Signal* signal
     switch (tcPtr->operation) {
     case ZREAD: TRACENR("READ"); break;
     case ZUPDATE: TRACENR("UPDATE"); break;
+    case ZMOVE: TRACENR("MOVE"); break;
     case ZWRITE: TRACENR("WRITE"); break;
     case ZINSERT: TRACENR("INSERT"); break;
     case ZDELETE: TRACENR("DELETE"); break;
@@ -15924,6 +15928,7 @@ void Dblqh::logLqhkeyrefLab(Signal* sign
   Uint32 result = returnExecLog(signal);
   switch (tcConnectptr.p->operation) {
   case ZUPDATE:
+  case ZMOVE:
   case ZDELETE:
     jam();
     if (unlikely(terrorCode != ZNO_TUPLE_FOUND))
@@ -15963,6 +15968,7 @@ error:
 	     " Failed op (%s) during REDO table: %d fragment: %d err: %d",
 	     tcConnectptr.p->operation == ZINSERT ? "INSERT" :
 	     tcConnectptr.p->operation == ZUPDATE ? "UPDATE" :
+	     tcConnectptr.p->operation == ZMOVE   ? "MOVE"   :
 	     tcConnectptr.p->operation == ZDELETE ? "DELETE" :
 	     tcConnectptr.p->operation == ZWRITE ? "WRITE" : "<unknown>",
 	     tcConnectptr.p->tableref,
@@ -19033,6 +19039,7 @@ Dblqh::match_and_print(Signal* signal, P
       break;
     case ZINSERT: op = "INSERT"; break;
     case ZUPDATE: op = "UPDATE"; break;
+    case ZMOVE  : op = "MOVE"  ; break;
     case ZDELETE: op = "DELETE"; break;
     case ZWRITE: op = "WRITE"; break;
     }
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-02-20 16:34:18 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2008-04-02 18:19:22 +08:00
@@ -2931,6 +2931,7 @@ void Dbtc::execTCKEYREQ(Signal* signal) 
     c_counters.cwriteCount = TwriteCount + 1;
     switch (TOperationType) {
     case ZUPDATE:
+    case ZMOVE:
     case ZINSERT:
     case ZDELETE:
     case ZWRITE:
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-03-19 20:57:40 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2008-04-02 18:19:23 +08:00
@@ -516,6 +516,7 @@ typedef Ptr<Fragoperrec> FragoperrecPtr;
       SCAN_DD        = 0x01,        // scan disk pages
       SCAN_VS        = 0x02,        // page format is var size
       SCAN_LCP       = 0x04,        // LCP mem page scan
+      SCAN_DESCEND   = 0x08,        // scan in descending order for mem tuple
       SCAN_LOCK_SH   = 0x10,        // lock mode shared
       SCAN_LOCK_EX   = 0x20,        // lock mode exclusive
       SCAN_LOCK_WAIT = 0x40,        // lock wait
@@ -1565,6 +1566,7 @@ struct KeyReqStruct {
   PagePtr m_disk_page_ptr;       //
   Local_key m_row_id;
   Uint32 optimize_options;
+  Local_key optimize_row_id;  /* move fixpart to when optimizing table */
   
   bool            dirty_op;
   bool            interpreted_exec;
@@ -1980,6 +1982,15 @@ private:
                       Tablerec* regTabPtr,
                       KeyReqStruct* req_struct,
 		      bool disk);
+
+//------------------------------------------------------------------
+//------------------------------------------------------------------
+  int handleMoveReq(Signal* signal,
+                    Operationrec* regOperPtr,
+                    Fragrecord* regFragPtr,
+                    Tablerec* regTabPtr,
+                    KeyReqStruct* req_struct, 
+                    bool disk);
 
 //------------------------------------------------------------------
 //------------------------------------------------------------------
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-03-25 23:47:04 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2008-04-02 18:19:23 +08:00
@@ -205,7 +205,12 @@ Dbtup::commit_operation(Signal* signal,
 			Tablerec* regTabPtr)
 {
   ndbassert(regOperPtr->op_struct.op_type != ZDELETE);
-  
+
+  if(regOperPtr->op_struct.op_type == ZMOVE) {
+    //TODO, for fixpart ???
+    return;
+  }
+ 
   Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
   Uint32 save= tuple_ptr->m_operation_ptr_i;
   Uint32 bits= tuple_ptr->m_header_bits;
@@ -697,6 +702,7 @@ skip_disk:
    */
   regOperPtr.p->nextActiveOp = RNIL;
   regOperPtr.p->prevActiveOp = RNIL;
+
   if(tuple_ptr->m_operation_ptr_i == regOperPtr.i)
   {
     jam();
@@ -754,7 +760,8 @@ Dbtup::set_commit_change_mask_info(const
   Uint32 masklen = (regTabPtr->m_no_of_attributes + 31) >> 5;
   if (regOperPtr->m_copy_tuple_location.isNull())
   {
-    ndbassert(regOperPtr->op_struct.op_type == ZDELETE);
+    ndbassert(regOperPtr->op_struct.op_type == ZDELETE ||
+              regOperPtr->op_struct.op_type == ZMOVE);
     req_struct->changeMask.set();
   }
   else
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-03-28 00:04:19 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2008-04-02 18:19:24 +08:00
@@ -846,6 +846,17 @@ void Dbtup::execTUPKEYREQ(Signal* signal
        sendTUPKEYCONF(signal, &req_struct, regOperPtr);
        return;
      }
+     else if(Roptype == ZMOVE)
+     {
+       jam();
+       if (handleMoveReq(signal, regOperPtr, regFragPtr, regTabPtr,
+                         &req_struct, disk_page != RNIL) == -1) {
+         return;
+       }
+
+       sendTUPKEYCONF(signal, &req_struct, regOperPtr);
+       return;
+     }
      else
      {
        ndbrequire(false); // Invalid op type
@@ -1057,8 +1068,6 @@ int Dbtup::handleUpdateReq(Signal* signa
   tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
   operPtrP->tupVersion= tup_version;
 
-  req_struct->optimize_options = 0;
-  
   if (!req_struct->interpreted_exec) {
     jam();
     int retValue = updateAttributes(req_struct,
@@ -1076,23 +1085,6 @@ int Dbtup::handleUpdateReq(Signal* signa
                           change_mask_ptr,
                           req_struct->changeMask.rep.data);
 
-  switch (req_struct->optimize_options) {
-    case AttributeHeader::OPTIMIZE_MOVE_VARPART:
-      /**
-       * optimize varpart of tuple,  move varpart of tuple from
-       * big-free-size page list into small-free-size page list
-       */
-      if(base->m_header_bits & Tuple_header::VAR_PART)
-        optimize_var_part(req_struct, base, operPtrP,
-                          regFragPtr, regTabPtr);
-      break;
-    case AttributeHeader::OPTIMIZE_MOVE_FIXPART:
-      //TODO: move fix part of tuple
-      break;
-    default:
-      break;
-  }
-
   if (regTabPtr->need_shrink())
   {  
     shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
@@ -1115,6 +1107,78 @@ int Dbtup::handleUpdateReq(Signal* signa
   
 error:
   tupkeyErrorLab(signal);  
+  return -1;
+}
+
+/* ---------------------------------------------------------------- */
+/* ----------------------------- MOVE ----------------------------- */
+/* ---------------------------------------------------------------- */
+int Dbtup::handleMoveReq(Signal* signal,
+                         Operationrec* operPtrP,
+                         Fragrecord* regFragPtr,
+                         Tablerec* regTabPtr,
+                         KeyReqStruct* req_struct,
+                         bool disk) 
+{
+  disk = disk ||
+         (req_struct->m_tuple_ptr->m_header_bits & Tuple_header::DISK_INLINE);
+  /* now we only support moving tuple based on memory */
+  if (unlikely(disk))
+    return 0;
+
+  /**
+   * Check consistency before move 
+   */
+  Uint32  tup_version= req_struct->m_tuple_ptr->get_tuple_version();
+  if ((regTabPtr->m_bits & Tablerec::TR_Checksum) &&
+      (calculateChecksum(req_struct->m_tuple_ptr, regTabPtr) != 0)) 
+  {
+    terrorCode= ZTUPLE_CORRUPTED_ERROR;
+    goto error;
+  }
+
+  tup_version= (tup_version + 1) & ZTUP_VERSION_MASK;
+  operPtrP->tupVersion= tup_version;
+
+  req_struct->optimize_options = 0;
+  req_struct->optimize_row_id.setNull();
+
+  if (!req_struct->interpreted_exec) {
+    jam();
+    int retValue = updateAttributes(req_struct,
+                                    &cinBuffer[0],
+                                    req_struct->attrinfo_len);
+    if (unlikely(retValue == -1))
+      goto error;
+  } else {
+    ndbrequire(false);
+    goto error;
+  }
+
+  if (req_struct->optimize_options) {
+    if (req_struct->optimize_options & AttributeHeader::OPTIMIZE_MOVE_VARPART) {
+      /**
+       * optimize varpart of tuple,  move varpart of tuple from
+       * big-free-size page list into small-free-size page list
+       */
+      if(req_struct->m_tuple_ptr->m_header_bits & Tuple_header::VAR_PART)
+        optimize_var_part(req_struct, req_struct->m_tuple_ptr, operPtrP,
+                          regFragPtr, regTabPtr);
+    }
+    if (req_struct->optimize_options & AttributeHeader::OPTIMIZE_MOVE_FIXPART) {
+      //TODO: move fix part of tuple
+    }
+  }
+
+  req_struct->m_tuple_ptr->set_tuple_version(tup_version);
+  if (regTabPtr->m_bits & Tablerec::TR_Checksum) {
+    jam();
+    setChecksum(req_struct->m_tuple_ptr, regTabPtr);
+  }
+  return 0;
+
+error:
+  tupkeyErrorLab(signal);
   return -1;
 }
 
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-03-25 21:35:27 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp	2008-04-02 18:19:25 +08:00
@@ -1557,13 +1557,15 @@ int Dbtup::updateAttributes(KeyReqStruct
     {
       jam();
       Uint32 sz= ahIn.getDataSize();
-      ndbrequire(sz == 1);
+      ndbrequire(sz == 2);
       /**
        * get optimize options
        */
-      req_struct->optimize_options = * (inBuffer + inBufIndex + 1);
-      req_struct->optimize_options &=
-        AttributeHeader::OPTIMIZE_OPTIONS_MASK;
+      Uint64 options = 0;
+      memcpy(&options, inBuffer+inBufIndex+1, sz << 2);
+      req_struct->optimize_options = options & AttributeHeader::OPTIMIZE_OPTIONS_MASK;
+      req_struct->optimize_row_id.m_page_no = options >> 32;
+      req_struct->optimize_row_id.m_page_idx = (options & 0xFFFF0000) >> 16;
       inBufIndex += 1 + sz;
       req_struct->in_buf_index = inBufIndex;
     }
@@ -2222,6 +2224,10 @@ Dbtup::read_pseudo(const Uint32 * inBuff
   }
   case AttributeHeader::ROW_SIZE:
     outBuffer[1] = tabptr.p->m_offsets[MM].m_fix_header_size << 2;
+    sz = 1;
+    break;
+  case AttributeHeader::FIXPAGE_SIZE:
+    outBuffer[1] = Fix_page::DATA_WORDS;  /* in words */
     sz = 1;
     break;
   case AttributeHeader::ROW_COUNT:
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-03-25 23:47:04 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2008-04-02 18:19:25 +08:00
@@ -119,6 +119,9 @@ Dbtup::execACC_SCANREQ(Signal* signal)
       ndbrequire((bits & ScanOp::SCAN_DD) == 0);
       ndbrequire((bits & ScanOp::SCAN_LOCK) == 0);
     }
+
+    if (AccScanReq::getDescendingFlag(req->requestInfo))
+      bits |= ScanOp::SCAN_DESCEND;
     
     // set up scan op
     new (scanPtr.p) ScanOp();
@@ -570,7 +573,11 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
   }
   if (! (bits & ScanOp::SCAN_DD)) {
     key.m_file_no = ZNIL;
-    key.m_page_no = 0;
+    if (bits & ScanOp::SCAN_DESCEND && frag.noOfPages > 0)
+      key.m_page_no = frag.m_max_page_no - 1;
+    else
+      key.m_page_no = 0; 
+
     pos.m_get = ScanPos::Get_page_mm;
     // for MM scan real page id is cached for efficiency
     pos.m_realpid_mm = RNIL;
@@ -588,7 +595,19 @@ Dbtup::scanFirst(Signal*, ScanOpPtr scan
     key.m_page_no = ext->m_first_page_no;
     pos.m_get = ScanPos::Get_page_dd;
   }
-  key.m_page_idx = 0;
+  if (bits & ScanOp::SCAN_DESCEND) {
+    TablerecPtr tablePtr;
+    tablePtr.i = scan.m_tableId;
+    ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
+    Tablerec& table = *tablePtr.p;
+    const bool mm = (bits & ScanOp::SCAN_DD);
+    Uint32 fixpart_size = table.m_offsets[mm].m_fix_header_size;
+    const Uint32 page_size = Fix_page::DATA_WORDS;
+    ndbassert(page_size >= fixpart_size);
+    /* set page_idx to maximal page index in a fixsize page */
+    key.m_page_idx = page_size - (page_size % fixpart_size) - fixpart_size;
+  } else
+    key.m_page_idx = 0;
   // let scanNext() do the work
   scan.m_state = ScanOp::Next;
 }
@@ -623,6 +642,10 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
   Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
   Uint32 size = table.m_offsets[mm].m_fix_header_size;
 
+  const Uint32 page_size = Fix_page::DATA_WORDS;
+  ndbassert(page_size >= size);
+  const Uint16 max_page_idx = page_size - (page_size % size) - size;
+  
   if (lcp && lcp_list != RNIL)
     goto found_lcp_keep;
 
@@ -630,7 +653,13 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
   case ScanPos::Get_next_tuple:
   case ScanPos::Get_next_tuple_fs:
     jam();
-    key.m_page_idx += size;
+    if (bits & ScanOp::SCAN_DESCEND) {
+      if (key.m_page_idx == 0)
+        key.m_page_idx = (Uint16)-1;  //just point out idx is Uint16
+      else
+        key.m_page_idx -= size;
+    } else
+      key.m_page_idx += size;
     // fall through
   case ScanPos::Get_tuple:
   case ScanPos::Get_tuple_fs:
@@ -671,8 +700,13 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
       // move to next logical TUP page
       jam();
       {
-        key.m_page_no++;
-        if (key.m_page_no >= frag.m_max_page_no) {
+        if (bits & ScanOp::SCAN_DESCEND)
+          key.m_page_no--;
+        else
+          key.m_page_no++;
+        if (((bits & ScanOp::SCAN_DESCEND) && key.m_page_no == ((Uint32)-1)) ||
+            (!(bits & ScanOp::SCAN_DESCEND) && key.m_page_no >= frag.m_max_page_no))
+        {
           jam();
 
           if ((bits & ScanOp::SCAN_NR) && (scan.m_endPage != RNIL))
@@ -691,7 +725,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
           return true;
         }
     cont:
-        key.m_page_idx = 0;
+        if (bits & ScanOp::SCAN_DESCEND)
+          key.m_page_idx = max_page_idx;
+        else
+          key.m_page_idx = 0;
+
         pos.m_get = ScanPos::Get_page_mm;
         // clear cached value
         pos.m_realpid_mm = RNIL;
@@ -867,7 +905,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
       // move to next fixed size tuple
       jam();
       {
-        key.m_page_idx += size;
+        if (bits & ScanOp::SCAN_DESCEND)
+        {
+          if (key.m_page_idx == 0)
+            key.m_page_idx = (Uint16)-1;  //just point out idx is Uint16
+          else
+            key.m_page_idx -= size;
+        }
+        else
+          key.m_page_idx += size;
         pos.m_get = ScanPos::Get_tuple_fs;
       }
       /*FALLTHRU*/
@@ -877,7 +923,8 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
       jam();
       {
         Fix_page* page = (Fix_page*)pos.m_page;
-        if (key.m_page_idx + size <= Fix_page::DATA_WORDS) 
+        if (((bits & ScanOp::SCAN_DESCEND) && (key.m_page_idx != ((Uint16)-1))) |
+            (!(bits & ScanOp::SCAN_DESCEND) && (key.m_page_idx + size <= Fix_page::DATA_WORDS)))
 	{
 	  pos.m_get = ScanPos::Get_next_tuple_fs;
 #ifdef VM_TRACE
diff -Nrup a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-03-19 20:56:57 +08:00
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2008-04-02 18:19:26 +08:00
@@ -502,6 +502,10 @@ void Dbtup::checkDetachedTriggers(KeyReq
   }
   
   switch(regOperPtr->op_struct.op_type) {
+  case(ZMOVE):
+      //TODO, for fixpart moving,
+      jam();
+      return;
   case(ZINSERT):
     jam();
     if (regTablePtr->subscriptionInsertTriggers.isEmpty()) {
@@ -1225,6 +1229,10 @@ Dbtup::executeTuxCommitTriggers(Signal* 
       return;
     jam();
     tupVersion= regOperPtr->tupVersion;
+  } else if (regOperPtr->op_struct.op_type == ZMOVE) {
+    //TODO, for fixpart moving
+    tupVersion= regOperPtr->tupVersion;
+    return;
   } else {
     ndbrequire(false);
     tupVersion= 0; // remove warning
diff -Nrup a/storage/ndb/src/ndbapi/NdbBlob.cpp b/storage/ndb/src/ndbapi/NdbBlob.cpp
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp	2008-02-19 23:00:27 +08:00
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp	2008-04-02 18:19:26 +08:00
@@ -441,6 +441,7 @@ NdbBlob::isKeyOp()
   return
     theNdbOp->theOperationType == NdbOperation::InsertRequest ||
     theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
+    theNdbOp->theOperationType == NdbOperation::MoveRequest ||
     theNdbOp->theOperationType == NdbOperation::WriteRequest ||
     theNdbOp->theOperationType == NdbOperation::ReadRequest ||
     theNdbOp->theOperationType == NdbOperation::ReadExclusive ||
@@ -497,6 +498,7 @@ NdbBlob::isReadOnlyOp()
   return ! (
     theNdbOp->theOperationType == NdbOperation::InsertRequest ||
     theNdbOp->theOperationType == NdbOperation::UpdateRequest ||
+    theNdbOp->theOperationType == NdbOperation::MoveRequest ||
     theNdbOp->theOperationType == NdbOperation::WriteRequest
   );
 }
diff -Nrup a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-03-28 06:48:31 +08:00
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp	2008-04-02 18:19:26 +08:00
@@ -405,8 +405,13 @@ NdbColumnImpl::create_pseudo(const char 
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 2;
   } else if(!strcmp(name, "NDB$OPTIMIZE")){
-    col->setType(NdbDictionary::Column::Unsigned);
+    col->setType(NdbDictionary::Column::Bigunsigned);
     col->m_impl.m_attrId = AttributeHeader::OPTIMIZE;
+    col->m_impl.m_attrSize = 8;
+    col->m_impl.m_arraySize = 1;
+  } else if(!strcmp(name, "NDB$FIXPAGE_SIZE")){
+    col->setType(NdbDictionary::Column::Unsigned);
+    col->m_impl.m_attrId = AttributeHeader::FIXPAGE_SIZE;
     col->m_impl.m_attrSize = 4;
     col->m_impl.m_arraySize = 1;
   } else {
@@ -1204,13 +1209,36 @@ NdbIndexImpl::getIndexTable() const
  */
 
 NdbOptimizeTableHandleImpl::NdbOptimizeTableHandleImpl(NdbDictionary::OptimizeTableHandle &f)
-  : NdbDictionary::OptimizeTableHandle(* this),
-    m_state(NdbOptimizeTableHandleImpl::CREATED),
-    m_ndb(NULL), m_table(NULL),
+  : NdbDictionary::OptimizeTableHandle(* this), MAX_LIST_ELMT_NUM(32),
+    m_state(NdbOptimizeTableHandleImpl::CREATED), m_ndb(NULL), m_table(NULL),
     m_table_queue(NULL), m_table_queue_first(NULL), m_table_queue_end(NULL),
-    m_trans(NULL), m_scan_op(NULL),
+    SIZE_NULL((Uint32)-1), m_row_size(0), m_fixpage_size(0),
+    m_forward_trans(NULL), m_forward_scan_op(NULL),
+    m_backward_trans(NULL), m_backward_scan_op(NULL), 
     m_facade(this)
 {
+  m_forward_scan_status.init();
+  m_backward_scan_status.init();
+
+  m_forward_extraGets[0].column = NdbDictionary::Column::FRAGMENT;
+  m_forward_extraGets[0].appStorage = 0;
+  m_forward_extraGets[0].recAttr = 0;
+  m_forward_extraGets[1].column = NdbDictionary::Column::ROWID;
+  m_forward_extraGets[1].appStorage = 0;
+  m_forward_extraGets[1].recAttr = 0;
+  m_forward_extraGets[2].column = NdbDictionary::Column::ROW_SIZE;
+  m_forward_extraGets[2].appStorage = 0;
+  m_forward_extraGets[2].recAttr = 0;
+  m_forward_extraGets[3].column = NdbDictionary::Column::FIXPAGE_SIZE;
+  m_forward_extraGets[3].appStorage = 0;
+  m_forward_extraGets[3].recAttr = 0;
+
+  m_backward_extraGets[0].column = NdbDictionary::Column::FRAGMENT;
+  m_backward_extraGets[0].appStorage = 0;
+  m_backward_extraGets[0].recAttr = 0;
+  m_backward_extraGets[1].column = NdbDictionary::Column::ROWID;
+  m_backward_extraGets[1].appStorage = 0;
+  m_backward_extraGets[1].recAttr = 0;
 }
 
 NdbOptimizeTableHandleImpl::~NdbOptimizeTableHandleImpl()
@@ -1220,65 +1248,229 @@ NdbOptimizeTableHandleImpl::~NdbOptimize
   DBUG_VOID_RETURN;
 }
 
+void NdbOptimizeTableHandleImpl::close_trans_op()
+{
+  if (m_forward_scan_op) {
+    m_forward_scan_op->close();
+    m_forward_scan_op = NULL;
+  }
+  if (m_forward_trans) {
+    m_ndb->closeTransaction(m_forward_trans);
+    m_forward_trans = NULL;
+  }
+  if (m_backward_scan_op) {
+    m_backward_scan_op->close();
+    m_backward_scan_op = NULL;
+  }
+  if (m_backward_trans) {
+    m_ndb->closeTransaction(m_backward_trans);
+    m_backward_trans = NULL;
+  }
+}
+
+int NdbOptimizeTableHandleImpl::init_page_row_size()
+{
+  DBUG_ENTER("NdbOptimizeTableImpl::init_page_row_size()");
+  int noRetries = 100;
+  if (m_table_queue) {
+    const NdbTableImpl * table = m_table_queue->table;
+
+    while (noRetries-- > 0) {
+      NdbSleep_MilliSleep(50);
+      close_trans_op();
+      /*
+       * Start forward scan to get size of fix page
+       * and size of one row in a fix page;
+       * for each table, we should get the size
+       */
+      if (!m_forward_trans)
+        m_forward_trans = m_ndb->startTransaction();
+      if (!m_forward_trans) {
+        if (m_ndb->getNdbError().status == NdbError::TemporaryError)
+          continue;   /* next retry */
+        break;  /* goto do_error */
+      }
+
+      NdbScanOperation::ScanOptions forward_scan_options;
+      forward_scan_options.optionsPresent =
+        NdbScanOperation::ScanOptions::SO_GETVALUE;
+
+      forward_scan_options.extraGetValues = &m_forward_extraGets[0];
+      forward_scan_options.numExtraGetValues = 4;
+
+      if ((m_forward_scan_op =
+          m_forward_trans->scanTable(table->m_ndbrecord,
+                                     NdbOperation::LM_CommittedRead,
+                                     table->m_emptyMask, /* no columns */
+                                     &forward_scan_options,
+                                     sizeof(NdbScanOperation::ScanOptions)))
+          == NULL) {
+        break;  /* goto do_error */
+      }
+
+      /* need only get one tuple to get row size and fixpage size */
+      m_forward_recAttrRowsize = m_forward_extraGets[2].recAttr;
+      m_forward_recAttrFixpagesize = m_forward_extraGets[3].recAttr;
+
+      /* to commit to get row_size & fixpage_size */
+      if (m_forward_trans->execute(NdbTransaction::NoCommit) != 0) {
+        if (m_forward_trans->getNdbError().status == NdbError::TemporaryError)
+          continue;  /* next retry */
+        m_ndb->getNdbError(m_forward_trans->getNdbError().code);
+        break;  /* goto do_error */
+      }
+
+      const char* dummyOutRowPtr;
+      int check = 0;
+      if ((check = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+                                                 true,
+                                                 false)) == 0) {
+        m_row_size = m_forward_recAttrRowsize->u_32_value() >> 2;  /* into words */
+        m_fixpage_size = m_forward_recAttrFixpagesize->u_32_value();
+      } else if (check == -1) {
+        if (m_forward_trans->getNdbError().status == NdbError::TemporaryError) {
+          if (noRetries > 0)
+            continue;  /* next retry */
+        }
+        m_ndb->getNdbError(m_forward_trans->getNdbError().code);
+        break;  /* goto do_error */
+      } else {
+        /* the table is empty, no rows */
+        m_row_size = m_fixpage_size = SIZE_NULL;
+      }
+
+      close_trans_op();
+      DBUG_RETURN(0);  /* ok! initialized size of page and row */
+    }
+  }
+/*do_error: */
+  close_trans_op();
+  DBUG_PRINT("info", ("NdbOptimizeTableImpl::init_page_row_size() failed"));
+  DBUG_RETURN(-1);
+}
+
 int NdbOptimizeTableHandleImpl::start()
 {
   int noRetries = 100;
-  DBUG_ENTER("NdbOptimizeTableImpl::start");
+  DBUG_ENTER("NdbOptimizeTableImpl::start()");
 
-  if (m_table_queue)
-  {
+  if (m_table_queue) {
     const NdbTableImpl * table = m_table_queue->table;
 
+    if (m_row_size == 0 && m_fixpage_size == 0) {
+      if (init_page_row_size())
+        goto do_error;
+      else if (m_row_size == SIZE_NULL && m_fixpage_size == SIZE_NULL) {
+        /* empty table after init_page_row_size() */
+        m_state = NdbOptimizeTableHandleImpl::INITIALIZED;
+        DBUG_RETURN(0);
+      }
+    }
+
+    m_forward_scan_status.free_list();
+    m_forward_scan_status.init();
+    m_backward_scan_status.init();
     /*
-     * Start/Restart transaction
+     * Start/Restart forward and backward transactions
      */
-    while (noRetries-- > 0)
-    {
-      if (m_trans && (m_trans->restart() != 0))
-      {
-        m_ndb->closeTransaction(m_trans);
-        m_trans = NULL;
+    while (noRetries-- > 0) {
+      NdbSleep_MilliSleep(50);
+      close_trans_op();
+      if (!m_forward_trans)
+        m_forward_trans = m_ndb->startTransaction();
+      if (!m_forward_trans) {
+        if (m_ndb->getNdbError().status == NdbError::TemporaryError) {
+          if (noRetries > 0)
+            continue;   /* next retry */
+        }
+        goto do_error;
       }
-      else
-        m_trans = m_ndb->startTransaction();
-      if (!m_trans)
-      {
-        if (noRetries == 0)
-          goto do_error;
-        continue;
+
+      if (!m_backward_trans)
+        m_backward_trans = m_ndb->startTransaction();
+      if (!m_backward_trans) {
+        if (m_ndb->getNdbError().status == NdbError::TemporaryError) {
+          if (noRetries > 0)
+            continue;   /* next retry */
+        }
+        goto do_error;
       }
-      
+
       /*
        * Get first scan operation
-       */ 
-      if ((m_scan_op = m_trans->getNdbScanOperation(table->m_facade)) 
-          == NULL)
-      {
-        m_ndb->getNdbError(m_trans->getNdbError().code);
+       * LM_Exclusive means that key information will be available
+       * for subsequent lock takeover operations.
+       */
+      NdbScanOperation::ScanOptions forward_scan_options, backward_scan_options;
+      forward_scan_options.optionsPresent = backward_scan_options.optionsPresent =
+        NdbScanOperation::ScanOptions::SO_SCANFLAGS |
+        NdbScanOperation::ScanOptions::SO_PARALLEL |
+        NdbScanOperation::ScanOptions::SO_GETVALUE;
+      forward_scan_options.extraGetValues = &m_forward_extraGets[0];
+      forward_scan_options.numExtraGetValues =
+        backward_scan_options.numExtraGetValues = 2;
+      forward_scan_options.scan_flags = NdbScanOperation::SF_TupScan;
+      backward_scan_options.extraGetValues = &m_backward_extraGets[0];
+      backward_scan_options.scan_flags = NdbScanOperation::SF_TupScan |
+                                         NdbScanOperation::SF_Descending;
+      /*
+       * scan tuples in fragment one by one,
+       * then we can easy to make sure move tuple in one same frament
+       */
+      forward_scan_options.parallel = backward_scan_options.parallel = 1;
+
+
+      if ((m_forward_scan_op =
+          m_forward_trans->scanTable(table->m_ndbrecord,
+                                     NdbOperation::LM_CommittedRead, /* no lock */
+                                     table->m_emptyMask, /* no columns */
+                                     &forward_scan_options,
+                                     sizeof(NdbScanOperation::ScanOptions)))
+          == NULL) {
+        m_ndb->getNdbError(m_forward_trans->getNdbError().code);
         goto do_error;
       }
-      
-      /**
-       * Define a result set for the scan.
-       */ 
-      if (m_scan_op->readTuples(NdbOperation::LM_Exclusive)) {
-        m_ndb->getNdbError(m_trans->getNdbError().code);
+      m_forward_recAttrFragno = m_forward_extraGets[0].recAttr;
+      m_forward_recAttrRowid = m_forward_extraGets[1].recAttr;
+
+      if ((m_backward_scan_op =
+          m_backward_trans->scanTable(table->m_ndbrecord,
+                                      NdbOperation::LM_Exclusive, /* exclusive lock */
+                                      table->m_emptyMask, /* no columns */
+                                      &backward_scan_options,
+                                      sizeof(NdbScanOperation::ScanOptions)))
+          == NULL) {
+        m_ndb->getNdbError(m_backward_trans->getNdbError().code);
         goto do_error;
       }
-      
+      m_backward_recAttrFragno = m_backward_extraGets[0].recAttr;
+      m_backward_recAttrRowid = m_backward_extraGets[1].recAttr;
+
       /**
        * Start scan    (NoCommit since we are only reading at this stage);
        */
-      if (m_trans->execute(NdbTransaction::NoCommit) != 0) {
-        if (m_trans->getNdbError().status == NdbError::TemporaryError)
-          continue;  /* goto next_retry */
-        m_ndb->getNdbError(m_trans->getNdbError().code);
+      if (m_forward_trans->execute(NdbTransaction::NoCommit) != 0) {
+        if (m_forward_trans->getNdbError().status == NdbError::TemporaryError) {
+          if (noRetries > 0)
+            continue;  /* goto next_retry */
+        }
+        m_ndb->getNdbError(m_forward_trans->getNdbError().code);
         goto do_error;
       }
-      break;
-    } // while (noRetries-- > 0)
+
+      if (m_backward_trans->execute(NdbTransaction::NoCommit) != 0) {
+        if (m_backward_trans->getNdbError().status == NdbError::TemporaryError) {
+          if (noRetries > 0)
+            continue;  /* goto next_retry */
+        }
+        m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+        goto do_error;
+      }
+
+      break;    /* okay, initialization is done */
+    } /* while (noRetries-- > 0) */
     m_state = NdbOptimizeTableHandleImpl::INITIALIZED;
-  } // if (m_table_queue)
+  } /* if (m_table_queue) */
   else
     m_state = NdbOptimizeTableHandleImpl::FINISHED;
 
@@ -1294,27 +1486,24 @@ int NdbOptimizeTableHandleImpl::init(Ndb
   DBUG_ENTER("NdbOptimizeTableHandleImpl::init");
   NdbDictionary::Dictionary* dict = ndb->getDictionary();
   Uint32 sz = table.m_columns.size();
-  bool found_varpart = false;
+  bool found_disk = false;
   int blob_num = table.m_noOfBlobs;
 
   m_ndb = ndb;
   m_table = &table;
 
   /**
-   * search whether there are var size columns in the table,
-   * in first step, we only optimize var part, then if the
-   * table has no var size columns, we do not do optimizing
+   * search whether the table is memory-based,
+   * currently, we only optimize memory table 
    */
   for (Uint32 i = 0; i < sz; i++) {
     const NdbColumnImpl *col = m_table->m_columns[i];
-    if (col != 0 && col->m_storageType == NDB_STORAGETYPE_MEMORY &&
-        (col->m_dynamic || col->m_arrayType != NDB_ARRAYTYPE_FIXED)) {
-      found_varpart= true;
+    if (col != 0 && col->m_storageType == NDB_STORAGETYPE_DISK) {
+      found_disk= true;
       break;
     }
   }
-  if (!found_varpart)
-  {
+  if (found_disk) {
     m_state = NdbOptimizeTableHandleImpl::FINISHED;
     DBUG_RETURN(0);
   }
@@ -1338,8 +1527,7 @@ int NdbOptimizeTableHandleImpl::init(Ndb
     blob_num--;
     const NdbTableImpl * blob_table = 
       (const NdbTableImpl *)dict->getBlobTable(m_table, c.m_attrId);
-    if (blob_table)
-    {
+    if (blob_table) {
       m_table_queue_end = new fifo_element_st(blob_table, m_table_queue_end);
     }
   }
@@ -1351,100 +1539,299 @@ int NdbOptimizeTableHandleImpl::init(Ndb
  
 int NdbOptimizeTableHandleImpl::next()
 {
-  int noRetries = 100;
-  int done, check;
   DBUG_ENTER("NdbOptimizeTableHandleImpl::next");
+  int noRetries = 100;
+  const char* dummyOutRowPtr;
 
   if (m_state == NdbOptimizeTableHandleImpl::FINISHED)
-    DBUG_RETURN(0);
+    DBUG_RETURN(0);   /* all tables are scanned */
   else if (m_state != NdbOptimizeTableHandleImpl::INITIALIZED)
     DBUG_RETURN(-1);
 
-  while (noRetries-- > 0)
-  {
-    if ((done = check = m_scan_op->nextResult(true)) == 0)
-    {
-      do 
-      {
-        /** 
-         * Get update operation
-         */
-        NdbOperation * myUpdateOp = m_scan_op->updateCurrentTuple();
-        if (myUpdateOp == 0)
-        {
-          m_ndb->getNdbError(m_trans->getNdbError().code);
+  assert(m_row_size != 0 && m_fixpage_size != 0);
+  /* empty table, then scan next table */
+  if (m_row_size == SIZE_NULL && m_fixpage_size == SIZE_NULL) {
+      fifo_element_st *current = m_table_queue;
+      m_table_queue = current->next;
+      /* Start scan of next table */
+      m_row_size = m_fixpage_size = 0;
+      if (start() != 0)
+        goto do_error;
+      DBUG_RETURN(1);
+  }
+
+  while (noRetries-- > 0) {
+
+    /*
+     * do forward scan(holes producer):
+     * if we can accomodate more holes in list, then
+     * scan table to find holes and store into list
+     */
+    if (m_forward_scan_status.list_elements_num < MAX_LIST_ELMT_NUM &&
+        m_forward_scan_status.finished == 0) {
+      int f_check1 = 0, f_check2 = 0;
+      if ((f_check1 = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+                                                    true,
+                                                    false)) == 0) {
+        do {
+          Uint32 frag_no = m_forward_recAttrFragno->u_32_value();
+          Uint32 page_no = m_forward_recAttrRowid->u_32_value();
+          //TODO, big-endian issue ?
+          Uint32 page_idx = *(Uint32*)(m_forward_recAttrRowid->aRef() + 4);
+          row_spec current_row(frag_no, page_no, page_idx);
+          row_spec prev_row = m_forward_scan_status.latest_row;
+          /*
+           * ROWID of current row should be larger than ROWID of previous row,
+           * since it is in a forward scan now.
+           */
+          if (! prev_row.is_null()) {
+            if (prev_row.frag_no == current_row.frag_no)
+              assert(current_row > prev_row);
+            else {
+              /* we already set parallel = 1, then fragment no. start from 0 to 1, 2, ... */
+              assert(current_row.frag_no > prev_row.frag_no);
+            }
+          }
+
+          m_forward_scan_status.latest_row = current_row;
+
+          /* find holes, and store holes rowid into lists */
+          if (! current_row.is_neighbor(prev_row, m_row_size, m_fixpage_size)) {
+            /* maximal page index in a fixsize page */
+            Uint32 max_idx = 
+              m_fixpage_size - (m_fixpage_size % m_row_size) - m_row_size;
+            if (prev_row.is_null() ||
+                (!prev_row.is_null() && current_row.frag_no != prev_row.frag_no)) {
+              assert(current_row.page_idx > 0);
+              row_spec hole(current_row.frag_no,
+                            current_row.page_no,
+                            0);
+              Uint32 holes_num = current_row.page_idx / m_row_size;
+              m_forward_scan_status.add_holes(hole, holes_num);
+            } else { /* in same frag */
+              if (current_row.page_no != prev_row.page_no) {
+                if (prev_row.page_idx < max_idx) {
+                  row_spec hole(prev_row.frag_no,
+                                prev_row.page_no,
+                                prev_row.page_idx + m_row_size);
+                  Uint32 holes_num = (max_idx - prev_row.page_idx) / m_row_size;
+                  m_forward_scan_status.add_holes(hole, holes_num);
+                }
+                if (current_row.page_idx > 0) {
+                  row_spec hole(current_row.frag_no,
+                                current_row.page_no,
+                                0);
+                  Uint32 holes_num = current_row.page_idx / m_row_size;
+                  m_forward_scan_status.add_holes(hole, holes_num);
+                }
+              } else { /* in same page */
+                assert(current_row.page_idx - prev_row.page_idx > m_row_size);
+                row_spec hole(prev_row.frag_no,
+                              prev_row.page_no,
+                              prev_row.page_idx + m_row_size);
+                Uint32 holes_num =
+                  (current_row.page_idx - prev_row.page_idx) / m_row_size; 
+                m_forward_scan_status.add_holes(hole, holes_num);
+              }
+            }
+            /* list is full, then suspend forward scan */
+            if (m_forward_scan_status.list_elements_num >= MAX_LIST_ELMT_NUM)
+              break;
+          } /* end if (! current_row.is_neighbor(...)) */
+
+        } while ((f_check2 = m_forward_scan_op->nextResult(&dummyOutRowPtr,
+                                                           false,
+                                                           false)) == 0);
+      } /* end of if nextResult(true, false) */
+
+      if (f_check1 == -1 || f_check2 == -1) {
+        if (m_forward_trans->getNdbError().status == NdbError::TemporaryError &&
+            noRetries > 0) {
+          if (start() != 0)
+            goto do_error;
+          continue;  /* next retry */
+        } else {
+          m_ndb->getNdbError(m_forward_trans->getNdbError().code);
           goto do_error;
         }
-        /**
-         * optimize a tuple through doing the update
-         * first step, move varpart
-         */
-        Uint32 options = 0 | AttributeHeader::OPTIMIZE_MOVE_VARPART;
-        myUpdateOp->setOptimize(options);
-        /**
-         * nextResult(false) means that the records
-         * cached in the NDBAPI are modified before
-         * fetching more rows from NDB.
-         */
-      } while ((check = m_scan_op->nextResult(false)) == 0);
-    }
+      }
+
+      if (f_check1 == 1)
+        m_forward_scan_status.finished = 1; /* forward scan is end */
+
+    } /* end if forward scan */
+
+    int need_backward_scan = 0;
+    int have_holes = (m_forward_scan_status.list_elements_num > 0 ? 1 : 0);
 
     /**
-     * Commit when all cached tuple have been updated
+     * when forward scan isnot finished and no holes,
+     * we should not do backward scan, that means, we
+     * need wait forward scan to produce holes.
      */
-    if (check != -1)
-      check = m_trans->execute(NdbTransaction::Commit);
-    
-    if (done == 1)
-    {
+    if (m_forward_scan_status.finished == 1 || have_holes)
+      need_backward_scan = 1;
+
+    /*
+     * do backward scan:
+     * --holes consumer, i.e. moving fixpart
+     * --moving varpart
+     */
+    if (need_backward_scan) {
+      int b_check1 = 0, b_check2 = 0;
+      if ((b_check1 = m_backward_scan_op->nextResult(&dummyOutRowPtr,
+                                                     true,
+                                                     false)) == 0) {
+        do {
+          /* get current tuple's frag_no and ROWID */
+          Uint32 frag_no = m_backward_recAttrFragno->u_32_value();
+          Uint32 page_no = m_backward_recAttrRowid->u_32_value();
+          //TODO, big-endian influence ?
+          Uint32 page_idx = *(Uint32*)(m_backward_recAttrRowid->aRef() + 4);
+          row_spec current_row(frag_no, page_no, page_idx);
+          row_spec prev_row = m_backward_scan_status.latest_row;
+
+          /*
+           * ROWID of current row should be less than ROWID of previous row,
+           * since it is in a backward scan now.
+           */
+          if (! prev_row.is_null()) {
+            if (prev_row.frag_no == current_row.frag_no)
+              assert(prev_row > current_row);
+            else {
+              /* we already set parallel = 1, then fragment no. start from 0 to 1, 2, ... */
+              assert(current_row.frag_no > prev_row.frag_no);
+            }
+          }
+
+          m_backward_scan_status.latest_row = current_row;
+          row_spec valid_hole;
+
+          /* Setup OperationOptions struct for the update takeover operation */
+          //NdbOperation::OperationOptions options;
+          //options.optionsPresent = NdbOperation::OperationOptions::OO_OPTIMIZE;
+          //options.optimizeFlags = AttributeHeader::OPTIMIZE_MOVE_VARPART;
+
+          Uint32 optimize_option = 1;  /* move varpart */
+
+          if (have_holes) {
+            /* get one hole from hosts list in forward scan, then consume it*/
+            int need_move_fixpart =
+              m_forward_scan_status.get_valid_hole(current_row, valid_hole, m_row_size);
+            /* we got one valid hole */
+            if (need_move_fixpart == 1) {
+              assert(!valid_hole.is_null());
+              //options.optimizeFlags |= AttributeHeader::OPTIMIZE_MOVE_FIXPART;
+              //options.optimizeFlags |= ((Uint64)valid_hole.page_no) << 32;
+              /* page_idx is used only 16bits in kernel */
+              //options.optimizeFlags |= (valid_hole.page_idx & 0x0000FFFF) << 16;
+              optimize_option |= 2;  /* move fixpart */
+            }
+          }
+
+          const NdbTableImpl * table = m_table_queue->table;
+          /**
+           * Get update operation which updates nothing,
+           * but has optimize set in OperationOptions
+           */
+          const NdbOperation * myUpdateOp =
+          /*
+            m_backward_scan_op->updateCurrentTuple(m_backward_trans,
+                                                   table->m_ndbrecord,
+                                                   NULL,  // No column values updated
+                                                   table->m_emptyMask,
+                                                   &options,
+                                                   sizeof(NdbOperation::OperationOptions));
+          */
+            m_backward_scan_op->moveCurrentTuple(m_backward_trans,
+                                                 table->m_ndbrecord,
+                                                 optimize_option,
+                                                 valid_hole.page_no,
+                                                 valid_hole.page_idx);
+          if (myUpdateOp == 0) {
+            m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+            goto do_error;
+          }
+
+          /* 
+           * if list is empty and forward scan isnot finished,
+           * that means we need to wait more holes,then we suspend
+           * backward scan
+           */
+          if (m_forward_scan_status.list_elements_num == 0 &&
+              m_forward_scan_status.finished == 0)
+            break;
+
+          /**
+           * nextResult(false) means that the records
+           * cached in the NDBAPI are modified before
+           * fetching more rows from NDB.
+           */
+        } while ((b_check2 = m_backward_scan_op->nextResult(&dummyOutRowPtr,
+                                                            false,
+                                                            false)) == 0);
+        if (b_check2 != -1)
+          b_check2 = m_backward_trans->execute(NdbTransaction::NoCommit,
+                                               NdbOperation::AbortOnError);
+      } /* end if nextResult(true, false) */
+
+      if (b_check1 == -1 || b_check2 == -1) {
+        if (m_backward_trans->getNdbError().status == NdbError::TemporaryError &&
+            noRetries > 0) {
+          if (start() != 0)
+            goto do_error;
+          continue;  /* next retry */
+        } else {
+          //TODO, check which operation is error
+          m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+          goto do_error;
+        }
+      }
+
+      if (b_check1 == 1)
+        m_backward_scan_status.finished = 1;
+    }  /* end if backwarad scan */
+
+    if (m_backward_scan_status.finished == 1) {
+      /**
+       * Commit when all cached tuple have been updated
+       */
+      int check = m_backward_trans->execute(NdbTransaction::Commit,
+                                            NdbOperation::AbortOnError);
+      if (check == -1) {
+        if (m_backward_trans->getNdbError().status == NdbError::TemporaryError &&
+          noRetries > 0) {
+          if (start() != 0)
+            goto do_error;
+          continue;  /* next retry */
+        } else {
+          //TODO, check which operation is error
+          m_ndb->getNdbError(m_backward_trans->getNdbError().code);
+          goto do_error;
+        }
+      }
       DBUG_PRINT("info", ("Done with table %s",
                           m_table_queue->table->getName()));
       /*
-       * We are done with optimizing current table
-       * move to next
+       * It's done for optimizing current table,
+       * close scan operation and move to next table
        */
       fifo_element_st *current = m_table_queue;
       m_table_queue = current->next;
+
+      m_row_size = m_fixpage_size = 0;
       /*
        * Start scan of next table
        */
       if (start() != 0) {
-        m_ndb->getNdbError(m_trans->getNdbError().code);
         goto do_error;
       }
       DBUG_RETURN(1);
     }
-    if (check == -1)
-    {
-      if (m_trans->getNdbError().status == NdbError::TemporaryError)
-      {
-        /*
-         * If we encountered temporary error, retry
-         */
-        m_ndb->closeTransaction(m_trans);
-        m_trans = NULL;
-        if (start() != 0) {
-          m_ndb->getNdbError(m_trans->getNdbError().code);
-          goto do_error;
-        }
-        continue; //retry
-      }
-      m_ndb->getNdbError(m_trans->getNdbError().code);
-      goto do_error;
-    }
-    if (m_trans->restart() != 0)
-    {
-      DBUG_PRINT("info", ("Failed to restart transaction"));
-      m_ndb->closeTransaction(m_trans);
-      m_trans = NULL;
-      if (start() != 0) {
-        m_ndb->getNdbError(m_trans->getNdbError().code);
-        goto do_error;
-      }
-    }
- 
-    DBUG_RETURN(1);
-  }
+
+    DBUG_RETURN(1);     //okay, one next() run okay!
+  } /* end of while (noRetries-- > 0)*/
+
 do_error:
   DBUG_PRINT("info", ("NdbOptimizeTableHandleImpl::next aborted"));
   m_state = NdbOptimizeTableHandleImpl::ABORTED;
@@ -1463,12 +1850,12 @@ int NdbOptimizeTableHandleImpl::close()
     delete m_table_queue_first;
     m_table_queue_first = next;
   }
-  m_table_queue = m_table_queue_first = m_table_queue_end = NULL;
-  if (m_trans)
-  {
-    m_ndb->closeTransaction(m_trans);
-    m_trans = NULL;
-  }
+  /*
+   * release possible holes list in forward scan
+   */
+  m_forward_scan_status.free_list();
+
+  close_trans_op();
   m_state = NdbOptimizeTableHandleImpl::CLOSED;
   DBUG_RETURN(0);
 }
@@ -6647,3 +7034,4 @@ const NdbDictionary::Column * NdbDiction
 const NdbDictionary::Column * NdbDictionary::Column::ANY_VALUE = 0;
 const NdbDictionary::Column * NdbDictionary::Column::COPY_ROWID = 0;
 const NdbDictionary::Column * NdbDictionary::Column::OPTIMIZE = 0;
+const NdbDictionary::Column * NdbDictionary::Column::FIXPAGE_SIZE = 0;
diff -Nrup a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2008-02-20 16:34:19 +08:00
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp	2008-04-02 18:19:29 +08:00
@@ -324,7 +324,196 @@ class NdbOptimizeTableHandleImpl : publi
 {
   enum State { CREATED, INITIALIZED, FINISHED, ABORTED, CLOSED };
 private:
+  typedef struct row_spec {
+    Uint32 frag_no;
+    Uint32 page_no;
+    Uint32 page_idx;
+    row_spec() : frag_no((Uint32)-1), page_no((Uint32)-1), page_idx((Uint32)-1) {};
+    row_spec(const Uint32 a, const Uint32 b, const Uint32 c) {
+      frag_no = a;
+      page_no = b;
+      page_idx = c;
+    }
+    void set_null() {
+      frag_no = (Uint32)-1;
+      page_no = (Uint32)-1;
+      page_idx = (Uint32)-1;
+    }
+    bool is_null() {
+      if (frag_no == (Uint32)-1 ||
+          page_no == (Uint32)-1 ||
+          page_idx == (Uint32)-1)
+        return true;
+      else
+        return false;
+    }
+    bool operator > (row_spec & row) {
+      /* we only compare two valid rows in a same fragment */
+      assert((! row.is_null()) && (! is_null()) && frag_no == row.frag_no);
+      if (page_no > row.page_no ||
+          (page_no == row.page_no && page_idx > row.page_idx)) {
+        return true;
+      }
+      return false;
+    }
+    /*
+     * is_neighbor() should be invoked only in forward scan
+     */
+    bool is_neighbor(row_spec prev_row,
+                     const Uint32 row_size,
+                     const Uint32 page_size) {
+      /*
+       * in forward scan:
+       * assert frag_no start from 0 to 1, 2, ...
+       * assert frag_no >= prev_row.frag_no
+       * assert page_no >= prev_row.page_no
+       * assert page_idx > prev_row.page_idx
+       */
+      if (prev_row.is_null()) {
+        if (frag_no == 0 && page_idx == 0)
+          return true;
+      } else if (frag_no == prev_row.frag_no) {
+        if (page_no == prev_row.page_no) {
+          if (page_idx == (prev_row.page_idx + row_size))
+            return true;
+        } else {
+          Uint32 max_idx = page_size - (page_size % row_size) - row_size;
+          if ( prev_row.page_idx == max_idx && page_idx == 0)
+            return true;
+        }
+      } else if (page_idx == 0) {
+        return true;
+      }
+      return false;
+    }
+  };  /* end of struct row_spec */
+
+  typedef struct backward_scan_status {
+    row_spec latest_row;
+    int finished;
+    backward_scan_status() {
+      init();
+    }
+    void init() {
+      latest_row.set_null();
+      finished = 0;
+    }
+  };
+
+  typedef struct fifo_holes_list {
+    fifo_holes_list(const row_spec hole, const Uint32 num) {
+      start_hole = hole;
+      holes_num = num;
+      next = NULL;
+    }
+    row_spec start_hole;
+    Uint32 holes_num;
+    fifo_holes_list * next;
+  };
+
+  const Uint32 MAX_LIST_ELMT_NUM;
+  typedef struct forward_scan_status: public backward_scan_status {
+    /*
+     * Maximal number of elements in holes list
+     * should be less than const MAX_LIST_ELMT_NUM.
+     */
+    Uint32 list_elements_num;
+    fifo_holes_list * holes_list_first;
+    fifo_holes_list * holes_list_end;
+    forward_scan_status() {
+      init();
+    }
+    void init() {
+      latest_row.set_null();
+      finished = 0;
+      list_elements_num = 0;
+      holes_list_first = holes_list_end = NULL;
+    }
+    void free_list() {
+      fifo_holes_list * now = holes_list_first;
+      Uint32 count = 0;
+      while(now) {
+        count++;
+        now = now->next;
+      }
+      assert(count == list_elements_num);
+      while(holes_list_first != NULL) {
+        fifo_holes_list *next = holes_list_first->next;
+        delete holes_list_first;
+        list_elements_num--;
+        holes_list_first = next;
+      }
+    }
+    /*
+     * add_holes() shoulb be invoked only in forward scan,
+     * forward scan find holes and add to list.
+     */
+    void add_holes(const row_spec hole, const Uint32 holes_num) {
+      assert(holes_num > 0);
+      fifo_holes_list * new_element = new fifo_holes_list(hole, holes_num);
+      assert(new_element);
+      if (holes_list_end == NULL)   /* we add first element */
+        holes_list_first = holes_list_end = new_element;
+      else {
+        holes_list_end->next = new_element;  /* add to list */
+        holes_list_end = new_element;  /* let end point to last element */
+      }
+      list_elements_num++;
+    }
+    /*
+     * get_valid_hole() should be invoked only in forward scan,
+     * backward scan consume holes and remove them from list.
+     */
+    int get_valid_hole(const row_spec tuple, row_spec &out_hole, const Uint32 row_size) {
+      if (holes_list_first && list_elements_num > 0) {
+        /* get a hole from list */
+        row_spec hole = holes_list_first->start_hole;
+
+        assert(hole.frag_no >= tuple.frag_no);
+        if (hole.frag_no == tuple.frag_no && hole.page_no < tuple.page_no) {
+          out_hole = hole;
+          assert(holes_list_first->holes_num >= 1);
+          if (holes_list_first->holes_num == 1) {
+            fifo_holes_list * next = holes_list_first->next;
+            delete holes_list_first;
+            holes_list_first = next;
+            list_elements_num--;
+            if (list_elements_num == 0)
+              holes_list_end = NULL;
+          } else {
+            holes_list_first->start_hole.page_idx += row_size;
+            holes_list_first->holes_num--;
+          }
+          return 1; /* okay, get a valid hole and remove it from list */
+        }
+        /* 
+         * we should make sure ROWID of backward tuple is
+         * larger than ROWID of forward hole, then need
+         * to remove invalid holes when forward meet backward.
+         * NOTE: we donot move tuple to a hole in same page.
+         */
+        if (hole.frag_no == tuple.frag_no && hole.page_no >= tuple.page_no) {
+          while (holes_list_first &&
+                 holes_list_first->start_hole.frag_no == tuple.frag_no) {
+            fifo_holes_list * next = holes_list_first->next;
+            delete holes_list_first;
+            holes_list_first = next;
+            list_elements_num--;
+            if (list_elements_num == 0)
+              holes_list_end = NULL;
+          }
+        }
+      }
+      return 0;
+    }
+  };  /* end of struct forward_scan_status */
+
+  forward_scan_status m_forward_scan_status;
+  backward_scan_status m_backward_scan_status;
+
   int start();
+  int init_page_row_size();
+  void close_trans_op();
 
   State m_state;
   Ndb *m_ndb;
@@ -345,8 +534,48 @@ private:
   fifo_element_st * m_table_queue;
   fifo_element_st * m_table_queue_first;
   fifo_element_st * m_table_queue_end;
-  NdbTransaction * m_trans;
-  NdbScanOperation * m_scan_op;
+
+  /*
+   * There are two full table scans during optimizing table:
+   * 1) forward scan play roles:
+   *    -- holes producer: to find free rows in fix pages;
+   * 2) backward scan play roles:
+   *    -- holes consumer: move fixpart to the holes; 
+   *       that means, in NDBAPI, we already know where to move fixpart.
+   *    -- to optimize varpart of tuple, i.e. moving varpart of tuple;
+   *       that means we need do moveTuple(move varpart) on all of tuples,
+   *       in kernel, it decide whether varpart of the tuple need to be
+   *       moved and where to move.
+   * 3) how to find holes:
+   *    we scan table by TupleScan order in forward scan, if
+   *    we know how many rows in a fix page and size of each
+   *    row, then we can calcalaute the interspace between two
+   *    non-continuous tuples in a fix page.
+   */
+
+  const Uint32 SIZE_NULL;
+  /* Define size of fixpart of a tuple in a fixsize page, in words */
+  Uint32 m_row_size;
+  /* Define size of a fixsize page, in words */
+  Uint32 m_fixpage_size;
+
+  NdbTransaction * m_forward_trans;
+  NdbScanOperation * m_forward_scan_op;
+  /*
+   * Define extra get value to get frag no.,
+   * ROWID, rowsize and fixpagesize
+   */
+  NdbOperation::GetValueSpec m_forward_extraGets[4];
+  NdbRecAttr *m_forward_recAttrFragno;
+  NdbRecAttr *m_forward_recAttrRowid;
+  NdbRecAttr *m_forward_recAttrRowsize;
+  NdbRecAttr *m_forward_recAttrFixpagesize;
+
+  NdbTransaction * m_backward_trans;
+  NdbScanOperation * m_backward_scan_op;
+  NdbOperation::GetValueSpec m_backward_extraGets[2];
+  NdbRecAttr *m_backward_recAttrFragno;
+  NdbRecAttr *m_backward_recAttrRowid;
 
   NdbDictionary::OptimizeTableHandle * m_facade;
 public:
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperation.cpp b/storage/ndb/src/ndbapi/NdbOperation.cpp
--- a/storage/ndb/src/ndbapi/NdbOperation.cpp	2008-02-20 16:34:19 +08:00
+++ b/storage/ndb/src/ndbapi/NdbOperation.cpp	2008-04-02 18:19:30 +08:00
@@ -181,6 +181,7 @@ NdbOperation::init(const NdbTableImpl* t
   m_extraSetValues = NULL;
   m_numExtraSetValues = 0;
   m_use_any_value = 0;
+  m_use_optimize_flags = false;
 
   tSignal = theNdb->getSignal();
   if (tSignal == NULL)
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp
--- a/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2008-02-20 20:30:13 +08:00
+++ b/storage/ndb/src/ndbapi/NdbOperationDefine.cpp	2008-04-02 18:19:31 +08:00
@@ -661,8 +661,16 @@ NdbOperation::setAnyValue(Uint32 any_val
 }
 
 int
-NdbOperation::setOptimize(Uint32 options)
+NdbOperation::setOptimize(Uint64 options)
 {
+  if (theStatus == UseNdbRecord)
+  {
+    /* Method not allowed for NdbRecord, use OperationOptions or 
+       ScanOptions structure instead */
+    setErrorCodeAbort(4515);
+    return -1;
+  }
+
   return setValue(&NdbColumnImpl::getImpl(*NdbDictionary::Column::OPTIMIZE),
                   (const char*)&options);
 }
@@ -1281,6 +1289,13 @@ NdbOperation::handleOperationOptions (co
   {
     /* Set the operation's customData ptr */
     op->m_customData = opts->customData;
+  }
+
+  if (opts->optionsPresent & OperationOptions::OO_OPTIMIZE)
+  {
+    /* Set the operation's optimize flags var */
+    op->m_use_optimize_flags= true;
+    op->m_optimize_flags= opts->optimizeFlags;
   }
 
   return 0;
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationExec.cpp b/storage/ndb/src/ndbapi/NdbOperationExec.cpp
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-02-22 17:53:01 +08:00
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2008-04-02 18:19:31 +08:00
@@ -117,6 +117,7 @@ NdbOperation::prepareSend(Uint32 aTC_Con
     OperationType tOpType = theOperationType;
     OperationStatus tStatus = theStatus;
     if ((tOpType == UpdateRequest) ||
+	(tOpType == MoveRequest) ||
 	(tOpType == InsertRequest) ||
 	(tOpType == WriteRequest)) {
       if (tStatus != SetValue) {
@@ -977,9 +978,10 @@ NdbOperation::buildSignalsNdbRecord(Uint
   if ((tOpType == InsertRequest) ||
       (tOpType == WriteRequest) ||
       (tOpType == UpdateRequest) ||
+      (tOpType == MoveRequest) ||
       (tOpType == DeleteRequest))
   {
-    /* Handle any setAnyValue(). */
+    /* Handle AnyValue if it's there. */
     if (m_use_any_value)
     {
       res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
@@ -989,6 +991,21 @@ NdbOperation::buildSignalsNdbRecord(Uint
         return res;
       res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
                                         (const char *)(&m_any_value), 4,
+                                        &attrInfoPtr, &remain);
+      if(res)
+        return res;
+    }
+
+    /* Handle any Optimize flags required */
+    if (m_use_optimize_flags)
+    {
+      res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+                                       AttributeHeader::OPTIMIZE, 8,
+                                       &attrInfoPtr, &remain);
+      if(res)
+        return res;
+      res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+                                        (const char *)(&m_optimize_flags), 8,
                                         &attrInfoPtr, &remain);
       if(res)
         return res;
diff -Nrup a/storage/ndb/src/ndbapi/NdbOperationSearch.cpp b/storage/ndb/src/ndbapi/NdbOperationSearch.cpp
--- a/storage/ndb/src/ndbapi/NdbOperationSearch.cpp	2008-02-19 23:00:27 +08:00
+++ b/storage/ndb/src/ndbapi/NdbOperationSearch.cpp	2008-04-02 18:19:33 +08:00
@@ -241,7 +241,7 @@ NdbOperation::equal_impl(const NdbColumn
           }
         }
 
-	if (tOpType == UpdateRequest) {
+	if (tOpType == UpdateRequest || tOpType == MoveRequest) {
 	  if (tInterpretInd == 1) {
 	    theStatus = GetValue;
 	  } else {
diff -Nrup a/storage/ndb/src/ndbapi/NdbScanOperation.cpp b/storage/ndb/src/ndbapi/NdbScanOperation.cpp
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-03-29 01:47:09 +08:00
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2008-04-02 18:19:34 +08:00
@@ -462,6 +462,37 @@ NdbScanOperation::scanTableImpl(const Nd
   return 0;
 }
 
+const NdbOperation *
+NdbScanOperation::moveCurrentTuple(NdbTransaction *takeOverTrans,
+                                   const NdbRecord *attr_rec,
+                                   const Uint32 optimize_option,
+                                   const Uint32 page_no,
+                                   const Uint32 page_idx)
+{
+  if (optimize_option == 0 || optimize_option > 3) {
+    /* if option is invalid, return null pointer */
+    return NULL;
+  }
+
+  NdbOperation::OperationOptions options;
+  options.optionsPresent = NdbOperation::OperationOptions::OO_OPTIMIZE;
+  options.optimizeFlags = 0;
+
+  if (optimize_option & 1)
+    options.optimizeFlags |= AttributeHeader::OPTIMIZE_MOVE_VARPART;
+
+  if (optimize_option & 2) {
+    options.optimizeFlags |= AttributeHeader::OPTIMIZE_MOVE_FIXPART;
+    options.optimizeFlags |= ((Uint64)page_no) << 32;
+    /* page_idx is used only 16bits in kernel */
+    options.optimizeFlags |= (page_idx & 0x0000FFFF) << 16;
+  }
+
+  return takeOverScanOpNdbRecord(NdbOperation::MoveRequest, takeOverTrans,
+                                 attr_rec, NULL, NULL,
+                                 &options, sizeof(NdbOperation::OperationOptions));
+}
+
 /*
   Compare two rows on some prefix of the index.
   This is used to see if we can determine that all rows in an index range scan
@@ -936,7 +967,10 @@ NdbScanOperation::processTableScanDefs(N
                                        Uint32 parallel,
                                        Uint32 batch)
 {
-  m_ordered = m_descending = false;
+  /* m_ordered only for ordered index */
+  m_ordered = false;
+  /* m_descending for ordered index or reversed order of mem based tuples*/
+  m_descending = scan_flags & SF_Descending;
   Uint32 fragCount = m_currentTable->m_fragmentCount;
 
   if (parallel > fragCount || parallel == 0) {
@@ -1011,6 +1045,8 @@ NdbScanOperation::processTableScanDefs(N
   ScanTabReq::setScanBatch(reqInfo, 0);
   ScanTabReq::setRangeScanFlag(reqInfo, rangeScan);
   ScanTabReq::setTupScanFlag(reqInfo, tupScan);
+  /* now we support desending tuple scan */
+  ScanTabReq::setDescendingFlag(reqInfo, m_descending);
   req->requestInfo = reqInfo;
 
   m_keyInfo = (scan_flags & SF_KeyInfo) ? 1 : 0;
@@ -2184,12 +2220,6 @@ NdbScanOperation::takeOverScanOpNdbRecor
     setErrorCodeAbort(4604);
     return NULL;
   }
-  if (record->flags & NdbRecord::RecIsIndex)
-  {
-    /* result_record must be a base table ndbrecord, not an index ndbrecord */
-    setErrorCodeAbort(4340);
-    return NULL;
-  }
 
   NdbOperation *op= pTrans->getNdbOperation(record->table, NULL, true);
   if (!op)
@@ -2266,6 +2296,7 @@ NdbScanOperation::takeOverScanOpNdbRecor
   {
   case ReadRequest:
   case UpdateRequest:
+  case MoveRequest:
     if (unlikely(record->flags & NdbRecord::RecHasBlob))
     {
       if (op->getBlobHandlesNdbRecord(pTrans) == -1)
diff -Nrup a/storage/ndb/src/ndbapi/NdbTransaction.cpp b/storage/ndb/src/ndbapi/NdbTransaction.cpp
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2008-03-27 21:14:09 +08:00
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2008-04-02 18:19:34 +08:00
@@ -1973,6 +1973,7 @@ NdbTransaction::receiveTCKEY_FAILCONF(co
       case NdbOperation::InsertRequest:
       case NdbOperation::DeleteRequest:
       case NdbOperation::WriteRequest:
+      case NdbOperation::MoveRequest:
 	tOp = tOp->next();
 	break;
       case NdbOperation::ReadRequest:
diff -Nrup a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2007-12-14 23:20:18 +08:00
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2008-04-02 18:19:35 +08:00
@@ -357,6 +357,8 @@ Ndb_cluster_connection_impl(const char *
       NdbColumnImpl::create_pseudo("NDB$COPY_ROWID");
     NdbDictionary::Column::OPTIMIZE=
       NdbColumnImpl::create_pseudo("NDB$OPTIMIZE");
+    NdbDictionary::Column::FIXPAGE_SIZE=
+      NdbColumnImpl::create_pseudo("NDB$FIXPAGE_SIZE");
   }
   NdbMutex_Unlock(g_ndb_connection_mutex);
 
@@ -417,6 +419,7 @@ Ndb_cluster_connection_impl::~Ndb_cluste
     delete NdbDictionary::Column::ROW_GCI;
     delete NdbDictionary::Column::ANY_VALUE;
     delete NdbDictionary::Column::OPTIMIZE;
+    delete NdbDictionary::Column::FIXPAGE_SIZE;
     NdbDictionary::Column::FRAGMENT= 0;
     NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0;
     NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0;
@@ -430,6 +433,7 @@ Ndb_cluster_connection_impl::~Ndb_cluste
     NdbDictionary::Column::ROW_GCI= 0;
     NdbDictionary::Column::ANY_VALUE= 0;
     NdbDictionary::Column::OPTIMIZE= 0;
+    NdbDictionary::Column::FIXPAGE_SIZE= 0;
 
     delete NdbDictionary::Column::COPY_ROWID;
     NdbDictionary::Column::COPY_ROWID = 0;
diff -Nrup a/storage/ndb/tools/select_all.cpp b/storage/ndb/tools/select_all.cpp
--- a/storage/ndb/tools/select_all.cpp	2008-02-19 23:00:28 +08:00
+++ b/storage/ndb/tools/select_all.cpp	2008-04-02 18:19:37 +08:00
@@ -169,10 +169,13 @@ int main(int argc, char** argv){
     return NDBT_ProgramExit(NDBT_WRONGARGS);
   }
 
+  /* now we support descending tuple scan */
+#if 0
   if (_descending && ! _order) {
     ndbout << " Descending flag given without order flag" << endl;
     return NDBT_ProgramExit(NDBT_WRONGARGS);
   }
+#endif
 
   if (scanReadRecords(&MyNdb, 
 		      pTab, 
@@ -241,6 +244,7 @@ int scanReadRecords(Ndb* pNdb, 
     int rs;
     unsigned scan_flags = 0;
     if (_tup) scan_flags |= NdbScanOperation::SF_TupScan;
+    if (descending) scan_flags |= NdbScanOperation::SF_Descending;
     switch(_lock + (3 * order)){
     case 1:
       rs = pOp->readTuples(NdbScanOperation::LM_Read, scan_flags, parallel);
Thread
bk commit into 5.1 tree (Justin.He:1.2589)justin.he2 Apr
  • Re: bk commit into 5.1 tree (Justin.He:1.2589)Jonas Oreland2 Apr