List:Commits« Previous MessageNext Message »
From:jonas oreland Date:May 7 2011 6:16am
Subject:bzr commit into mysql-5.1-telco-7.0 branch (jonas:4357) Bug#56051
View as plain text  
#At file:///home/jonas/src/telco-7.0/ based on revid:jonas@stripped

 4357 jonas oreland	2011-05-07
      ndb - bug#56051 - remove known 899 case...reclassify 899 as internal error (instead of temporary)

    modified:
      storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp
      storage/ndb/src/kernel/blocks/backup/Backup.cpp
      storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/ndbapi/testBasic.cpp
      storage/ndb/test/run-test/daily-basic-tests.txt
=== modified file 'storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp'
--- a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp	2011-05-07 06:16:13 +0000
@@ -170,6 +170,7 @@ public:
     DumpBackup = 13000,
     DumpBackupSetCompressed = 13001,
     DumpBackupSetCompressedLCP = 13002,
+    BackupErrorInsert = 13003,
 
     DumpDbinfo = 14000,
     DbinfoListTables = 14001,

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2011-02-24 09:46:11 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2011-05-07 06:16:13 +0000
@@ -662,6 +662,16 @@ Backup::execDUMP_STATE_ORD(Signal* signa
     c_defaults.m_compressed_lcp= signal->theData[1];
     infoEvent("Compressed LCP: %d", c_defaults.m_compressed_lcp);
   }
+
+  if (signal->theData[0] == DumpStateOrd::BackupErrorInsert)
+  {
+    if (signal->getLength() == 1)
+      ndbout_c("BACKUP: setting error %u", signal->theData[1]);
+    else
+      ndbout_c("BACKUP: setting error %u, %u",
+               signal->theData[1], signal->theData[2]);
+    SET_ERROR_INSERT_VALUE2(signal->theData[1], signal->theData[2]);
+  }
 }
 
 void Backup::execDBINFO_SCANREQ(Signal *signal)
@@ -4578,6 +4588,13 @@ Backup::checkScan(Signal* signal, Backup
       sendSignal(ptr.p->masterRef, GSN_ABORT_BACKUP_ORD, signal, 
 		 AbortBackupOrd::SignalLength, JBB);
     }
