List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:March 3 2009 10:33pm
Subject:bzr push into mysql-5.1-telco-6.4 branch (frazer:2901 to 2902)
View as plain text  
 2902 Frazer Clement	2009-03-02 [merge]
      Merge 6.3->6.4 and extend testBlobs to test HashMapPartitioning
      modified:
        storage/ndb/include/ndbapi/NdbBlob.hpp
        storage/ndb/src/ndbapi/NdbBlob.cpp
        storage/ndb/test/ndbapi/testBlobs.cpp

 2901 Jonas Oreland	2009-03-02 [merge]
      merge 63 to 64
      modified:
        storage/ndb/src/kernel/blocks/backup/Backup.cpp
        storage/ndb/test/run-test/daily-basic-tests.txt

=== modified file 'storage/ndb/include/ndbapi/NdbBlob.hpp'
--- a/storage/ndb/include/ndbapi/NdbBlob.hpp	2008-11-08 20:40:15 +0000
+++ b/storage/ndb/include/ndbapi/NdbBlob.hpp	2009-03-02 16:08:11 +0000
@@ -386,6 +386,7 @@ private:
   NdbOperation* theHeadInlineReadOp;
   bool theHeadInlineUpdateFlag;
   // partition id for data events
+  bool userDefinedPartitioning;
   Uint32 noPartitionId() { return ~(Uint32)0; }
   Uint32 thePartitionId;
   NdbRecAttr* thePartitionIdRecAttr;
@@ -440,6 +441,8 @@ private:
   int getHeadInlineValue(NdbOperation* anOp);
   void getHeadFromRecAttr();
   int setHeadInlineValue(NdbOperation* anOp);
+  void setHeadPartitionId(NdbOperation* anOp);
+  void setPartPartitionId(NdbOperation* anOp);
   // data operations
   int readDataPrivate(char* buf, Uint32& bytes);
   int writeDataPrivate(const char* buf, Uint32 bytes);

=== modified file 'storage/ndb/src/ndbapi/NdbBlob.cpp'
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp	2008-11-08 21:43:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp	2009-03-02 17:23:44 +0000
@@ -97,8 +97,6 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, 
   bt.setLogging(t->getLogging());
   /*
     BLOB tables use the same fragmentation as the original table
-    but may change the fragment type if it is UserDefined since it
-    must be hash based so that the kernel can handle it on its own.
     It also uses the same tablespaces and it never uses any range or
     list arrays.
   */
@@ -108,27 +106,8 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, 
   bt.setFragmentCount(t->getFragmentCount());
   bt.m_tablespace_id = t->m_tablespace_id;
   bt.m_tablespace_version = t->m_tablespace_version;
-  switch (t->getFragmentType())
-  {
-    case NdbDictionary::Object::FragAllSmall:
-    case NdbDictionary::Object::FragAllMedium:
-    case NdbDictionary::Object::FragAllLarge:
-    case NdbDictionary::Object::FragSingle:
-      bt.setFragmentType(t->getFragmentType());
-      break;
-    case NdbDictionary::Object::DistrKeyLin:
-    case NdbDictionary::Object::DistrKeyHash:
-    case NdbDictionary::Object::HashMapPartition:
-      bt.setFragmentType(t->getFragmentType());
-      break;
-    case NdbDictionary::Object::UserDefined:
-      bt.setFragmentType(NdbDictionary::Object::DistrKeyHash);
-      break;
-    default:
-      DBUG_ASSERT(0);
-      DBUG_RETURN(-1);
-      break;
-  }
+  bt.setFragmentType(t->getFragmentType());
+
   DBUG_PRINT("info", ("Define BLOB table V%d with"
                       " primary table = %u and Fragment Type = %u",
                       blobVersion,
@@ -363,6 +342,7 @@ NdbBlob::init()
   theHeadInlineRecAttr = NULL;
   theHeadInlineReadOp = NULL;
   theHeadInlineUpdateFlag = false;
+  userDefinedPartitioning = false;
   thePartitionId = noPartitionId();
   thePartitionIdRecAttr = NULL;
   theNullFlag = -1;
@@ -547,6 +527,37 @@ NdbBlob::getDistKey(Uint32 part)
   return dist;
 }
 
+inline void
+NdbBlob::setHeadPartitionId(NdbOperation* anOp)
+{
+  /* For UserDefined partitioned tables,
+   * we must set the head row's partition id
+   * manually when reading/modifying it with
+   * primary key or unique key.
+   * For scans we do not have to.
+   */
+  if (userDefinedPartitioning &&
+      (thePartitionId != noPartitionId())) {
+    anOp->setPartitionId(thePartitionId);
+  }
+}
+
+inline void
+NdbBlob::setPartPartitionId(NdbOperation* anOp)
+{
+  /* For UserDefined partitioned tables
+   * we must set the part row's partition 
+   * id manually when performing operations.
+   * This means that stripe size is ignored
+   * for UserDefined partitioned tables.
+   * All part row operations use primary keys
+   */
+  if (userDefinedPartitioning) {
+    assert(thePartitionId != noPartitionId());
+    anOp->setPartitionId(thePartitionId);
+  }
+}
+
 // pack/unpack table/index key  XXX support routines, shortcuts
 
 int
@@ -938,6 +949,7 @@ NdbBlob::setPartKeyValue(NdbOperation* a
       DBUG_RETURN(-1);
     }
   }
