List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:December 22 2010 4:23pm
Subject:bzr commit into mysql-5.1-telco-7.0 branch (frazer:4080) Bug#59113
View as plain text  
#At file:///home/frazer/bzr/mysql-5.1-telco-7.0/ based on revid:bocklin@stripped

 4080 Frazer Clement	2010-12-22
      Bug#59113 MySQL Cluster : Large Blob read/write operations cause excessive resource usage
      
      NdbTransaction.hpp has two settings added :
      
        MaxPendingBlobReadBytes
        MaxPendingBlobWriteBytes
      
      When the volume of data being read/written to Blob column part tables in 
      the transaction exceeds these values, the transaction will implicitly execute
      the accumulated operations.
      
      This avoids an excessive build up of pending data which can result in resource
      exhaustion in the kernel.
      
      By default these parameters are set to 0, which means no limit.
      
      At the MySQLD level, two new session variables are added :
      
        ndb_blob_read_batch_bytes
        ndb_blob_write_batch_bytes
      
      These control the setting of the per-transaction values above.  
      They can be defaulted for all transactions on the MySQLD command line.
      
      When no default is given, they both default to 64kB.
      
      This patch should improve the stability of MySQL Cluster when large Blob 
      values are being read and written.

    added:
      mysql-test/suite/ndb/r/ndb_blob_big.result
      mysql-test/suite/ndb/t/ndb_blob_big.cnf
      mysql-test/suite/ndb/t/ndb_blob_big.test
    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/include/ndbapi/NdbBlob.hpp
      storage/ndb/include/ndbapi/NdbTransaction.hpp
      storage/ndb/src/ndbapi/NdbBlob.cpp
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/test/ndbapi/testBlobs.cpp
=== added file 'mysql-test/suite/ndb/r/ndb_blob_big.result'
--- a/mysql-test/suite/ndb/r/ndb_blob_big.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_blob_big.result	2010-12-22 16:23:32 +0000
@@ -0,0 +1,50 @@
+Larger MaxAllowedPacket as this limits the size of Blob MySQLD can handle
+show variables like '%max_allowed_packet%';
+Variable_name	Value
+max_allowed_packet	104857600
+use test;
+create table t1 (a int primary key, b longtext) engine=ndb;
+create procedure heavy_insert(times int, bytes int)
+begin
+set @x = 0;
+repeat
+insert into t1 values (1, repeat('B', bytes));
+delete from t1 where a=1;
+set @x = @x + 1;
+until @x = times
+end repeat;
+insert into t1 values (1, repeat('B', bytes));
+end %
+create procedure heavy_read(times int)
+begin
+set @x = 0;
+repeat
+select a, length(b) from t1 where a=1;
+set @x = @x + 1;
+until @x = times
+end repeat;
+end %
+Set unlimited batch size for reads+writes
+set ndb_blob_read_batch_bytes = 0;
+set ndb_blob_write_batch_bytes = 0;
+Now try a heavy insert - idea is to show SendBuffer overload on insert
+call heavy_insert(20, 5*1024*1024);
+delete from t1;
+set ndb_blob_write_batch_bytes=100 * 1024;
+Now heavy insert should succeed
+call heavy_insert(10, 9*1024*1024);
+Now heavy read should fail
+call heavy_read(100);
+show warnings;
+Level	Code	Message
+Error	1297	Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDB
+Error	1297	Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDB
+Error	1297	Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDBCLUSTER
+set ndb_blob_read_batch_bytes=100 * 1024;
+Now heavy read should succeed
+call heavy_read(100);
+set ndb_blob_write_batch_bytes=0;
+set ndb_blob_read_batch_bytes=0;
+drop procedure heavy_insert;
+drop procedure heavy_read;
+drop table t1;

=== added file 'mysql-test/suite/ndb/t/ndb_blob_big.cnf'
--- a/mysql-test/suite/ndb/t/ndb_blob_big.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_blob_big.cnf	2010-12-22 16:23:32 +0000
@@ -0,0 +1,16 @@
+!include suite/ndb/my.cnf
+
+[cluster_config]
+# Bigger (imaginary) Redo space
+Diskless=1
+NoOfFragmentLogFiles = 64
+FragmentLogFileSize = 48M
+
+# Artificially small SendBuffer (to hit error)
+SendBuffer = 512K
+# Larger LongMessageBuffer (to avoid hitting this)
+LongMessageBuffer = 16M
+
+[mysqld]
+# Allow dealing with bigger blobs
+max-allowed-packet=104857600

