List:Internals« Previous MessageNext Message »
From:pekka Date:March 21 2005 12:01pm
Subject:bk commit into 5.0 tree (pekka:1.1781)
View as plain text  
Below is the list of changes that have just been committed into a local
5.0 repository of pekka. When pekka does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1781 05/03/21 12:01:25 pekka@stripped +4 -0
  ndb - common pool for records

  ndb/src/kernel/vm/Makefile.am
    1.6 05/03/21 12:00:32 pekka@stripped +2 -1
    SuperPool.cpp

  ndb/src/kernel/vm/testSuperPool.cpp
    1.1 05/03/21 11:58:46 pekka@stripped +220 -0

  ndb/src/kernel/vm/SuperPool.hpp
    1.1 05/03/21 11:58:46 pekka@stripped +561 -0

  ndb/src/kernel/vm/SuperPool.cpp
    1.1 05/03/21 11:58:46 pekka@stripped +442 -0

  ndb/src/kernel/vm/testSuperPool.cpp
    1.0 05/03/21 11:58:46 pekka@stripped +0 -0
    BitKeeper file
/export/space/pekka/ndb/version/my50-ndb/ndb/src/kernel/vm/testSuperPool.cpp

  ndb/src/kernel/vm/SuperPool.hpp
    1.0 05/03/21 11:58:46 pekka@stripped +0 -0
    BitKeeper file
/export/space/pekka/ndb/version/my50-ndb/ndb/src/kernel/vm/SuperPool.hpp

  ndb/src/kernel/vm/SuperPool.cpp
    1.0 05/03/21 11:58:46 pekka@stripped +0 -0
    BitKeeper file
/export/space/pekka/ndb/version/my50-ndb/ndb/src/kernel/vm/SuperPool.cpp

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	pekka
# Host:	clam.ndb.mysql.com
# Root:	/export/space/pekka/ndb/version/my50-ndb
--- New file ---
+++ ndb/src/kernel/vm/SuperPool.cpp	05/03/21 11:58:46
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include <ndb_global.h>
#include "SuperPool.hpp"

// SuperPool

SuperPool::SuperPool(Uint32 pageSize, Uint32 pageBits) :
  m_pageSize(SP_ALIGN_SIZE(pageSize, SP_ALIGN)),
  m_pageBits(pageBits),
  m_memRoot(0),
  m_pageEnt(0),
  m_typeCheck(0),
  m_typeSeq(0),
  m_pageList(),
  m_totalSize(0),
  m_initSize(0),
  m_incrSize(0),
  m_maxSize(0)
{
  assert(5 <= pageBits <= 30);
}

bool
SuperPool::init()
{
  return true;
}

SuperPool::~SuperPool()
{
}

SuperPool::PageEnt::PageEnt() :
  m_pageType(0),
  m_freeRecI(RNIL),
  m_useCount(0),
  m_nextPageI(RNIL),
  m_prevPageI(RNIL)
{
}

SuperPool::PageList::PageList() :
  m_headPageI(RNIL),
  m_tailPageI(RNIL),
  m_pageCount(0)
{
}

SuperPool::PageList::PageList(PtrI pageI) :
  m_headPageI(pageI),
  m_tailPageI(pageI),
  m_pageCount(1)
{
}

SuperPool::RecInfo::RecInfo(Uint32 recType, Uint32 recSize) :
  m_recType(recType),
  m_recSize(recSize),
  m_maxUseCount(0),
  m_currPageI(RNIL),
  m_currFreeRecI(RNIL),
  m_currUseCount(0),
  m_totalUseCount(0),
  m_totalRecCount(0),
  m_freeList(),
  m_activeList(),
  m_fullList()
{
}

SuperPool::PtrI
SuperPool::getPageI(void* pageP)
{
  const Uint32 pageSize = m_pageSize;
  const Uint32 pageBits = m_pageBits;
  const Uint32 recBits = 32 - pageBits;
  void* const memRoot = m_memRoot;
  assert(pageP == SP_ALIGN_PTR(pageP, memRoot, pageSize));
  my_ptrdiff_t ipL = ((Uint8*)pageP - (Uint8*)memRoot) / pageSize;
  Int32 ip = (Int32)ipL;
  Int32 lim = 1 << (pageBits - 1);
  assert(ip == ipL && -lim <= ip && ip < lim && ip != -1);
  PtrI pageI = ip << recBits;
  assert(pageP == getPageP(pageI));
  return pageI;
}

void
SuperPool::movePages(PageList& pl1, PageList& pl2)
{
  const Uint32 recBits = 32 - m_pageBits;
  if (pl1.m_pageCount != 0) {
    if (pl2.m_pageCount != 0) {
      PtrI pageI1 = pl1.m_tailPageI;
      PtrI pageI2 = pl2.m_headPageI;
      PageEnt& pe1 = getPageEnt(pageI1);
      PageEnt& pe2 = getPageEnt(pageI2);
      pe1.m_nextPageI = pageI2;
      pe2.m_prevPageI = pageI1;
      pl1.m_pageCount += pl2.m_pageCount;
    }
  } else {
    pl1 = pl2;
  }
  pl2.m_headPageI = pl2.m_tailPageI = RNIL;
  pl2.m_pageCount = 0;
}

