#At file:///home/frazer/bzr/mysql-5.1-telco-7.0/ based on revid:bocklin@stripped
4080 Frazer Clement 2010-12-22
Bug#59113 MySQL Cluster : Large Blob read/write operations cause excessive resource usage
NdbTransaction.hpp has two settings added :
MaxPendingBlobReadBytes
MaxPendingBlobWriteBytes
When the volume of data being read/written to Blob column part tables in
the transaction exceeds these values, the transaction will implicitly execute
the accumulated operations.
This avoids an excessive build up of pending data which can result in resource
exhaustion in the kernel.
By default these parameters are set to 0, which means no limit.
At the MySQLD level, two new session variables are added :
ndb_blob_read_batch_bytes
ndb_blob_write_batch_bytes
These control the setting of the per-transaction values above.
They can be defaulted for all transactions on the MySQLD command line.
When no default is given, they both default to 64kB.
This patch should improve the stability of MySQL Cluster when large Blob
values are being read and written.
added:
mysql-test/suite/ndb/r/ndb_blob_big.result
mysql-test/suite/ndb/t/ndb_blob_big.cnf
mysql-test/suite/ndb/t/ndb_blob_big.test
modified:
sql/ha_ndbcluster.cc
storage/ndb/include/ndbapi/NdbBlob.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/src/ndbapi/NdbBlob.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/test/ndbapi/testBlobs.cpp
=== added file 'mysql-test/suite/ndb/r/ndb_blob_big.result'
--- a/mysql-test/suite/ndb/r/ndb_blob_big.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_blob_big.result 2010-12-22 16:23:32 +0000
@@ -0,0 +1,50 @@
+Larger MaxAllowedPacket as this limits the size of Blob MySQLD can handle
+show variables like '%max_allowed_packet%';
+Variable_name Value
+max_allowed_packet 104857600
+use test;
+create table t1 (a int primary key, b longtext) engine=ndb;
+create procedure heavy_insert(times int, bytes int)
+begin
+set @x = 0;
+repeat
+insert into t1 values (1, repeat('B', bytes));
+delete from t1 where a=1;
+set @x = @x + 1;
+until @x = times
+end repeat;
+insert into t1 values (1, repeat('B', bytes));
+end %
+create procedure heavy_read(times int)
+begin
+set @x = 0;
+repeat
+select a, length(b) from t1 where a=1;
+set @x = @x + 1;
+until @x = times
+end repeat;
+end %
+Set unlimited batch size for reads+writes
+set ndb_blob_read_batch_bytes = 0;
+set ndb_blob_write_batch_bytes = 0;
+Now try a heavy insert - idea is to show SendBuffer overload on insert
+call heavy_insert(20, 5*1024*1024);
+delete from t1;
+set ndb_blob_write_batch_bytes=100 * 1024;
+Now heavy insert should succeed
+call heavy_insert(10, 9*1024*1024);
+Now heavy read should fail
+call heavy_read(100);
+show warnings;
+Level Code Message
+Error 1297 Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDB
+Error 1297 Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDB
+Error 1297 Got temporary error 1218 'Send Buffers overloaded in NDB kernel' from NDBCLUSTER
+set ndb_blob_read_batch_bytes=100 * 1024;
+Now heavy read should succeed
+call heavy_read(100);
+set ndb_blob_write_batch_bytes=0;
+set ndb_blob_read_batch_bytes=0;
+drop procedure heavy_insert;
+drop procedure heavy_read;
+drop table t1;
=== added file 'mysql-test/suite/ndb/t/ndb_blob_big.cnf'
--- a/mysql-test/suite/ndb/t/ndb_blob_big.cnf 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_blob_big.cnf 2010-12-22 16:23:32 +0000
@@ -0,0 +1,16 @@
+!include suite/ndb/my.cnf
+
+[cluster_config]
+# Bigger (imaginary) Redo space
+Diskless=1
+NoOfFragmentLogFiles = 64
+FragmentLogFileSize = 48M
+
+# Artificially small SendBuffer (to hit error)
+SendBuffer = 512K
+# Larger LongMessageBuffer (to avoid hitting this)
+LongMessageBuffer = 16M
+
+[mysqld]
+# Allow dealing with bigger blobs
+max-allowed-packet=104857600
=== added file 'mysql-test/suite/ndb/t/ndb_blob_big.test'
--- a/mysql-test/suite/ndb/t/ndb_blob_big.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_blob_big.test 2010-12-22 16:23:32 +0000
@@ -0,0 +1,74 @@
+--source include/have_ndb.inc
+
+# Some tests with larger blobs
+
+--echo Larger MaxAllowedPacket as this limits the size of Blob MySQLD can handle
+show variables like '%max_allowed_packet%';
+
+use test;
+
+create table t1 (a int primary key, b longtext) engine=ndb;
+
+delimiter %;
+
+create procedure heavy_insert(times int, bytes int)
+begin
+ set @x = 0;
+ repeat
+ insert into t1 values (1, repeat('B', bytes));
+ delete from t1 where a=1;
+ set @x = @x + 1;
+ until @x = times
+ end repeat;
+ insert into t1 values (1, repeat('B', bytes));
+end %
+
+create procedure heavy_read(times int)
+begin
+ set @x = 0;
+ repeat
+ select a, length(b) from t1 where a=1;
+ set @x = @x + 1;
+ until @x = times
+ end repeat;
+end %
+
+delimiter ;%
+
+--echo Set unlimited batch size for reads+writes
+
+set ndb_blob_read_batch_bytes = 0;
+set ndb_blob_write_batch_bytes = 0;
+
+--echo Now try a heavy insert - idea is to show SendBuffer overload on insert
+
+--error 0,1297,1297
+call heavy_insert(20, 5*1024*1024);
+
+delete from t1;
+
+set ndb_blob_write_batch_bytes=100 * 1024;
+
+--echo Now heavy insert should succeed
+call heavy_insert(10, 9*1024*1024);
+
+--echo Now heavy read should fail
+
+--disable_result_log
+--error 0,1297,1297
+call heavy_read(100);
+--enable_result_log
+show warnings;
+
+set ndb_blob_read_batch_bytes=100 * 1024;
+
+--echo Now heavy read should succeed
+--disable_result_log
+call heavy_read(100);
+--enable_result_log
+
+set ndb_blob_write_batch_bytes=0;
+set ndb_blob_read_batch_bytes=0;
+drop procedure heavy_insert;
+drop procedure heavy_read;
+drop table t1;
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2010-12-22 11:13:45 +0000
+++ b/sql/ha_ndbcluster.cc 2010-12-22 16:23:32 +0000
@@ -221,6 +221,32 @@ static MYSQL_THDVAR_BOOL(
FALSE /* default */
);
+static MYSQL_THDVAR_UINT(
+ blob_read_batch_bytes, /* name */
+ PLUGIN_VAR_RQCMDARG,
+ "Specifies the bytesize large Blob reads "
+ "should be batched into. 0 == No limit.",
+ NULL, /* check func */
+ NULL, /* update func */
+ 65536, /* default */
+ 0, /* min */
+ UINT_MAX, /* max */
+ 0 /* block */
+);
+
+static MYSQL_THDVAR_UINT(
+ blob_write_batch_bytes, /* name */
+ PLUGIN_VAR_RQCMDARG,
+ "Specifies the bytesize large Blob writes "
+ "should be batched into. 0 == No limit.",
+ NULL, /* check func */
+ NULL, /* update func */
+ 65536, /* default */
+ 0, /* min */
+ UINT_MAX, /* max */
+ 0 /* block */
+);
+
/*
Default value for max number of transactions createable against NDB from
@@ -1425,6 +1451,8 @@ ha_ndbcluster::get_blob_values(const Ndb
m_blob_expected_count_per_row= 0;
m_blob_destination_record= dst_record;
m_blobs_row_total_size= 0;
+ ndb_op->getNdbTransaction()->
+ setMaxPendingBlobReadBytes(THDVAR(current_thd, blob_read_batch_bytes));
for (i= 0; i < table_share->fields; i++)
{
@@ -1465,6 +1493,8 @@ ha_ndbcluster::set_blob_values(const Ndb
if (table_share->blob_fields == 0)
DBUG_RETURN(0);
+ ndb_op->getNdbTransaction()->
+ setMaxPendingBlobWriteBytes(THDVAR(current_thd, blob_write_batch_bytes));
blob_index= table_share->blob_field;
blob_index_end= blob_index + table_share->blob_fields;
do
@@ -14860,6 +14890,8 @@ static struct st_mysql_sys_var* system_v
MYSQL_SYSVAR(connectstring),
MYSQL_SYSVAR(mgmd_host),
MYSQL_SYSVAR(nodeid),
+ MYSQL_SYSVAR(blob_read_batch_bytes),
+ MYSQL_SYSVAR(blob_write_batch_bytes),
NULL
};
=== modified file 'storage/ndb/include/ndbapi/NdbBlob.hpp'
--- a/storage/ndb/include/ndbapi/NdbBlob.hpp 2010-01-28 15:16:46 +0000
+++ b/storage/ndb/include/ndbapi/NdbBlob.hpp 2010-12-22 16:23:32 +0000
@@ -387,6 +387,7 @@ private:
bool theGetFlag;
char* theGetBuf;
bool theSetFlag;
+ bool theSetValueInPreExecFlag;
const char* theSetBuf;
Uint32 theGetSetBytes;
// pending ops
@@ -493,6 +494,7 @@ private:
int insertPart(const char* buf, Uint32 part, const Uint16& len);
int updateParts(const char* buf, Uint32 part, Uint32 count);
int updatePart(const char* buf, Uint32 part, const Uint16& len);
+ int deletePartsThrottled(Uint32 part, Uint32 count);
int deleteParts(Uint32 part, Uint32 count);
int deletePartsUnknown(Uint32 part);
// pending ops
=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp 2010-12-18 08:36:08 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp 2010-12-22 16:23:32 +0000
@@ -892,6 +892,18 @@ public:
*/
int releaseLockHandle(const NdbLockHandle* lockHandle);
+ /* Get maximum number of pending Blob read/write bytes before
+ * an automatic execute() occurs
+ */
+ Uint32 getMaxPendingBlobReadBytes() const;
+ Uint32 getMaxPendingBlobWriteBytes() const;
+
+ /* Set maximum number of pending Blob read/write bytes before
+ * an automatic execute() occurs
+ */
+ void setMaxPendingBlobReadBytes(Uint32 bytes);
+ void setMaxPendingBlobWriteBytes(Uint32 bytes);
+
private:
/**
* Release completed operations
@@ -1137,6 +1149,10 @@ private:
// optim: any blobs
bool theBlobFlag;
Uint8 thePendingBlobOps;
+ Uint32 maxPendingBlobReadBytes;
+ Uint32 maxPendingBlobWriteBytes;
+ Uint32 pendingBlobReadBytes;
+ Uint32 pendingBlobWriteBytes;
inline bool hasBlobOperation() { return theBlobFlag; }
static void sendTC_COMMIT_ACK(class NdbImpl *, NdbApiSignal *,
=== modified file 'storage/ndb/src/ndbapi/NdbBlob.cpp'
--- a/storage/ndb/src/ndbapi/NdbBlob.cpp 2010-09-30 09:32:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbBlob.cpp 2010-12-22 16:23:32 +0000
@@ -326,6 +326,7 @@ NdbBlob::init()
theGetFlag = false;
theGetBuf = NULL;
theSetFlag = false;
+ theSetValueInPreExecFlag = false;
theSetBuf = NULL;
theGetSetBytes = 0;
thePendingBlobOps = 0;
@@ -1294,7 +1295,7 @@ NdbBlob::setNull()
}
if (theNullFlag)
DBUG_RETURN(0);
- if (deleteParts(0, getPartCount()) == -1)
+ if (deletePartsThrottled(0, getPartCount()) == -1)
DBUG_RETURN(-1);
theNullFlag = true;
theLength = 0;
@@ -1336,7 +1337,7 @@ NdbBlob::truncate(Uint64 length)
Uint32 part1 = getPartNumber(length - 1);
Uint32 part2 = getPartNumber(theLength - 1);
assert(part2 >= part1);
- if (part2 > part1 && deleteParts(part1 + 1, part2 - part1) == -1)
+ if (part2 > part1 && deletePartsThrottled(part1 + 1, part2 - part1) == -1)
DBUG_RETURN(-1);
Uint32 off = getPartOffset(length);
if (off != 0) {
@@ -1360,7 +1361,7 @@ NdbBlob::truncate(Uint64 length)
DBUG_RETURN(-1);
}
} else {
- if (deleteParts(0, getPartCount()) == -1)
+ if (deletePartsThrottled(0, getPartCount()) == -1)
DBUG_RETURN(-1);
}
theLength = length;
@@ -1471,12 +1472,36 @@ NdbBlob::readDataPrivate(char* buf, Uint
if (len >= thePartSize) {
Uint32 part = getPartNumber(pos);
Uint32 count = len / thePartSize;
- if (readParts(buf, part, count) == -1)
- DBUG_RETURN(-1);
- Uint32 n = thePartSize * count;
- pos += n;
- buf += n;
- len -= n;
+ do
+ {
+ /* How much quota left, avoiding underflow? */
+ Uint32 partsThisTrip = count;
+ if (theEventBlobVersion == -1)
+ {
+ /* Table read as opposed to event buffer read */
+ const Uint32 remainingQuota =
+ theNdbCon->maxPendingBlobReadBytes -
+ MIN(theNdbCon->maxPendingBlobReadBytes, theNdbCon->pendingBlobReadBytes);
+ const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
+ partsThisTrip= MIN(count, maxPartsThisTrip);
+ }
+
+ if (readParts(buf, part, partsThisTrip) == -1)
+ DBUG_RETURN(-1);
+ Uint32 n = thePartSize * partsThisTrip;
+
+ pos += n;
+ buf += n;
+ len -= n;
+ part += partsThisTrip;
+ count -= partsThisTrip;
+ if (count != 0)
+ {
+ /* Execute this batch before defining next */
+ if (executePendingBlobReads() == -1)
+ DBUG_RETURN(-1);
+ }
+ } while (count != 0);
}
}
if (len > 0) {
@@ -1602,6 +1627,15 @@ NdbBlob::writeDataPrivate(const char* bu
pos += n;
buf += n;
len -= n;
+ if (theNdbCon->pendingBlobWriteBytes >
+ theNdbCon->maxPendingBlobWriteBytes)
+ {
+ /* Flush defined part ops */
+ if (executePendingBlobWrites() == -1)
+ {
+ DBUG_RETURN(-1);
+ }
+ }
}
}
}
@@ -1731,6 +1765,7 @@ NdbBlob::readTablePart(char* buf, Uint32
tOp->m_abortOption = NdbOperation::AbortOnError;
thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::ReadRequest);
+ theNdbCon->pendingBlobReadBytes += len;
DBUG_RETURN(0);
}
@@ -1791,6 +1826,7 @@ NdbBlob::insertPart(const char* buf, Uin
tOp->m_abortOption = NdbOperation::AbortOnError;
thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::InsertRequest);
+ theNdbCon->pendingBlobWriteBytes += len;
DBUG_RETURN(0);
}
@@ -1828,6 +1864,44 @@ NdbBlob::updatePart(const char* buf, Uin
tOp->m_abortOption = NdbOperation::AbortOnError;
thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::UpdateRequest);
+ theNdbCon->pendingBlobWriteBytes += len;
+ DBUG_RETURN(0);
+}
+
+int
+NdbBlob::deletePartsThrottled(Uint32 part, Uint32 count)
+{
+ DBUG_ENTER("NdbBlob::deletePartsThrottled");
+ DBUG_PRINT("info", ("part=%u count=%u maxPendingBlobWriteBytes=%llu",
+ part, count, theNdbCon->maxPendingBlobWriteBytes));
+
+ if (thePartSize)
+ {
+ do
+ {
+ /* How much quota left, avoiding underflow? */
+ const Uint32 remainingQuota =
+ theNdbCon->maxPendingBlobWriteBytes -
+ MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
+ const Uint32 maxPartsThisTrip = MAX(remainingQuota / thePartSize, 1); // always read one part
+ const Uint32 partsThisTrip= MIN(count, maxPartsThisTrip);
+
+ int rc = deleteParts(part, partsThisTrip);
+ if (rc != 0)
+ DBUG_RETURN(rc);
+
+ part+= partsThisTrip;
+ count-= partsThisTrip;
+
+ if (count != 0)
+ {
+ /* Execute this batch before defining next */
+ if (executePendingBlobWrites() == -1)
+ DBUG_RETURN(-1);
+ }
+ } while (count != 0);
+ }
+
DBUG_RETURN(0);
}
@@ -1850,6 +1924,7 @@ NdbBlob::deleteParts(Uint32 part, Uint32
n++;
thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
theNdbCon->thePendingBlobOps |= (1 << NdbOperation::DeleteRequest);
+ theNdbCon->pendingBlobWriteBytes += thePartSize; /* Assume full part */
}
DBUG_RETURN(0);
}
@@ -1873,6 +1948,11 @@ NdbBlob::deletePartsUnknown(Uint32 part)
while (true) {
Uint32 n;
n = 0;
+ /* How much quota left, avoiding underflow? */
+ Uint32 remainingQuota = theNdbCon->maxPendingBlobWriteBytes -
+ MIN(theNdbCon->maxPendingBlobWriteBytes, theNdbCon->pendingBlobWriteBytes);
+ Uint32 deleteQuota = MAX(remainingQuota / thePartSize, 1);
+ bat = MIN(deleteQuota, bat);
while (n < bat) {
NdbOperation*& tOp = tOpList[n]; // ref
tOp = theNdbCon->getNdbOperation(theBlobTable);
@@ -1884,6 +1964,7 @@ NdbBlob::deletePartsUnknown(Uint32 part)
}
tOp->m_abortOption= NdbOperation::AO_IgnoreError;
tOp->m_noErrorPropagation = true;
+ theNdbCon->pendingBlobWriteBytes += thePartSize;
n++;
}
DBUG_PRINT("info", ("bat=%u", bat));
@@ -2539,14 +2620,26 @@ NdbBlob::preExecute(NdbTransaction::Exec
* and the transaction can fail on the AbortOnError
* part operations or corrupt the head with the
* post-update operation)
+ *
+ * Additionally, if the insert is large, we'll defer to
+ * postExecute, where we can perform the writes at a more
+ * leisurely pace.
+ * We defer if we are writing more part data than we have
+ * remaining quota for.
*/
- bool performExtraInsertOpsInPreExec=
- (theNdbOp->m_abortOption != NdbOperation::AO_IgnoreError);
+ theSetValueInPreExecFlag =
+ ((theNdbOp->m_abortOption == NdbOperation::AbortOnError) &&
+ ((theGetSetBytes <= theInlineSize) || // Parts being written
+ ((theGetSetBytes - theInlineSize) <= // Total part size <=
+ (theNdbCon->maxPendingBlobWriteBytes - // (Quota -
+ MIN(theNdbCon->maxPendingBlobWriteBytes,
+ theNdbCon->pendingBlobWriteBytes)) // bytes_written)
+ )));
- if (performExtraInsertOpsInPreExec)
+ if (theSetValueInPreExecFlag)
{
DBUG_PRINT("info",
- ("Insert abortError - extra ops added in preExecute"));
+ ("Insert extra ops added in preExecute"));
/* Add operations to insert parts and update the
* Blob head+inline in the main tables
*/
@@ -2556,8 +2649,12 @@ NdbBlob::preExecute(NdbTransaction::Exec
const char* buf = theSetBuf + theInlineSize;
Uint32 bytes = theGetSetBytes - theInlineSize;
assert(thePos == theInlineSize);
+ Uint32 savePendingBlobWriteBytes = theNdbCon->pendingBlobWriteBytes;
if (writeDataPrivate(buf, bytes) == -1)
DBUG_RETURN(-1);
+ /* Assert that we didn't execute inline there */
+ assert(theNdbCon->pendingBlobWriteBytes >
+ savePendingBlobWriteBytes);
}
if (theHeadInlineUpdateFlag)
@@ -2578,7 +2675,7 @@ NdbBlob::preExecute(NdbTransaction::Exec
else
{
DBUG_PRINT("info",
- ("Insert ignoreError - waiting for Blob head insert"));
+ ("Insert waiting for Blob head insert"));
/* Require that this insert op is completed
* before beginning more user ops - avoid interleave
* with delete etc.
@@ -2847,14 +2944,16 @@ NdbBlob::postExecute(NdbTransaction::Exe
if (isInsertOp() && theSetFlag) {
/* For Inserts where the main table operation is IgnoreError,
* we perform extra operations on the head and inline parts
- * now
+ * now, as we know that the main table row was inserted
+ * successfully.
+ *
+ * Additionally, if the insert was large, we deferred writing
+ * until now to better control the flow of part operations.
+ * See preExecute()
*/
- bool performDelayedInsertOpsInPostExec=
- (theNdbOp->m_abortOption == NdbOperation::AO_IgnoreError);
-
- if (performDelayedInsertOpsInPostExec)
+ if (! theSetValueInPreExecFlag)
{
- DBUG_PRINT("info", ("Insert IgnoreError adding extra ops"));
+ DBUG_PRINT("info", ("Insert adding extra ops"));
/* Check the main table op for an error (don't proceed if
* it failed)
*/
@@ -2990,7 +3089,7 @@ NdbBlob::postExecute(NdbTransaction::Exe
if (isDeleteOp()) {
assert(anExecType == NdbTransaction::NoCommit);
getHeadFromRecAttr();
- if (deleteParts(0, getPartCount()) == -1)
+ if (deletePartsThrottled(0, getPartCount()) == -1)
DBUG_RETURN(-1);
}
setState(anExecType == NdbTransaction::NoCommit ? Active : Closed);
=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2010-11-10 12:28:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2010-12-22 16:23:32 +0000
@@ -75,6 +75,10 @@ NdbTransaction::NdbTransaction( Ndb* aNd
theBuddyConPtr(0xFFFFFFFF),
theBlobFlag(false),
thePendingBlobOps(0),
+ maxPendingBlobReadBytes(~Uint32(0)),
+ maxPendingBlobWriteBytes(~Uint32(0)),
+ pendingBlobReadBytes(0),
+ pendingBlobWriteBytes(0),
m_theFirstLockHandle(NULL),
m_theLastLockHandle(NULL)
{
@@ -150,6 +154,8 @@ NdbTransaction::init()
thePendingBlobOps = 0;
m_theFirstLockHandle = NULL;
m_theLastLockHandle = NULL;
+ pendingBlobReadBytes = 0;
+ pendingBlobWriteBytes = 0;
if (theId == NdbObjectIdMap::InvalidId)
{
theId = theNdb->theImpl->theNdbObjectIdMap.map(this);
@@ -583,8 +589,10 @@ NdbTransaction::executeNoBlobs(NdbTransa
}
}
thePendingBlobOps = 0;
+ pendingBlobReadBytes = 0;
+ pendingBlobWriteBytes = 0;
DBUG_RETURN(0);
-}//NdbTransaction::execute()
+}//NdbTransaction::executeNoBlobs()
/*****************************************************************************
void executeAsynchPrepare(ExecType aTypeOfExec,
@@ -2652,7 +2660,35 @@ NdbTransaction::scanIndex(const NdbRecor
return op;
} // ::scanIndex();
+Uint32
+NdbTransaction::getMaxPendingBlobReadBytes() const
+{
+ /* 0 == max */
+ return (maxPendingBlobReadBytes ==
+ (~Uint32(0)) ? 0 : maxPendingBlobReadBytes);
+};
+
+Uint32
+NdbTransaction::getMaxPendingBlobWriteBytes() const
+{
+ /* 0 == max */
+ return (maxPendingBlobWriteBytes ==
+ (~Uint32(0)) ? 0 : maxPendingBlobWriteBytes);
+};
+
+void
+NdbTransaction::setMaxPendingBlobReadBytes(Uint32 bytes)
+{
+ /* 0 == max */
+ maxPendingBlobReadBytes = (bytes?bytes : (~ Uint32(0)));
+}
+void
+NdbTransaction::setMaxPendingBlobWriteBytes(Uint32 bytes)
+{
+ /* 0 == max */
+ maxPendingBlobWriteBytes = (bytes?bytes : (~ Uint32(0)));
+}
#ifdef VM_TRACE
#define CASE(x) case x: ndbout << " " << #x; break
=== modified file 'storage/ndb/test/ndbapi/testBlobs.cpp'
--- a/storage/ndb/test/ndbapi/testBlobs.cpp 2010-02-26 12:44:34 +0000
+++ b/storage/ndb/test/ndbapi/testBlobs.cpp 2010-12-22 16:23:32 +0000
@@ -79,6 +79,9 @@ struct Opt {
Chr m_pk2chr;
bool m_pk2part;
bool m_oneblob;
+
+ int m_rbatch;
+ int m_wbatch;
// perf
const char* m_tnameperf;
unsigned m_rowsperf;
@@ -109,6 +112,8 @@ struct Opt {
m_pk2chr(),
m_pk2part(false),
m_oneblob(false),
+ m_rbatch(-1),
+ m_wbatch(-1),
// perf
m_tnameperf("TB2"),
m_rowsperf(10000),
@@ -148,6 +153,8 @@ printusage()
<< " -pk2cs PK2 charset or collation [" << d.m_pk2chr.m_cs << "]" << endl
<< " -pk2part partition primary table by PK2" << endl
<< " -oneblob only 1 blob attribute [default 2]" << endl
+ << " -rbatch N Read parts batchsize (bytes) [default -1] -1=random" << endl
+ << " -wbatch N Write parts batchsize (bytes) [default -1] -1=random" << endl
<< "disk or memory storage for blobs. Don't apply to performance test" << endl
<< " m Blob columns stored in memory" << endl
<< " h Blob columns stored on disk" << endl
@@ -942,6 +949,32 @@ calcTups(bool keys, bool keepsize = fals
}
}
+static void setBatchSizes()
+{
+ if (g_opt.m_rbatch != 0)
+ {
+ Uint32 byteSize = (g_opt.m_rbatch == -1) ?
+ urandom(~Uint32(0)) :
+ g_opt.m_rbatch;
+
+ DBG("Setting read batch size to " << byteSize
+ << " bytes.");
+ g_con->setMaxPendingBlobReadBytes(byteSize);
+ }
+
+ if (g_opt.m_wbatch != 0)
+ {
+ Uint32 byteSize = (g_opt.m_wbatch == -1) ?
+ urandom(~Uint32(0)) :
+ g_opt.m_wbatch;
+
+ DBG("Setting write batch size to " << byteSize
+ << " bytes.");
+ g_con->setMaxPendingBlobWriteBytes(byteSize);
+ }
+}
+
+
// blob handle ops
// const version for NdbRecord defined operations
static int
@@ -950,6 +983,8 @@ getBlobHandles(const NdbOperation* opr)
CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
+
+ setBatchSizes();
return 0;
}
@@ -961,6 +996,7 @@ getBlobHandles(NdbOperation* opr)
CHK((g_bh1 = opr->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = opr->getBlobHandle("BL2")) != 0);
+ setBatchSizes();
return 0;
}
@@ -971,6 +1007,7 @@ getBlobHandles(NdbScanOperation* ops)
CHK((g_bh1 = ops->getBlobHandle("BL1")) != 0);
if (! g_opt.m_oneblob)
CHK((g_bh2 = ops->getBlobHandle("BL2")) != 0);
+ setBatchSizes();
return 0;
}
@@ -4925,6 +4962,18 @@ NDB_COMMAND(testOdbcDriver, "testBlobs",
g_opt.m_oneblob = true;
continue;
}
+ if (strcmp(arg, "-rbatch") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_rbatch = atoi(argv[0]);
+ continue;
+ }
+ }
+ if (strcmp(arg, "-wbatch") == 0) {
+ if (++argv, --argc > 0) {
+ g_opt.m_wbatch = atoi(argv[0]);
+ continue;
+ }
+ }
// bugs
if (strcmp(arg, "-bug") == 0) {
if (++argv, --argc > 0) {
Attachment: [text/bzr-bundle] bzr/frazer@mysql.com-20101222162332-fq0fosrp28n6gcgt.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.0 branch (frazer:4080) Bug#59113 | Frazer Clement | 22 Dec |