+  setPartPartitionId(anOp);
   DBUG_RETURN(0);
 }
 
@@ -1013,14 +1025,26 @@ NdbBlob::getHeadInlineValue(NdbOperation
    * specific checks
    */
   theHeadInlineRecAttr = anOp->getValue_impl(theColumn, theHeadInlineBuf.data);
-  thePartitionIdRecAttr = 
-    anOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
-  
-  if (theHeadInlineRecAttr == NULL ||
-      thePartitionIdRecAttr == NULL) {
+  if (theHeadInlineRecAttr == NULL) {
     setErrorCode(anOp);
     DBUG_RETURN(-1);
   }
+  if (userDefinedPartitioning)
+  {
+    /* For UserDefined partitioned tables, we ask for the partition
+     * id of the main table row to use for the parts
+     * Not technically needed for main table access via PK, which must
+     * have partition id set for access, but we do it anyway and check
+     * it's as expected.
+     */
+    thePartitionIdRecAttr = 
+      anOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
+    
+    if (thePartitionIdRecAttr == NULL) {
+      setErrorCode(anOp);
+      DBUG_RETURN(-1);
+    }
+  }
   /*
    * If we get no data from this op then the operation is aborted
    * one way or other.  Following hack in 5.0 makes sure we don't read
@@ -1042,20 +1066,30 @@ NdbBlob::getHeadFromRecAttr()
   if (theNullFlag == 0) {
     unpackBlobHead();
     theLength = theHead.length;
-    if (theEventBlobVersion == -1) {
+  } else {
+    theLength = 0;
+  }
+  if (theEventBlobVersion == -1) {
+    if (userDefinedPartitioning)
+    {
+      /* Use main table fragment id as partition id
+       * for blob parts table
+       */
       Uint32 id = thePartitionIdRecAttr->u_32_value();
       DBUG_PRINT("info", ("table partition id: %u", id));
       if (thePartitionId == noPartitionId()) {
         DBUG_PRINT("info", ("discovered here"));
-        // setting even in non-partitioned case
         thePartitionId = id;
       } else {
         assert(thePartitionId == id);
       }
     }
-  } else {
-    theLength = 0;
+    else
+    {
+      assert(thePartitionIdRecAttr == NULL);
+    }
   }
+
   DBUG_PRINT("info", ("theNullFlag=%d theLength=%llu",
                       theNullFlag, theLength));
   DBUG_VOID_RETURN;
@@ -1680,10 +1714,7 @@ NdbBlob::readTablePart(char* buf, Uint32
     setErrorCode(tOp);
     DBUG_RETURN(-1);
   }
-  if (thePartitionId != noPartitionId() &&
-      theStripeSize == 0) {
-    tOp->setPartitionId(thePartitionId);
-  }
+
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
@@ -1743,10 +1774,7 @@ NdbBlob::insertPart(const char* buf, Uin
     setErrorCode(tOp);
     DBUG_RETURN(-1);
   }
-  if (thePartitionId != noPartitionId() &&
-      theStripeSize == 0) {
-    tOp->setPartitionId(thePartitionId);
-  }
+
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
@@ -1783,10 +1811,7 @@ NdbBlob::updatePart(const char* buf, Uin
     setErrorCode(tOp);
     DBUG_RETURN(-1);
   }
-  if (thePartitionId != noPartitionId() &&
-      theStripeSize == 0) {
-    tOp->setPartitionId(thePartitionId);
-  }
+
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
@@ -1807,10 +1832,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32
       setErrorCode(tOp);
       DBUG_RETURN(-1);
     }