void
SuperPool::addHeadPage(PageList& pl, PtrI pageI)
{
  PageList pl2(pageI);
  movePages(pl2, pl);
  pl = pl2;
}

void
SuperPool::addTailPage(PageList& pl, PtrI pageI)
{
  PageList pl2(pageI);
  movePages(pl, pl2);
}

void
SuperPool::removePage(PageList& pl, PtrI pageI)
{
  PageEnt& pe = getPageEnt(pageI);
  PtrI pageI1 = pe.m_prevPageI;
  PtrI pageI2 = pe.m_nextPageI;
  if (pageI1 != RNIL) {
    PageEnt& pe1 = getPageEnt(pageI1);
    pe1.m_nextPageI = pageI2;
    if (pageI2 != RNIL) {
      PageEnt& pe2 = getPageEnt(pageI2);
      pe2.m_prevPageI = pageI1;
    } else {
      pl.m_tailPageI = pageI1;
    }
  } else {
    if (pageI2 != RNIL) {
      PageEnt& pe2 = getPageEnt(pageI2);
      pe2.m_prevPageI = pageI1;
      pl.m_headPageI = pageI2;
    } else {
      pl.m_headPageI = pl.m_tailPageI = RNIL;
    }
  }
  pe.m_prevPageI = pe.m_nextPageI = RNIL;
  assert(pl.m_pageCount != 0);
  pl.m_pageCount--;
}

void
SuperPool::setCurrPage(RecInfo& ri, PtrI newPageI)
{
  PtrI oldPageI = ri.m_currPageI;
  if (oldPageI != RNIL) {
    // copy from cache
    PageEnt& pe = getPageEnt(oldPageI);
    pe.m_freeRecI = ri.m_currFreeRecI;
    pe.m_useCount = ri.m_currUseCount;
    // add to right list according to "pp2" policy
    if (pe.m_useCount == 0) {
      pe.m_pageType = 0;
      addHeadPage(m_pageList, oldPageI);
      ri.m_totalRecCount -= ri.m_maxUseCount;
    } else if (pe.m_useCount < ri.m_maxUseCount) {
      addHeadPage(ri.m_activeList, oldPageI);
    } else {
      addHeadPage(ri.m_fullList, oldPageI);
    }
  }
  if (newPageI != RNIL) {
    PageEnt& pe = getPageEnt(newPageI);
    // copy to cache
    ri.m_currPageI = newPageI;
    ri.m_currFreeRecI = pe.m_freeRecI;
    ri.m_currUseCount = pe.m_useCount;
    // remove from right list
    if (pe.m_useCount == 0) {
      removePage(ri.m_freeList, newPageI);
    } else if (pe.m_useCount < ri.m_maxUseCount) {
      removePage(ri.m_activeList, newPageI);
    } else {
      removePage(ri.m_fullList, newPageI);
    }
  } else {
    ri.m_currPageI = RNIL;
    ri.m_currFreeRecI = RNIL;
    ri.m_currUseCount = 0;
  }
}

bool
SuperPool::getAvailPage(RecInfo& ri)
{
  PtrI pageI;
  if ((pageI = ri.m_activeList.m_headPageI) != RNIL ||
      (pageI = ri.m_freeList.m_headPageI) != RNIL ||
      (pageI = getFreePage(ri)) != RNIL) {
    setCurrPage(ri, pageI);
    return true;
  }
  return false;
}

SuperPool::PtrI
SuperPool::getFreePage(RecInfo& ri)
{
  PtrI pageI;
  if (m_pageList.m_pageCount != 0) {
    pageI = m_pageList.m_headPageI;
    removePage(m_pageList, pageI);
  } else {
    pageI = getNewPage();
    if (pageI == RNIL)
      return RNIL;
  }
  void* pageP = getPageP(pageI);
  // set up free record list
  Uint32 maxUseCount = ri.m_maxUseCount;
  Uint32 recSize = ri.m_recSize;
  void* recP = (Uint8*)pageP;
  Uint32 irNext = 1;
  while (irNext < maxUseCount) {
    *(Uint32*)recP = pageI | irNext;
    recP = (Uint8*)recP + recSize;
    irNext++;
  }
  *(Uint32*)recP = RNIL;
  // add to total record count
  ri.m_totalRecCount += maxUseCount;
  // set up new page entry
  PageEnt& pe = getPageEnt(pageI);
  new (&pe) PageEnt();
  pe.m_pageType = ri.m_recType;
  pe.m_freeRecI = pageI | 0;
  pe.m_useCount = 0;
  // set type check bits
  setCheckBits(pageI, ri.m_recType);
  // add to record pool free list
  addHeadPage(ri.m_freeList, pageI);
  return pageI;
}

void
SuperPool::setSizes(size_t initSize, size_t incrSize, size_t maxSize)
{
  const Uint32 pageSize = m_pageSize;
  m_initSize = SP_ALIGN_SIZE(initSize, pageSize);
  m_incrSize = SP_ALIGN_SIZE(incrSize, pageSize);
  m_maxSize = SP_ALIGN_SIZE(maxSize, pageSize);
}