=== added file 'mysql-test/suite/ndb/t/ndb_blob_big.test'
--- a/mysql-test/suite/ndb/t/ndb_blob_big.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_blob_big.test	2010-12-22 16:23:32 +0000
@@ -0,0 +1,74 @@
+--source include/have_ndb.inc
+
+# Some tests with larger blobs
+
+--echo Larger MaxAllowedPacket as this limits the size of Blob MySQLD can handle
+show variables like '%max_allowed_packet%';
+
+use test;
+
+create table t1 (a int primary key, b longtext) engine=ndb;
+
+delimiter %;
+
+create procedure heavy_insert(times int, bytes int)
+begin
+  set @x = 0;
+  repeat
+    insert into t1 values (1, repeat('B', bytes));
+    delete from t1 where a=1;
+    set @x = @x + 1;
+  until @x = times
+  end repeat;
+  insert into t1 values (1, repeat('B', bytes));
+end %
+
+create procedure heavy_read(times int)
+begin
+  set @x = 0;
+  repeat
+    select a, length(b) from t1 where a=1;
+    set @x = @x + 1;
+  until @x = times
+  end repeat;
+end %
+
+delimiter ;%
+
+--echo Set unlimited batch size for reads+writes
+
+set ndb_blob_read_batch_bytes = 0;
+set ndb_blob_write_batch_bytes = 0;
+
+--echo Now try a heavy insert - idea is to show SendBuffer overload on insert
+
+--error 0,1297,1297
+call heavy_insert(20, 5*1024*1024);
+
+delete from t1;
+
+set ndb_blob_write_batch_bytes=100 * 1024;
+
+--echo Now heavy insert should succeed
+call heavy_insert(10, 9*1024*1024);
+
+--echo Now heavy read should fail
+
+--disable_result_log
+--error 0,1297,1297
+call heavy_read(100);
+--enable_result_log
+show warnings;
+
+set ndb_blob_read_batch_bytes=100 * 1024;
+
+--echo Now heavy read should succeed
+--disable_result_log
+call heavy_read(100);
+--enable_result_log
+
+set ndb_blob_write_batch_bytes=0;
+set ndb_blob_read_batch_bytes=0;
+drop procedure heavy_insert;
+drop procedure heavy_read;
+drop table t1;

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2010-12-22 11:13:45 +0000
+++ b/sql/ha_ndbcluster.cc	2010-12-22 16:23:32 +0000
@@ -221,6 +221,32 @@ static MYSQL_THDVAR_BOOL(
   FALSE                              /* default */
 );
 