-    if (thePartitionId != noPartitionId() &&
-        theStripeSize == 0) {
-      tOp->setPartitionId(thePartitionId);
-    }
+
     tOp->m_abortOption = NdbOperation::AbortOnError;
     n++;
     thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
@@ -1847,10 +1869,6 @@ NdbBlob::deletePartsUnknown(Uint32 part)
         setErrorCode(tOp);
         DBUG_RETURN(-1);
       }
-      if (thePartitionId != noPartitionId() &&
-          theStripeSize == 0) {
-        tOp->setPartitionId(thePartitionId);
-      }
       tOp->m_abortOption= NdbOperation::AO_IgnoreError;
       tOp->m_noErrorPropagation = true;
       n++;
@@ -2010,8 +2028,14 @@ NdbBlob::atPrepareCommon(NdbTransaction*
   // prepare blob column and table
   if (prepareColumn() == -1)
     return -1;
-  // check if mysql or user has set partition id
-  if (theNdbOp->theDistrKeyIndicator_) {
+  userDefinedPartitioning= (theTable->getFragmentType() == 
+                            NdbDictionary::Object::UserDefined);
+  /* UserDefined Partitioning
+   * If user has set partitionId specifically, take it for
+   * Blob head and part operations
+   */
+  if (userDefinedPartitioning && 
+      theNdbOp->theDistrKeyIndicator_) {
     thePartitionId = theNdbOp->getPartitionId();
     DBUG_PRINT("info", ("op partition id: %u", thePartitionId));
   }
@@ -2461,9 +2485,8 @@ NdbBlob::preExecute(NdbTransaction::Exec
           setErrorCode(NdbBlobImpl::ErrAbort);
           DBUG_RETURN(-1);
         }
-        if (thePartitionId != noPartitionId()) {
-          tOp->setPartitionId(thePartitionId);
-        }
+        setHeadPartitionId(tOp);
+
         DBUG_PRINT("info", ("Insert : added op to update head+inline in preExecute"));
       }
     }
@@ -2504,9 +2527,8 @@ NdbBlob::preExecute(NdbTransaction::Exec
         setErrorCode(tOp);
         DBUG_RETURN(-1);
       }
-      if (thePartitionId != noPartitionId()) {
-        tOp->setPartitionId(thePartitionId);
-      }
+      setHeadPartitionId(tOp);
+
       if (isWriteOp()) {
         /* There may be no data currently, so ignore tuple not found etc. */
         tOp->m_abortOption = NdbOperation::AO_IgnoreError;
@@ -2536,6 +2558,11 @@ NdbBlob::preExecute(NdbTransaction::Exec
     if (this == tFirstBlob) {
       // first blob does it for all
       if (g_ndb_blob_ok_to_read_index_table) {
+        /* Cannot work for userDefinedPartitioning + write() op as
+         * we need to read the 'main' partition Id
+         * Maybe this branch should be removed?
+         */
+        assert(!userDefinedPartitioning);
         Uint32 pkAttrId = theAccessTable->getNoOfColumns() - 1;
         NdbOperation* tOp = theNdbCon->getNdbOperation(theAccessTable, theNdbOp);
         if (tOp == NULL ||
@@ -2554,6 +2581,20 @@ NdbBlob::preExecute(NdbTransaction::Exec
           setErrorCode(tOp);
           DBUG_RETURN(-1);
         }
+        if (userDefinedPartitioning && isWriteOp())
+        {
+          /* Index Write op does not perform head read before deleting parts
+           * as it cannot safely IgnoreErrors.
+           * To get partitioning right we read partition id for main row
+           * here.
+           */
+          thePartitionIdRecAttr = tOp->getValue_impl(&NdbColumnImpl::getImpl(*NdbDictionary::Column::FRAGMENT));
+          
+          if (thePartitionIdRecAttr == NULL) {
+            setErrorCode(tOp);
+            DBUG_RETURN(-1);
+          }
+        } 
       }
       DBUG_PRINT("info", ("Index op : added op before to read table key"));
     }
@@ -2622,9 +2663,8 @@ NdbBlob::preExecute(NdbTransaction::Exec
             setErrorCode(NdbBlobImpl::ErrAbort);
             DBUG_RETURN(-1);
           }
-          if (thePartitionId != noPartitionId()) {
-            tOp->setPartitionId(thePartitionId);
-          }
+          setHeadPartitionId(tOp);
+
           DBUG_PRINT("info", ("NdbRecord table write : added op to update head+inline"));
         }
       }
@@ -2756,9 +2796,8 @@ NdbBlob::postExecute(NdbTransaction::Exe
             setErrorCode(NdbBlobImpl::ErrAbort);
             DBUG_RETURN(-1);
           }
-          if (thePartitionId != noPartitionId()) {
-            tOp->setPartitionId(thePartitionId);
-          }
+          setHeadPartitionId(tOp);
+
           DBUG_PRINT("info", ("Insert : added op to update head+inline"));
         }
       }
