List:Commits« Previous MessageNext Message »
From:jonas Date:May 1 2007 7:52pm
Subject:bk commit into 5.1 tree (jonas:1.2514)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-05-01 21:52:49+02:00, jonas@stripped +7 -0
  ndb - dynarr256
    use dynarr256 as pagemap
    do single page allocations (remove 19%)
    allow sparse pagemap (holes)

  storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +10 -47
    use dynarr256 for pagemap

  storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +35 -10
    use dynarr256 as pagemap
    allow sparse pagemap

  storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +7 -14
    use dynarr256 for pagemap

  storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +89 -14
    use dyn2arr256 as pagemap

  storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +63 -460
    use DynArr256 as pagemap

  storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +8 -1
    allow non-existent pages

  storage/ndb/src/kernel/blocks/record_types.hpp@stripped, 2007-05-01 21:52:47+02:00, jonas@stripped +1 -0
    use datamem for pagemap

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	jonas
# Host:	perch.ndb.mysql.com
# Root:	/home/jonas/src/51-telco

--- 1.7/storage/ndb/src/kernel/blocks/record_types.hpp	2007-05-01 21:52:54 +02:00
+++ 1.8/storage/ndb/src/kernel/blocks/record_types.hpp	2007-05-01 21:52:54 +02:00
@@ -61,5 +61,6 @@
 #define RT_TSMAN_FILEGROUP         MAKE_TID( 7, RG_DISK_RECORDS)
 
 #define RT_DBTUP_PAGE              MAKE_TID( 1, RG_DATAMEM)
+#define RT_DBTUP_PAGE_MAP          MAKE_TID( 2, RG_DATAMEM)
 
 #endif

--- 1.75/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-05-01 21:52:54 +02:00
+++ 1.76/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2007-05-01 21:52:54 +02:00
@@ -32,6 +32,7 @@
 #include "AttributeOffset.hpp"
 #include "Undo_buffer.hpp"
 #include "tuppage.hpp"
+#include <DynArr256.hpp>
 #include <../pgman.hpp>
 #include <../tsman.hpp>
 
@@ -328,6 +329,7 @@
 #define ZFREE_EXTENT 11
 #define ZUNMAP_PAGES 12
 #define ZFREE_VAR_PAGES 13
+#define ZFREE_PAGES 14
 
 #define ZSCAN_PROCEDURE 0
 #define ZCOPY_PROCEDURE 2
@@ -698,13 +700,10 @@
   void dump_disk_alloc(Disk_alloc_info&);
 
 struct Fragrecord {
-  Uint32 nextStartRange;
-  Uint32 currentPageRange;
-  Uint32 rootPageRange;
   Uint32 noOfPages;
   Uint32 noOfVarPages;
-  Uint32 noOfPagesToGrow;
 
+  DynArr256::Head m_page_map;
   DLList<Page>::Head emptyPrimPage; // allocated pages (not init)
   DLFifoList<Page>::Head thFreeFirst;   // pages with atleast 1 free record
   SLList<Page>::Head m_empty_pages; // Empty pages not in logical/physical map
@@ -854,29 +853,6 @@
 };
 typedef Ptr<Operationrec> OperationrecPtr;
 
-          /* ****************************** PAGE RANGE RECORD ************************** */
-          /* PAGE RANGES AND BASE PAGE ID. EACH RANGE HAS A  CORRESPONDING BASE PAGE ID  */
-          /* THAT IS USED TO  CALCULATE REAL PAGE ID FROM A FRAGMENT PAGE ID AND A TABLE */
-          /* REFERENCE.                                                                  */
-          /* THE PAGE RANGES ARE ORGANISED IN A B-TREE FASHION WHERE THE VARIABLE TYPE   */
-          /* SPECIFIES IF A LEAF NODE HAS BEEN REACHED. IF A LEAF NODE HAS BEEN REACHED  */
-          /* THEN BASE_PAGE_ID IS THE BASE_PAGE_ID OF THE SET OF PAGES THAT WAS          */
-          /* ALLOCATED IN THAT RANGE. OTHERWISE BASE_PAGE_ID IS THE POINTER TO THE NEXT  */
-          /* PAGE_RANGE RECORD.                                                          */
-          /* *************************************************************************** */
-struct PageRange {
-  Uint32 startRange[4];                                  /* START OF RANGE                                   */
-  Uint32 endRange[4];                                    /* END OF THIS RANGE                                */
-  Uint32 basePageId[4];                                  /* BASE PAGE ID.                                    */
-/*----               VARIABLE BASE_PAGE_ID2 (4) 8 DS NEEDED WHEN SUPPORTING 40 BIT PAGE ID           -------*/
-  Uint8 type[4];                                        /* TYPE OF BASE PAGE ID                             */
-  Uint32 nextFree;                                       /* NEXT FREE PAGE RANGE RECORD                      */
-  Uint32 parentPtr;                                      /* THE PARENT TO THE PAGE RANGE REC IN THE B-TREE   */
-  Uint8 currentIndexPos;
-};
-typedef Ptr<PageRange> PageRangePtr;
-
-
   /* ************* TRIGGER DATA ************* */
   /* THIS RECORD FORMS LISTS OF ACTIVE       */
   /* TRIGGERS FOR EACH TABLE.                 */
@@ -2801,9 +2777,9 @@
                         Tablerec* regTabPtr,
                         Uint32 fragId);
 
-
-  void releaseFragment(Signal* signal, Uint32 tableId, Uint32);
+  void releaseFragment(Signal*, Uint32, Uint32);
   void drop_fragment_free_var_pages(Signal*);
+  void drop_fragment_free_pages(Signal*);
   void drop_fragment_free_extent(Signal*, TablerecPtr, FragrecordPtr, Uint32);
   void drop_fragment_free_extent_log_buffer_callback(Signal*, Uint32, Uint32);
   void drop_fragment_unmap_pages(Signal*, TablerecPtr, FragrecordPtr, Uint32);