+#ifdef ERROR_INSERT
+    else if (ERROR_INSERTED(10042) && filePtr.p->tableId ==c_error_insert_extra)
+    {
+      sendSignalWithDelay(lqhRef, GSN_SCAN_NEXTREQ, signal,
+			  10, ScanFragNextReq::SignalLength);
+    }
+#endif
     else
     {
       sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal, 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2011-05-07 06:16:13 +0000
@@ -723,9 +723,10 @@ struct Fragrecord {
   Uint32 m_free_page_id_list;
   DynArr256::Head m_page_map;
   DLFifoList<Page>::Head thFreeFirst;   // pages with atleast 1 free record
-  
+
   Uint32 m_lcp_scan_op;
-  Uint32 m_lcp_keep_list;
+  Local_key m_lcp_keep_list_head;
+  Local_key m_lcp_keep_list_tail;
 
   enum FragState
   { FS_FREE
@@ -1439,9 +1440,8 @@ typedef Ptr<HostBuffer> HostBufferPtr;
     STATIC_CONST( MM_SHRINK   = 0x00200000 ); // Has MM part shrunk
     STATIC_CONST( MM_GROWN    = 0x00400000 ); // Has MM part grown
     STATIC_CONST( FREED       = 0x00800000 ); // Is freed
+    STATIC_CONST( FREE        = 0x00800000 ); // alias
     STATIC_CONST( LCP_SKIP    = 0x01000000 ); // Should not be returned in LCP
-    STATIC_CONST( LCP_KEEP    = 0x02000000 ); // Should be returned in LCP
-    STATIC_CONST( FREE        = 0x02800000 ); // Is free
     STATIC_CONST( VAR_PART    = 0x04000000 ); // Is there a varpart
     STATIC_CONST( REORG_MOVE  = 0x08000000 );
 
@@ -3215,6 +3215,8 @@ private:
   Uint32* get_default_ptr(const Tablerec*, Uint32&);
   Uint32 get_len(Ptr<Page>* pagePtr, Var_part_ref ref);
 
+  STATIC_CONST( COPY_TUPLE_HEADER32 = 4 );
+
   Tuple_header* alloc_copy_tuple(const Tablerec* tabPtrP, Local_key* ptr){
     Uint32 * dst = c_undo_buffer.alloc_copy_tuple(ptr,
                                                   tabPtrP->total_rec_size);
@@ -3224,7 +3226,7 @@ private:
     bzero(dst, tabPtrP->total_rec_size);
 #endif
     Uint32 count = tabPtrP->m_no_of_attributes;
-    ChangeMask * mask = (ChangeMask*)(dst);
+    ChangeMask * mask = (ChangeMask*)(dst + COPY_TUPLE_HEADER32);
     mask->m_cols = count;
     return (Tuple_header*)(mask->end_of_mask(count));
   }
@@ -3234,11 +3236,12 @@ private:
   }
 
   Tuple_header * get_copy_tuple(Uint32 * rawptr) {
-    return (Tuple_header*)(get_change_mask_ptr(rawptr)->end_of_mask());
+    return (Tuple_header*)
+      (get_change_mask_ptr(rawptr)->end_of_mask());
   }
 
   ChangeMask * get_change_mask_ptr(Uint32 * rawptr) {
-    return (ChangeMask*)(rawptr);
+    return (ChangeMask*)(rawptr + COPY_TUPLE_HEADER32);
   }
 
   Tuple_header* get_copy_tuple(const Local_key* ptr){
@@ -3250,7 +3253,7 @@ private:
     Uint32 * tmp = raw - (1 + ((tabP->m_no_of_attributes + 31) >> 5));
     ChangeMask* mask = (ChangeMask*)tmp;
     assert(mask->end_of_mask() == raw);
-    assert(get_copy_tuple(tmp) == copytuple);
+    assert(get_copy_tuple(tmp - COPY_TUPLE_HEADER32) == copytuple);
     return mask;
   }
 
@@ -3382,10 +3385,10 @@ private:
                          Page_cache_client::Request,
                          OperationrecPtr);
   int retrieve_log_page(Signal*, FragrecordPtr, OperationrecPtr);
-  
-  void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*, 
-		     Operationrec*, Fragrecord*, Tablerec*);
-  
+
+  void dealloc_tuple(Signal* signal, Uint32, Page*, Tuple_header*,
+		     KeyReqStruct*, Operationrec*, Fragrecord*, Tablerec*);
+
   int handle_size_change_after_update(KeyReqStruct* req_struct,
 				      Tuple_header* org,
 				      Operationrec*,
@@ -3411,7 +3414,31 @@ private:
   void check_page_map(Fragrecord*);
   bool find_page_id_in_list(Fragrecord*, Uint32 pid);
 #endif
-  void handle_lcp_keep(Signal*, Fragrecord*, ScanOp*, Uint32 rowid);
+  void handle_lcp_keep(Signal*, Fragrecord*, ScanOp*);
+  void handle_lcp_keep_commit(const Local_key*,
+                              KeyReqStruct *,
+                              Operationrec*, Fragrecord*, Tablerec*);
+
+  void setup_lcp_read_copy_tuple( KeyReqStruct *,
+                                  Operationrec*,
+                                  Fragrecord*,
+                                  Tablerec*);
+
+  bool isCopyTuple(Uint32 pageid, Uint32 pageidx) const {
+    return (pageidx & (Uint16(1) << 15)) != 0;
+  }
+
+  void setCopyTuple(Uint32& pageid, Uint16& pageidx) const {
+    assert(!isCopyTuple(pageid, pageidx));
+    pageidx |= (Uint16(1) << 15);
+    assert(isCopyTuple(pageid, pageidx));
+  }
+
+  void clearCopyTuple(Uint32& pageid, Uint16& pageidx) const {
+    assert(isCopyTuple(pageid, pageidx));
+    pageidx &= ~(Uint16(1) << 15);
+    assert(!isCopyTuple(pageid, pageidx));
+  }
 };
 
 #if 0

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupCommit.cpp	2011-05-07 06:16:13 +0000
@@ -51,15 +51,8 @@ void Dbtup::execTUP_DEALLOCREQ(Signal* s
     PagePtr pagePtr;
     Tuple_header* ptr= (Tuple_header*)get_ptr(&pagePtr, &tmp, regTabPtr.p);
 
-    ndbassert(ptr->m_header_bits & Tuple_header::FREE);
+    ndbassert(ptr->m_header_bits & Tuple_header::FREED);
 
-    if (ptr->m_header_bits & Tuple_header::LCP_KEEP)
-    {
-      ndbassert(! (ptr->m_header_bits & Tuple_header::FREED));
-      ptr->m_header_bits |= Tuple_header::FREED;
-      return;
-    }
-    
     if (regTabPtr.p->m_attributes[MM].m_no_of_varsize +
         regTabPtr.p->m_attributes[MM].m_no_of_dynamic)
     {
@@ -157,12 +150,12 @@ Dbtup::dealloc_tuple(Signal* signal,
 		     Uint32 gci,
 		     Page* page,
 		     Tuple_header* ptr, 
+                     KeyReqStruct * req_struct,
 		     Operationrec* regOperPtr, 
 		     Fragrecord* regFragPtr, 
 		     Tablerec* regTabPtr)
 {
   Uint32 lcpScan_ptr_i= regFragPtr->m_lcp_scan_op;
-  Uint32 lcp_keep_list = regFragPtr->m_lcp_keep_list;
 
   Uint32 bits = ptr->m_header_bits;
   Uint32 extra_bits = Tuple_header::FREED;
@@ -189,9 +182,15 @@ Dbtup::dealloc_tuple(Signal* signal,
     if (!is_rowid_lcp_scanned(rowid, *scanOp.p))
     {
       jam();
-      extra_bits = Tuple_header::LCP_KEEP; // Note REMOVE FREE
-      ptr->m_operation_ptr_i = lcp_keep_list;
-      regFragPtr->m_lcp_keep_list = rowid.ref();
+
+      /**
+       * We're committing a delete, on a row that should
+       *   be part of LCP. Copy original row into copy-tuple
+       *   and add this copy-tuple to lcp-keep-list
+       *
+       */
+      handle_lcp_keep_commit(&rowid,
+                             req_struct, regOperPtr, regFragPtr, regTabPtr);
     }
   }
   
@@ -204,6 +203,69 @@ Dbtup::dealloc_tuple(Signal* signal,
   }
 }
 
+void
+Dbtup::handle_lcp_keep_commit(const Local_key* rowid,
+                              KeyReqStruct * req_struct,
+                              Operationrec * opPtrP,
+                              Fragrecord * regFragPtr,
+                              Tablerec * regTabPtr)
+{
+  bool disk = false;
+  Uint32 sizes[4];
+  Uint32 * copytuple = get_copy_tuple_raw(&opPtrP->m_copy_tuple_location);
+  Tuple_header * dst = get_copy_tuple(copytuple);
+  Tuple_header * org = req_struct->m_tuple_ptr;
+  if (regTabPtr->need_expand(disk))
+  {
+    setup_fixed_part(req_struct, opPtrP, regTabPtr);
+    req_struct->m_tuple_ptr = dst;
+    expand_tuple(req_struct, sizes, org, regTabPtr, disk);
+    shrink_tuple(req_struct, sizes+2, regTabPtr, disk);
+  }
+  else
+  {
+    memcpy(dst, org, 4*regTabPtr->m_offsets[MM].m_fix_header_size);
+  }
+  dst->m_header_bits |= Tuple_header::COPY_TUPLE;
+
+  /**
+   * Store original row-id in copytuple[0,1]
+   * Store next-ptr in copytuple[1,2] (set to RNIL/RNIL)
+   *
+   */
+  assert(sizeof(Local_key) == 8);
+  memcpy(copytuple+0, rowid, sizeof(Local_key));
+
+  Local_key nil;
+  nil.setNull();
+  memcpy(copytuple+2, &nil, sizeof(nil));
+
+  /**
+   * Link it to list
+   */
+  if (regFragPtr->m_lcp_keep_list_tail.isNull())
+  {
+    jam();
+    regFragPtr->m_lcp_keep_list_head = opPtrP->m_copy_tuple_location;
+  }
+  else
+  {
+    jam();
+    Uint32 * tail = get_copy_tuple_raw(&regFragPtr->m_lcp_keep_list_tail);
+    Local_key nextptr;
+    memcpy(&nextptr, tail+2, sizeof(Local_key));
+    ndbassert(nextptr.isNull());
+    nextptr = opPtrP->m_copy_tuple_location;
+    memcpy(tail+2, &nextptr, sizeof(Local_key));
+  }
+  regFragPtr->m_lcp_keep_list_tail = opPtrP->m_copy_tuple_location;
+
+  /**
+   * And finally clear m_copy_tuple_location so that it won't be freed
+   */
+  opPtrP->m_copy_tuple_location.setNull();
+}
+
 #if 0
 static void dump_buf_hex(unsigned char *p, Uint32 bytes)
 {
@@ -786,7 +848,7 @@ skip_disk:
 	ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
       }
       dealloc_tuple(signal, gci_hi, page.p, tuple_ptr,
-		    regOperPtr.p, regFragPtr.p, regTabPtr.p); 
+		    &req_struct, regOperPtr.p, regFragPtr.p, regTabPtr.p);
     }
   } 
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2011-05-07 06:16:13 +0000
@@ -635,6 +635,17 @@ void Dbtup::execTUPKEYREQ(Signal* signal
      goto do_insert;
    }
 