@@ -2823,6 +2862,33 @@ NdbBlob::postExecute(NdbTransaction::Exe
   }
   if (isWriteOp() && isIndexOp()) {
     // XXX until IgnoreError fixed for index op
+    if (userDefinedPartitioning)
+    {
+      /* For Index Write with UserDefined partitioning, we get the
+       * partition id from the main table key read created in 
+       * preExecute().
+       * Extra complexity as only the first Blob does the read, other
+       * Blobs grab result from first.
+       */
+      if (thePartitionIdRecAttr != NULL)
+      {
+        assert( this == theNdbOp->theBlobList );
+        Uint32 id= thePartitionIdRecAttr->u_32_value();
+        assert( id != noPartitionId() );
+        DBUG_PRINT("info", ("Index write, setting partition id to %d", id));
+        thePartitionId= id;
+      }
+      else
+      {
+        /* First Blob (not us) in this op got the partition Id */
+        assert( theNdbOp->theBlobList );
+        assert( this != theNdbOp->theBlobList );
+
+        thePartitionId= theNdbOp->theBlobList->thePartitionId;
+
+        assert(thePartitionId != noPartitionId());
+      }
+    }
     if (deletePartsUnknown(0) == -1)
       DBUG_RETURN(-1);
     if (theSetFlag && theGetSetBytes > theInlineSize) {
@@ -2856,9 +2922,8 @@ NdbBlob::postExecute(NdbTransaction::Exe
       setErrorCode(NdbBlobImpl::ErrAbort);
       DBUG_RETURN(-1);
     }
-    if (thePartitionId != noPartitionId()) {
-      tOp->setPartitionId(thePartitionId);
-    }
+    setHeadPartitionId(tOp);
+
     tOp->m_abortOption = NdbOperation::AbortOnError;
     DBUG_PRINT("info", ("added op to update head+inline"));
   }
@@ -2890,9 +2955,8 @@ NdbBlob::preCommit()
           setErrorCode(NdbBlobImpl::ErrAbort);
           DBUG_RETURN(-1);
         }
-        if (thePartitionId != noPartitionId()) {
-          tOp->setPartitionId(thePartitionId);
-        }
+        setHeadPartitionId(tOp);
+        
         tOp->m_abortOption = NdbOperation::AbortOnError;
         DBUG_PRINT("info", ("added op to update head+inline"));
     }

=== modified file 'storage/ndb/test/ndbapi/testBlobs.cpp'
--- a/storage/ndb/test/ndbapi/testBlobs.cpp	2008-10-07 05:47:44 +0000
+++ b/storage/ndb/test/ndbapi/testBlobs.cpp	2009-03-02 17:23:44 +0000
@@ -212,6 +212,9 @@ static Uint32 g_batchSize= 0;
 static Uint32 g_scanFlags= 0;
 static Uint32 g_parallel= 0;
 static Uint32 g_usingDisk= false;
+static const Uint32 MAX_FRAGS=48 * 8 * 4; // e.g. 48 nodes, 8 frags/node, 4 replicas
+static Uint32 frag_ng_mappings[MAX_FRAGS];
+
 
 static const char* stylename[3] = {
   "style=getValue/setValue",
@@ -482,6 +485,12 @@ dropTable()
   return 0;
 }
 