+static MYSQL_THDVAR_UINT(
+  blob_read_batch_bytes,             /* name */
+  PLUGIN_VAR_RQCMDARG,
+  "Specifies the bytesize large Blob reads "
+  "should be batched into.  0 == No limit.",
+  NULL,                              /* check func */
+  NULL,                              /* update func */
+  65536,                             /* default */
+  0,                                 /* min */
+  UINT_MAX,                          /* max */
+  0                                  /* block */
+);
+
+static MYSQL_THDVAR_UINT(
+  blob_write_batch_bytes,            /* name */
+  PLUGIN_VAR_RQCMDARG,
+  "Specifies the bytesize large Blob writes "
+  "should be batched into.  0 == No limit.",
+  NULL,                              /* check func */
+  NULL,                              /* update func */
+  65536,                             /* default */
+  0,                                 /* min */
+  UINT_MAX,                          /* max */
+  0                                  /* block */
+);
+  
 
 /*
   Default value for max number of transactions createable against NDB from
@@ -1425,6 +1451,8 @@ ha_ndbcluster::get_blob_values(const Ndb
   m_blob_expected_count_per_row= 0;
   m_blob_destination_record= dst_record;
   m_blobs_row_total_size= 0;
+  ndb_op->getNdbTransaction()->
+    setMaxPendingBlobReadBytes(THDVAR(current_thd, blob_read_batch_bytes));
 
   for (i= 0; i < table_share->fields; i++) 
   {
@@ -1465,6 +1493,8 @@ ha_ndbcluster::set_blob_values(const Ndb
   if (table_share->blob_fields == 0)
     DBUG_RETURN(0);
 
+  ndb_op->getNdbTransaction()->
+    setMaxPendingBlobWriteBytes(THDVAR(current_thd, blob_write_batch_bytes));
   blob_index= table_share->blob_field;
   blob_index_end= blob_index + table_share->blob_fields;
   do
@@ -14860,6 +14890,8 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(connectstring),
   MYSQL_SYSVAR(mgmd_host),
   MYSQL_SYSVAR(nodeid),
+  MYSQL_SYSVAR(blob_read_batch_bytes),
+  MYSQL_SYSVAR(blob_write_batch_bytes),
 
   NULL
 };

=== modified file 'storage/ndb/include/ndbapi/NdbBlob.hpp'
--- a/storage/ndb/include/ndbapi/NdbBlob.hpp	2010-01-28 15:16:46 +0000
+++ b/storage/ndb/include/ndbapi/NdbBlob.hpp	2010-12-22 16:23:32 +0000
@@ -387,6 +387,7 @@ private:
   bool theGetFlag;
   char* theGetBuf;
   bool theSetFlag;
+  bool theSetValueInPreExecFlag;
   const char* theSetBuf;
   Uint32 theGetSetBytes;
   // pending ops
@@ -493,6 +494,7 @@ private:
   int insertPart(const char* buf, Uint32 part, const Uint16& len);
   int updateParts(const char* buf, Uint32 part, Uint32 count);
   int updatePart(const char* buf, Uint32 part, const Uint16& len);
+  int deletePartsThrottled(Uint32 part, Uint32 count);
   int deleteParts(Uint32 part, Uint32 count);
   int deletePartsUnknown(Uint32 part);
   // pending ops

=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2010-12-18 08:36:08 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2010-12-22 16:23:32 +0000
@@ -892,6 +892,18 @@ public:
    */
   int releaseLockHandle(const NdbLockHandle* lockHandle);
 