void
SuperPool::verify(RecInfo& ri)
{
  PageList* plList[3] = { &ri.m_freeList, &ri.m_activeList, &ri.m_fullList };
  for (int i = 0; i < 3; i++) {
    PageList& pl = *plList[i];
    unsigned count = 0;
    PtrI pageI = pl.m_headPageI;
    while (pageI != RNIL) {
      PageEnt& pe = getPageEnt(pageI);
      PtrI pageI1 = pe.m_prevPageI;
      PtrI pageI2 = pe.m_nextPageI;
      if (count == 0) {
        assert(pageI1 == RNIL);
      } else {
        assert(pageI1 != RNIL);
        PageEnt& pe1 = getPageEnt(pageI1);
        assert(pe1.m_nextPageI == pageI);
        if (pageI2 != RNIL) {
          PageEnt& pe2 = getPageEnt(pageI2);
          assert(pe2.m_prevPageI == pageI);
        }
      }
      pageI = pageI2;
      count++;
    }
    assert(pl.m_pageCount == count);
  }
}

// HeapPool

HeapPool::HeapPool(Uint32 pageSize, Uint32 pageBits) :
  SuperPool(pageSize, pageBits),
  m_areaHead(),
  m_currArea(&m_areaHead),
  m_lastArea(&m_areaHead),
  m_mallocPart(4)
{
}

bool
HeapPool::init()
{
  const Uint32 pageBits = m_pageBits;
  if (! SuperPool::init())
    return false;;
  // allocate page entry array
  Uint32 peBytes = (1 << pageBits) * sizeof(PageEnt);
  m_pageEnt = static_cast<PageEnt*>(malloc(peBytes));
  if (m_pageEnt == 0)
    return false;
  memset(m_pageEnt, 0, peBytes);
  // allocate type check array
  Uint32 tcWords = 1 << (pageBits - (5 - SP_CHECK_LOG2));
  m_typeCheck = static_cast<Uint32*>(malloc(tcWords << 2));
  if (m_typeCheck == 0)
    return false;
  memset(m_typeCheck, 0, tcWords << 2);
  // allocate initial data
  assert(m_totalSize == 0);
  if (! allocMoreData(m_initSize))
    return false;
  return true;
}

HeapPool::~HeapPool()
{
  free(m_pageEnt);
  free(m_typeCheck);
  Area* ap;
  while ((ap = m_areaHead.m_nextArea) != 0) {
    m_areaHead.m_nextArea = ap->m_nextArea;
    free(ap->m_memory);
    free(ap);
  }
}

HeapPool::Area::Area() :
  m_nextArea(0),
  m_firstPageI(RNIL),
  m_currPage(0),
  m_numPages(0),
  m_memory(0)
{
}

SuperPool::PtrI
HeapPool::getNewPage()
{
  const Uint32 pageSize = m_pageSize;
  const Uint32 pageBits = m_pageBits;
  const Uint32 recBits= 32 - pageBits;
  Area* ap = m_currArea;
  if (ap->m_currPage == ap->m_numPages) {
    // area is used up
    if (ap->m_nextArea == 0) {
      // todo dynamic increase
      assert(m_incrSize == 0);
      return RNIL;
    }
    ap = m_currArea = ap->m_nextArea;
  }
  assert(ap->m_currPage < ap->m_numPages);
  PtrI pageI = ap->m_firstPageI;
  Int32 ip = (Int32)pageI >> recBits;
  ip += ap->m_currPage;
  pageI = ip << recBits;
  ap->m_currPage++;
  return pageI;
}

bool
HeapPool::allocMoreData(size_t size)
{
  const Uint32 pageSize = m_pageSize;
  const Uint32 pageBits = m_pageBits;
  const Uint32 recBits = 32 - pageBits;
  const Uint32 incrSize = m_incrSize;
  const Uint32 incrPages = incrSize / pageSize;
  const Uint32 mallocPart = m_mallocPart;
  size = SP_ALIGN_SIZE(size, pageSize);
  if (incrSize != 0)
    size = SP_ALIGN_SIZE(size, incrSize);
  Uint32 needPages = size / pageSize;
  while (needPages != 0) {
    Uint32 wantPages = needPages;
    if (incrPages != 0 && wantPages > incrPages)
      wantPages = incrPages;
    Uint32 tryPages = 0;
    void* p1 = 0;
    for (Uint32 i = mallocPart; i > 0 && p1 == 0; i--) {
      // one page is usually wasted due to alignment to memory root
      tryPages = ((wantPages + 1) * i) / mallocPart;
      if (tryPages < 2)
        break;
      p1 = malloc(pageSize * tryPages);
    }
    if (p1 == 0)
      return false;
    if (m_memRoot == 0) {
      // set memory root at first "big" alloc
      // assume malloc header makes later ip = -1 impossible
      m_memRoot = p1;
    }
    void* p2 = SP_ALIGN_PTR(p1, m_memRoot, pageSize);
    Uint32 numPages = tryPages - (p1 != p2);
    my_ptrdiff_t ipL = ((Uint8*)p2 - (Uint8*)m_memRoot) / pageSize;
    Int32 ip = (Int32)ipL;
    Int32 lim = 1 << (pageBits - 1);
    if (! (ip == ipL && -lim <= ip && ip + numPages < lim)) {
      free(p1);
      return false;
    }
    assert(ip != -1);
    PtrI pageI = ip << recBits;
    needPages = (needPages >= numPages ? needPages - numPages : 0);
    m_totalSize += numPages * pageSize;
    // allocate new area
    Area* ap = static_cast<Area*>(malloc(sizeof(Area)));
    if (ap == 0) {
      free(p1);
      return false;
    }
    new (ap) Area();
    ap->m_firstPageI = pageI;
    ap->m_numPages = numPages;
    ap->m_memory = p1;
    m_lastArea->m_nextArea = ap;
    m_lastArea = ap;
  }
  return true;
}