+static unsigned
+urandom(unsigned n)
+{
+  return n == 0 ? 0 : ndb_rand() % n;
+}
+
 static int
 createTable(int storageType)
 {
@@ -496,7 +505,43 @@ createTable(int storageType)
   if (storageType == STORAGE_DISK)
     tab.setTablespaceName(g_tsName);
   tab.setLogging(loggingRequired);
-  tab.setFragmentType(NdbDictionary::Object::FragAllLarge);
+  
+  /* Choose from the interesting fragmentation types :
+   * DistrKeyHash, DistrKeyLin, UserDefined, HashMapPartitioned
+   * Others are obsolete fragment-count setting variants 
+   * of DistrKeyLin
+   * For UserDefined partitioning, we need to set the partition
+   * id for all PK operations.
+   */
+  Uint32 fragTypeRange= 1 + (NdbDictionary::Object::HashMapPartition - 
+                             NdbDictionary::Object::DistrKeyHash);
+  Uint32 fragType= NdbDictionary::Object::DistrKeyHash + urandom(fragTypeRange);
+
+  /* Value 8 is unused currently, map it to something else */
+  if (fragType == 8)
+    fragType= NdbDictionary::Object::UserDefined;
+
+  tab.setFragmentType((NdbDictionary::Object::FragmentType)fragType);
+
+  if (fragType == NdbDictionary::Object::UserDefined)
+  {
+    /* Need to set the FragmentCount and fragment to NG mapping
+     * for this partitioning type 
+     */
+    const Uint32 numNodes= g_ncc->no_db_nodes();
+    const Uint32 numReplicas= 2; // Assumption
+    const Uint32 guessNumNgs= numNodes/2;
+    const Uint32 numNgs= guessNumNgs?guessNumNgs : 1;
+    const Uint32 numFragsPerNode= 2 + (rand() % 3);
+    const Uint32 numPartitions= numReplicas * numNgs * numFragsPerNode;
+    
+    tab.setFragmentCount(numPartitions);
+    for (Uint32 i=0; i<numPartitions; i++)
+    {
+      frag_ng_mappings[i]= i % numNgs;
+    }
+    tab.setFragmentData(frag_ng_mappings, numPartitions);
+  }
   const Chr& pk2chr = g_opt.m_pk2chr;
   // col PK1 - Uint32
   { NdbDictionary::Column col("PK1");
@@ -617,12 +662,6 @@ createTable(int storageType)
 
 // tuples
 
-static unsigned
-urandom(unsigned n)
-{
-  return n == 0 ? 0 : ndb_rand() % n;
-}
-
 struct Bval {
   const Bcol& m_bcol;
   char* m_val;
@@ -732,6 +771,11 @@ struct Tup {
       return m_pk2;
     return urandom(2) == 0 ? m_pk2 : m_pk2eq;
   }
+  Uint32 getPartitionId(Uint32 numParts) const {
+    /* Only for UserDefined tables really */
+    return m_pk1 % numParts; // MySQLD hash(PK1) style partitioning
+  }
+
 private:
   Tup(const Tup&);
   Tup& operator=(const Tup&);
@@ -740,6 +784,32 @@ private:
 static Tup* g_tups;
 
 static void
+setUDpartId(const Tup& tup, NdbOperation* op)
+{
+  const NdbDictionary::Table* tab= op->getTable();
+  if (tab->getFragmentType() == NdbDictionary::Object::UserDefined)
+  {
+    Uint32 partId= tup.getPartitionId(tab->getFragmentCount());
+    DBG("Setting partition id to " << partId << " out of " << 
+        tab->getFragmentCount());
+    op->setPartitionId(partId);
+  }
+}
+
+static void
+setUDpartIdNdbRecord(const Tup& tup, 
+                     const NdbDictionary::Table* tab, 
+                     NdbOperation::OperationOptions& opts)
+{
+  opts.optionsPresent= 0;
+  if (tab->getFragmentType() == NdbDictionary::Object::UserDefined)
+  {
+    opts.optionsPresent= NdbOperation::OperationOptions::OO_PARTITION_ID;
+    opts.partitionId= tup.getPartitionId(tab->getFragmentCount());
+  } 
+}
+
+static void
 calcBval(const Bcol& b, Bval& v, bool keepsize)
 {
   if (b.m_nullable && urandom(10) == 0) {
@@ -1165,6 +1235,7 @@ verifyHeadInline(Tup& tup)
     CHK(g_opr->equal("PK2", tup.pk2()) == 0);
     CHK(g_opr->equal("PK3", (char*)&tup.m_pk3) == 0);
   }
+  setUDpartId(tup, g_opr);
   NdbRecAttr* ra1;
   NdbRecAttr* ra2;
   NdbRecAttr* ra_frag;
@@ -1231,6 +1302,10 @@ verifyBlobTable(const Bval& v, Uint32 pk
     CHK((ra_data = g_ops->getValue("NDB$DATA")) != 0);
   }
 
+  /* No partition id set on Blob part table scan so that we
+   * find any misplaced parts in other partitions
+   */
+
   CHK((ra_frag = g_ops->getValue(NdbDictionary::Column::FRAGMENT)) != 0);
   CHK(g_con->execute(NoCommit) == 0);
   unsigned partcount;
@@ -1253,7 +1328,8 @@ verifyBlobTable(const Bval& v, Uint32 pk
         continue;
     }
     Uint32 part = ra_part->u_32_value();
-    DBG("part " << part << " of " << partcount);
+    Uint32 frag2 = ra_frag->u_32_value();
+    DBG("part " << part << " of " << partcount << " from fragment " << frag2);
     CHK(part < partcount && ! seen[part]);
     seen[part] = 1;
     unsigned n = b.m_inline + part * b.m_partsize;
@@ -1295,7 +1371,6 @@ verifyBlobTable(const Bval& v, Uint32 pk
         i++;
       }
     }
-    Uint32 frag2 = ra_frag->u_32_value();
     DBG("frags main=" << frag << " blob=" << frag2 << " stripe=" << b.m_stripe);
     if (b.m_stripe == 0)
       CHK(frag == frag2);
@@ -1354,6 +1429,7 @@ insertPk(int style, int api)
         CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
         CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
       }
+      setUDpartId(tup, g_opr);
       CHK(getBlobHandles(g_opr) == 0);
     }
     else
@@ -1363,7 +1439,15 @@ insertPk(int style, int api)
         memcpy(&tup.m_row[g_pk2_offset], tup.m_pk2, g_opt.m_pk2chr.m_totlen);
         memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
       }