+  /* Get maximum number of pending Blob read/write bytes before
+   * an automatic execute() occurs 
+   */
+  Uint32 getMaxPendingBlobReadBytes() const;
+  Uint32 getMaxPendingBlobWriteBytes() const;
+
+  /* Set maximum number of pending Blob read/write bytes before
+   * an automatic execute() occurs
+   */
+  void setMaxPendingBlobReadBytes(Uint32 bytes);
+  void setMaxPendingBlobWriteBytes(Uint32 bytes);
+
 private:						
   /**
    * Release completed operations
@@ -1137,6 +1149,10 @@ private:						
   // optim: any blobs
   bool theBlobFlag;
   Uint8 thePendingBlobOps;
+  Uint32 maxPendingBlobReadBytes;
+  Uint32 maxPendingBlobWriteBytes;
+  Uint32 pendingBlobReadBytes;
+  Uint32 pendingBlobWriteBytes;
   inline bool hasBlobOperation() { return theBlobFlag; }
 
   static void sendTC_COMMIT_ACK(class NdbImpl *, NdbApiSignal *,

=== modified file 'storage/ndb/src/ndbapi/NdbBlob.cpp'
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp	2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp	2010-12-22 16:23:32 +0000
@@ -326,6 +326,7 @@ NdbBlob::init()
   theGetFlag = false;
   theGetBuf = NULL;
   theSetFlag = false;
+  theSetValueInPreExecFlag = false;
   theSetBuf = NULL;
   theGetSetBytes = 0;
   thePendingBlobOps = 0;
@@ -1294,7 +1295,7 @@ NdbBlob::setNull()
   }
   if (theNullFlag)
     DBUG_RETURN(0);
-  if (deleteParts(0, getPartCount()) == -1)
+  if (deletePartsThrottled(0, getPartCount()) == -1)
     DBUG_RETURN(-1);
   theNullFlag = true;
   theLength = 0;
@@ -1336,7 +1337,7 @@ NdbBlob::truncate(Uint64 length)
       Uint32 part1 = getPartNumber(length - 1);
       Uint32 part2 = getPartNumber(theLength - 1);
       assert(part2 >= part1);
-      if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
+      if (part2 > part1 && deletePartsThrottled(part1 + 1, part2 - part1) == -1)
       DBUG_RETURN(-1);
       Uint32 off = getPartOffset(length);
       if (off != 0) {
@@ -1360,7 +1361,7 @@ NdbBlob::truncate(Uint64 length)
           DBUG_RETURN(-1);
       }
     } else {
-      if (deleteParts(0, getPartCount()) == -1)
+      if (deletePartsThrottled(0, getPartCount()) == -1)
         DBUG_RETURN(-1);
     }
     theLength = length;
@@ -1471,12 +1472,36 @@ NdbBlob::readDataPrivate(char* buf, Uint
     if (len >= thePartSize) {
       Uint32 part = getPartNumber(pos);
       Uint32 count = len / thePartSize;
-      if (readParts(buf, part, count) == -1)
-        DBUG_RETURN(-1);
-      Uint32 n = thePartSize * count;
-      pos += n;
-      buf += n;
-      len -= n;
+      do
+      {
+        /* How much quota left, avoiding underflow? */
+        Uint32 partsThisTrip = count;
+        if (theEventBlobVersion == -1)
+        {
+          /* Table read as opposed to event buffer read */
+          const Uint32 remainingQuota = 
+            theNdbCon->maxPendingBlobReadBytes - 
+            MIN(theNdbCon->maxPendingBlobReadBytes, theNdbCon->pendingBlobReadBytes);
+          const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
+          partsThisTrip= MIN(count, maxPartsThisTrip);
+        }
+        
+        if (readParts(buf, part, partsThisTrip) == -1)
+          DBUG_RETURN(-1);
+        Uint32 n = thePartSize * partsThisTrip;
+
+        pos += n;
+        buf += n;
+        len -= n;
+        part += partsThisTrip;
+        count -= partsThisTrip;
+        if (count != 0)
+        {
+          /* Execute this batch before defining next */
+          if (executePendingBlobReads() == -1)
+            DBUG_RETURN(-1);
+        }
+      } while (count != 0);
     }
   }
   if (len > 0) {
@@ -1602,6 +1627,15 @@ NdbBlob::writeDataPrivate(const char* bu
         pos += n;
         buf += n;
         len -= n;
+        if (theNdbCon->pendingBlobWriteBytes > 
+            theNdbCon->maxPendingBlobWriteBytes)
+        {
+          /* Flush defined part ops */
+          if (executePendingBlobWrites() == -1)
+          {
+            DBUG_RETURN(-1);
+          }
+        }
       }
     }
   }
@@ -1731,6 +1765,7 @@ NdbBlob::readTablePart(char* buf, Uint32
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
+  theNdbCon->pendingBlobReadBytes += len;
   DBUG_RETURN(0);
 }
 
@@ -1791,6 +1826,7 @@ NdbBlob::insertPart(const char* buf, Uin
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
+  theNdbCon->pendingBlobWriteBytes += len;
   DBUG_RETURN(0);
 }
 
@@ -1828,6 +1864,44 @@ NdbBlob::updatePart(const char* buf, Uin
   tOp->m_abortOption = NdbOperation::AbortOnError;
   thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
   theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
+  theNdbCon->pendingBlobWriteBytes += len;
+  DBUG_RETURN(0);
+}
+
+int
+NdbBlob::deletePartsThrottled(Uint32 part, Uint32 count)
+{
+  DBUG_ENTER("NdbBlob::deletePartsThrottled");
+  DBUG_PRINT("info", ("part=%u count=%u maxPendingBlobWriteBytes=%llu", 
+                      part, count, theNdbCon->maxPendingBlobWriteBytes));
+  
+  if (thePartSize)
+  {
+    do
+    {
+      /* How much quota left, avoiding underflow? */
+      const Uint32 remainingQuota = 
+        theNdbCon->maxPendingBlobWriteBytes - 
+        MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
+      const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
+      const Uint32 partsThisTrip= MIN(count, maxPartsThisTrip);
+      
+      int rc = deleteParts(part, partsThisTrip);
+      if (rc != 0)
+        DBUG_RETURN(rc);
+      
+      part+= partsThisTrip;
+      count-= partsThisTrip;
+      
+      if (count != 0)
+      {
+        /* Execute this batch before defining next */
+        if (executePendingBlobWrites() == -1)
+          DBUG_RETURN(-1);
+      }
+    } while (count != 0);
+  }
+
   DBUG_RETURN(0);
 }
 