--- New file ---
+++ ndb/src/kernel/vm/SuperPool.hpp	05/03/21 11:58:46
/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#ifndef SUPER_POOL_HPP
#define SUPER_POOL_HPP

#include <ndb_global.h>

#include <pc.hpp>
#include <ErrorReporter.hpp>

#define NDB_SP_VERIFY_LEVEL 1

/*
 * SuperPool - super pool for record pools (abstract class)
 *
 * Documents SuperPool and RecordPool<T>.
 *
 * GENERAL
 *
 * A "super pool" is a shared pool of pages of fixed size.  A "record
 * pool" is a pool of records of fixed size.  One super pool instance is
 * used by any number of record pools to allocate their memory.
 * A special case is a "page pool" where a record is a simple page,
 * possibly smaller than super pool page.
 *
 * A record pool allocates memory in pages.  Thus each used page is
 * associated with one record pool and one record type.  The records on
 * a page form an array starting at start of page.  Thus each record has
 * an index within the page.  Any last partial record which does not fit
 * on the page is disregarded.
 *
 * I-VALUE
 *
 * The old "i-p" principle is kept.  A reference to a super pool page or
 * record is stored as an "i-value" from which the record pointer "p" is
 * computed.  In super pool the i-value is a Uint32 with two parts:
 *
 * - "ip" index of page within super pool (high pageBits)
 * - "ir" index of record within page (low recBits)
 *
 * The translation between "ip" and page address is described in next
 * section.  Once page address is known, the record address is found
 * from "ir" in the obvious way.
 *
 * The main advantage with i-value is that it can be verified.  The
 * level of verification depends on compile type (release, debug).
 *
 * - "v0" minimal sanity check
 * - "v1" check record type matches page type, see below
 * - "v2" check record is in use (not yet implemented)
 *
 * Another advantage of a 32-bit i-value is that it extends the space of
 * 32-bit addressable records on a 64-bit platform.
 *
 * RNIL is 0xffffff00 and indicates NULL i-value.  To avoid hitting RNIL
 * it is required that pageBits <= 30 and that the maximum value of the
 * range (2^pageBits-1) is not used.
 *
 * MEMORY ROOT
 *
 * This super pool requires a "memory root" i.e. a memory address such
 * that the index of a page "ip" satisfies
 *
 *   page address = memory root + (signed)ip * page size
 *
 * This is possible on most platforms, provided that the memory root and
 * all pages are either on the heap or on the stack, in order to keep
 * the size of "ip" reasonably small.
 *
 * The cast (signed)ip is done as integer of pageBits bits.  "ip" has
 * same sign bit as i-value "i" so (signed)ip = (Int32)i >> recBits.
 * The RNIL restriction can be expressed as (signed)ip != -1.
 *
 * PAGE ENTRIES
 *
 * Each super pool page has a "page entry".  It contains:
 *
 * - page type
 * - i-value of first free record on page
 * - page use count, to see if page can be freed
 * - pointers (as i-values) to next and previous page in list
 *
 * Page entry cannot be stored on the page itself since this prevents
 * aligning pages to OS block size and the use of BATs (don't ask) for
 * page pools in NDB.  For now the implementation provides an array of
 * page entries with place for all (2^pageBits) entries.
 *
 * PAGE TYPE
 *
 * Page type is (in principle) unique to the record pool using the super
 * pool.  It is assigned in record pool constructor.  Page type zero
 * means that the page is free i.e. not allocated to a record pool.
 *
 * Each "i-p" conversion checks ("v1") that the record belongs to same
 * pool as the page.  This check is much more common than page or record
 * allocation.  To make it cache effective, there is a separate array of
 * reduced "type bits" (computed from real type).
 *
 * FREE LISTS
 *
 * A record is either used or on the free list of the record pool.
 * A page has a use count i.e. number of used records.  When use count
 * drops to zero the page can be returned to the super pool.  This is
 * not necessarily done at once, or ever.
 *
 * To make freeing pages feasible, the record pool free list has two
 * levels.  There are available pages (some free) and a singly linked
 * free list within the page.  A page allocated to record pool is on one
 * of 4 lists:
 *
 * - free page list (all free, available)
 * - active page list (some free, some used, available)
 * - full page list (none free)
 * - current page (list of 1), see below
 *
 * Some usage types (temporary pools) may never free records.  They pay
 * a small penalty for the extra overhead.
 *
 * RECORD POOL
 *
 * A pool of records which allocates its memory from a super pool
 * instance specified in the constructor.  There are 3 basic operations:
 *
 * - getPtr - translate i-value to pointer-to-record p
 * - seize - allocate record
 * - release - free record
 *
 * CURRENT PAGE
 *
 * getPtr is a fast computation which does not touch the page.  For
 * seize and release there is an optimization:
 *
 * Define "current page" as page of latest seize or release.  Its page
 * entry is cached under record pool instance.  The page is removed from
 * its normal list.  Seize and release on current page are fast and
 * avoid touching the page.  The current page is used until
 *
 * - seize and current page is full
 * - release and the page is not current page
 *
 * Then the real page entry is updated and the page is added to the
 * appropriate list, and a new page is made current.
 *
 * PAGE POLICY
 *
 * Allocating new page to record pool is expensive.  Therefore record
 * pool should not always return empty pages to super pool.  There are
 * two trivial policies, each with problems:
 *
 * - "pp1" never return empty page to super pool
 * - "pp2" always return empty page to super pool
 *
 * This implementation uses "pp2" for now.  A real policy is implemented
 * in next version.
 *
 * OPEN ISSUES AND LIMITATIONS
 *
 * - smarter (virtual) placement of check bits & page entries
 * - should getPtr etc be inlined?  (too much code)
 * - real page policy
 * - other implementations (only HeapPool is done)
 * - super pool list of all record pools, for statistics etc
 * - access by multiple threads is not supported
 */