-      CHK((g_const_opr = g_con->insertTuple(g_full_record, tup.m_row)) != 0);
+      NdbOperation::OperationOptions opts;
+      setUDpartIdNdbRecord(tup,
+                           g_ndb->getDictionary()->getTable(g_opt.m_tname),
+                           opts);
+      CHK((g_const_opr = g_con->insertTuple(g_full_record, 
+                                            tup.m_row,
+                                            NULL,
+                                            &opts,
+                                            sizeof(opts))) != 0);
       CHK(getBlobHandles(g_const_opr) == 0);
     }
     if (style == 0) {
@@ -1416,6 +1500,7 @@ readPk(int style, int api)
         CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
         CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
       }
+      setUDpartId(tup, g_opr);
       CHK(getBlobHandles(g_opr) == 0);
     }
     else
@@ -1425,13 +1510,24 @@ readPk(int style, int api)
         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
       }
+      NdbOperation::OperationOptions opts;
+      setUDpartIdNdbRecord(tup,
+                           g_ndb->getDictionary()->getTable(g_opt.m_tname),
+                           opts);
       if (urandom(2) == 0)
         CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
-                                            g_blob_record, tup.m_row)) != 0);
+                                            g_blob_record, tup.m_row,
+                                            NdbOperation::LM_Read,
+                                            NULL,
+                                            &opts,
+                                            sizeof(opts))) != 0);
       else
         CHK((g_const_opr = g_con->readTuple(g_key_record, tup.m_key_row,
                                             g_blob_record, tup.m_row,
-                                            NdbOperation::LM_CommittedRead)) != 0);
+                                            NdbOperation::LM_CommittedRead,
+                                            NULL,
+                                            &opts,
+                                            sizeof(opts))) != 0);
       CHK(getBlobHandles(g_const_opr) == 0);
     }
     if (style == 0) {
@@ -1487,6 +1583,7 @@ updatePk(int style, int api)
           CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
           CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
         }
+        setUDpartId(tup, g_opr);
         CHK(getBlobHandles(g_opr) == 0);
       }
       else
@@ -1496,19 +1593,27 @@ updatePk(int style, int api)
           memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
           memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
         }
+        NdbOperation::OperationOptions opts;
+        setUDpartIdNdbRecord(tup,
+                             g_ndb->getDictionary()->getTable(g_opt.m_tname),
+                             opts);
         if (mode == 0) {
           DBG("using updateTuple");
           CHK((g_const_opr= g_con->updateTuple(g_key_record, tup.m_key_row,
-                                               g_blob_record, tup.m_row)) != 0);
+                                               g_blob_record, tup.m_row,
+                                               NULL, &opts, sizeof(opts))) != 0);
         } else if (mode == 1) {
           DBG("using readTuple exclusive");
           CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
                                              g_blob_record, tup.m_row,
-                                             NdbOperation::LM_Exclusive)) != 0);
+                                             NdbOperation::LM_Exclusive,
+                                             NULL, &opts, sizeof(opts))) != 0);
         } else {
           DBG("using readTuple - will fail and retry");
           CHK((g_const_opr= g_con->readTuple(g_key_record, tup.m_key_row,
-                                             g_blob_record, tup.m_row)) != 0);
+                                             g_blob_record, tup.m_row,
+                                             NdbOperation::LM_Read,
+                                             NULL, &opts, sizeof(opts))) != 0);
         }
         CHK(getBlobHandles(g_const_opr) == 0);
       }