@@ -1850,6 +1924,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32
     n++;
     thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
     theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
+    theNdbCon->pendingBlobWriteBytes += thePartSize; /* Assume full part */
   }
   DBUG_RETURN(0);
 }
@@ -1873,6 +1948,11 @@ NdbBlob::deletePartsUnknown(Uint32 part)
   while (true) {
     Uint32 n;
     n = 0;
+    /* How much quota left, avoiding underflow? */
+    Uint32 remainingQuota = theNdbCon->maxPendingBlobWriteBytes - 
+      MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
+    Uint32 deleteQuota = MAX(remainingQuota / thePartSize, 1);
+    bat = MIN(deleteQuota, bat);
     while (n < bat) {
       NdbOperation*& tOp = tOpList[n];  // ref
       tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -1884,6 +1964,7 @@ NdbBlob::deletePartsUnknown(Uint32 part)
       }
       tOp->m_abortOption= NdbOperation::AO_IgnoreError;
       tOp->m_noErrorPropagation = true;
+      theNdbCon->pendingBlobWriteBytes += thePartSize;
       n++;
     }
     DBUG_PRINT("info", ("bat=%u", bat));
@@ -2539,14 +2620,26 @@ NdbBlob::preExecute(NdbTransaction::Exec
      * and the transaction can fail on the AbortOnError 
      * part operations or corrupt the head with the 
      * post-update operation)
+     *
+     * Additionally, if the insert is large, we'll defer to
+     * postExecute, where we can perform the writes at a more
+     * leisurely pace.
+     * We defer if we are writing more part data than we have
+     * remaining quota for.
      */
-    bool performExtraInsertOpsInPreExec= 
-      (theNdbOp->m_abortOption != NdbOperation::AO_IgnoreError);
+    theSetValueInPreExecFlag =
+      ((theNdbOp->m_abortOption == NdbOperation::AbortOnError) &&
+       ((theGetSetBytes <= theInlineSize) ||   // Parts being written
+        ((theGetSetBytes - theInlineSize) <=      // Total part size <=
+         (theNdbCon->maxPendingBlobWriteBytes -   //  (Quota -
+          MIN(theNdbCon->maxPendingBlobWriteBytes,
+              theNdbCon->pendingBlobWriteBytes))  //   bytes_written)
+         )));
 
-    if (performExtraInsertOpsInPreExec)
+    if (theSetValueInPreExecFlag)
     {
       DBUG_PRINT("info", 
-                 ("Insert abortError - extra ops added in preExecute"));
+                 ("Insert extra ops added in preExecute"));
       /* Add operations to insert parts and update the
        * Blob head+inline in the main tables
        */
@@ -2556,8 +2649,12 @@ NdbBlob::preExecute(NdbTransaction::Exec
         const char* buf = theSetBuf + theInlineSize;
         Uint32 bytes = theGetSetBytes - theInlineSize;
         assert(thePos == theInlineSize);
+        Uint32 savePendingBlobWriteBytes = theNdbCon->pendingBlobWriteBytes;
         if (writeDataPrivate(buf, bytes) == -1)
           DBUG_RETURN(-1);
+        /* Assert that we didn't execute inline there */
+        assert(theNdbCon->pendingBlobWriteBytes >
+               savePendingBlobWriteBytes);
       }
       
       if (theHeadInlineUpdateFlag)