// align size
#define SP_ALIGN_SIZE(sz, al) \
  (((sz) + (al) - 1) & ~((al) - 1))

// align pointer relative to base
#define SP_ALIGN_PTR(p, base, al) \
  (void*)((Uint8*)(base) + SP_ALIGN_SIZE((Uint8*)(p) - (Uint8*)(base), (al)))

class SuperPool {
public:
  // Type of i-value, used to reference both pages and records.  Page
  // index "ip" occupies the high bits.  The i-value of a page is same
  // as i-value of record 0 on the page.
  typedef Uint32 PtrI;

  // Size and address alignment given as number of bytes (power of 2).
  STATIC_CONST( SP_ALIGN = 8 );

  // Page entry.  Current|y allocated as array of (2^pageBits).
  struct PageEnt {
    PageEnt();
    Uint32 m_pageType;
    Uint32 m_freeRecI;
    Uint32 m_useCount;
    PtrI m_nextPageI;
    PtrI m_prevPageI;
  };

  // Number of bits for cache effective type check given as log of 2.
  // Example: 2 means 4 bits and uses 32k for 2g of 32k pages.
  STATIC_CONST( SP_CHECK_LOG2 = 2 );

  // Doubly-linked list of pages.  There is one free list in super pool
  // and free, active, full list in each record pool.
  struct PageList {
    PageList::PageList();
    PageList::PageList(PtrI pageI);
    PtrI m_headPageI;
    PtrI m_tailPageI;
    Uint32 m_pageCount;
  };

  // Record pool information.  Each record pool instance contains one.
  struct RecInfo {
    RecInfo(Uint32 recType, Uint32 recSize);
    const Uint32 m_recType;
    const Uint32 m_recSize;
    Uint32 m_maxUseCount;       // could be computed
    Uint32 m_currPageI;         // current page
    Uint32 m_currFreeRecI;
    Uint32 m_currUseCount;
    Uint32 m_totalUseCount;     // total per pool
    Uint32 m_totalRecCount;
    PageList m_freeList;
    PageList m_activeList;
    PageList m_fullList;
  };

  // Constructor.  Gives page size in bytes (excluding page header) and
  // number of bits to use for page index "ip" in i-value.
  SuperPool(Uint32 pageSize, Uint32 pageBits);

  // Initialize.  Must be called after setting sizes or other parameters
  // and before the pool is used.
  virtual bool init();

  // Destructor.
  virtual ~SuperPool() = 0;

  // Translate i-value to page entry.
  PageEnt& getPageEnt(PtrI pageI);

  // Translate i-value to page address.
  void* getPageP(PtrI pageI);

  // Translate page address to i-value (unused).
  PtrI getPageI(void* pageP);

  // Given type, return non-zero reduced type check bits.
  Uint32 makeCheckBits(Uint32 type);

  // Get type check bits from type check array.
  Uint32 getCheckBits(PtrI pageI);

  // Set type check bits in type check array.
  void setCheckBits(PtrI pageI, Uint32 type);

  // Translate i-value to record address.
  void* getRecP(PtrI recI, RecInfo& ri);

  // Move all pages from second list to end of first list.
  void movePages(PageList& pl1, PageList& pl2);

  // Add page to beginning of page list.
  void addHeadPage(PageList& pl, PtrI pageI);

  // Add page to end of page list.
  void addTailPage(PageList& pl, PtrI pageI);

  // Remove any page from page list.
  void removePage(PageList& pl, PtrI pageI);

  // Set current page.  Previous current page is updated and added to
  // appropriate list.
  void setCurrPage(RecInfo& ri, PtrI pageI);

  // Get page with some free records and make it current.  Takes head of
  // active or free list, or else gets free page from super pool.
  bool getAvailPage(RecInfo& ri);

  // Get free page from super pool and add it to record pool free list.
  // This is an expensive subroutine of getAvailPage().
  PtrI getFreePage(RecInfo& ri);

  // Get new free page from the implementation.
  virtual PtrI getNewPage() = 0;

  // Set 3 size parameters, rounded to page size.  If called before
  // init() then init() allocates the initial size.
  void setSizes(size_t initSize = 0, size_t incrSize = 0, size_t maxSize = 0);

  const Uint32 m_pageSize;
  const Uint32 m_pageBits;
  // implementation must set up these pointers
  void* m_memRoot;
  PageEnt* m_pageEnt;
  Uint32* m_typeCheck;
  Uint32 m_typeSeq;
  PageList m_pageList;
  size_t m_totalSize;
  size_t m_initSize;
  size_t m_incrSize;
  size_t m_maxSize;