@@ -1554,6 +1659,7 @@ writePk(int style, int api)
         CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
         CHK(g_opr->equal("PK3", tup.m_pk3) == 0);
       }
+      setUDpartId(tup, g_opr);
       CHK(getBlobHandles(g_opr) == 0);
     }
     else
@@ -1566,8 +1672,13 @@ writePk(int style, int api)
         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
         memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
       }
+      NdbOperation::OperationOptions opts;
+      setUDpartIdNdbRecord(tup,
+                           g_ndb->getDictionary()->getTable(g_opt.m_tname),
+                           opts);
       CHK((g_const_opr= g_con->writeTuple(g_key_record, tup.m_key_row,
-                                          g_full_record, tup.m_row)) != 0);
+                                          g_full_record, tup.m_row,
+                                          NULL, &opts, sizeof(opts))) != 0);
       CHK(getBlobHandles(g_const_opr) == 0);
     }
     if (style == 0) {
@@ -1603,6 +1714,11 @@ deletePk(int api)
     {
       CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
       CHK(g_opr->deleteTuple() == 0);
+      /* Must set explicit partitionId before equal() calls as that's
+       * where implicit Blob handles are created which need the 
+       * partitioning info
+       */
+      setUDpartId(tup, g_opr);
       CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
       if (g_opt.m_pk2chr.m_len != 0)
       {
@@ -1617,8 +1733,13 @@ deletePk(int api)
         memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
         memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
       }
+      NdbOperation::OperationOptions opts;
+      setUDpartIdNdbRecord(tup,
+                           g_ndb->getDictionary()->getTable(g_opt.m_tname),
+                           opts);
       CHK((g_const_opr= g_con->deleteTuple(g_key_record, tup.m_key_row,
-                                           g_full_record)) != 0);
+                                           g_full_record, NULL,
+                                           NULL, &opts, sizeof(opts))) != 0);
     }
     if (++n == g_opt.m_batch) {
       CHK(g_con->execute(Commit) == 0);
@@ -1667,6 +1788,7 @@ deleteNoPk()
   DBG("deletePk pk1=" << hex << tup.m_pk1);
   CHK((g_opr = g_con->getNdbOperation(g_opt.m_tname)) != 0);
   CHK(g_opr->deleteTuple() == 0);
+  setUDpartId(tup, g_opr);
   CHK(g_opr->equal("PK1", tup.m_pk1) == 0);
   if (pk2chr.m_len != 0) {
     CHK(g_opr->equal("PK2", tup.m_pk2) == 0);
@@ -1702,12 +1824,14 @@ readIdx(int style, int api)
         CHK(g_opx->readTuple(NdbOperation::LM_CommittedRead) == 0);
       CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
       CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      /* No need to set partition Id for unique indexes */
       CHK(getBlobHandles(g_opx) == 0);
     }
     else
     {
       memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
       memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      /* No need to set partition Id for unique indexes */
       if (urandom(2) == 0)
         CHK((g_const_opr= g_con->readTuple(g_idx_record, tup.m_key_row,
                                            g_blob_record, tup.m_row)) != 0);
@@ -1755,12 +1879,14 @@ updateIdx(int style, int api)
       CHK(g_opx->updateTuple() == 0);
       CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
       CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      /* No need to set partition Id for unique indexes */
       CHK(getBlobHandles(g_opx) == 0);
     }
     else
     {
       memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
       memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      /* No need to set partition Id for unique indexes */
       CHK((g_const_opr= g_con->updateTuple(g_idx_record, tup.m_key_row,
                                            g_blob_record, tup.m_row)) != 0);
       CHK(getBlobHandles(g_const_opr) == 0);
@@ -1797,6 +1923,7 @@ writeIdx(int style, int api)
       CHK(g_opx->writeTuple() == 0);
       CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
       CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      /* No need to set partition Id for unique indexes */
       CHK(getBlobHandles(g_opx) == 0);
     }
     else
@@ -1806,6 +1933,7 @@ writeIdx(int style, int api)
       memcpy(&tup.m_row[g_pk1_offset], &tup.m_pk1, sizeof(tup.m_pk1));
       memcpy(&tup.m_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
       memcpy(&tup.m_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      /* No need to set partition Id for unique indexes */
       CHK((g_const_opr= g_con->writeTuple(g_idx_record, tup.m_key_row,
                                           g_full_record, tup.m_row)) != 0);
       CHK(getBlobHandles(g_const_opr) == 0);
@@ -1847,11 +1975,13 @@ deleteIdx(int api)
       CHK(g_opx->deleteTuple() == 0);
       CHK(g_opx->equal("PK2", tup.m_pk2) == 0);
       CHK(g_opx->equal("PK3", tup.m_pk3) == 0);
+      /* No need to set partition Id for unique indexes */
     }
     else
     {
       memcpy(&tup.m_key_row[g_pk2_offset], tup.pk2(), g_opt.m_pk2chr.m_totlen);
       memcpy(&tup.m_key_row[g_pk3_offset], &tup.m_pk3, sizeof(tup.m_pk3));
+      /* No need to set partition Id for unique indexes */
       CHK((g_const_opr= g_con->deleteTuple(g_idx_record, tup.m_key_row,
                                            g_full_record)) != 0);
     }
@@ -1908,10 +2038,12 @@ readScan(int style, int api, bool idx)
       CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
       CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
     }
+    /* Don't bother setting UserDefined partitions for scan tests */
     CHK(getBlobHandles(g_ops) == 0);   
   }
   else
   {
+    /* Don't bother setting UserDefined partitions for scan tests */
     if (urandom(2) == 0)
       if (! idx)
         CHK((g_ops= g_con->scanTable(g_full_record,
@@ -2010,9 +2142,11 @@ updateScan(int style, int api, bool idx)
       CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
       CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
     }
+    /* Don't bother setting UserDefined partitions for scan tests */
   }
   else
   {
+    /* Don't bother setting UserDefined partitions for scan tests */
     if (! idx)
       CHK((g_ops= g_con->scanTable(g_key_record,
                                    NdbOperation::LM_Exclusive)) != 0);
@@ -2110,9 +2244,11 @@ deleteScan(int api, bool idx)
       CHK(g_ops->getValue("PK2", tup.m_pk2) != 0);
       CHK(g_ops->getValue("PK3", (char *) &tup.m_pk3) != 0);
     }
+    /* Don't bother setting UserDefined partitions for scan tests */
   }
   else
   {
+    /* Don't bother setting UserDefined partitions for scan tests */
     if (! idx)
       CHK((g_ops= g_con->scanTable(g_key_record,
                                    NdbOperation::LM_Exclusive)) != 0);
@@ -2293,6 +2429,7 @@ setupOperation(NdbOperation*& op, OpType
   
   if (pkop)
   {
+    setUDpartId(tup, op);
     CHK(op->equal("PK1", tup.m_pk1) == 0);
     if (g_opt.m_pk2chr.m_len != 0)
     {
@@ -2433,6 +2570,7 @@ bugtest_36756()
       CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
       CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
     }
+    setUDpartId(tupExists, g_opr);
     CHK(getBlobHandles(g_opr) == 0);
     
     CHK(setBlobValue(tupExists) == 0);
@@ -2499,6 +2637,7 @@ bugtest_36756()
     CHK((g_con= g_ndb->startTransaction()) != 0);
     CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
     CHK(g_opr->deleteTuple() == 0);
+    setUDpartId(tupExists, g_opr);
     CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
     if (g_opt.m_pk2chr.m_len != 0)
     {
@@ -2548,7 +2687,7 @@ testmain()
     ndb_srand(g_opt.m_seed);
   }
   for (g_loop = 0; g_opt.m_loop == 0 || g_loop < g_opt.m_loop; g_loop++) {
-        for (int storage= 0; storage < 2; storage++) {
+    for (int storage= 0; storage < 2; storage++) {
       if (!testcase(storageSymbol[storage]))
         continue;
       
@@ -2557,6 +2696,7 @@ testmain()
       CHK(createTable(storage) == 0);
       { /* Dump created table information */
         Bcol& b1 = g_blob1;
+        DBG("FragType: " << g_dic->getTable(g_opt.m_tname)->getFragmentType()); 
         CHK(NdbBlob::getBlobTableName(b1.m_btname, g_ndb, g_opt.m_tname, "BL1") == 0);
         DBG("BL1: inline=" << b1.m_inline << " part=" << b1.m_partsize << " table=" << b1.m_btname);
         if (! g_opt.m_oneblob) {

Thread
bzr push into mysql-5.1-telco-6.4 branch (frazer:2901 to 2902)Frazer Clement3 Mar