+   if (unlikely(isCopyTuple(pageid, pageidx)))
+   {
+     /**
+      * Only LCP reads a copy-tuple "directly"
+      */
+     ndbassert(Roptype == ZREAD);
+     ndbassert(disk_page == RNIL);
+     setup_lcp_read_copy_tuple(&req_struct, regOperPtr, regFragPtr, regTabPtr);
+     goto do_read;
+   }
+
    /**
     * Get pointer to tuple
     */
@@ -652,6 +663,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal
      if (setup_read(&req_struct, regOperPtr, regFragPtr, regTabPtr, 
 		    disk_page != RNIL))
      {
+   do_read:
        if(handleReadReq(signal, regOperPtr, regTabPtr, &req_struct) != -1) 
        {
 	 req_struct.log_size= 0;
@@ -847,6 +859,44 @@ Dbtup::setup_fixed_part(KeyReqStruct* re
   req_struct->attr_descr= tab_descr; 
 }
 
+void
+Dbtup::setup_lcp_read_copy_tuple(KeyReqStruct* req_struct,
+                                 Operationrec* regOperPtr,
+                                 Fragrecord* regFragPtr,
+                                 Tablerec* regTabPtr)
+{
+  Local_key tmp;
+  tmp.m_page_no = req_struct->frag_page_id;
+  tmp.m_page_idx = regOperPtr->m_tuple_location.m_page_idx;
+  clearCopyTuple(tmp.m_page_no, tmp.m_page_idx);
+
+  Uint32 * copytuple = get_copy_tuple_raw(&tmp);
+  Local_key rowid;
+  memcpy(&rowid, copytuple+0, sizeof(Local_key));
+
+  req_struct->frag_page_id = rowid.m_page_no;
+  regOperPtr->m_tuple_location.m_page_idx = rowid.m_page_idx;
+
+  Tuple_header * th = get_copy_tuple(copytuple);
+  req_struct->m_page_ptr.setNull();
+  req_struct->m_tuple_ptr = (Tuple_header*)th;
+  th->m_operation_ptr_i = RNIL;
+  ndbassert((th->m_header_bits & Tuple_header::COPY_TUPLE) != 0);
+
+  Uint32 num_attr= regTabPtr->m_no_of_attributes;
+  Uint32 descr_start= regTabPtr->tabDescriptor;
+  TableDescriptor *tab_descr= &tableDescriptor[descr_start];
+  ndbrequire(descr_start + (num_attr << ZAD_LOG_SIZE) <= cnoOfTabDescrRec);
+  req_struct->attr_descr= tab_descr;
+
+  bool disk = false;
+  if (regTabPtr->need_expand(disk))
+  {
+    jam();
+    prepare_read(req_struct, regTabPtr, disk);
+  }
+}
+
  /* ---------------------------------------------------------------- */
  /* ------------------------ CONFIRM REQUEST ----------------------- */
  /* ---------------------------------------------------------------- */
@@ -1904,6 +1954,13 @@ int Dbtup::handleDeleteReq(Signal* signa
                            KeyReqStruct *req_struct,
 			   bool disk)
 {
+  Tuple_header* dst = alloc_copy_tuple(regTabPtr,
+                                       &regOperPtr->m_copy_tuple_location);
+  if (dst == 0) {
+    terrorCode = ZMEM_NOMEM_ERROR;
+    goto error;
+  }
+
   // delete must set but not increment tupVersion
   if (!regOperPtr->is_first_operation())
   {
@@ -1911,24 +1968,25 @@ int Dbtup::handleDeleteReq(Signal* signa
     regOperPtr->tupVersion= prevOp->tupVersion;
     // make copy since previous op is committed before this one
     const Tuple_header* org = get_copy_tuple(&prevOp->m_copy_tuple_location);
-    Tuple_header* dst = alloc_copy_tuple(regTabPtr,
-                                         &regOperPtr->m_copy_tuple_location);
-    if (dst == 0) {
-      terrorCode = ZMEM_NOMEM_ERROR;
-      goto error;
-    }
-    Uint32 len = regTabPtr->total_rec_size - 
-      Uint32(((Uint32*)dst) - 
+    Uint32 len = regTabPtr->total_rec_size -
+      Uint32(((Uint32*)dst) -
              get_copy_tuple_raw(&regOperPtr->m_copy_tuple_location));
     memcpy(dst, org, 4 * len);
     req_struct->m_tuple_ptr = dst;
-    set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
   }
-  else 
+  else
   {
     regOperPtr->tupVersion= req_struct->m_tuple_ptr->get_tuple_version();
+    if (regTabPtr->m_no_of_disk_attributes)
+    {
+      dst->m_header_bits = req_struct->m_tuple_ptr->m_header_bits;
+      memcpy(dst->get_disk_ref_ptr(regTabPtr),
+	     req_struct->m_tuple_ptr->get_disk_ref_ptr(regTabPtr),
+             sizeof(Local_key));
+    }
   }
   req_struct->changeMask.set();
+  set_change_mask_info(regTabPtr, get_change_mask_ptr(regTabPtr, dst));
 
   if(disk && regOperPtr->m_undo_buffer_space == 0)
   {

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2011-05-07 06:16:13 +0000
@@ -670,7 +670,6 @@ void Dbtup::initializeDefaultValuesFrag(
   DefaultValuesFragment.p->fragStatus = Fragrecord::FS_ONLINE;
   DefaultValuesFragment.p->m_undo_complete= false;
   DefaultValuesFragment.p->m_lcp_scan_op = RNIL;
-  DefaultValuesFragment.p->m_lcp_keep_list = RNIL;
   DefaultValuesFragment.p->noOfPages = 0;
   DefaultValuesFragment.p->noOfVarPages = 0;
   DefaultValuesFragment.p->m_max_page_no = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2011-04-18 15:36:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupMeta.cpp	2011-05-07 06:16:13 +0000
@@ -703,7 +703,8 @@ void Dbtup::execTUPFRAGREQ(Signal* signa
   regFragPtr.p->m_tablespace_id= tablespace_id;
   regFragPtr.p->m_undo_complete= false;
   regFragPtr.p->m_lcp_scan_op = RNIL;
-  regFragPtr.p->m_lcp_keep_list = RNIL;
+  regFragPtr.p->m_lcp_keep_list_head.setNull();
+  regFragPtr.p->m_lcp_keep_list_tail.setNull();
   regFragPtr.p->noOfPages = 0;
   regFragPtr.p->noOfVarPages = 0;
   regFragPtr.p->m_max_page_no = 0;
@@ -1573,6 +1574,8 @@ Dbtup::computeTableMetaData(Tablerec *re
   /* Room for changemask */
   total_rec_size += 1 + ((regTabPtr->m_no_of_attributes + 31) >> 5);
 
+  total_rec_size += COPY_TUPLE_HEADER32;
+
   regTabPtr->total_rec_size= total_rec_size;
 
   setUpQueryRoutines(regTabPtr);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2011-04-19 09:01:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp	2011-05-07 06:16:13 +0000
@@ -287,9 +287,8 @@ Dbtup::execACC_CHECK_SCAN(Signal* signal
   }
 
   const bool lcp = (scan.m_bits & ScanOp::SCAN_LCP);
-  Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
 
-  if (lcp && lcp_list != RNIL)
+  if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
   {
     jam();
     /**
@@ -297,7 +296,7 @@ Dbtup::execACC_CHECK_SCAN(Signal* signal
      *   So that scan state is not alterer
      *   if lcp_keep rows are found in ScanOp::First
      */
-    handle_lcp_keep(signal, fragPtr.p, scanPtr.p, lcp_list);
+    handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
     return;
   }
 
@@ -692,19 +691,18 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
  
   const bool mm = (bits & ScanOp::SCAN_DD);
   const bool lcp = (bits & ScanOp::SCAN_LCP);
-  
-  Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
+
   const Uint32 size = ((bits & ScanOp::SCAN_VS) == 0) ?
     table.m_offsets[mm].m_fix_header_size : 1;
   const Uint32 first = ((bits & ScanOp::SCAN_VS) == 0) ? 0 : 1;
 
-  if (lcp && lcp_list != RNIL)
+  if (lcp && ! fragPtr.p->m_lcp_keep_list_head.isNull())
   {
     jam();
     /**
      * Handle lcp keep list here to, due to scanCont
      */
-    handle_lcp_keep(signal, fragPtr.p, scanPtr.p, lcp_list);
+    handle_lcp_keep(signal, fragPtr.p, scanPtr.p);
     return false;
   }
 
@@ -1130,57 +1128,40 @@ Dbtup::scanNext(Signal* signal, ScanOpPt
 void
 Dbtup::handle_lcp_keep(Signal* signal,
                        Fragrecord* fragPtrP,
-                       ScanOp* scanPtrP,
-                       Uint32 lcp_list)
+                       ScanOp* scanPtrP)
 {
   TablerecPtr tablePtr;
   tablePtr.i = scanPtrP->m_tableId;
   ptrCheckGuard(tablePtr, cnoOfTablerec, tablerec);
 
-  Local_key tmp;
-  tmp.assref(lcp_list);
-  tmp.m_page_no = getRealpid(fragPtrP, tmp.m_page_no);
-  
-  Ptr<Page> pagePtr;
-  c_page_pool.getPtr(pagePtr, tmp.m_page_no);
-  Tuple_header* ptr = (Tuple_header*)
-    ((Fix_page*)pagePtr.p)->get_ptr(tmp.m_page_idx, 0);
-  Uint32 headerbits = ptr->m_header_bits;
-  ndbrequire(headerbits & Tuple_header::LCP_KEEP);
-  
-  Uint32 next = ptr->m_operation_ptr_i;
-  ptr->m_operation_ptr_i = RNIL;
-  ptr->m_header_bits = headerbits & ~(Uint32)Tuple_header::FREE;
-  
-  if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
+  ndbassert(!fragPtrP->m_lcp_keep_list_head.isNull());
+  Local_key tmp = fragPtrP->m_lcp_keep_list_head;
+  Uint32 * copytuple = get_copy_tuple_raw(&tmp);
+  memcpy(&fragPtrP->m_lcp_keep_list_head,
+         copytuple+2,
+         sizeof(Local_key));
+
+  if (fragPtrP->m_lcp_keep_list_head.isNull())
+  {
     jam();
-    setChecksum(ptr, tablePtr.p);
+    ndbassert(tmp.m_page_no == fragPtrP->m_lcp_keep_list_tail.m_page_no);
+    ndbassert(tmp.m_page_idx == fragPtrP->m_lcp_keep_list_tail.m_page_idx);
+    fragPtrP->m_lcp_keep_list_tail.setNull();
   }
 
+  Local_key save = tmp;
+  setCopyTuple(tmp.m_page_no, tmp.m_page_idx);
   NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
   conf->scanPtr = scanPtrP->m_userPtr;
   conf->accOperationPtr = (Uint32)-1;
   conf->fragId = fragPtrP->fragmentId;
-  conf->localKey[0] = Local_key::ref2page_id(lcp_list);
-  conf->localKey[1] = Local_key::ref2page_idx(lcp_list);
+  conf->localKey[0] = tmp.m_page_no;
+  conf->localKey[1] = tmp.m_page_idx;
   conf->gci = 0;
   Uint32 blockNo = refToMain(scanPtrP->m_userRef);
   EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 6);
-  
-  fragPtrP->m_lcp_keep_list = next;
-  ptr->m_header_bits |= Tuple_header::FREED; // RESTORE free flag
-  if (headerbits & Tuple_header::FREED)
-  {
-    if (tablePtr.p->m_attributes[MM].m_no_of_varsize +
-        tablePtr.p->m_attributes[MM].m_no_of_dynamic)
-    {
-      jam();
-      free_var_rec(fragPtrP, tablePtr.p, &tmp, pagePtr);
-    } else {
-      jam();
-      free_fix_rec(fragPtrP, tablePtr.p, &tmp, (Fix_page*)pagePtr.p);
-    }
-  }
+
+  c_undo_buffer.free_copy_tuple(&save);
 }
 
 void
@@ -1320,4 +1301,7 @@ Dbtup::execLCP_FRAG_ORD(Signal* signal)
   new (scanPtr.p) ScanOp;
   scanPtr.p->m_fragPtrI = fragPtr.i;
   scanPtr.p->m_state = ScanOp::First;
+
+  ndbassert(frag.m_lcp_keep_list_head.isNull());
+  ndbassert(frag.m_lcp_keep_list_tail.isNull());
 }

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2011-05-04 11:45:33 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2011-05-07 06:16:13 +0000
@@ -187,7 +187,7 @@ ErrorBundle ErrorCodes[] = {
   { 805,  DMEC, TR, "Out of attrinfo records in tuple manager" },
   { 830,  DMEC, TR, "Out of add fragment operation records" },
   { 873,  DMEC, TR, "Out of attrinfo records for scan in tuple manager" },
-  { 899,  DMEC, TR, "Rowid already allocated" },
+  { 899,  DMEC, IE, "Internal error: rowid already allocated" },
   { 1217, DMEC, TR, "Out of operation records in local data manager (increase MaxNoOfLocalOperations)" },
   { 1218, DMEC, TR, "Send Buffers overloaded in NDB kernel" },
   { 1220, DMEC, TR, "REDO log files overloaded (increase FragmentLogFileSize)" },

=== modified file 'storage/ndb/test/ndbapi/testBasic.cpp'
--- a/storage/ndb/test/ndbapi/testBasic.cpp	2011-04-07 07:22:49 +0000
+++ b/storage/ndb/test/ndbapi/testBasic.cpp	2011-05-07 06:16:13 +0000
@@ -2267,6 +2267,154 @@ runBug59496_case2(NDBT_Context* ctx, NDB
   return NDBT_OK;
 }
 
+#define CHK_RET_FAILED(x) if (!(x)) { ndbout_c("Failed on line: %u", __LINE__); return NDBT_FAILED; }
+
+int
+runTest899(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  const NdbDictionary::Table* pTab = ctx->getTab();
+
+  const int rows = ctx->getNumRecords();
+  const int loops = ctx->getNumLoops();
+  const int batch = ctx->getProperty("Batch", Uint32(50));
+  const int until_stopped = ctx->getProperty("UntilStopped");
+
+  const NdbRecord * pRowRecord = pTab->getDefaultRecord();
+  CHK_RET_FAILED(pRowRecord != 0);
+
+  const Uint32 len = NdbDictionary::getRecordRowLength(pRowRecord);
+  Uint8 * pRow = new Uint8[len];
+
+  int count_ok = 0;
+  int count_failed = 0;
+  int count_899 = 0;
+  for (int i = 0; i < loops || (until_stopped && !ctx->isTestStopped()); i++)
+  {
+    ndbout_c("loop: %d",i);
+    int result = 0;
+    for (int rowNo = 0; rowNo < rows;)
+    {
+      NdbTransaction* pTrans = pNdb->startTransaction();
+      CHK_RET_FAILED(pTrans != 0);
+
+      for (int b = 0; rowNo < rows && b < batch; rowNo++, b++)
+      {
+        bzero(pRow, len);
+
+        HugoCalculator calc(* pTab);
+
+        NdbOperation::OperationOptions opts;
+        bzero(&opts, sizeof(opts));
+
+        const NdbOperation* pOp = 0;
+        switch(i % 2){
+        case 0:
+          calc.setValues(pRow, pRowRecord, rowNo, rand());
+          pOp = pTrans->writeTuple(pRowRecord, (char*)pRow,
+                                   pRowRecord, (char*)pRow,
+                                   0,
+                                   &opts,
+                                   sizeof(opts));
+          result = pTrans->execute(NoCommit);
+          break;
+        case 1:
+          calc.setValues(pRow, pRowRecord, rowNo, rand());
+          pOp = pTrans->deleteTuple(pRowRecord, (char*)pRow,
+                                    pRowRecord, (char*)pRow,
+                                    0,
+                                    &opts,
+                                    sizeof(opts));
+          result = pTrans->execute(NoCommit, AO_IgnoreError);
+          break;
+        }
+
+        CHK_RET_FAILED(pOp != 0);
+
+        if (result != 0)
+        {
+          goto found_error;
+        }
+      }
+      result = pTrans->execute(Commit);
+
+      if (result != 0)
+      {
+    found_error:
+        count_failed++;
+        NdbError err = pTrans->getNdbError();
+        if (! (err.status == NdbError::TemporaryError ||
+               err.classification == NdbError::NoDataFound ||
+               err.classification == NdbError::ConstraintViolation))
+        {
+          ndbout << err << endl;
+        }
+        CHK_RET_FAILED(err.status == NdbError::TemporaryError ||
+                       err.classification == NdbError::NoDataFound ||
+                       err.classification == NdbError::ConstraintViolation);
+        if (err.code == 899)
+        {
+          count_899++;
+          ndbout << err << endl;
+        }
+      }
+      else
+      {
+        count_ok++;
+      }
+      pTrans->close();
+    }
+  }
+
+  ndbout_c("count_ok: %d count_failed: %d (899: %d)",
+           count_ok, count_failed, count_899);
+  delete [] pRow;
+
+  return count_899 == 0 ? NDBT_OK : NDBT_FAILED;
+}
+
+int
+runInit899(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbRestarter restarter;
+  int val = DumpStateOrd::DihMinTimeBetweenLCP;
+  restarter.dumpStateAllNodes(&val, 1);
+
+  Ndb* pNdb = GETNDB(step);
+  const NdbDictionary::Table* pTab = ctx->getTab();
+  const NdbDictionary::Table * pTab2 = pNdb->getDictionary()->
+    getTable(pTab->getName());
+
+  int tableId = pTab2->getObjectId();
+  int val2[] = { DumpStateOrd::BackupErrorInsert, 10042, tableId };
+
+  for (int i = 0; i < restarter.getNumDbNodes(); i++)
+  {
+    if (i & 1)
+    {
+      int nodeId = restarter.getDbNodeId(i);
+      ndbout_c("Setting slow LCP of table %d on node %d",
+               tableId, nodeId);
+      restarter.dumpStateOneNode(nodeId, val2, 3);
+    }
+  }
+
+  return NDBT_OK;
+}
+
+int
+runEnd899(NDBT_Context* ctx, NDBT_Step* step)
+{
+  // reset LCP speed
+  NdbRestarter restarter;
+  int val[] = { DumpStateOrd::DihMinTimeBetweenLCP, 0 };
+  restarter.dumpStateAllNodes(val, 2);
+
+  restarter.insertErrorInAllNodes(0);
+  return NDBT_OK;
+}
+
+
 NDBT_TESTSUITE(testBasic);
 TESTCASE("PkInsert", 
 	 "Verify that we can insert and delete from this table using PK"
@@ -2618,6 +2766,13 @@ TESTCASE("Bug59496_case2", "")
   STEP(runBug59496_case2);
   STEPS(runBug59496_scan, 10);
 }
+TESTCASE("899", "")
+{
+  INITIALIZER(runLoadTable);
+  INITIALIZER(runInit899);
+  STEP(runTest899);
+  FINALIZER(runEnd899);
+}
 NDBT_TESTSUITE_END(testBasic);
 
 #if 0

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2011-05-03 06:21:59 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2011-05-07 06:16:13 +0000
@@ -32,6 +32,10 @@ max-time: 900
 cmd: testIndex
 args: -n NF_Mixed T1 T6 T13
 
+max-time: 900
+cmd: testBasic
+args: -r 5000 -n 899 T15 D1 D2
+
 max-time: 600
 cmd: atrt-testBackup
 args: -n NFMaster T1


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20110507061613-2g5tm04hks7hqp8m.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (jonas:4357) Bug#56051jonas oreland7 May