@@ -2878,30 +2854,16 @@
 //
 // Public methods
   Uint32 getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId);
+  Uint32 getRealpidCheck(Fragrecord* regFragPtr, Uint32 logicalPageId);
   Uint32 getNoOfPages(Fragrecord* regFragPtr);
-  void initPageRangeSize(Uint32 size);
-  bool insertPageRangeTab(Fragrecord* regFragPtr,
-                          Uint32 startPageId,
-                          Uint32 noPages);
-  void releaseFragPages(Fragrecord* regFragPtr);
-  void initFragRange(Fragrecord* regFragPtr);
-  void initializePageRange();
   Uint32 getEmptyPage(Fragrecord* regFragPtr);
-  Uint32 allocFragPages(Fragrecord* regFragPtr, Uint32 noOfPagesAllocated);
+  Uint32 allocFragPage(Fragrecord* regFragPtr);
+  void releaseFragPages(Fragrecord* regFragPtr);
   Uint32 get_empty_var_page(Fragrecord* frag_ptr);
+  void init_page(Fragrecord*, PagePtr, Uint32 page_no);
   
 // Private methods
-  Uint32 leafPageRangeFull(Fragrecord* regFragPtr, PageRangePtr currPageRangePtr);
-  void releasePagerange(PageRangePtr regPRPtr);
-  void seizePagerange(PageRangePtr& regPageRangePtr);
   void errorHandler(Uint32 errorCode);
-  void allocMoreFragPages(Fragrecord* regFragPtr);
-
-// Private data
-  Uint32 cfirstfreerange;
-  PageRange *pageRange;
-  Uint32 c_noOfFreePageRanges;
-  Uint32 cnoOfPageRangeRec;
 
 //---------------------------------------------------------------
 // Variable Allocator
@@ -2991,6 +2953,7 @@
 
   HostBuffer *hostBuffer;
 
+  DynArr256Pool c_page_map_pool;
   ArrayPool<Operationrec> c_operation_pool;
 
   ArrayPool<Page> c_page_pool;

--- 1.10/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp	2007-05-01 21:52:55 +02:00
+++ 1.11/storage/ndb/src/kernel/blocks/dbtup/DbtupFixAlloc.cpp	2007-05-01 21:52:55 +02:00
@@ -207,24 +207,49 @@
 {
   Uint32 pages = fragPtrP->noOfPages;
   
-  if (page_no >= pages)
+  DynArr256 map(c_page_map_pool, fragPtrP->m_page_map);
+  Uint32 * ptr = map.set(page_no);
+  if (unlikely(ptr == 0))
   {
-    Uint32 start = pages;
-    while(page_no >= pages)
-      pages += (pages >> 3) + (pages >> 4) + 2;
-    allocFragPages(fragPtrP, pages - start);
-    if (page_no >= (pages = fragPtrP->noOfPages))
+    jam();
+    terrorCode = ZMEM_NOMEM_ERROR;
+    return 1;
+  }
+  
+  PagePtr pagePtr;
+  LocalDLList<Page> alloc_pages(c_page_pool, fragPtrP->emptyPrimPage);
+  LocalDLFifoList<Page> free_pages(c_page_pool, fragPtrP->thFreeFirst);
+  
+  pagePtr.i = * ptr;
+  if (likely(pagePtr.i != RNIL))
+  {
+    jam();
+    c_page_pool.getPtr(pagePtr);
+  }
+  else
+  {
+    jam();
+    Uint32 noOfPagesAllocated = 0;
+    allocConsPages(1, noOfPagesAllocated, pagePtr.i);
+    if (unlikely(noOfPagesAllocated == 0))
     {
+      jam();
       terrorCode = ZMEM_NOMEM_ERROR;
       return 1;
     }
+    * ptr = pagePtr.i; // Save in map
+    c_page_pool.getPtr(pagePtr);
+    init_page(fragPtrP, pagePtr, page_no);
+    
+    if (page_no >= pages)
+    {
+      jam();
+      fragPtrP->noOfPages = page_no + 1;
+    }
+    alloc_pages.add(pagePtr);
   }
   
-  PagePtr pagePtr;
-  c_page_pool.getPtr(pagePtr, getRealpid(fragPtrP, page_no));
   
-  LocalDLList<Page> alloc_pages(c_page_pool, fragPtrP->emptyPrimPage);
-  LocalDLFifoList<Page> free_pages(c_page_pool, fragPtrP->thFreeFirst);
   if (pagePtr.p->page_state == ZEMPTY_MM)
   {
     convertThPage((Fix_page*)pagePtr.p, tabPtrP, MM);

--- 1.47/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-05-01 21:52:55 +02:00
+++ 1.48/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2007-05-01 21:52:55 +02:00
@@ -41,7 +41,6 @@
   cnoOfFragrec = MAX_FRAG_PER_NODE;
   cnoOfFragoprec = MAX_FRAG_PER_NODE;
   cnoOfAlterTabOps = MAX_FRAG_PER_NODE;
-  cnoOfPageRangeRec = ZNO_OF_PAGE_RANGE_REC;
   c_maxTriggersPerTable = ZDEFAULT_MAX_NO_TRIGGERS_PER_TABLE;
   c_noOfBuildIndexRec = 32;
 
@@ -106,7 +105,6 @@
   fragrecord = 0;
   alterTabOperRec = 0;
   hostBuffer = 0;
-  pageRange = 0;
   tablerec = 0;
   tableDescriptor = 0;
   totNoOfPagesAllocated = 0;
@@ -141,10 +139,6 @@
 		sizeof(HostBuffer), 
 		MAX_NODES);
   
-  deallocRecord((void **)&pageRange,"PageRange",
-		sizeof(PageRange), 
-		cnoOfPageRangeRec);
-
   deallocRecord((void **)&tablerec,"Tablerec",
 		sizeof(Tablerec), 
 		cnoOfTablerec);
@@ -251,6 +245,12 @@
     drop_fragment_free_var_pages(signal);
     return;
   }