  // Debugging.
  void verify(RecInfo& ri);
};

inline SuperPool::PageEnt&
SuperPool::getPageEnt(PtrI pageI)
{
  Uint32 ip = pageI >> (32 - m_pageBits);
  return m_pageEnt[ip];
}

inline void*
SuperPool::getPageP(PtrI ptrI)
{
  Int32 ip = (Int32)ptrI >> (32 - m_pageBits);
  my_ptrdiff_t sz = m_pageSize;
  void* pageP = (Uint8*)m_memRoot + ip * sz;
  return pageP;
}

inline Uint32
SuperPool::makeCheckBits(Uint32 type)
{
  Uint32 shift = 1 << SP_CHECK_LOG2;
  Uint32 mask = (1 << shift) - 1;
  return 1 + type % mask;
}

inline Uint32
SuperPool::getCheckBits(PtrI pageI)
{
  Uint32 ip = pageI >> (32 - m_pageBits);
  Uint32 xp = ip >> (5 - SP_CHECK_LOG2);
  Uint32 yp = ip & (1 << (5 - SP_CHECK_LOG2)) - 1;
  Uint32& w = m_typeCheck[xp];
  Uint32 shift = 1 << SP_CHECK_LOG2;
  Uint32 mask = (1 << shift) - 1;
  // get
  Uint32 bits = (w >> yp * shift) & mask;
  return bits;
}

inline void
SuperPool::setCheckBits(PtrI pageI, Uint32 type)
{
  Uint32 ip = pageI >> (32 - m_pageBits);
  Uint32 xp = ip >> (5 - SP_CHECK_LOG2);
  Uint32 yp = ip & (1 << (5 - SP_CHECK_LOG2)) - 1;
  Uint32& w = m_typeCheck[xp];
  Uint32 shift = 1 << SP_CHECK_LOG2;
  Uint32 mask = (1 << shift) - 1;
  // set
  Uint32 bits = makeCheckBits(type);
  w &= ~(mask << yp * shift);
  w |= (bits << yp * shift);
}

inline void*
SuperPool::getRecP(PtrI ptrI, RecInfo& ri)
{
  const Uint32 recMask = (1 << (32 - m_pageBits)) - 1;
  PtrI pageI = ptrI & ~recMask;
#if NDB_SP_VERIFY_LEVEL >= 1
  Uint32 bits1 = getCheckBits(pageI);
  Uint32 bits2 = makeCheckBits(ri.m_recType);
  assert(bits1 == bits2);
#endif
  void* pageP = getPageP(pageI);
  Uint32 ir = ptrI & recMask;
  void* recP = (Uint8*)pageP + ir * ri.m_recSize;
  return recP;
}

/*
 * HeapPool - SuperPool on heap (concrete class)
 *
 * A super pool based on malloc with memory root on the heap.  This
 * pool type has 2 realistic uses:
 *
 * - a small pool with only initial malloc and pageBits set to match
 * - the big pool from which all heap allocations are done
 *
 * A "smart" malloc may break "ip" limit by using different VM areas for
 * different sized requests.  For this reason malloc is done in units of
 * increment size if possible.   Memory root is set to start of first
 * malloc.
 */

class HeapPool : public SuperPool {
public:
  // Describes malloc area.  The areas are kept in singly linked list.
  // There is a list head and pointers to current and last area.
  struct Area {
    Area();
    Area* m_nextArea;
    PtrI m_firstPageI;
    Uint32 m_currPage;
    Uint32 m_numPages;
    void* m_memory;
  };

  // Constructor.
  HeapPool(Uint32 pageSize, Uint32 pageBits);

  // Initialize.
  virtual bool init();

  // Destructor.
  virtual ~HeapPool();

  // Use malloc to allocate more.
  bool allocMoreData(size_t size);

  // Get new page from current area.
  virtual PtrI getNewPage();

  // List of malloc areas.
  Area m_areaHead;
  Area* m_currArea;
  Area* m_lastArea;

  // Fraction of malloc size to try if cannot get all in one.
  Uint32 m_mallocPart;
};

/*
 * RecordPool -  record pool using one super pool instance (template)
 *
 * Documented under SuperPool.  Satisfies ArrayPool interface.
 */

template <class T>
class RecordPool {
public:
  // Constructor.
  RecordPool(SuperPool& superPool);

  // Destructor.
  ~RecordPool();

  // Update pointer ptr.p according to i-value ptr.i.
  void getPtr(Ptr<T>& ptr);

  // Allocate record from the pool.
  bool seize(Ptr<T>& ptr);

  // Return record to the pool.
  void release(Ptr<T>& ptr);

  // todo variants of basic methods

  // Return all pages to super pool.  The force flag is required if
  // there are any used records.
  void free(bool force);

  SuperPool& m_superPool;
  SuperPool::RecInfo m_recInfo;
};