@@ -2578,7 +2675,7 @@ NdbBlob::preExecute(NdbTransaction::Exec
     else
     {
       DBUG_PRINT("info", 
-                 ("Insert ignoreError - waiting for Blob head insert"));
+                 ("Insert waiting for Blob head insert"));
       /* Require that this insert op is completed 
        * before beginning more user ops - avoid interleave
        * with delete etc.
@@ -2847,14 +2944,16 @@ NdbBlob::postExecute(NdbTransaction::Exe
   if (isInsertOp() && theSetFlag) {
     /* For Inserts where the main table operation is IgnoreError, 
      * we perform extra operations on the head and inline parts
-     * now
+     * now, as we know that the main table row was inserted 
+     * successfully.
+     *
+     * Additionally, if the insert was large, we deferred writing
+     * until now to better control the flow of part operations.
+     * See preExecute()
      */
-    bool performDelayedInsertOpsInPostExec= 
-      (theNdbOp->m_abortOption == NdbOperation::AO_IgnoreError);
-
-    if (performDelayedInsertOpsInPostExec)
+    if (! theSetValueInPreExecFlag)
     {
-      DBUG_PRINT("info", ("Insert IgnoreError adding extra ops"));
+      DBUG_PRINT("info", ("Insert adding extra ops"));
       /* Check the main table op for an error (don't proceed if 
        * it failed) 
        */
@@ -2990,7 +3089,7 @@ NdbBlob::postExecute(NdbTransaction::Exe
   if (isDeleteOp()) {
     assert(anExecType == NdbTransaction::NoCommit);
     getHeadFromRecAttr();
-    if (deleteParts(0, getPartCount()) == -1)
+    if (deletePartsThrottled(0, getPartCount()) == -1)
       DBUG_RETURN(-1);
   }
   setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2010-11-10 12:28:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2010-12-22 16:23:32 +0000
@@ -75,6 +75,10 @@ NdbTransaction::NdbTransaction( Ndb* aNd
   theBuddyConPtr(0xFFFFFFFF),
   theBlobFlag(false),
   thePendingBlobOps(0),
+  maxPendingBlobReadBytes(~Uint32(0)),
+  maxPendingBlobWriteBytes(~Uint32(0)),
+  pendingBlobReadBytes(0),
+  pendingBlobWriteBytes(0),
   m_theFirstLockHandle(NULL),
   m_theLastLockHandle(NULL)
 {
@@ -150,6 +154,8 @@ NdbTransaction::init()
   thePendingBlobOps = 0;
   m_theFirstLockHandle    = NULL;
   m_theLastLockHandle     = NULL;
+  pendingBlobReadBytes = 0;
+  pendingBlobWriteBytes = 0;
   if (theId == NdbObjectIdMap::InvalidId)
   {
     theId = theNdb->theImpl->theNdbObjectIdMap.map(this);
@@ -583,8 +589,10 @@ NdbTransaction::executeNoBlobs(NdbTransa
     }
   }
   thePendingBlobOps = 0;
+  pendingBlobReadBytes = 0;
+  pendingBlobWriteBytes = 0;
   DBUG_RETURN(0);
-}//NdbTransaction::execute()
+}//NdbTransaction::executeNoBlobs()
 
 /*****************************************************************************
 void executeAsynchPrepare(ExecType           aTypeOfExec,
@@ -2652,7 +2660,35 @@ NdbTransaction::scanIndex(const NdbRecor
   return op;
 } // ::scanIndex();
 
+Uint32
+NdbTransaction::getMaxPendingBlobReadBytes() const
+{
+  /* 0 == max */
+  return (maxPendingBlobReadBytes == 
+          (~Uint32(0)) ? 0 : maxPendingBlobReadBytes);
+};
+
+Uint32
+NdbTransaction::getMaxPendingBlobWriteBytes() const
+{
+  /* 0 == max */
+  return (maxPendingBlobWriteBytes == 
+          (~Uint32(0)) ? 0 : maxPendingBlobWriteBytes);
+};
+
+void
+NdbTransaction::setMaxPendingBlobReadBytes(Uint32 bytes)
+{
+  /* 0 == max */
+  maxPendingBlobReadBytes = (bytes?bytes : (~ Uint32(0)));
+}
 
+void
+NdbTransaction::setMaxPendingBlobWriteBytes(Uint32 bytes)
+{
+  /* 0 == max */
+  maxPendingBlobWriteBytes = (bytes?bytes : (~ Uint32(0)));
+}
 
 #ifdef VM_TRACE
 #define CASE(x) case x: ndbout << " " << #x; break

=== modified file 'storage/ndb/test/ndbapi/testBlobs.cpp'
--- a/storage/ndb/test/ndbapi/testBlobs.cpp	2010-02-26 12:44:34 +0000
+++ b/storage/ndb/test/ndbapi/testBlobs.cpp	2010-12-22 16:23:32 +0000
@@ -79,6 +79,9 @@ struct Opt {
   Chr m_pk2chr;
   bool m_pk2part;
   bool m_oneblob;
+
+  int m_rbatch;
+  int m_wbatch;
   // perf
   const char* m_tnameperf;
   unsigned m_rowsperf;
@@ -109,6 +112,8 @@ struct Opt {
     m_pk2chr(),
     m_pk2part(false),
     m_oneblob(false),
+    m_rbatch(-1),
+    m_wbatch(-1),
     // perf
     m_tnameperf("TB2"),
     m_rowsperf(10000),
@@ -148,6 +153,8 @@ printusage()
     << "  -pk2cs      PK2 charset or collation [" << d.m_pk2chr.m_cs << "]" << endl
     << "  -pk2part    partition primary table by PK2" << endl
     << "  -oneblob    only 1 blob attribute [default 2]" << endl
+    << "  -rbatch     N Read parts batchsize (bytes) [default -1] -1=random" << endl
+    << "  -wbatch     N Write parts batchsize (bytes) [default -1] -1=random" << endl
     << "disk or memory storage for blobs.  Don't apply to performance test" << endl
     << "  m           Blob columns stored in memory" << endl
     << "  h           Blob columns stored on disk" << endl
@@ -942,6 +949,32 @@ calcTups(bool keys, bool keepsize = fals
   }
 }
 
+static void setBatchSizes()
+{
+  if (g_opt.m_rbatch != 0)
+  {
+    Uint32 byteSize = (g_opt.m_rbatch == -1) ? 
+      urandom(~Uint32(0)) :
+      g_opt.m_rbatch;
+    
+    DBG("Setting read batch size to " << byteSize 
+        << " bytes.");
+    g_con->setMaxPendingBlobReadBytes(byteSize);
+  }
+  
+  if (g_opt.m_wbatch != 0)
+  {
+    Uint32 byteSize = (g_opt.m_wbatch == -1) ? 
+      urandom(~Uint32(0)) :
+      g_opt.m_wbatch;
+    
+    DBG("Setting write batch size to " << byteSize 
+        << " bytes.");
+    g_con->setMaxPendingBlobWriteBytes(byteSize);
+  }
+}
+    
+
 // blob handle ops
 // const version for NdbRecord defined operations
 static int
@@ -950,6 +983,8 @@ getBlobHandles(const NdbOperation* opr)
   CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
   if (! g_opt.m_oneblob)
     CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
+
+  setBatchSizes();
   return 0;
 }
 
@@ -961,6 +996,7 @@ getBlobHandles(NdbOperation* opr)
   CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
   if (! g_opt.m_oneblob)
     CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
+  setBatchSizes();
   return 0;
 }
 
@@ -971,6 +1007,7 @@ getBlobHandles(NdbScanOperation* ops)
   CHK((g_bh1 = ops->getBlobHandle("BL1")) != 0);
   if (! g_opt.m_oneblob)
     CHK((g_bh2 = ops->getBlobHandle("BL2")) != 0);
+  setBatchSizes();
   return 0;
 }
 
@@ -4925,6 +4962,18 @@ NDB_COMMAND(testOdbcDriver, "testBlobs",
       g_opt.m_oneblob = true;
       continue;
     }
+    if (strcmp(arg, "-rbatch") == 0) {
+      if (++argv, --argc > 0) {
+        g_opt.m_rbatch = atoi(argv[0]);
+        continue;
+      }
+    }
+    if (strcmp(arg, "-wbatch") == 0) {
+      if (++argv, --argc > 0) {
+        g_opt.m_wbatch = atoi(argv[0]);
+        continue;
+      }
+    }
     // bugs
     if (strcmp(arg, "-bug") == 0) {
       if (++argv, --argc > 0) {


Attachment: [text/bzr-bundle] bzr/frazer@mysql.com-20101222162332-fq0fosrp28n6gcgt.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (frazer:4080) Bug#59113Frazer Clement22 Dec