+  case ZFREE_PAGES:
+  {
+    jam();
+    drop_fragment_free_pages(signal);
+    return;
+  }
   default:
     ndbrequire(false);
     break;
@@ -309,9 +309,6 @@
   
   Uint32 noOfTriggers= 0;
   
-  Uint32 tmp= 0;
-  ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_PAGE_RANGE, &tmp));
-  initPageRangeSize(tmp);
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_TABLE, &cnoOfTablerec));
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUP_TABLE_DESC, 
 					&cnoOfTabDescrRec));
@@ -335,6 +332,7 @@
   pc.m_block = this;
   c_page_request_pool.wo_pool_init(RT_DBTUP_PAGE_REQUEST, pc);
   c_extent_pool.init(RT_DBTUP_EXTENT_INFO, pc);
+  c_page_map_pool.init(RT_DBTUP_PAGE_MAP, pc);
   
   Uint32 nScanOp;       // use TUX config for now
   ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_TUX_SCAN_OP, &nScanOp));
@@ -405,10 +403,6 @@
   c_operation_pool.setSize(tmp, false, true, true, 
       tmp1 == 0 ? CFG_DB_NO_OPS : CFG_DB_NO_LOCAL_OPS);
   
-  pageRange = (PageRange*)allocRecord("PageRange",
-				      sizeof(PageRange), 
-				      cnoOfPageRangeRec);
-  
   tablerec = (Tablerec*)allocRecord("Tablerec",
 				    sizeof(Tablerec), 
 				    cnoOfTablerec);
@@ -455,7 +449,6 @@
     break;
   case 8:
     jam();
-    initializePageRange();
     break;
   case 9:
     jam();

--- 1.45/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-05-01 21:52:55 +02:00
+++ 1.46/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2007-05-01 21:52:55 +02:00
@@ -118,19 +118,12 @@
     fragrefuse1Lab(signal, fragOperPtr);
     return;
   }
-  initFragRange(regFragPtr.p);
   if (!addfragtotab(regTabPtr.p, fragId, regFragPtr.i)) {
     jam();
     terrorCode= ZNO_FREE_TAB_ENTRY_ERROR;
     fragrefuse2Lab(signal, fragOperPtr, regFragPtr);
     return;
   }
-  if (cfirstfreerange == RNIL) {
-    jam();
-    terrorCode= ZNO_FREE_PAGE_RANGE_ERROR;
-    fragrefuse3Lab(signal, fragOperPtr, regFragPtr, regTabPtr.p, fragId);
-    return;
-  }
 
   regFragPtr.p->fragTableId= regTabPtr.i;
   regFragPtr.p->fragmentId= fragId;
@@ -139,6 +132,9 @@
   regFragPtr.p->m_lcp_scan_op = RNIL; 
   regFragPtr.p->m_lcp_keep_list = RNIL;
   regFragPtr.p->m_var_page_chunks = RNIL;  
+  regFragPtr.p->noOfPages = 0;
+  regFragPtr.p->noOfVarPages = 0;
+  ndbrequire(regFragPtr.p->m_page_map.isEmpty());
 
   if (ERROR_INSERTED(4007) && regTabPtr.p->fragid[0] == fragId ||
       ERROR_INSERTED(4008) && regTabPtr.p->fragid[1] == fragId) {
@@ -517,6 +513,7 @@
 #endif
   
   {
+#ifdef MIN_ROWS_NOT_SUPPORTED
     Uint32 fix_tupheader = regTabPtr.p->m_offsets[MM].m_fix_header_size;
     ndbassert(fix_tupheader > 0);
     Uint32 noRowsPerPage = ZWORDS_ON_PAGE / fix_tupheader;
@@ -526,8 +523,10 @@
       noAllocatedPages = 2;
     else if (noAllocatedPages == 0)
       noAllocatedPages = 2;
-    noAllocatedPages = allocFragPages(regFragPtr.p, noAllocatedPages);
-
+#endif
+    
+    Uint32 noAllocatedPages = allocFragPage(regFragPtr.p);
+    
     if (noAllocatedPages == 0) {
       jam();
       terrorCode = ZNO_PAGES_ALLOCATED_ERROR;
@@ -1336,7 +1335,6 @@
                              Tablerec* const regTabPtr,
                              Uint32 fragId) 
 {
-  releaseFragPages(regFragPtr.p);
   deleteFragTab(regTabPtr, fragId);
   releaseFragrec(regFragPtr);
   releaseTabDescr(regTabPtr);
@@ -1355,7 +1353,6 @@
                            Tablerec* const regTabPtr,
                            Uint32 fragId) 
 {
-  releaseFragPages(regFragPtr.p);
   fragrefuse3Lab(signal, fragOperPtr, regFragPtr, regTabPtr, fragId);
   initTab(regTabPtr);
 }
@@ -1367,7 +1364,6 @@
                            Uint32 fragId) 
 {
   fragrefuse2Lab(signal, fragOperPtr, regFragPtr);
-  deleteFragTab(regTabPtr, fragId);
 }
 
 void Dbtup::fragrefuse2Lab(Signal* signal,
@@ -1854,12 +1850,91 @@
     sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);  
     return;
   }