template <class T>
inline
RecordPool<T>::RecordPool(SuperPool& superPool) :
  m_superPool(superPool),
  m_recInfo(1 + superPool.m_typeSeq++, sizeof(T))
{
  SuperPool::RecInfo& ri = m_recInfo;
  assert(sizeof(T) == SP_ALIGN_SIZE(sizeof(T), sizeof(Uint32)));
  Uint32 maxUseCount = superPool.m_pageSize / sizeof(T);
  Uint32 sizeLimit = 1 << (32 - superPool.m_pageBits);
  if (maxUseCount >= sizeLimit)
    maxUseCount = sizeLimit;
  ri.m_maxUseCount = maxUseCount;
}

template <class T>
inline
RecordPool<T>::~RecordPool()
{
  free(true);
}

template <class T>
inline void
RecordPool<T>::getPtr(Ptr<T>& ptr)
{
  void* recP = m_superPool.getRecP(ptr.i, m_recInfo);
  ptr.p = static_cast<T*>(recP);
}

template <class T>
inline bool
RecordPool<T>::seize(Ptr<T>& ptr)
{
  SuperPool& sp = m_superPool;
  SuperPool::RecInfo& ri = m_recInfo;
  if (ri.m_currFreeRecI != RNIL || sp.getAvailPage(ri)) {
    SuperPool::PtrI recI = ri.m_currFreeRecI;
    void* recP = sp.getRecP(recI, ri);
    ri.m_currFreeRecI = *(Uint32*)recP;
    Uint32 useCount = ri.m_currUseCount;
    assert(useCount < ri.m_maxUseCount);
    ri.m_currUseCount = useCount + 1;
    ri.m_totalUseCount++;
    ptr.i = recI;
    ptr.p = static_cast<T*>(recP);
    return true;
  }
  return false;
}

template <class T>
inline void
RecordPool<T>::release(Ptr<T>& ptr)
{
  SuperPool& sp = m_superPool;
  SuperPool::RecInfo& ri = m_recInfo;
  const Uint32 recMask = (1 << (32 - sp.m_pageBits)) - 1;
  SuperPool::PtrI recI = ptr.i;
  SuperPool::PtrI pageI = recI & ~recMask;
  if (pageI != ri.m_currPageI) {
    sp.setCurrPage(ri, pageI);
  }
  void* recP = sp.getRecP(recI, ri);
  *(Uint32*)recP = ri.m_currFreeRecI;
  ri.m_currFreeRecI = recI;
  Uint32 useCount = ri.m_currUseCount;
  assert(useCount != 0);
  ri.m_currUseCount = useCount - 1;
  ri.m_totalUseCount--;
  ptr.i = RNIL;
  ptr.p = 0;
}

template <class T>
inline void
RecordPool<T>::free(bool force)
{
  SuperPool& sp = m_superPool;
  SuperPool::RecInfo& ri = m_recInfo;
  sp.setCurrPage(ri, RNIL);
  assert(force || ri.m_totalUseCount == 0);
  sp.movePages(sp.m_pageList, ri.m_freeList);
  sp.movePages(sp.m_pageList, ri.m_activeList);
  sp.movePages(sp.m_pageList, ri.m_fullList);
  ri.m_totalRecCount = 0;
}

#endif

--- New file ---
+++ ndb/src/kernel/vm/testSuperPool.cpp	05/03/21 11:58:46
#if 0
make -f Makefile -f - testSuperPool <<'_eof_'
testSuperPool: testSuperPool.cpp libkernel.a
	$(CXXCOMPILE) -o $@ $@.cpp libkernel.a -L../../common/util/.libs -lgeneral
_eof_
exit $?
#endif

/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

#include "SuperPool.hpp"
#include <NdbOut.hpp>

template <Uint32 sz>
struct A {
  Uint32 a[sz];
  void fill() {
    Uint32 c = 0;
    for (Uint32 i = 0; i + 1 < sz; i++) {
      a[i] = random();
      c = (c << 1) ^ a[i];
    }
    a[sz - 1] = c;
  }
  void check() {
    Uint32 c = 0;
    for (Uint32 i = 0; i + 1 < sz; i++) {
      c = (c << 1) ^ a[i];
    }
    assert(a[sz - 1] == c);
  }
};

static Uint32
urandom(Uint32 n)
{
  return (Uint32)random() % n;
}

static Uint32
random_coprime(Uint32 n)
{
  Uint32 prime[] = { 101, 211, 307, 401, 503, 601, 701, 809, 907 };
  Uint32 count = sizeof(prime) / sizeof(prime[0]);
  while (1) {
    Uint32 i = urandom(count);
    if (n % prime[i] != 0)
      return prime[i];
  }
}

static int
cmpPtrI(const void* a, const void* b)
{
  Ptr<const void> u = *(Ptr<const void>*)a;
  Ptr<const void> v = *(Ptr<const void>*)b;
  return u.i < v.i ? -1 : u.i > v.i ? +1 : 0;
}

static int
cmpPtrP(const void* a, const void* b)
{
  Ptr<const void> u = *(Ptr<const void>*)a;
  Ptr<const void> v = *(Ptr<const void>*)b;
  return u.p < v.p ? -1 : u.p > v.p ? +1 : 0;
}

static Uint32 loopcount = 3;