-  Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
-  releaseFragPages(fragPtr.p);
+
+  DynArr256::ReleaseIterator iter;
+  DynArr256 map(c_page_map_pool, fragPtr.p->m_page_map);
+  map.init(iter);
+  signal->theData[0] = ZFREE_PAGES;
+  signal->theData[1] = tabPtr.i;
+  signal->theData[2] = fragPtrI;
+  memcpy(signal->theData+3, &iter, sizeof(iter));
+  sendSignal(reference(), GSN_CONTINUEB, signal, 3 + sizeof(iter)/4, JBB);
+}
+
+void
+Dbtup::drop_fragment_free_pages(Signal* signal)
+{
   Uint32 i;
+  Uint32 tableId = signal->theData[1];
+  Uint32 fragPtrI = signal->theData[2];
+  DynArr256::ReleaseIterator iter;
+  memcpy(&iter, signal->theData+3, sizeof(iter));
+
+  FragrecordPtr fragPtr;
+  fragPtr.i = fragPtrI;
+  ptrCheckGuard(fragPtr, cnoOfFragrec, fragrecord);
+  
+  DynArr256 map(c_page_map_pool, fragPtr.p->m_page_map);
+  Uint32 realpid;
+  for (i = 0; i<16; i++)
+  {
+    switch(map.release(iter, &realpid)){
+    case 0:
+      jam();
+      goto done;
+    case 1:
+      if (realpid != RNIL)
+      {
+	jam();
+	returnCommonArea(realpid, 1);
+      }
+    case 2:
+      jam();
+      break;
+    }
+  }
+  
+  signal->theData[0] = ZFREE_PAGES;
+  signal->theData[1] = tableId;
+  signal->theData[2] = fragPtrI;
+  memcpy(signal->theData+3, &iter, sizeof(iter));
+  sendSignal(reference(), GSN_CONTINUEB, signal, 3 + sizeof(iter)/4, JBB);
+  return;
+
+done:
+  for (i = 0; i<MAX_FREE_LIST; i++)
+  {
+    LocalDLList<Page> tmp(c_page_pool, fragPtr.p->free_var_page_array[i]);
+    tmp.remove();
+  }
+  
+  {
+    LocalDLList<Page> tmp(c_page_pool, fragPtr.p->emptyPrimPage);
+    tmp.remove();
+  }
+  
+  {
+    LocalDLFifoList<Page> tmp(c_page_pool, fragPtr.p->thFreeFirst);
+    tmp.remove();
+  }
+  
+  {
+    LocalSLList<Page> tmp(c_page_pool, fragPtr.p->m_empty_pages);
+    tmp.remove();
+  }
+
+  /**
+   * Finish
+   */
+  TablerecPtr tabPtr;
+  tabPtr.i= tableId;
+  ptrCheckGuard(tabPtr, cnoOfTablerec, tablerec);
+  
   for(i= 0; i<MAX_FRAG_PER_NODE; i++)
     if(tabPtr.p->fragrec[i] == fragPtr.i)
       break;
+
+  Uint32 logfile_group_id = fragPtr.p->m_logfile_group_id ;
   
   ndbrequire(i != MAX_FRAG_PER_NODE);
   tabPtr.p->fragid[i]= RNIL;

--- 1.14/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2007-05-01 21:52:55 +02:00
+++ 1.15/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2007-05-01 21:52:55 +02:00
@@ -91,7 +91,7 @@
   Uint32 pageId = regFragPtr->emptyPrimPage.firstItem;
   if (pageId == RNIL) {
     jam();
-    allocMoreFragPages(regFragPtr);
+    allocFragPage(regFragPtr);
     pageId = regFragPtr->emptyPrimPage.firstItem;
     if (pageId == RNIL) {
       jam();
@@ -107,475 +107,78 @@
 
 Uint32 Dbtup::getRealpid(Fragrecord* regFragPtr, Uint32 logicalPageId) 
 {
-  PageRangePtr grpPageRangePtr;
-  Uint32 loopLimit;
-  Uint32 loopCount = 0;
-  Uint32 pageRangeLimit = cnoOfPageRangeRec;
-  ndbassert(logicalPageId < getNoOfPages(regFragPtr));
-  grpPageRangePtr.i = regFragPtr->rootPageRange;
-  while (true) {
-    ndbassert(loopCount++ < 100);
-    ndbrequire(grpPageRangePtr.i < pageRangeLimit);
-    ptrAss(grpPageRangePtr, pageRange);
-    loopLimit = grpPageRangePtr.p->currentIndexPos;
-    ndbrequire(loopLimit <= 3);
-    for (Uint32 i = 0; i <= loopLimit; i++) {
-      jam();
-      if (grpPageRangePtr.p->startRange[i] <= logicalPageId) {
-        if (grpPageRangePtr.p->endRange[i] >= logicalPageId) {
-          if (grpPageRangePtr.p->type[i] == ZLEAF) {
-            jam();
-            Uint32 realPageId = (logicalPageId - grpPageRangePtr.p->startRange[i]) +
-                                 grpPageRangePtr.p->basePageId[i];
-            return realPageId;
-          } else {
-            ndbrequire(grpPageRangePtr.p->type[i] == ZNON_LEAF);
-            grpPageRangePtr.i = grpPageRangePtr.p->basePageId[i];
-          }//if
-        }//if
-      }//if
-    }//for
-  }//while
-  return 0;
-}//Dbtup::getRealpid()
+  DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
+  Uint32 * ptr = map.get(logicalPageId);
+  if (likely(ptr != 0))
+  {
+    return * ptr;
+  }
+  ndbrequire(false);
+  return RNIL;
+}
+
+Uint32 
+Dbtup::getRealpidCheck(Fragrecord* regFragPtr, Uint32 logicalPageId) 
+{
+  DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
+  Uint32 * ptr = map.get(logicalPageId);
+  if (likely(ptr != 0))
+  {
+    return * ptr;
+  }
+  return RNIL;
+}
 
 Uint32 Dbtup::getNoOfPages(Fragrecord* const regFragPtr)
 {
   return regFragPtr->noOfPages;
 }//Dbtup::getNoOfPages()
 
-void Dbtup::initPageRangeSize(Uint32 size)
-{
-  cnoOfPageRangeRec = size;
-}//Dbtup::initPageRangeSize()
-
-/* ---------------------------------------------------------------- */
-/* ----------------------- INSERT_PAGE_RANGE_TAB ------------------ */
-/* ---------------------------------------------------------------- */
-/*       INSERT A PAGE RANGE INTO THE FRAGMENT                      */
-/*                                                                  */
-/*       NOTE:   THE METHOD IS ATOMIC. EITHER THE ACTION IS         */
-/*               PERFORMED FULLY OR NO ACTION IS PERFORMED AT ALL.  */
-/*               TO SUPPORT THIS THE CODE HAS A CLEANUP PART AFTER  */
-/*               ERRORS.                                            */
-/* ---------------------------------------------------------------- */
-bool Dbtup::insertPageRangeTab(Fragrecord*  const regFragPtr,
-                               Uint32 startPageId,
-                               Uint32 noPages) 
-{
-  PageRangePtr currPageRangePtr;
-  if (cfirstfreerange == RNIL) {
-    jam();
-    return false;
-  }//if
-  currPageRangePtr.i = regFragPtr->currentPageRange;
-  if (currPageRangePtr.i == RNIL) {
-    jam();
-/* ---------------------------------------------------------------- */
-/*       THE FIRST PAGE RANGE IS HANDLED WITH SPECIAL CODE          */
-/* ---------------------------------------------------------------- */
-    seizePagerange(currPageRangePtr);
-    regFragPtr->rootPageRange = currPageRangePtr.i;
-    currPageRangePtr.p->currentIndexPos = 0;
-    currPageRangePtr.p->parentPtr = RNIL;
-  } else {
-    jam();
-    ptrCheckGuard(currPageRangePtr, cnoOfPageRangeRec, pageRange);
-    if (currPageRangePtr.p->currentIndexPos < 3) {
-      jam();
-/* ---------------------------------------------------------------- */
-/*       THE SIMPLE CASE WHEN IT IS ONLY NECESSARY TO FILL IN THE   */
-/*       NEXT EMPTY POSITION IN THE PAGE RANGE RECORD IS TREATED    */
-/*       BY COMMON CODE AT THE END OF THE SUBROUTINE.               */
-/* ---------------------------------------------------------------- */
-      currPageRangePtr.p->currentIndexPos++;
-    } else {
-      jam();
-      ndbrequire(currPageRangePtr.p->currentIndexPos == 3);
-      currPageRangePtr.i = leafPageRangeFull(regFragPtr, currPageRangePtr);
-      if (currPageRangePtr.i == RNIL) {
-        return false;
-      }//if
-      ptrCheckGuard(currPageRangePtr, cnoOfPageRangeRec, pageRange);
-    }//if
-  }//if
-  currPageRangePtr.p->startRange[currPageRangePtr.p->currentIndexPos] = regFragPtr->nextStartRange;
-/* ---------------------------------------------------------------- */
-/*       NOW SET THE LEAF LEVEL PAGE RANGE RECORD PROPERLY          */
-/*       PAGE_RANGE_PTR REFERS TO LEAF RECORD WHEN ARRIVING HERE    */
-/* ---------------------------------------------------------------- */
-  currPageRangePtr.p->endRange[currPageRangePtr.p->currentIndexPos] = 
-                                                   (regFragPtr->nextStartRange + noPages) - 1;
-  currPageRangePtr.p->basePageId[currPageRangePtr.p->currentIndexPos] = startPageId;
-  currPageRangePtr.p->type[currPageRangePtr.p->currentIndexPos] = ZLEAF;
-/* ---------------------------------------------------------------- */
-/*       WE NEED TO UPDATE THE CURRENT PAGE RANGE IN CASE IT HAS    */
-/*       CHANGED. WE ALSO NEED TO UPDATE THE NEXT START RANGE       */
-/* ---------------------------------------------------------------- */
-  regFragPtr->currentPageRange = currPageRangePtr.i;
-  regFragPtr->nextStartRange += noPages;
-/* ---------------------------------------------------------------- */
-/*       WE NEED TO UPDATE THE END RANGE IN ALL PAGE RANGE RECORDS  */
-/*       UP TO THE ROOT.                                            */
-/* ---------------------------------------------------------------- */
-  PageRangePtr loopPageRangePtr;
-  loopPageRangePtr = currPageRangePtr;
-  while (true) {
-    jam();
-    loopPageRangePtr.i = loopPageRangePtr.p->parentPtr;
-    if (loopPageRangePtr.i != RNIL) {
-      jam();
-      ptrCheckGuard(loopPageRangePtr, cnoOfPageRangeRec, pageRange);
-      ndbrequire(loopPageRangePtr.p->currentIndexPos < 4);
-      loopPageRangePtr.p->endRange[loopPageRangePtr.p->currentIndexPos] += noPages;
-    } else {
-      jam();
-      break;
-    }//if
-  }//while
-  regFragPtr->noOfPages += noPages;
-  return true;
-}//Dbtup::insertPageRangeTab()
-
-
-void Dbtup::releaseFragPages(Fragrecord* regFragPtr) 
+void
+Dbtup::init_page(Fragrecord* regFragPtr, PagePtr pagePtr, Uint32 pageId)
 {
-  if (regFragPtr->rootPageRange == RNIL) {
+  pagePtr.p->page_state = ZEMPTY_MM;
+  pagePtr.p->frag_page_id = pageId;
+  pagePtr.p->physical_page_id = pagePtr.i;
+  pagePtr.p->nextList = RNIL;
+  pagePtr.p->prevList = RNIL;
+}
+
+Uint32 
+Dbtup::allocFragPage(Fragrecord* regFragPtr) 
+{
+  Uint32 noOfPagesAllocated = 0;
+  Uint32 retPageRef = RNIL;
+  allocConsPages(1, noOfPagesAllocated, retPageRef);
+  if (noOfPagesAllocated == 0) 
+  {
     jam();
-    return;
+    return 0;
   }//if
-  PageRangePtr regPRPtr;
-  regPRPtr.i = regFragPtr->rootPageRange;
-  ptrCheckGuard(regPRPtr, cnoOfPageRangeRec, pageRange);
-  while (true) {
-    jam();
-    const Uint32 indexPos = regPRPtr.p->currentIndexPos;
-    ndbrequire(indexPos < 4);
-
-    const Uint32 basePageId = regPRPtr.p->basePageId[indexPos];
-    regPRPtr.p->basePageId[indexPos] = RNIL;
-    if (basePageId == RNIL) {
-      jam();
-      /**
-       * Finished with indexPos continue with next
-       */
-      if (indexPos > 0) {
-        jam();
-	regPRPtr.p->currentIndexPos--;
-	continue;
-      }//if
-      
-      /* ---------------------------------------------------------------- */
-      /* THE PAGE RANGE REC IS EMPTY. RELEASE IT.                         */
-      /*----------------------------------------------------------------- */
-      Uint32 parentPtr = regPRPtr.p->parentPtr;
-      releasePagerange(regPRPtr);
-      
-      if (parentPtr != RNIL) {
-	jam();
-	regPRPtr.i = parentPtr;
-	ptrCheckGuard(regPRPtr, cnoOfPageRangeRec, pageRange);
-	continue;
-      }//if
-
-      jam();
-      ndbrequire(regPRPtr.i == regFragPtr->rootPageRange);
-      initFragRange(regFragPtr);
-      for (Uint32 i = 0; i<MAX_FREE_LIST; i++)
-      {
-	LocalDLList<Page> tmp(c_page_pool, regFragPtr->free_var_page_array[i]);
-	tmp.remove();
-      }
-
-      {
-	LocalDLList<Page> tmp(c_page_pool, regFragPtr->emptyPrimPage);
-	tmp.remove();
-      }
-
-      {
-	LocalDLFifoList<Page> tmp(c_page_pool, regFragPtr->thFreeFirst);
-	tmp.remove();
-      }
-
-      {
-	LocalSLList<Page> tmp(c_page_pool, regFragPtr->m_empty_pages);
-	tmp.remove();
-      }
-      
-      return;
-    } else {
-      if (regPRPtr.p->type[indexPos] == ZNON_LEAF) {
-        jam();
-	/* ---------------------------------------------------------------- */
-	// A non-leaf node, we must release everything below it before we 
-	// release this node.
-	/* ---------------------------------------------------------------- */
-        regPRPtr.i = basePageId;
-        ptrCheckGuard(regPRPtr, cnoOfPageRangeRec, pageRange);
-      } else {
-        jam();
-        ndbrequire(regPRPtr.p->type[indexPos] == ZLEAF);
-	/* ---------------------------------------------------------------- */
-	/* PAGE_RANGE_PTR /= RNIL AND THE CURRENT POS IS NOT A CHLED.       */
-	/*----------------------------------------------------------------- */
-	const Uint32 start = regPRPtr.p->startRange[indexPos];
-	const Uint32 stop = regPRPtr.p->endRange[indexPos];
-	ndbrequire(stop >= start);
-	const Uint32 retNo = (stop - start + 1);
-	returnCommonArea(basePageId, retNo);
-      }//if
-    }//if
-  }//while
-}//Dbtup::releaseFragPages()
-
-void Dbtup::initializePageRange() 
-{
-  PageRangePtr regPTRPtr;
-  for (regPTRPtr.i = 0;
-       regPTRPtr.i < cnoOfPageRangeRec; regPTRPtr.i++) {
-    ptrAss(regPTRPtr, pageRange);
-    regPTRPtr.p->nextFree = regPTRPtr.i + 1;
-  }//for
-  regPTRPtr.i = cnoOfPageRangeRec - 1;
-  ptrAss(regPTRPtr, pageRange);
-  regPTRPtr.p->nextFree = RNIL;
-  cfirstfreerange = 0;
-  c_noOfFreePageRanges = cnoOfPageRangeRec;
-}//Dbtup::initializePageRange()
-
-void Dbtup::initFragRange(Fragrecord* const regFragPtr)
-{
-  regFragPtr->rootPageRange = RNIL;
-  regFragPtr->currentPageRange = RNIL;
-  regFragPtr->noOfPages = 0;
-  regFragPtr->noOfVarPages = 0;
-  regFragPtr->noOfPagesToGrow = 2;
-  regFragPtr->nextStartRange = 0;
-}//initFragRange()
-
-Uint32 Dbtup::allocFragPages(Fragrecord* regFragPtr, Uint32 tafpNoAllocRequested) 
-{
-  Uint32 tafpPagesAllocated = 0;
-  while (true) {
-    Uint32 noOfPagesAllocated = 0;
-    Uint32 noPagesToAllocate = tafpNoAllocRequested - tafpPagesAllocated;
-    Uint32 retPageRef = RNIL;
-    allocConsPages(noPagesToAllocate, noOfPagesAllocated, retPageRef);
-    if (noOfPagesAllocated == 0) {
-      jam();
-      return tafpPagesAllocated;
-    }//if
-/* ---------------------------------------------------------------- */
-/*       IT IS NOW TIME TO PUT THE ALLOCATED AREA INTO THE PAGE     */
-/*       RANGE TABLE.                                               */
-/* ---------------------------------------------------------------- */
-    Uint32 startRange = regFragPtr->nextStartRange;
-    if (!insertPageRangeTab(regFragPtr, retPageRef, noOfPagesAllocated)) {
-      jam();
-      returnCommonArea(retPageRef, noOfPagesAllocated);
-      return tafpPagesAllocated;
-    }//if
-    tafpPagesAllocated += noOfPagesAllocated;
-    Uint32 loopLimit = retPageRef + noOfPagesAllocated;
-    PagePtr loopPagePtr;
-/* ---------------------------------------------------------------- */
-/*       SINCE A NUMBER OF PAGES WERE ALLOCATED FROM COMMON AREA    */
-/*       WITH SUCCESS IT IS NOW TIME TO CHANGE THE STATE OF         */
-/*       THOSE PAGES TO EMPTY_MM AND LINK THEM INTO THE EMPTY       */
-/*       PAGE LIST OF THE FRAGMENT.                                 */
-/* ---------------------------------------------------------------- */
-    Uint32 prev = RNIL;
-    for (loopPagePtr.i = retPageRef; loopPagePtr.i < loopLimit; loopPagePtr.i++) {
-      jam();
-      c_page_pool.getPtr(loopPagePtr);
-      loopPagePtr.p->page_state = ZEMPTY_MM;
-      loopPagePtr.p->frag_page_id = startRange +
-	(loopPagePtr.i - retPageRef);
-      loopPagePtr.p->physical_page_id = loopPagePtr.i;
-      loopPagePtr.p->nextList = loopPagePtr.i + 1;
-      loopPagePtr.p->prevList = prev;
-      prev = loopPagePtr.i;
-    }//for
-    loopPagePtr.i--;
-    ndbassert(loopPagePtr.p == c_page_pool.getPtr(loopPagePtr.i));
-    loopPagePtr.p->nextList = RNIL;
-    
+  
+  Uint32 pageId = regFragPtr->noOfPages;
+  DynArr256 map(c_page_map_pool, regFragPtr->m_page_map);
+  
+  Uint32 * ptr = map.set(pageId);
+  if (likely(ptr != 0))
+  {
+    jam();
+    * ptr = retPageRef;
+    regFragPtr->noOfPages = pageId + 1;
+
+    Ptr<Page> pagePtr;
+    pagePtr.i = retPageRef;
+    c_page_pool.getPtr(pagePtr);
+    init_page(regFragPtr, pagePtr, pageId);
     LocalDLList<Page> alloc(c_page_pool, regFragPtr->emptyPrimPage);
-    if (noOfPagesAllocated > 1)
-    {
-      alloc.add(retPageRef, loopPagePtr);
-    }
-    else
-    {
-      alloc.add(loopPagePtr);
-    }
-
-/* ---------------------------------------------------------------- */
-/*       WAS ENOUGH PAGES ALLOCATED OR ARE MORE NEEDED.             */
-/* ---------------------------------------------------------------- */
-    if (tafpPagesAllocated < tafpNoAllocRequested) {
-      jam();
-    } else {
-      ndbrequire(tafpPagesAllocated == tafpNoAllocRequested);
-      jam();
-      return tafpNoAllocRequested;
-    }//if
-  }//while
-}//Dbtup::allocFragPages()
-
-void Dbtup::allocMoreFragPages(Fragrecord* const regFragPtr) 
-{
-  Uint32 noAllocPages = regFragPtr->noOfPagesToGrow >> 3; // 12.5%
-  noAllocPages += regFragPtr->noOfPagesToGrow >> 4; // 6.25%
-  noAllocPages += 2;
-/* -----------------------------------------------------------------*/
-// We will grow by 18.75% plus two more additional pages to grow
-// a little bit quicker in the beginning.
-/* -----------------------------------------------------------------*/
-  Uint32 allocated = allocFragPages(regFragPtr, noAllocPages);
-  regFragPtr->noOfPagesToGrow += allocated;
-}//Dbtup::allocMoreFragPages()
-
-Uint32 Dbtup::leafPageRangeFull(Fragrecord*  const regFragPtr, PageRangePtr currPageRangePtr)
-{
-/* ---------------------------------------------------------------- */
-/*       THE COMPLEX CASE WHEN THE LEAF NODE IS FULL. GO UP THE TREE*/
-/*       TO FIND THE FIRST RECORD WITH A FREE ENTRY. ALLOCATE NEW   */
-/*       PAGE RANGE RECORDS THEN ALL THE WAY DOWN TO THE LEAF LEVEL */
-/*       AGAIN. THE TREE SHOULD ALWAYS REMAIN BALANCED.             */
-/* ---------------------------------------------------------------- */
-  PageRangePtr parentPageRangePtr;
-  PageRangePtr foundPageRangePtr;
-  parentPageRangePtr = currPageRangePtr;
-  Uint32 tiprNoLevels = 1;
-  while (true) {
-    jam();
-    parentPageRangePtr.i = parentPageRangePtr.p->parentPtr;
-    if (parentPageRangePtr.i == RNIL) {
-      jam();
-/* ---------------------------------------------------------------- */
-/*       WE HAVE REACHED THE ROOT. A NEW ROOT MUST BE ALLOCATED.    */
-/* ---------------------------------------------------------------- */
-      if (c_noOfFreePageRanges < tiprNoLevels) {
-        jam();
-        return RNIL;
-      }//if
-      PageRangePtr oldRootPRPtr;
-      PageRangePtr newRootPRPtr;
-      oldRootPRPtr.i = regFragPtr->rootPageRange;
-      ptrCheckGuard(oldRootPRPtr, cnoOfPageRangeRec, pageRange);
-      seizePagerange(newRootPRPtr);
-      regFragPtr->rootPageRange = newRootPRPtr.i;
-      oldRootPRPtr.p->parentPtr = newRootPRPtr.i;
-
-      newRootPRPtr.p->basePageId[0] = oldRootPRPtr.i;
-      newRootPRPtr.p->parentPtr = RNIL;
-      newRootPRPtr.p->startRange[0] = 0;
-      newRootPRPtr.p->endRange[0] = regFragPtr->nextStartRange - 1;
-      newRootPRPtr.p->type[0] = ZNON_LEAF;
-      newRootPRPtr.p->startRange[1] = regFragPtr->nextStartRange;
-      newRootPRPtr.p->endRange[1] = regFragPtr->nextStartRange - 1;
-      newRootPRPtr.p->type[1] = ZNON_LEAF;
-      newRootPRPtr.p->currentIndexPos = 1;
-      foundPageRangePtr = newRootPRPtr;
-      break;
-    } else {
-      jam();
-      ptrCheckGuard(parentPageRangePtr, cnoOfPageRangeRec, pageRange);
-      if (parentPageRangePtr.p->currentIndexPos < 3) {
-        jam();
-
-        if (c_noOfFreePageRanges < tiprNoLevels) 
-        {
-          jam();
-          return RNIL;
-        }//if
-	
-/* ---------------------------------------------------------------- */
-/*       WE HAVE FOUND AN EMPTY ENTRY IN A PAGE RANGE RECORD.       */
-/*       ALLOCATE A NEW PAGE RANGE RECORD, FILL IN THE START RANGE, */
-/*       ALLOCATE A NEW PAGE RANGE RECORD AND UPDATE THE POINTERS   */
-/* ---------------------------------------------------------------- */
-        parentPageRangePtr.p->currentIndexPos++;
-        parentPageRangePtr.p->startRange[parentPageRangePtr.p->currentIndexPos] = regFragPtr->nextStartRange;
-        parentPageRangePtr.p->endRange[parentPageRangePtr.p->currentIndexPos] = regFragPtr->nextStartRange - 1;
-        parentPageRangePtr.p->type[parentPageRangePtr.p->currentIndexPos] = ZNON_LEAF;
-        foundPageRangePtr = parentPageRangePtr;
-        break;
-      } else {
-        jam();
-        ndbrequire(parentPageRangePtr.p->currentIndexPos == 3);
-/* ---------------------------------------------------------------- */
-/*       THE PAGE RANGE RECORD WAS FULL. FIND THE PARENT RECORD     */
-/*       AND INCREASE THE NUMBER OF LEVELS WE HAVE TRAVERSED        */
-/*       GOING UP THE TREE.                                         */
-/* ---------------------------------------------------------------- */
-        tiprNoLevels++;
-      }//if
-    }//if
-  }//while
-/* ---------------------------------------------------------------- */
-/*       REMEMBER THE ERROR LEVEL IN CASE OF ALLOCATION ERRORS      */
-/* ---------------------------------------------------------------- */
-  PageRangePtr newPageRangePtr;
-  PageRangePtr prevPageRangePtr;
-  prevPageRangePtr = foundPageRangePtr;
-  if (c_noOfFreePageRanges < tiprNoLevels) {
-    jam();
-    return RNIL;
-  }//if
-/* ---------------------------------------------------------------- */
-/*       NOW WE HAVE PERFORMED THE SEARCH UPWARDS AND FILLED IN THE */
-/*       PROPER FIELDS IN THE PAGE RANGE RECORD WHERE SOME SPACE    */
-/*       WAS FOUND. THE NEXT STEP IS TO ALLOCATE PAGE RANGES SO     */
-/*       THAT WE KEEP THE B-TREE BALANCED. THE NEW PAGE RANGE       */
-/*       ARE ALSO PROPERLY UPDATED ON THE PATH TO THE LEAF LEVEL.   */
-/* ---------------------------------------------------------------- */
-  while (true) {
-    jam();
-    seizePagerange(newPageRangePtr);
-    tiprNoLevels--;
-    ndbrequire(prevPageRangePtr.p->currentIndexPos < 4);
-    prevPageRangePtr.p->basePageId[prevPageRangePtr.p->currentIndexPos] = newPageRangePtr.i;
-    newPageRangePtr.p->parentPtr = prevPageRangePtr.i;
-    newPageRangePtr.p->currentIndexPos = 0;
-    if (tiprNoLevels > 0) {
-      jam();
-      newPageRangePtr.p->startRange[0] = regFragPtr->nextStartRange;
-      newPageRangePtr.p->endRange[0] = regFragPtr->nextStartRange - 1;
-      newPageRangePtr.p->type[0] = ZNON_LEAF;
-      prevPageRangePtr = newPageRangePtr;
-    } else {
-      jam();
-      break;
-    }//if
-  }//while
-  return newPageRangePtr.i;
-}//Dbtup::leafPageRangeFull()
-
-void Dbtup::releasePagerange(PageRangePtr regPRPtr) 
-{
-  regPRPtr.p->nextFree = cfirstfreerange;
-  cfirstfreerange = regPRPtr.i;
-  c_noOfFreePageRanges++;
-}//Dbtup::releasePagerange()
-
-void Dbtup::seizePagerange(PageRangePtr& regPageRangePtr) 
-{
-  regPageRangePtr.i = cfirstfreerange;
-  ptrCheckGuard(regPageRangePtr, cnoOfPageRangeRec, pageRange);
-  cfirstfreerange = regPageRangePtr.p->nextFree;
-  regPageRangePtr.p->nextFree = RNIL;
-  regPageRangePtr.p->currentIndexPos = 0;
-  regPageRangePtr.p->parentPtr = RNIL;
-  for (Uint32 i = 0; i < 4; i++) {
-    regPageRangePtr.p->startRange[i] = 1;
-    regPageRangePtr.p->endRange[i] = 0;
-    regPageRangePtr.p->type[i] = ZNON_LEAF;
-    regPageRangePtr.p->basePageId[i] = (Uint32)-1;
-  }//for
-  c_noOfFreePageRanges--;
-}//Dbtup::seizePagerange()
+    alloc.add(pagePtr);
+    return 1;
+  }
+  
+  jam();
+  returnCommonArea(retPageRef, noOfPagesAllocated);
+  return 0;
+}//Dbtup::allocFragPage()
 
 void Dbtup::errorHandler(Uint32 errorCode)
 {

--- 1.28/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-05-01 21:52:55 +02:00
+++ 1.29/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2007-05-01 21:52:55 +02:00
@@ -653,7 +653,14 @@
       {
         if (pos.m_realpid_mm == RNIL) {
           jam();
-          pos.m_realpid_mm = getRealpid(fragPtr.p, key.m_page_no);
+          pos.m_realpid_mm = getRealpidCheck(fragPtr.p, key.m_page_no);
+          
+          if (pos.m_realpid_mm == RNIL)
+          {
+            jam();
+            pos.m_get = ScanPos::Get_next_page_mm;
+            break; // incr loop count
+          }
         }
         PagePtr pagePtr;
 	c_page_pool.getPtr(pagePtr, pos.m_realpid_mm);
Thread
bk commit into 5.1 tree (jonas:1.2514)jonas1 May