template <Uint32 sz>
void
sp_test(SuperPool& sp)
{
  typedef A<sz> T;
  RecordPool<T> rp(sp);
  SuperPool::RecInfo& ri = rp.m_recInfo;
  Uint32 pageCount = sp.m_totalSize / sp.m_pageSize;
  Uint32 perPage = rp.m_recInfo.m_maxUseCount;
  Uint32 perPool = perPage * pageCount;
  ndbout << "pages=" << pageCount << " perpage=" << perPage
<< " perpool=" << perPool << endl;
  Ptr<T>* ptrList = new Ptr<T> [perPool];
  memset(ptrList, 0x1f, perPool * sizeof(Ptr<T>));
  Uint32 loop;
  for (loop = 0; loop < loopcount; loop++) {
    ndbout << "loop " << loop << endl;
    Uint32 i, j;
    // seize all
    ndbout << "seize all" << endl;
    for (i = 0; i < perPool + 1; i++) {
      j = i;
      sp.verify(ri);
      Ptr<T> ptr1 = { 0, RNIL };
      if (! rp.seize(ptr1))
        break;
      // write value
      ptr1.p->fill();
      ptr1.p->check();
      // verify getPtr
      Ptr<T> ptr2 = { 0, ptr1.i };
      rp.getPtr(ptr2);
      assert(ptr1.i == ptr2.i && ptr1.p == ptr2.p);
      // save
      ptrList[j] = ptr1;
    }
    assert(i == perPool);
    assert(ri.m_totalUseCount == perPool && ri.m_totalRecCount == perPool);
    sp.verify(ri);
    // check duplicates
    {
      Ptr<T>* ptrList2 = new Ptr<T> [perPool];
      memcpy(ptrList2, ptrList, perPool * sizeof(Ptr<T>));
      qsort(ptrList2, perPool, sizeof(Ptr<T>), cmpPtrI);
      for (i = 1; i < perPool; i++)
        assert(ptrList2[i - 1].i != ptrList2[i].i);
      qsort(ptrList2, perPool, sizeof(Ptr<T>), cmpPtrP);
      for (i = 1; i < perPool; i++)
        assert(ptrList2[i - 1].p != ptrList2[i].p);
      delete [] ptrList2;
    }
    // release all in various orders
    ndbout << "release all" << endl;
    Uint32 coprime = random_coprime(perPool);
    for (i = 0; i < perPool; i++) {
      sp.verify(ri);
      switch (loop % 3) {
      case 0:   // ascending
        j = i;
        break;
      case 1:   // descending
        j = perPool - 1 - i;
        break;
      case 2:   // pseudo-random
        j = (coprime * i) % perPool;
        break;
      }
      Ptr<T>& ptr = ptrList[j];
      assert(ptr.i != RNIL && ptr.p != 0);
      ptr.p->check();
      rp.release(ptr);
      assert(ptr.i == RNIL && ptr.p == 0);
    }
    sp.setCurrPage(ri, RNIL);
    assert(ri.m_totalUseCount == 0 && ri.m_totalRecCount == 0);
    sp.verify(ri);
    // seize/release at random
    ndbout << "seize/release at random" << endl;
    for (i = 0; i < loopcount * perPool; i++) {
      j = urandom(perPool);
      Ptr<T>& ptr = ptrList[j];
      if (ptr.i == RNIL) {
        rp.seize(ptr);
        ptr.p->fill();
      } else {
        ptr.p->check();
        rp.release(ptr);
      }
    }
    ndbout << "used " << ri.m_totalUseCount << endl;
    sp.verify(ri);
    // release all
    ndbout << "release all" << endl;
    for (i = 0; i < perPool; i++) {
      j = i;
      Ptr<T>& ptr = ptrList[j];
      if (ptr.i != RNIL) {
        ptr.p->check();
        rp.release(ptr);
      }
    }
    sp.setCurrPage(ri, RNIL);
    assert(ri.m_totalUseCount == 0 && ri.m_totalRecCount == 0);
    sp.verify(ri);
  }
  // done
  delete [] ptrList;
}

static Uint32 pageCount = 99;
static Uint32 pageSize = 32768;
static Uint32 pageBits = 15;

const Uint32 sz1 = 3, sz2 = 4, sz3 = 53, sz4 = 424, sz5 = 5353;

template void sp_test<sz1>(SuperPool& sp);
template void sp_test<sz2>(SuperPool& sp);
template void sp_test<sz3>(SuperPool& sp);
template void sp_test<sz4>(SuperPool& sp);
template void sp_test<sz5>(SuperPool& sp);

int
main()
{
  HeapPool sp(pageSize, pageBits);
  sp.setSizes(pageCount * pageSize);
  if (! sp.init())
    assert(false);
  Uint16 s = (Uint16)getpid();
  srandom(s);
  ndbout << "rand " << s << endl;
  sp_test<sz1>(sp);
  sp_test<sz2>(sp);
  sp_test<sz3>(sp);
  sp_test<sz4>(sp);
  sp_test<sz5>(sp);
  return 0;
}


--- 1.5/ndb/src/kernel/vm/Makefile.am	2004-11-11 21:25:32 +01:00
+++ 1.6/ndb/src/kernel/vm/Makefile.am	2005-03-21 12:00:32 +01:00
@@ -18,7 +18,8 @@
         SimplePropertiesSection.cpp \
         SectionReader.cpp \
 	MetaData.cpp \
-        Mutex.cpp SafeCounter.cpp
+        Mutex.cpp SafeCounter.cpp \
+	SuperPool.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/ndb/src/mgmapi
 
Thread
bk commit into 5.0 tree (pekka:1.1781)pekka21 Mar