List:Commits« Previous MessageNext Message »
From:jonas oreland Date:January 31 2012 6:39am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3724 to 3742)
View as plain text  
 3742 jonas oreland	2012-01-31 [merge]
      ndb - merge 72 to wl5929_72

    added:
      mysql-test/suite/ndb_big/bug13637411-master.opt
      mysql-test/suite/ndb_big/bug13637411.cnf
      mysql-test/suite/ndb_big/bug13637411.test
    modified:
      mysql-test/include/default_ndbd.cnf
      mysql-test/suite/ndb/r/ndb_index_stat_partitions.result
      mysql-test/suite/ndb/t/ndb_index_stat_partitions.test
      storage/ndb/include/kernel/kernel_types.h
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/backup/BackupInit.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/dummy_nonmt.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/src/ndbjtie/NdbApiWrapper.hpp
      storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java
      storage/ndb/src/ndbjtie/ndbapi_jtie.hpp
 3741 Mikael Ronstrom	2012-01-27
      add static to local method

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3740 Mikael Ronstrom	2012-01-27
      Enhanced handling of send buffer memory

    modified:
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/Emulator.hpp
      storage/ndb/src/kernel/vm/dummy_nonmt.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
 3739 Mikael Ronstrom	2012-01-26
      Remove pack of non-existing nodes, remove extra printout

    modified:
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/kernel/vm/mt.cpp
 3738 Mikael Ronstrom	2012-01-26
      Minor fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3737 Mikael Ronstrom	2012-01-26
      Minor fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3736 Mikael Ronstrom	2012-01-26
      Second merge step

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/vm/mt.cpp
 3735 Mikael Ronstrom	2012-01-26 [merge]
      merge

    added:
      sql/ndb_repl_tab.cc
      sql/ndb_repl_tab.h
    modified:
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_cond.cc
      sql/item_cmpfunc.h
      sql/ndb_share.h
      sql/ndb_table_guard.h
      storage/ndb/CMakeLists.txt
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
 3734 Mikael Ronstrom	2012-01-25
      Add pack send buffers before starting a new loop to minimize risk of send buffer overload

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3733 Mikael Ronstrom	2012-01-25
      Fix overload handling of scans

    modified:
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
 3732 Mikael Ronstrom	2012-01-25
      Regulate scan already on slowdown limit, add some stuff for debugging

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
 3731 Mikael Ronstrom	2012-01-25
      compilation fix

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3730 Mikael Ronstrom	2012-01-25
      Don't send so immediately, only when absolutely necessary from flush_send_buffer

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3729 Mikael Ronstrom	2012-01-25
      Need to send when grabbing send node lock

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3728 Mikael Ronstrom	2012-01-24
      More to fix send buffer overload issue

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3727 Mikael Ronstrom	2012-01-24
      Improved pack page patch

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3726 Mikael Ronstrom	2012-01-24
      Release memory and add a debug printout to see how much it is used

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3725 Mikael Ronstrom	2012-01-24
      pack sb_pages

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
 3724 Mikael Ronstrom	2012-01-24 [merge]
      merge

    added:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImplForNdbRecord.java
    renamed:
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result => mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test => mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test
    modified:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
=== modified file 'mysql-test/include/default_ndbd.cnf'
--- a/mysql-test/include/default_ndbd.cnf	2012-01-19 12:28:49 +0000
+++ b/mysql-test/include/default_ndbd.cnf	2012-01-31 06:37:19 +0000
@@ -1,7 +1,5 @@
 
 [cluster_config]
-ThreadConfig=ldm={count=16},tc={count=16},send={count=4},recv={count=4}
-NoOfFragmentLogParts=16
 MaxNoOfSavedMessages=          1000
 MaxNoOfConcurrentTransactions= 2048
 MaxNoOfConcurrentOperations=   10000

=== modified file 'mysql-test/suite/ndb/r/ndb_condition_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	2012-01-19 12:28:47 +0000
+++ b/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	2012-01-24 14:02:05 +0000
@@ -2404,5 +2404,21 @@ a	b
 3	bbb
 4	ccc
 drop table t;
+create table escapetest ( emailaddress varchar(255) default null, id int not
+null default '0') engine=ndbcluster;
+insert into escapetest values('test_data@stripped', 1);
+explain select * from escapetest where emailaddress like "test_%";
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	escapetest	ALL	NULL	NULL	NULL	NULL	2	Using where with pushed condition
+select * from escapetest where emailaddress like "test_%";
+emailaddress	id
+test_data@stripped	1
+explain select * from escapetest where emailaddress like "test|_%" escape '|';
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	escapetest	ALL	NULL	NULL	NULL	NULL	2	Using where
+select * from escapetest where emailaddress like "test|_%" escape '|';
+emailaddress	id
+test_data@stripped	1
+drop table escapetest;
 set @@session.optimizer_switch = @old_ecpd;
 DROP TABLE t1,t2,t3,t4,t5;

=== modified file 'mysql-test/suite/ndb/r/ndb_index_stat_partitions.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat_partitions.result	2011-09-19 19:51:16 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat_partitions.result	2012-01-26 12:11:53 +0000
@@ -32,14 +32,14 @@ partition by key (K) partitions 1;
 INSERT INTO t1(I,J,L) VALUES
 (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
 (6,6,6),(7,7,7),(8,8,8),(9,9,9),(0,0,0);
-INSERT INTO t1(I,J,L) SELECT I,1,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,2,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,3,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,4,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,5,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,6,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,7,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,8,I FROM t1;
+INSERT INTO t1(I,J,L) SELECT I,1,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,2,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,3,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,4,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,5,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,6,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,7,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,8,I FROM t1 ORDER BY K;
 select i, count(*) from t1 group by 1 order by 1;
 i	count(*)
 0	256

=== modified file 'mysql-test/suite/ndb/t/ndb_condition_pushdown.test'
--- a/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	2012-01-19 12:28:47 +0000
+++ b/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	2012-01-24 14:02:05 +0000
@@ -2424,6 +2424,16 @@ select * from t where b like 'a%';
 select * from t where b not like 'a%';
 drop table t;
 
+# Bug #13604447 61064: SELECT QUERIES USING ESCAPE SYNTAX FAIL
+create table escapetest ( emailaddress varchar(255) default null, id int not
+null default '0') engine=ndbcluster;
+insert into escapetest values('test_data@stripped', 1);
+explain select * from escapetest where emailaddress like "test_%";
+select * from escapetest where emailaddress like "test_%";
+explain select * from escapetest where emailaddress like "test|_%" escape '|';
+select * from escapetest where emailaddress like "test|_%" escape '|';
+drop table escapetest;
+
 set @@session.optimizer_switch = @old_ecpd;
 DROP TABLE t1,t2,t3,t4,t5;
 

=== modified file 'mysql-test/suite/ndb/t/ndb_index_stat_partitions.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat_partitions.test	2011-09-19 08:16:01 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat_partitions.test	2012-01-26 12:11:53 +0000
@@ -24,14 +24,14 @@ INSERT INTO t1(I,J,L) VALUES
 (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
 (6,6,6),(7,7,7),(8,8,8),(9,9,9),(0,0,0);
 
-INSERT INTO t1(I,J,L) SELECT I,1,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,2,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,3,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,4,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,5,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,6,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,7,I FROM t1;
-INSERT INTO t1(I,J,L) SELECT I,8,I FROM t1;
+INSERT INTO t1(I,J,L) SELECT I,1,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,2,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,3,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,4,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,5,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,6,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,7,I FROM t1 ORDER BY K;
+INSERT INTO t1(I,J,L) SELECT I,8,I FROM t1 ORDER BY K;
 
 select i, count(*) from t1 group by 1 order by 1;
 select l, count(*) from t1 group by 1 order by 1;

=== added file 'mysql-test/suite/ndb_big/bug13637411-master.opt'
--- a/mysql-test/suite/ndb_big/bug13637411-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug13637411-master.opt	2012-01-28 09:09:19 +0000
@@ -0,0 +1 @@
+--testcase-timeout=60

=== added file 'mysql-test/suite/ndb_big/bug13637411.cnf'
--- a/mysql-test/suite/ndb_big/bug13637411.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug13637411.cnf	2012-01-28 09:09:19 +0000
@@ -0,0 +1,29 @@
+!include include/default_mysqld.cnf
+
+[cluster_config.1]
+ndbd=
+ndb_mgmd=
+mysqld=
+
+NoOfReplicas=1
+DataMemory=21G
+IndexMemory=220M
+Diskless=1
+
+[mysqld]
+# Make all mysqlds use cluster
+ndbcluster
+
+ndb-cluster-connection-pool=1
+ndb-force-send=1
+ndb-use-exact-count=0
+ndb-extra-logging=1
+ndb-autoincrement-prefetch-sz=256
+engine-condition-pushdown=1
+ndb-wait-connected=600
+ndb-wait-setup=300
+
+[ENV]
+NDB_CONNECTSTRING=             @mysql_cluster.1.ndb_connectstring
+MASTER_MYPORT=                 @mysqld.1.1.port
+

=== added file 'mysql-test/suite/ndb_big/bug13637411.test'
--- a/mysql-test/suite/ndb_big/bug13637411.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_big/bug13637411.test	2012-01-28 15:29:25 +0000
@@ -0,0 +1,140 @@
+source suite.inc;
+source include/have_ndb.inc;
+result_format 2;
+
+call mtr.add_suppression("The table '.*' is full");
+
+select version();
+
+CREATE TABLE t1 (
+  c0 int unsigned not null primary key,
+  c00 char(255) not null default '',
+  c01 char(255) not null default '',
+  c02 char(255) not null default '',
+  c03 char(255) not null default '',
+  c04 char(255) not null default '',
+  c05 char(255) not null default '',
+  c06 char(255) not null default '',
+  c07 char(255) not null default '',
+  c08 char(255) not null default '',
+  c09 char(255) not null default '',
+  c10 char(255) not null default '',
+  c11 char(255) not null default '',
+  c12 char(255) not null default '',
+  c13 char(255) not null default '',
+  c14 char(255) not null default '',
+  c15 char(255) not null default '',
+  c16 char(255) not null default '',
+  c17 char(255) not null default '',
+  c18 char(255) not null default '',
+  c19 char(255) not null default '',
+  c20 char(255) not null default '',
+  c21 char(255) not null default '',
+  c22 char(255) not null default '',
+  c23 char(255) not null default '',
+  c24 char(255) not null default '',
+  c25 char(255) not null default '',
+  c26 char(255) not null default '',
+  c27 char(255) not null default '',
+  c28 char(255) not null default '',
+  c29 char(255) not null default ''
+) COMMENT='NDB_TABLE=NOLOGGING' ENGINE=ndbcluster partition by key(c0) partitions 1;
+
+let $batch = 200;
+
+## Load table...
+--echo Filling table with 15Gb of data
+disable_query_log;
+let $i = 0;
+let $lastgb = 0;
+while (`select (DATA_LENGTH / 1024 / 1024 / 1024) < 15 from INFORMATION_SCHEMA.PARTITIONS where table_name = 't1'`)
+{
+  let $b = $batch; # Number of values to INSERT per batch
+  let $separator = ;
+  let $sql = INSERT t1 (c0) VALUES;
+  while($b)
+  {
+    let $sql=$sql$separator($i*$batch + $b);
+    let $separator = ,;
+    dec $b;
+  }
+
+  --error 0,1297
+  eval $sql;
+  if (!$mysql_errno)
+  {
+    inc $i;
+  }
+
+  let $gb = `select round(DATA_LENGTH / 1024 / 1024 / 1024) from INFORMATION_SCHEMA.PARTITIONS where table_name = 't1'`;
+  if ($gb != $lastgb)
+  {
+    --echo $gb gb...
+    let $lastgb = $gb;
+  }
+}
+
+--echo Filling table up to 20Gb, expect error
+let $done = 0;
+while (!$done)
+{
+  let $b = $batch; # Number of values to INSERT per batch
+  let $separator = ;
+  let $sql = INSERT t1 (c0) VALUES;
+  while($b)
+  {
+    let $sql=$sql$separator($i*$batch + $b);
+    let $separator = ,;
+    dec $b;
+  }
+
+  --error 0,1114,1297
+  eval $sql;
+  if (!$mysql_errno)
+  {
+    inc $i;
+  }
+  if ($mysql_errno == 1114)
+  {
+    show warnings;
+    inc $done;
+  }
+  if (`select (DATA_LENGTH / 1024 / 1024 / 1024) >= 20 from INFORMATION_SCHEMA.PARTITIONS where table_name = 't1'`)
+  {
+    inc $done;
+    --echo 20g loaded!
+  }
+}
+enable_query_log;
+
+select count(*),max(c0)
+from t1;
+
+select (DATA_LENGTH / 1024 / 1024 / 1024)
+from INFORMATION_SCHEMA.PARTITIONS
+where table_name = 't1';
+
+--echo Clearing table
+disable_query_log;
+while ($i > 0)
+{
+  let $b = $batch; # 
+  let $separator = ;
+  let $sql = delete from t1 where c0 in (;
+  while($b)
+  {
+    let $sql=$sql$separator($i*$batch + $b);
+    let $separator = ,;
+    dec $b;
+  }
+  let $sql=$sql);
+  source run_query_with_retry.inc;
+
+  dec $i;
+}
+enable_query_log;
+
+drop table t1;
+
+## Test suceeded
+exit;

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2012-01-17 14:08:16 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2012-01-25 17:50:29 +0000
@@ -1,9 +1,51 @@
 include/master-slave.inc
 [connection master]
+Wrong schema for the table, too few pks
 create table t1 (a int key, X int) engine ndb;
 Warnings:
 Warning	1625	Bad schema for mysql.ndb_replication table. Message: Wrong number of primary key parts, expected 3
 drop table t1;
+Wrong schema for the table, incorrect pk
+create table t1 (a int key, X int) engine ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Missing or wrong type for column 'server_id'
+drop table t1;
+Wrong schema for the table, binlog_type is signed
+create table t1 (a int key, X int) engine ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Missing or wrong type for column 'binlog_type'
+drop table t1;
+Wrong schema for the table, conflict_fn is too long
+create table t1 (a int key, X int) engine ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Missing or wrong type for column 'conflict_fn'
+drop table t1;
+Correct schema for the table but no conflict fn
+insert into mysql.ndb_replication values ("test", "t1", 0, 7);
+create table test.t1 (a int primary key) engine=ndb;
+show warnings;
+Level	Code	Message
+drop table test.t1;
+MySQLD error output for server 1.1 matching pattern %NDB Binlog: logging%
+relevant
+[note] ndb binlog: logging ./test/t1 (full,use_update)
+delete from mysql.ndb_replication;
+Check that NULL uses server defaults
+show variables like 'ndb_log_update_as_write';
+Variable_name	Value
+ndb_log_update_as_write	ON
+show variables like 'ndb_log_updated_only';
+Variable_name	Value
+ndb_log_updated_only	ON
+insert into mysql.ndb_replication values ("test", "t1", 0, NULL);
+create table test.t1 (a int primary key) engine=ndb;
+show warnings;
+Level	Code	Message
+drop table test.t1;
+MySQLD error output for server 1.1 matching pattern %NDB Binlog: logging%
+relevant
+[note] ndb binlog: logging ./test/t1 (updated,use_write)
+Correct schema for the table but other errors
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 create table t1 (a int key, X int) engine ndb;
 ERROR HY000: Can't create table 'test.t1' (errno: 1626)
@@ -34,3 +76,249 @@ Level	Code	Message
 Warning	1626	Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
 Error	1005	Can't create table 'test.t1' (errno: 1626)
 delete from mysql.ndb_replication;
+show variables like 'server_id';
+Variable_name	Value
+server_id	1
+create database europenorth;
+create database europesouth;
+create database usnorth;
+create database ussouth;
+Basic wildcard tests
+Note that we put in bad conflict fn names so that the warnings generated
+when the table create fails give an indication of which ndb_replication
+table row was chosen.
+
+Wild serverid
+insert into mysql.ndb_replication values ("europenorth", "france", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("europenorth", "france", 0, NULL, "NDB$B()");
+Should match specific entry (1) with algorithm A.
+create table europenorth.france (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.france' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$A(), unknown conflict resolution function at 'NDB$A()'
+Error	1005	Can't create table 'europenorth.france' (errno: 1626)
+delete from mysql.ndb_replication where server_id=1;
+Should match generic entry (0) with algorthin B.
+create table europenorth.france (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.france' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'europenorth.france' (errno: 1626)
+delete from mysql.ndb_replication;
+Wild table_name
+insert into mysql.ndb_replication values ("europenorth", "fr_nce", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("europenorth", "%any", 1, NULL, "NDB$B()");
+Should match specific entry (fr_nce) with algorithm A.
+create table europenorth.france (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.france' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$A(), unknown conflict resolution function at 'NDB$A()'
+Error	1005	Can't create table 'europenorth.france' (errno: 1626)
+Should match specific entry (%any) with algorithm B.
+create table europenorth.germany (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.germany' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'europenorth.germany' (errno: 1626)
+Should match specific entry (%any) with algorithm B.
+create table europenorth.romany (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.romany' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'europenorth.romany' (errno: 1626)
+No match, should be fine
+create table europenorth.uk (a int primary key) engine=ndb;
+show warnings;
+Level	Code	Message
+drop table europenorth.uk;
+delete from mysql.ndb_replication;
+Wild db
+insert into mysql.ndb_replication values ("%north", "countries", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("%south", "countries", 1, NULL, "NDB$B()");
+Should match north with A
+create table europenorth.countries (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.countries' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$A(), unknown conflict resolution function at 'NDB$A()'
+Error	1005	Can't create table 'europenorth.countries' (errno: 1626)
+Should match north with A
+create table usnorth.countries (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'usnorth.countries' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$A(), unknown conflict resolution function at 'NDB$A()'
+Error	1005	Can't create table 'usnorth.countries' (errno: 1626)
+Should match south with B
+create table europesouth.countries (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europesouth.countries' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'europesouth.countries' (errno: 1626)
+Should match south with B
+create table ussouth.countries (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'ussouth.countries' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'ussouth.countries' (errno: 1626)
+delete from mysql.ndb_replication;
+Now test wildcard matching precedence
+Wildcards match in the following precedence (highest->lowest)
+Exact match
+Wild serverid
+Wild table_name
+Wild serverid + wild table_name
+Wild db
+Wild db + wild serverid
+Wild db + wild table_name
+All wild
+
+The 'wild serverid' is 0
+
+Multiple matches at the same precedence are ambiguous and result
+in an error.
+
+Start with full set of potential matches, and chip away
+insert into mysql.ndb_replication values ("europenorth", "netherlands", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("europenorth", "netherlands", 0, NULL, "NDB$B()");
+insert into mysql.ndb_replication values ("europenorth", "%lands", 1, NULL, "NDB$C()");
+insert into mysql.ndb_replication values ("europenorth", "nether%s", 1, NULL, "NDB$D()");
+insert into mysql.ndb_replication values ("europenorth", "%lands", 0, NULL, "NDB$E()");
+insert into mysql.ndb_replication values ("europenorth", "nether%s", 0, NULL, "NDB$F()");
+insert into mysql.ndb_replication values ("Europe%", "netherlands", 1, NULL, "NDB$G()");
+insert into mysql.ndb_replication values ("%North", "netherlands", 1, NULL, "NDB$H()");
+insert into mysql.ndb_replication values ("Europe%", "netherlands", 0, NULL, "NDB$I()");
+insert into mysql.ndb_replication values ("%North", "netherlands", 0, NULL, "NDB$J()");
+insert into mysql.ndb_replication values ("Europe%", "%lands", 1, NULL, "NDB$K()");
+insert into mysql.ndb_replication values ("%North", "nether%s", 1, NULL, "NDB$L()");
+insert into mysql.ndb_replication values ("Europe%", "%lands", 0, NULL, "NDB$M()");
+insert into mysql.ndb_replication values ("%North", "nether%s", 0, NULL, "NDB$N()");
+Unique match (A)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$A(), unknown conflict resolution function at 'NDB$A()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="europenorth" and table_name="netherlands" and server_id=1;
+Unique match with wild serverid (B)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$B(), unknown conflict resolution function at 'NDB$B()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="europenorth" and table_name="netherlands" and server_id=0;
+Ambiguous wild table name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="europenorth" and table_name="%lands" and server_id=1;
+Wild table name (D)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$D(), unknown conflict resolution function at 'NDB$D()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="europenorth" and table_name="nether%s" and server_id=1;
+Ambiguous wild server id and table name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="europenorth" and table_name="nether%s" and server_id=0;
+Wild server id and table name (E)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$E(), unknown conflict resolution function at 'NDB$E()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="europenorth" and table_name="%lands" and server_id=0;
+Amiguous wild db
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="Europe%" and table_name="netherlands" and server_id=1;
+Wild db (H)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$H(), unknown conflict resolution function at 'NDB$H()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="%North" and table_name="netherlands" and server_id=1;
+Ambiguous wild db + server_id
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="Europe%" and table_name="netherlands" and server_id=0;
+Wild db + server id (J)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$J(), unknown conflict resolution function at 'NDB$J()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="%North" and table_name="netherlands" and server_id=0;
+Ambiguous wild db + table_name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="Europe%" and table_name="%lands" and server_id=1;
+Wild db + table_name (L)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$L(), unknown conflict resolution function at 'NDB$L()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication where db="%North" and table_name="nether%s" and server_id=1;
+Ambiguous all wild
+create table europenorth.netherlands (a int primary key) engine=ndb;
+Warnings:
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+show warnings;
+Level	Code	Message
+Warning	1625	Bad schema for mysql.ndb_replication table. Message: Ambiguous matches in mysql.ndb_replication for europenorth.nethe
+drop table europenorth.netherlands;
+delete from mysql.ndb_replication where db="Europe%" and table_name="%lands" and server_id=0;
+All wild (N)
+create table europenorth.netherlands (a int primary key) engine=ndb;
+ERROR HY000: Can't create table 'europenorth.netherlands' (errno: 1626)
+show warnings;
+Level	Code	Message
+Warning	1626	Error in parsing conflict function. Message: NDB$N(), unknown conflict resolution function at 'NDB$N()'
+Error	1005	Can't create table 'europenorth.netherlands' (errno: 1626)
+delete from mysql.ndb_replication;
+drop database europenorth;
+drop database europesouth;
+drop database usnorth;
+drop database ussouth;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2012-01-17 13:51:35 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2012-01-25 17:13:13 +0000
@@ -6,8 +6,25 @@
 --source include/have_binlog_format_mixed_or_row.inc
 --source suite/ndb_rpl/ndb_master-slave.inc
 
+# Need suppressions on all servers where warnings/errors can be seen.
+--disable_query_log
+--connection server1
+call mtr.add_suppression("NDB: .*Bad schema for mysql.ndb_replication table.*");
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection server2
+call mtr.add_suppression("NDB: .*Bad schema for mysql.ndb_replication table.*");
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection default
+--enable_query_log
+
 #
-# wrong schema for the table
+--echo Wrong schema for the table, too few pks
 #
 --disable_warnings
 --disable_query_log
@@ -28,37 +45,129 @@ CREATE TABLE mysql.ndb_replication
 create table t1 (a int key, X int) engine ndb;
 drop table t1;
 
+
 #
-# correct schema for the table
-# but other errors
+--echo Wrong schema for the table, incorrect pk
 #
 --disable_warnings
 --disable_query_log
-drop table mysql.ndb_replication;
+drop table if exists mysql.ndb_replication;
 CREATE TABLE mysql.ndb_replication
   (db VARBINARY(63),
    table_name VARBINARY(63),
    server_id INT UNSIGNED,
    binlog_type INT UNSIGNED,
    conflict_fn VARBINARY(128),
+   PRIMARY KEY USING HASH (db,table_name,binlog_type))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+# gives warning when trying to create table as logging
+# may not be as intended
+create table t1 (a int key, X int) engine ndb;
+drop table t1;
+
+#
+--echo Wrong schema for the table, binlog_type is signed
+#
+
+--disable_warnings
+--disable_query_log
+drop table if exists mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT,
+   conflict_fn VARBINARY(128),
    PRIMARY KEY USING HASH (db,table_name,server_id))
   ENGINE=NDB PARTITION BY KEY(db,table_name);
 --enable_warnings
 --enable_query_log
 
-# Need suppressions on all servers where warnings/errors can be seen.
+# gives warning when trying to create table as logging
+# may not be as intended
+create table t1 (a int key, X int) engine ndb;
+drop table t1;
+
+#
+--echo Wrong schema for the table, conflict_fn is too long
+#
+--disable_warnings
 --disable_query_log
---connection server1
-call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
-call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
-call mtr.add_suppression("NDB Slave: .* missing function argument .*");
-call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
---connection server2
-call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
-call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
-call mtr.add_suppression("NDB Slave: .* missing function argument .*");
-call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
---connection default
+drop table mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT UNSIGNED,
+   conflict_fn VARBINARY(257),
+   PRIMARY KEY USING HASH (db,table_name,server_id))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+# gives warning when trying to create table as logging
+# may not be as intended
+create table t1 (a int key, X int) engine ndb;
+drop table t1;
+
+#
+--echo Correct schema for the table but no conflict fn
+#
+--disable_warnings
+--disable_query_log
+drop table mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT UNSIGNED,
+   PRIMARY KEY USING HASH (db,table_name,server_id))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+insert into mysql.ndb_replication values ("test", "t1", 0, 7);
+create table test.t1 (a int primary key) engine=ndb;
+show warnings;
+drop table test.t1;
+
+--let server_num=1.1
+--let $pattern=%NDB Binlog: logging%
+--let $limit=1
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+delete from mysql.ndb_replication;
+
+--echo Check that NULL uses server defaults
+show variables like 'ndb_log_update_as_write';
+show variables like 'ndb_log_updated_only';
+insert into mysql.ndb_replication values ("test", "t1", 0, NULL);
+
+create table test.t1 (a int primary key) engine=ndb;
+show warnings;
+drop table test.t1;
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+
+#
+--echo Correct schema for the table but other errors
+#
+--disable_warnings
+--disable_query_log
+drop table mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT UNSIGNED,
+   conflict_fn VARBINARY(128),
+   PRIMARY KEY USING HASH (db,table_name,server_id))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
 --enable_query_log
 
 # Non existant conflict_fn
@@ -99,6 +208,246 @@ create table t1 (a int key, X int) engin
 show warnings;
 delete from mysql.ndb_replication;
 
+show variables like 'server_id';
+
+create database europenorth;
+create database europesouth;
+create database usnorth;
+create database ussouth;
+
+--echo Basic wildcard tests
+--echo Note that we put in bad conflict fn names so that the warnings generated
+--echo when the table create fails give an indication of which ndb_replication
+--echo table row was chosen.
+--echo
+--echo Wild serverid
+insert into mysql.ndb_replication values ("europenorth", "france", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("europenorth", "france", 0, NULL, "NDB$B()");
+
+--echo Should match specific entry (1) with algorithm A.
+--error 1005
+create table europenorth.france (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where server_id=1;
+
+--echo Should match generic entry (0) with algorthin B.
+--error 1005
+create table europenorth.france (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication;
+
+--echo Wild table_name
+insert into mysql.ndb_replication values ("europenorth", "fr_nce", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("europenorth", "%any", 1, NULL, "NDB$B()");
+
+--echo Should match specific entry (fr_nce) with algorithm A.
+--error 1005
+create table europenorth.france (a int primary key) engine=ndb;
+show warnings;
+
+--echo Should match specific entry (%any) with algorithm B.
+--error 1005
+create table europenorth.germany (a int primary key) engine=ndb;
+show warnings;
+
+--echo Should match specific entry (%any) with algorithm B.
+--error 1005
+create table europenorth.romany (a int primary key) engine=ndb;
+show warnings;
+
+--echo No match, should be fine
+create table europenorth.uk (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.uk;
+
+delete from mysql.ndb_replication;
+
+--echo Wild db
+insert into mysql.ndb_replication values ("%north", "countries", 1, NULL, "NDB$A()");
+insert into mysql.ndb_replication values ("%south", "countries", 1, NULL, "NDB$B()");
+
+--echo Should match north with A
+--error 1005
+create table europenorth.countries (a int primary key) engine=ndb;
+show warnings;
+
+--echo Should match north with A
+--error 1005
+create table usnorth.countries (a int primary key) engine=ndb;
+show warnings;
+
+--echo Should match south with B
+--error 1005
+create table europesouth.countries (a int primary key) engine=ndb;
+show warnings;
+
+--echo Should match south with B
+--error 1005
+create table ussouth.countries (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication;
+
+--echo Now test wildcard matching precedence
+--echo Wildcards match in the following precedence (highest->lowest)
+--echo   Exact match
+--echo   Wild serverid
+--echo   Wild table_name
+--echo   Wild serverid + wild table_name
+--echo   Wild db
+--echo   Wild db + wild serverid
+--echo   Wild db + wild table_name
+--echo   All wild
+--echo
+--echo The 'wild serverid' is 0
+--echo
+--echo Multiple matches at the same precedence are ambiguous and result
+--echo in an error.
+--echo
+
+--echo Start with full set of potential matches, and chip away
+# Put in duplicate matches at every precedence
+# Delete rows to get one then no matches at each precedence.
+
+# Unique
+insert into mysql.ndb_replication values ("europenorth", "netherlands", 1, NULL, "NDB$A()");
+
+# Wild serverid
+insert into mysql.ndb_replication values ("europenorth", "netherlands", 0, NULL, "NDB$B()");
+
+# Wild table_name (two)
+insert into mysql.ndb_replication values ("europenorth", "%lands", 1, NULL, "NDB$C()");
+insert into mysql.ndb_replication values ("europenorth", "nether%s", 1, NULL, "NDB$D()");
+
+# Wild server_id and table_name
+insert into mysql.ndb_replication values ("europenorth", "%lands", 0, NULL, "NDB$E()");
+insert into mysql.ndb_replication values ("europenorth", "nether%s", 0, NULL, "NDB$F()");
+
+# Wild db
+insert into mysql.ndb_replication values ("Europe%", "netherlands", 1, NULL, "NDB$G()");
+insert into mysql.ndb_replication values ("%North", "netherlands", 1, NULL, "NDB$H()");
+
+# Wild db + Wild server_id
+
+insert into mysql.ndb_replication values ("Europe%", "netherlands", 0, NULL, "NDB$I()");
+insert into mysql.ndb_replication values ("%North", "netherlands", 0, NULL, "NDB$J()");
+
+# Wild db + Wild table_name
+
+insert into mysql.ndb_replication values ("Europe%", "%lands", 1, NULL, "NDB$K()");
+insert into mysql.ndb_replication values ("%North", "nether%s", 1, NULL, "NDB$L()");
+
+# All wild
+
+insert into mysql.ndb_replication values ("Europe%", "%lands", 0, NULL, "NDB$M()");
+insert into mysql.ndb_replication values ("%North", "nether%s", 0, NULL, "NDB$N()");
+
+
+--echo Unique match (A)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="netherlands" and server_id=1;
+
+--echo Unique match with wild serverid (B)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="netherlands" and server_id=0;
+
+--echo Ambiguous wild table name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="%lands" and server_id=1;
+
+--echo Wild table name (D)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="nether%s" and server_id=1;
+
+--echo Ambiguous wild server id and table name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="nether%s" and server_id=0;
+
+--echo Wild server id and table name (E)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="europenorth" and table_name="%lands" and server_id=0;
+
+--echo Amiguous wild db
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="Europe%" and table_name="netherlands" and server_id=1;
+
+--echo Wild db (H)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="%North" and table_name="netherlands" and server_id=1;
+
+--echo Ambiguous wild db + server_id
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="Europe%" and table_name="netherlands" and server_id=0;
+
+--echo Wild db + server id (J)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="%North" and table_name="netherlands" and server_id=0;
+
+--echo Ambiguous wild db + table_name
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="Europe%" and table_name="%lands" and server_id=1;
+
+--echo Wild db + table_name (L)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication where db="%North" and table_name="nether%s" and server_id=1;
+
+--echo Ambiguous all wild
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+drop table europenorth.netherlands;
+
+delete from mysql.ndb_replication where db="Europe%" and table_name="%lands" and server_id=0;
+
+--echo All wild (N)
+--error 1005
+create table europenorth.netherlands (a int primary key) engine=ndb;
+show warnings;
+
+delete from mysql.ndb_replication;
+
+drop database europenorth;
+drop database europesouth;
+drop database usnorth;
+drop database ussouth;
+
 --disable_query_log
 --sync_slave_with_master
 stop slave;

=== modified file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc'
--- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	2012-01-17 15:51:25 +0000
+++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	2012-01-25 17:50:29 +0000
@@ -2,9 +2,14 @@
 --echo MySQLD error output for server $server_num matching pattern $pattern
 create table errlog (a int auto_increment primary key, txt text) engine=myisam;
 
---eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog columns terminated by '\n' (txt);
+# Avoid win path separators being interpreted as escapes
+# for next char by having no escape char
+--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog columns terminated by '\n' escaped by '' (txt);
 
---eval select replace( lower( right(txt, length(txt) - locate('[',txt) + 1)), '\r', '') as relevant from errlog where txt like '$pattern' order by a desc limit $limit;
+--eval delete from errlog where txt not like '$pattern';
+# Trim time, win CRs and fix win path separators (or any other backslashes)
+update errlog set txt= replace(replace( lower( right(txt, length(txt) - locate('[',txt) + 1)), '\r', ''), '\\', '/');
+--eval select txt as relevant from errlog order by a desc limit $limit
 
 drop table errlog;
 --enable_query_log

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2012-01-19 13:31:51 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2012-01-26 12:07:58 +0000
@@ -63,6 +63,7 @@ void ndb_index_stat_restart();
 #include "ndb_event_data.h"
 #include "ndb_schema_object.h"
 #include "ndb_schema_dist.h"
+#include "ndb_repl_tab.h"
 
 /*
   Timeout for syncing schema events between
@@ -3893,18 +3894,6 @@ ndb_rep_event_name(String *event_name,co
 }
 
 #ifdef HAVE_NDB_BINLOG
-
-enum Ndb_binlog_type
-{
-  NBT_DEFAULT                   = 0
-  ,NBT_NO_LOGGING               = 1
-  ,NBT_UPDATED_ONLY             = 2
-  ,NBT_FULL                     = 3
-  ,NBT_USE_UPDATE               = 4 /* bit 0x4 indicates USE_UPDATE */
-  ,NBT_UPDATED_ONLY_USE_UPDATE  = NBT_UPDATED_ONLY | NBT_USE_UPDATE
-  ,NBT_FULL_USE_UPDATE          = NBT_FULL         | NBT_USE_UPDATE
-};
-
 static void 
 set_binlog_flags(NDB_SHARE *share,
                  Ndb_binlog_type ndb_binlog_type)
@@ -4800,288 +4789,6 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
   DBUG_RETURN(0);
 }
 
-static const char *ndb_rep_db= NDB_REP_DB;
-static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
-static const char *nrt_db= "db";
-static const char *nrt_table_name= "table_name";
-static const char *nrt_server_id= "server_id";
-static const char *nrt_binlog_type= "binlog_type";
-static const char *nrt_conflict_fn= "conflict_fn";
-
-/*
-   ndbcluster_read_replication_table
-
-   This function reads the information for the supplied table from
-   the mysql.ndb_replication table.
-   Where there is no information (or no table), defaults are
-   returned.
-*/
-int
-ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
-                                  const char* db,
-                                  const char* table_name,
-                                  uint server_id,
-                                  Uint32* binlog_flags,
-                                  char** conflict_fn_spec,
-                                  char* conflict_fn_buffer,
-                                  Uint32 conflict_fn_buffer_len)
-{
-  DBUG_ENTER("ndbcluster_read_replication_table");
-  NdbError ndberror;
-  int error= 0;
-  const char *error_str= "<none>";
-
-  ndb->setDatabaseName(ndb_rep_db);
-  NDBDICT *dict= ndb->getDictionary();
-  Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
-  const NDBTAB *reptab= ndbtab_g.get_table();
-  if (reptab == NULL &&
-      (dict->getNdbError().classification == NdbError::SchemaError ||
-       dict->getNdbError().code == 4009))
-  {
-    DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
-    *binlog_flags= NBT_DEFAULT;
-    *conflict_fn_spec= NULL;
-    DBUG_RETURN(0);
-  }
-  const NDBCOL
-    *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
-  char tmp_buf[FN_REFLEN];
-  uint retries= 100;
-  int retry_sleep= 30; /* 30 milliseconds, transaction */
-  if (reptab == NULL)
-  {
-    ndberror= dict->getNdbError();
-    goto err;
-  }
-  if (reptab->getNoOfPrimaryKeys() != 3)
-  {
-    error= -2;
-    error_str= "Wrong number of primary key parts, expected 3";
-    goto err;
-  }
-  error= -1;
-  col_db= reptab->getColumn(error_str= nrt_db);
-  if (col_db == NULL ||
-      !col_db->getPrimaryKey() ||
-      col_db->getType() != NDBCOL::Varbinary)
-    goto err;
-  col_table_name= reptab->getColumn(error_str= nrt_table_name);
-  if (col_table_name == NULL ||
-      !col_table_name->getPrimaryKey() ||
-      col_table_name->getType() != NDBCOL::Varbinary)
-    goto err;
-  col_server_id= reptab->getColumn(error_str= nrt_server_id);
-  if (col_server_id == NULL ||
-      !col_server_id->getPrimaryKey() ||
-      col_server_id->getType() != NDBCOL::Unsigned)
-    goto err;
-  col_binlog_type= reptab->getColumn(error_str= nrt_binlog_type);
-  if (col_binlog_type == NULL ||
-      col_binlog_type->getPrimaryKey() ||
-      col_binlog_type->getType() != NDBCOL::Unsigned)
-    goto err;
-  col_conflict_fn= reptab->getColumn(error_str= nrt_conflict_fn);
-  if (col_conflict_fn == NULL)
-  {
-    col_conflict_fn= NULL;
-  }
-  else if (col_conflict_fn->getPrimaryKey() ||
-           col_conflict_fn->getType() != NDBCOL::Varbinary)
-    goto err;
-
-  error= 0;
-  for (;;)
-  {
-    NdbTransaction *trans= ndb->startTransaction();
-    if (trans == NULL)
-    {
-      ndberror= ndb->getNdbError();
-      break;
-    }
-    NdbRecAttr *col_binlog_type_rec_attr[2];
-    NdbRecAttr *col_conflict_fn_rec_attr[2]= {NULL, NULL};
-    uint32 ndb_binlog_type[2];
-    const uint sz= 256;
-    char ndb_conflict_fn_buf[2*sz];
-    char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
-    NdbOperation *op[2];
-    uint32 i, id= 0;
-    /* Read generic row (server_id==0) and specific row (server_id == our id)
-     * from ndb_replication.
-     * Specific overrides generic, if present
-     */
-    for (i= 0; i < 2; i++)
-    {
-      NdbOperation *_op;
-      DBUG_PRINT("info", ("reading[%u]: %s,%s,%u", i, db, table_name, id));
-      if ((_op= trans->getNdbOperation(reptab)) == NULL) abort();
-      if (_op->readTuple(NdbOperation::LM_CommittedRead)) abort();
-      ndb_pack_varchar(col_db, tmp_buf, db, (int)strlen(db));
-      if (_op->equal(col_db->getColumnNo(), tmp_buf)) abort();
-      ndb_pack_varchar(col_table_name, tmp_buf, table_name, (int)strlen(table_name));
-      if (_op->equal(col_table_name->getColumnNo(), tmp_buf)) abort();
-      if (_op->equal(col_server_id->getColumnNo(), id)) abort();
-      if ((col_binlog_type_rec_attr[i]=
-           _op->getValue(col_binlog_type, (char *)&(ndb_binlog_type[i]))) == 0) abort();
-      /* optional columns */
-      if (col_conflict_fn)
-      {
-        if ((col_conflict_fn_rec_attr[i]=
-             _op->getValue(col_conflict_fn, ndb_conflict_fn[i])) == 0) abort();
-      }
-      id= server_id;
-      op[i]= _op;
-    }
-
-    if (trans->execute(NdbTransaction::Commit,
-                       NdbOperation::AO_IgnoreError))
-    {
-      if (ndb->getNdbError().status == NdbError::TemporaryError)
-      {
-        if (retries--)
-        {
-          if (trans)
-            ndb->closeTransaction(trans);
-          do_retry_sleep(retry_sleep);
-          continue;
-        }
-      }
-      ndberror= trans->getNdbError();
-      ndb->closeTransaction(trans);
-      break;
-    }
-    for (i= 0; i < 2; i++)
-    {
-      if (op[i]->getNdbError().code)
-      {
-        if (op[i]->getNdbError().classification == NdbError::NoDataFound)
-        {
-          col_binlog_type_rec_attr[i]= NULL;
-          col_conflict_fn_rec_attr[i]= NULL;
-          DBUG_PRINT("info", ("not found row[%u]", i));
-          continue;
-        }
-        ndberror= op[i]->getNdbError();
-        break;
-      }
-      DBUG_PRINT("info", ("found row[%u]", i));
-    }
-    if (col_binlog_type_rec_attr[1] == NULL ||
-        col_binlog_type_rec_attr[1]->isNULL())
-    {
-      /* No specific value, use generic */
-      col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
-      ndb_binlog_type[1]= ndb_binlog_type[0];
-    }
-    if (col_conflict_fn_rec_attr[1] == NULL ||
-        col_conflict_fn_rec_attr[1]->isNULL())
-    {
-      /* No specific value, use generic */
-      col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
-      ndb_conflict_fn[1]= ndb_conflict_fn[0];
-    }
-
-    if (col_binlog_type_rec_attr[1] == NULL ||
-        col_binlog_type_rec_attr[1]->isNULL())
-    {
-      DBUG_PRINT("info", ("No binlog flag value, using default"));
-      /* No value */
-      *binlog_flags= NBT_DEFAULT;
-    }
-    else
-    {
-      DBUG_PRINT("info", ("Taking binlog flag value from the table"));
-      *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
-    }
-
-    if (col_conflict_fn_rec_attr[1] == NULL ||
-        col_conflict_fn_rec_attr[1]->isNULL())
-    {
-      /* No conflict function */
-      *conflict_fn_spec = NULL;
-    }
-    else
-    {
-      const char* conflict_fn = ndb_conflict_fn[1];
-      uint len= 0;
-      switch (col_conflict_fn->getArrayType())
-      {
-      case NDBCOL::ArrayTypeShortVar:
-        len= *(uchar*)conflict_fn;
-        conflict_fn++;
-        break;
-      case NDBCOL::ArrayTypeMediumVar:
-        len= uint2korr(conflict_fn);
-        conflict_fn+= 2;
-        break;
-      default:
-        abort();
-      }
-      if ((len + 1) > conflict_fn_buffer_len)
-      {
-        ndb->closeTransaction(trans);
-        error= -2;
-        error_str= "Conflict function specification too long.";
-        goto err;
-      }
-      memcpy(conflict_fn_buffer, conflict_fn, len);
-      conflict_fn_buffer[len] = '\0';
-      *conflict_fn_spec = conflict_fn_buffer;
-    }
-
-    DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
-                        *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
-                                       "NULL")));
-
-    ndb->closeTransaction(trans);
-
-    DBUG_RETURN(0);
-  }
-
-err:
-  DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
-                      error, error_str, ndberror.code));
-  if (error < 0)
-  {
-    char msg[FN_REFLEN];
-    switch (error)
-    {
-      case -1:
-        my_snprintf(msg, sizeof(msg),
-                 "Missing or wrong type for column '%s'", error_str);
-        break;
-      case -2:
-        my_snprintf(msg, sizeof(msg), "%s", error_str);
-        break;
-      default:
-        abort();
-    }
-    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                        ER_NDB_REPLICATION_SCHEMA_ERROR,
-                        ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
-                        msg);
-  }
-  else
-  {
-    char msg[FN_REFLEN];
-    my_snprintf(tmp_buf, sizeof(tmp_buf), "ndberror %u", ndberror.code);
-    my_snprintf(msg, sizeof(msg), "Unable to retrieve %s.%s, logging and "
-             "conflict resolution may not function as intended (%s)",
-             ndb_rep_db, ndb_replication_table, tmp_buf);
-    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                        ER_ILLEGAL_HA_CREATE_OPTION,
-                        ER(ER_ILLEGAL_HA_CREATE_OPTION),
-                        ndbcluster_hton_name, msg);  
-  }
-  *binlog_flags= NBT_DEFAULT;
-  *conflict_fn_spec= NULL;
-
-  if (ndberror.code && opt_ndb_extra_logging)
-    thd_print_warning_list(thd, "NDB");
-  DBUG_RETURN(ndberror.code);
-}
-
 /*
   ndbcluster_get_binlog_replication_info
 
@@ -5126,48 +4833,54 @@ ndbcluster_get_binlog_replication_info(T
     }
   }
 
-  const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
-  char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
-  char* conflict_fn_spec;
-
-  if (ndbcluster_read_replication_table(thd,
-                                        ndb,
-                                        db,
-                                        table_name,
-                                        server_id,
-                                        binlog_flags,
-                                        &conflict_fn_spec,
-                                        conflict_fn_buffer,
-                                        MAX_CONFLICT_FN_SPEC_LEN) != 0)
+  Ndb_rep_tab_reader rep_tab_reader;
+
+  int rc = rep_tab_reader.lookup(ndb,
+                                 db,
+                                 table_name,
+                                 server_id);
+
+  const char* msg = rep_tab_reader.get_warning_message();
+  if (msg != NULL)
   {
-    DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
+    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+                        ER_NDB_REPLICATION_SCHEMA_ERROR,
+                        ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
+                        msg);
+    sql_print_warning("NDB Binlog: %s",
+                      msg);
   }
 
+  if (rc != 0)
+    DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
+
+  *binlog_flags= rep_tab_reader.get_binlog_flags();
+  const char* conflict_fn_spec= rep_tab_reader.get_conflict_fn_spec();
+
   if (conflict_fn_spec != NULL)
   {
-    char tmp_buf[FN_REFLEN];
-
+    char msgbuf[ FN_REFLEN ];
     if (parse_conflict_fn_spec(conflict_fn_spec,
                                conflict_fn,
                                args,
                                num_args,
-                               tmp_buf,
-                               sizeof(tmp_buf)) != 0)
+                               msgbuf,
+                               sizeof(msgbuf)) != 0)
     {
         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                           ER_CONFLICT_FN_PARSE_ERROR,
                           ER(ER_CONFLICT_FN_PARSE_ERROR),
-                          tmp_buf);
+                          msgbuf);
 
       /*
-         Log as well, useful for contexts where the thd's stack of
-         warnings are ignored
-       */
+        Log as well, useful for contexts where the thd's stack of
+        warnings are ignored
+      */
       if (opt_ndb_extra_logging)
       {
         sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
                           db, table_name,
-                          tmp_buf);
+                          msgbuf);
       }
 
       DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);

=== modified file 'sql/ha_ndbcluster_cond.cc'
--- a/sql/ha_ndbcluster_cond.cc	2012-01-19 12:28:47 +0000
+++ b/sql/ha_ndbcluster_cond.cc	2012-01-24 14:02:05 +0000
@@ -497,7 +497,14 @@ ndb_serialize_cond(const Item *item, voi
           {
             Ndb_expect_stack* expect_next= new Ndb_expect_stack();
             DBUG_PRINT("info", ("LIKE_FUNC"));      
-            curr_cond->ndb_item= new Ndb_item(func_item->functype(),
+
+            if (((Item_func_like *)func_item)->escape_was_used_in_parsing())
+            {
+              DBUG_PRINT("info", ("LIKE expressions with ESCAPE not supported"));
+              context->supported= FALSE;
+            }
+            
+             curr_cond->ndb_item= new Ndb_item(func_item->functype(),
                                               func_item);      
 
             /*

=== modified file 'sql/item_cmpfunc.h'
--- a/sql/item_cmpfunc.h	2011-12-15 07:32:59 +0000
+++ b/sql/item_cmpfunc.h	2012-01-24 14:02:05 +0000
@@ -1435,6 +1435,7 @@ public:
   const char *func_name() const { return "like"; }
   bool fix_fields(THD *thd, Item **ref);
   void cleanup();
+  bool escape_was_used_in_parsing() { return escape_used_in_parsing; }
 };
 
 

=== added file 'sql/ndb_repl_tab.cc'
--- a/sql/ndb_repl_tab.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_repl_tab.cc	2012-01-25 17:50:29 +0000
@@ -0,0 +1,560 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ha_ndbcluster_tables.h"
+#include "ndb_repl_tab.h"
+
+#ifdef HAVE_NDB_BINLOG
+#include "ha_ndbcluster_glue.h"
+#include "ha_ndbcluster_connection.h"  /* do_retry_sleep() */
+#include "ndb_table_guard.h"
+#include "ndb_share.h"
+
+Ndb_rep_tab_key::Ndb_rep_tab_key(const char* _db,
+                                 const char* _table_name,
+                                 uint _server_id)
+{
+  uint db_len= (uint) strlen(_db);
+  uint tabname_len = (uint) strlen(_table_name);
+  assert(DB_MAXLEN < 256); /* Fits in Varchar */
+  assert(db_len <= DB_MAXLEN);
+  assert(tabname_len <= TABNAME_MAXLEN);
+
+  memcpy(&db[1], _db, db_len);
+  db[ 0 ]= db_len;
+
+  memcpy(&table_name[1], _table_name, tabname_len);
+  table_name[ 0 ]= tabname_len;
+
+  server_id= _server_id;
+
+  null_terminate_strings();
+}
+
+void Ndb_rep_tab_key::null_terminate_strings()
+{
+  assert((uint) db[0] <= DB_MAXLEN);
+  assert((uint) table_name[0] <= TABNAME_MAXLEN);
+  db[ db[0] + 1] = '\0';
+  table_name[ table_name[0] + 1] = '\0';
+}
+
+int
+Ndb_rep_tab_key::attempt_match(const char* keyptr,
+                               const uint keylen,
+                               const char* candidateptr,
+                               const uint candidatelen,
+                               const int exactmatchvalue)
+{
+  if (my_strnncoll(system_charset_info,
+                   (const uchar*) keyptr,
+                   keylen,
+                   (const uchar*) candidateptr,
+                   candidatelen) == 0)
+  {
+    /* Exact match */
+    return exactmatchvalue;
+  }
+  else if (my_wildcmp(system_charset_info,
+                      keyptr,
+                      keyptr + keylen,
+                      candidateptr,
+                      candidateptr + candidatelen,
+                      '\\', wild_one, wild_many) == 0)
+  {
+    /* Wild match */
+    return 0;
+  }
+
+  /* No match */
+  return -1;
+};
+
+int
+Ndb_rep_tab_key::get_match_quality(const Ndb_rep_tab_key* key,
+                                   const Ndb_rep_tab_key* candidate_row)
+{
+  /* 0= No match
+     1= Loosest match
+     8= Best match
+
+     Actual mapping is :
+     db    table    serverid  Quality
+     W     W        W         1
+     W     W        =         2
+     W     =        W         3
+     W     =        =         4
+     =     W        W         5
+     =     W        =         6
+     =     =        W         7
+     =     =        =         8
+  */
+  int quality = MIN_MATCH_VAL;
+
+  int rc;
+  if ((rc = attempt_match(&key->db[1],
+                          key->db[0],
+                          &candidate_row->db[1],
+                          candidate_row->db[0],
+                          EXACT_MATCH_DB)) == -1)
+  {
+    /* No match, drop out now */
+    return 0;
+  }
+  quality+= rc;
+
+  if ((rc = attempt_match(&key->table_name[1],
+                          key->table_name[0],
+                          &candidate_row->table_name[1],
+                          candidate_row->table_name[0],
+                          EXACT_MATCH_TABLE_NAME)) == -1)
+  {
+    /* No match, drop out now */
+    return 0;
+  }
+  quality+= rc;
+
+  if (candidate_row->server_id == key->server_id)
+  {
+    /* Exact match */
+    quality += EXACT_MATCH_SERVER_ID;
+  }
+  else if (candidate_row->server_id != 0)
+  {
+    /* No match */
+    return 0;
+  }
+
+  return quality;
+};
+
+Ndb_rep_tab_row::Ndb_rep_tab_row()
+  : binlog_type(0), cfs_is_null(true)
+{
+  memset(conflict_fn_spec, 0, sizeof(conflict_fn_spec));
+}
+const char* Ndb_rep_tab_reader::ndb_rep_db= NDB_REP_DB;
+const char* Ndb_rep_tab_reader::ndb_replication_table= NDB_REPLICATION_TABLE;
+const char* Ndb_rep_tab_reader::nrt_db= "db";
+const char* Ndb_rep_tab_reader::nrt_table_name= "table_name";
+const char* Ndb_rep_tab_reader::nrt_server_id= "server_id";
+const char* Ndb_rep_tab_reader::nrt_binlog_type= "binlog_type";
+const char* Ndb_rep_tab_reader::nrt_conflict_fn= "conflict_fn";
+
+Ndb_rep_tab_reader::Ndb_rep_tab_reader()
+  : binlog_flags(NBT_DEFAULT),
+    conflict_fn_spec(NULL),
+    warning_msg(NULL)
+{
+}
+
+int Ndb_rep_tab_reader::check_schema(const NdbDictionary::Table* reptab,
+                                     NdbDictionary::Dictionary* dict,
+                                     const char** error_str)
+{
+  DBUG_ENTER("check_schema");
+  *error_str= NULL;
+
+  const NdbDictionary::Column
+    *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
+  if (reptab->getNoOfPrimaryKeys() != 3)
+  {
+    *error_str= "Wrong number of primary key parts, expected 3";
+    DBUG_RETURN(-2);
+  }
+  col_db= reptab->getColumn(*error_str= nrt_db);
+  if (col_db == NULL ||
+      !col_db->getPrimaryKey() ||
+      col_db->getType() != NdbDictionary::Column::Varbinary)
+    DBUG_RETURN(-1);
+  col_table_name= reptab->getColumn(*error_str= nrt_table_name);
+  if (col_table_name == NULL ||
+      !col_table_name->getPrimaryKey() ||
+      col_table_name->getType() != NdbDictionary::Column::Varbinary)
+    DBUG_RETURN(-1);
+  col_server_id= reptab->getColumn(*error_str= nrt_server_id);
+  if (col_server_id == NULL ||
+      !col_server_id->getPrimaryKey() ||
+      col_server_id->getType() != NdbDictionary::Column::Unsigned)
+    DBUG_RETURN(-1);
+  col_binlog_type= reptab->getColumn(*error_str= nrt_binlog_type);
+  if (col_binlog_type == NULL ||
+      col_binlog_type->getPrimaryKey() ||
+      col_binlog_type->getType() != NdbDictionary::Column::Unsigned)
+    DBUG_RETURN(-1);
+  col_conflict_fn= reptab->getColumn(*error_str= nrt_conflict_fn);
+  if (col_conflict_fn != NULL)
+  {
+    if ((col_conflict_fn->getPrimaryKey()) ||
+        (col_conflict_fn->getType() != NdbDictionary::Column::Varbinary))
+      DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+Ndb_rep_tab_reader::scan_candidates(Ndb* ndb,
+                                    const NdbDictionary::Table* reptab,
+                                    const char* db,
+                                    const char* table_name,
+                                    uint server_id,
+                                    Ndb_rep_tab_row& best_match)
+{
+  uint retries= 100;
+  int retry_sleep= 30; /* 30 milliseconds, transaction */
+  int best_match_quality= 0;
+  NdbError ok;
+  NdbError ndberror;
+
+  /* Loop to enable temporary error retries */
+  while(true)
+  {
+    ndberror = ok; /* reset */
+    NdbTransaction *trans= ndb->startTransaction();
+    if (trans == NULL)
+    {
+      ndberror= ndb->getNdbError();
+
+      if (ndberror.status == NdbError::TemporaryError)
+      {
+        if (retries--)
+        {
+          do_retry_sleep(retry_sleep);
+          continue;
+        }
+      }
+      break;
+    }
+    NdbRecAttr* ra_binlog_type= NULL;
+    NdbRecAttr* ra_conflict_fn_spec= NULL;
+    Ndb_rep_tab_row row;
+    bool have_conflict_fn_col = (reptab->getColumn(nrt_conflict_fn) != NULL);
+
+    /* Define scan op on ndb_replication */
+    NdbScanOperation* scanOp = trans->getNdbScanOperation(reptab);
+    if (scanOp == NULL) { ndberror= trans->getNdbError(); break; }
+
+    if ((scanOp->readTuples(NdbScanOperation::LM_CommittedRead) != 0) ||
+        (scanOp->getValue(nrt_db, (char*) row.key.db) == NULL) ||
+        (scanOp->getValue(nrt_table_name, (char*) row.key.table_name) == NULL) ||
+        (scanOp->getValue(nrt_server_id, (char*) &row.key.server_id) == NULL) ||
+        ((ra_binlog_type = scanOp->getValue(nrt_binlog_type, (char*) &row.binlog_type)) == NULL) ||
+        (have_conflict_fn_col &&
+         ((ra_conflict_fn_spec=
+           scanOp->getValue(nrt_conflict_fn, (char*) row.conflict_fn_spec)) == NULL)))
+    {
+      ndberror= scanOp->getNdbError();
+      break;
+    }
+
+    if (trans->execute(NdbTransaction::NoCommit,
+                       NdbOperation::AO_IgnoreError))
+    {
+      ndberror= trans->getNdbError();
+      ndb->closeTransaction(trans);
+
+      if (ndberror.status == NdbError::TemporaryError)
+      {
+        if (retries--)
+        {
+          do_retry_sleep(retry_sleep);
+          continue;
+        }
+      }
+      break;
+    }
+
+    /* Scroll through results, looking for best match */
+    DBUG_PRINT("info", ("Searching ndb_replication for %s.%s %u",
+                        db, table_name, server_id));
+
+    bool ambiguous_match = false;
+    Ndb_rep_tab_key searchkey(db, table_name, server_id);
+    int scan_rc;
+    while ((scan_rc= scanOp->nextResult(true)) == 0)
+    {
+      if (ra_binlog_type->isNULL() == 1)
+      {
+        row.binlog_type= NBT_DEFAULT;
+      }
+      if (ra_conflict_fn_spec)
+      {
+        row.set_conflict_fn_spec_null(ra_conflict_fn_spec->isNULL() == 1);
+      }
+
+      /* Compare row to searchkey to get quality of match */
+      int match_quality= Ndb_rep_tab_key::get_match_quality(&searchkey,
+                                                            &row.key);
+#ifndef DBUG_OFF
+      {
+        row.null_terminate_strings();
+
+        DBUG_PRINT("info", ("Candidate : %s.%s %u : %u %s"
+                            " Match quality : %u.",
+                            row.key.get_db(),
+                            row.key.get_table_name(),
+                            row.key.server_id,
+                            row.binlog_type,
+                            row.get_conflict_fn_spec(),
+                            match_quality));
+      }
+#endif
+
+      if (match_quality > 0)
+      {
+        if (match_quality == best_match_quality)
+        {
+          ambiguous_match = true;
+          /* Ambiguous matches...*/
+          my_snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
+                      "Ambiguous matches in %s.%s for %s.%s (%u)."
+                      "Candidates : %s.%s (%u), %s.%s (%u).",
+                      ndb_rep_db, ndb_replication_table,
+                      db, table_name, server_id,
+                      &best_match.key.db[1],
+                      &best_match.key.table_name[1],
+                      best_match.key.server_id,
+                      &row.key.db[1],
+                      &row.key.table_name[1],
+                      row.key.server_id);
+          DBUG_PRINT("info", ("%s", warning_msg_buffer));
+        }
+        if (match_quality > best_match_quality)
+        {
+          /* New best match */
+          best_match= row;
+          best_match_quality = match_quality;
+          ambiguous_match = false;
+
+          if (best_match_quality == Ndb_rep_tab_key::EXACT_MATCH_QUALITY)
+          {
+            /* We're done */
+            break;
+          }
+        }
+      } /* if (match_quality > 0) */
+    } /* while ((scan_rc= scanOp->nextResult(true)) */
+
+    if (scan_rc < 0)
+    {
+      ndberror= scanOp->getNdbError();
+      if (ndberror.status == NdbError::TemporaryError)
+      {
+        if (retries--)
+        {
+          ndb->closeTransaction(trans);
+          do_retry_sleep(retry_sleep);
+          continue;
+        }
+      }
+    }
+
+    ndb->closeTransaction(trans);
+
+    if (ambiguous_match)
+    {
+      warning_msg= warning_msg_buffer;
+      best_match_quality = -1;
+    }
+
+    break;
+  } /* while(true) */
+
+  if (ndberror.code != 0)
+  {
+    my_snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
+                "Unable to retrieve %s.%s, logging and "
+                "conflict resolution may not function "
+                "as intended (ndberror %u)",
+                ndb_rep_db, ndb_replication_table,
+                ndberror.code);
+    warning_msg= warning_msg_buffer;
+    best_match_quality = -1;
+  }
+
+  return best_match_quality;
+}
+
+int
+Ndb_rep_tab_reader::lookup(Ndb* ndb,
+                           /* Keys */
+                           const char* db,
+                           const char* table_name,
+                           uint server_id)
+{
+  DBUG_ENTER("lookup");
+  int error= 0;
+  NdbError ndberror;
+  const char *error_str= "<none>";
+
+  /* Set results to defaults */
+  binlog_flags= NBT_DEFAULT;
+  conflict_fn_spec= NULL;
+  warning_msg= NULL;
+
+  ndb->setDatabaseName(ndb_rep_db);
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
+  Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
+  const NdbDictionary::Table *reptab= ndbtab_g.get_table();
+
+  do
+  {
+    if (reptab == NULL)
+    {
+      if (dict->getNdbError().classification == NdbError::SchemaError ||
+          dict->getNdbError().code == 4009)
+      {
+        DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
+        DBUG_RETURN(0);
+      }
+      else
+      {
+        error= 0;
+        ndberror= dict->getNdbError();
+        break;
+      }
+    }
+
+    if ((error= check_schema(reptab,
+                             dict,
+                             &error_str)) != 0)
+    {
+      DBUG_PRINT("info", ("check_schema failed : %u, error_str : %s",
+                          error, error_str));
+      break;
+    }
+
+
+    Ndb_rep_tab_row best_match_row;
+
+    int best_match_quality = scan_candidates(ndb,
+                                             reptab,
+                                             db,
+                                             table_name,
+                                             server_id,
+                                             best_match_row);
+
+    DBUG_PRINT("info", ("Best match at quality : %u", best_match_quality));
+
+    if (best_match_quality == -1)
+    {
+      /* Problem in matching, message already set */
+      assert(warning_msg != NULL);
+      error= -3;
+      break;
+    }
+    if (best_match_quality == 0)
+    {
+      /* No match : Use defaults */
+    }
+    else
+    {
+      /* Have a matching row, copy out values */
+      /* Ensure VARCHARs are usable as strings */
+      best_match_row.null_terminate_strings();
+
+      binlog_flags= (enum Ndb_binlog_type) best_match_row.binlog_type;
+
+      if (best_match_row.cfs_is_null)
+      {
+        DBUG_PRINT("info", ("Conflict FN SPEC is Null"));
+        /* No conflict fn spec */
+        conflict_fn_spec= NULL;
+      }
+      else
+      {
+        const char* conflict_fn = best_match_row.get_conflict_fn_spec();
+        uint len= (uint) strlen(conflict_fn);
+        if ((len + 1) > sizeof(conflict_fn_buffer))
+        {
+          error= -2;
+          error_str= "Conflict function specification too long.";
+          break;
+        }
+        memcpy(conflict_fn_buffer, conflict_fn, len);
+        conflict_fn_buffer[len] = '\0';
+        conflict_fn_spec = conflict_fn_buffer;
+      }
+    }
+  } while(0);
+
+  /* Error handling */
+  if (error == 0)
+  {
+    if (ndberror.code != 0)
+    {
+      my_snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
+                  "Unable to retrieve %s.%s, logging and "
+                  "conflict resolution may not function "
+                  "as intended (ndberror %u)",
+                  ndb_rep_db, ndb_replication_table,
+                  ndberror.code);
+      warning_msg= warning_msg_buffer;
+      error= -4;
+    }
+  }
+  else
+  {
+    switch (error)
+    {
+    case -1:
+      my_snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
+                  "Missing or wrong type for column '%s'", error_str);
+      break;
+    case -2:
+      my_snprintf(warning_msg_buffer, sizeof(warning_msg_buffer), "%s", error_str);
+      break;
+    case -3:
+      /* Message already set */
+      break;
+    default:
+      abort();
+    }
+    warning_msg= warning_msg_buffer;
+    error= 0; /* No real error, just use defaults */
+  }
+
+  DBUG_PRINT("info", ("Rc : %d Retrieved Binlog flags : %u and function spec : %s",
+                      error, binlog_flags, (conflict_fn_spec != NULL ?conflict_fn_spec:
+                                             "NULL")));
+
+  DBUG_RETURN(error);
+};
+
+Uint32
+Ndb_rep_tab_reader::get_binlog_flags() const
+{
+  return binlog_flags;
+}
+
+const char*
+Ndb_rep_tab_reader::get_conflict_fn_spec() const
+{
+  return conflict_fn_spec;
+}
+
+const char*
+Ndb_rep_tab_reader::get_warning_message() const
+{
+  return warning_msg;
+}
+
+
+/* #ifdef HAVE_NDB_BINLOG */
+
+#endif

=== added file 'sql/ndb_repl_tab.h'
--- a/sql/ndb_repl_tab.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_repl_tab.h	2012-01-25 17:13:13 +0000
@@ -0,0 +1,254 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_REPL_TAB_H
+#define NDB_REPL_TAB_H
+
+#include <my_global.h>
+
+#ifdef HAVE_NDB_BINLOG
+#include <mysql_com.h>  /* NAME_CHAR_LEN */
+#include <ndbapi/NdbApi.hpp>
+
+/*
+  Ndb_rep_tab_key
+
+  This class represents the key columns of the
+  mysql.ndb_replication system table
+  It is used when reading values from that table
+*/
+class Ndb_rep_tab_key
+{
+public:
+  static const uint DB_MAXLEN= NAME_CHAR_LEN - 1;
+  static const uint TABNAME_MAXLEN= NAME_CHAR_LEN - 1;
+
+  /* Char arrays in varchar format with 1 length byte and
+   * trailing 0
+   */
+  char db[ DB_MAXLEN + 2 ];
+  char table_name[ TABNAME_MAXLEN + 2 ];
+  uint server_id;
+
+  Ndb_rep_tab_key()
+  {
+    db[0] = 0;
+    table_name[0] = 0;
+    server_id = 0;
+  }
+
+  /* Constructor from normal null terminated strings */
+  Ndb_rep_tab_key(const char* _db,
+                  const char* _table_name,
+                  uint _server_id);
+
+  /* Add null terminators to VARCHAR format string values */
+  void null_terminate_strings();
+
+  const char* get_db() const
+  {
+    return &db[1];
+  };
+
+  const char* get_table_name() const
+  {
+    return &table_name[1];
+  };
+
+  static const int MIN_MATCH_VAL = 1;
+  static const int EXACT_MATCH_DB = 4;
+  static const int EXACT_MATCH_TABLE_NAME = 2;
+  static const int EXACT_MATCH_SERVER_ID = 1;
+
+  static const int EXACT_MATCH_QUALITY =
+    MIN_MATCH_VAL +
+    EXACT_MATCH_DB +
+    EXACT_MATCH_TABLE_NAME +
+    EXACT_MATCH_SERVER_ID;
+
+  /*
+    This static method attempts an exact, then a wild
+    match between the passed key (with optional wild
+    characters), and the passed candidate row
+    returns :
+     1  : Exact match
+     0  : Wild match
+     -1 : No match
+  */
+  static int attempt_match(const char* keyptr,
+                           const uint keylen,
+                           const char* candidateptr,
+                           const uint candidatelen,
+                           const int exactmatchvalue);
+
+  /* This static method compares a fixed key value with
+   * a possibly wildcard containing candidate_row.
+   * If there is no match, 0 is returned.
+   * >0 means there is a match, with larger numbers
+   * indicating a better match quality.
+   * An exact match returns EXACT_MATCH_QUALITY
+   */
+  static int get_match_quality(const Ndb_rep_tab_key* key,
+                               const Ndb_rep_tab_key* candidate_row);
+};
+
+/*
+  Ndb_rep_tab_row
+
+  This class represents a row in the mysql.ndb_replication table
+*/
+class Ndb_rep_tab_row
+{
+public:
+  static const uint MAX_CONFLICT_FN_SPEC_LEN = 255;
+  static const uint CONFLICT_FN_SPEC_BUF_LEN =
+    MAX_CONFLICT_FN_SPEC_LEN + 1; /* Trailing '\0' */
+
+  Ndb_rep_tab_key key;
+  uint binlog_type;
+  bool cfs_is_null;
+  /* Buffer has space for leading length byte */
+  char conflict_fn_spec[ CONFLICT_FN_SPEC_BUF_LEN + 1 ];
+
+  Ndb_rep_tab_row();
+
+  void null_terminate_strings()
+  {
+    key.null_terminate_strings();
+    uint speclen= 0;
+    speclen = conflict_fn_spec[0];
+
+    assert(speclen <= MAX_CONFLICT_FN_SPEC_LEN);
+    conflict_fn_spec[1 + speclen] = '\0';
+  }
+
+  const char* get_conflict_fn_spec()
+  {
+    return &conflict_fn_spec[1];
+  }
+
+  void set_conflict_fn_spec_null(bool null)
+  {
+    if (null)
+    {
+      cfs_is_null = true;
+      conflict_fn_spec[0] = 0;
+      conflict_fn_spec[1] = 0;
+    }
+    else
+    {
+      cfs_is_null = false;
+    }
+  }
+};
+
+/**
+   Ndb_rep_tab_reader
+
+   A helper class for accessing the mysql.ndb_replication
+   table
+*/
+class Ndb_rep_tab_reader
+{
+private:
+  static const char *ndb_rep_db;
+  static const char *ndb_replication_table;
+  static const char *nrt_db;
+  static const char *nrt_table_name;
+  static const char *nrt_server_id;
+  static const char *nrt_binlog_type;
+  static const char *nrt_conflict_fn;
+
+  Uint32 binlog_flags;
+  char conflict_fn_buffer[ Ndb_rep_tab_row::CONFLICT_FN_SPEC_BUF_LEN ];
+  char warning_msg_buffer[ FN_REFLEN ];
+
+  const char* conflict_fn_spec;
+  const char* warning_msg;
+
+  /**
+     check_schema
+
+     Checks that the schema of the mysql.ndb_replication table
+     is acceptable.
+     Returns
+     0 if ok
+     -1 if a column has an error.  Col name in error_str
+     -2 if there's a more general error.  Error description in
+        error_str
+  */
+  static
+  int check_schema(const NdbDictionary::Table* reptab,
+                   NdbDictionary::Dictionary* dict,
+                   const char** error_str);
+
+  /**
+     scan_candidates
+
+     Scans the ndb_replication table for rows matching the
+     passed db, table_name, server_id triple.
+     Returns the quality of the match made.
+
+     -1 = Error in processing, see msg
+     0 = No match, use defaults.
+     >0 = Use data in best_match
+
+     if msg is set on return it contains a warning.
+     Warnings may be produces in non error scenarios
+  */
+  int scan_candidates(Ndb* ndb,
+                      const NdbDictionary::Table* reptab,
+                      const char* db,
+                      const char* table_name,
+                      uint server_id,
+                      Ndb_rep_tab_row& best_match);
+public:
+  Ndb_rep_tab_reader();
+  ~Ndb_rep_tab_reader() {};
+
+  /**
+     lookup
+
+     lookup scans the mysql.ndb_replication table for
+     the best matching entry for the supplied db,
+     table_name, server_id triple.
+     A buffer for the conflict_fn spec, and for any
+     error or warning messages must be supplied.
+     The passed binlog_flags, conflict_fn_spec and
+     message may be updated as a result
+
+     Returns :
+       0  : Success.
+       <0 : Error.
+  */
+  int lookup(Ndb* ndb,
+             /* Keys */
+             const char* db,
+             const char* table_name,
+             uint server_id);
+
+  /* Following only valid after a call to lookup() */
+  Uint32 get_binlog_flags() const;
+  const char* get_conflict_fn_spec() const;
+  const char* get_warning_message() const;
+};
+
+/* #ifdef HAVE_NDB_BINLOG */
+#endif
+
+/* #ifdef NDB_REPL_TAB_H */
+#endif

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2012-01-17 14:08:16 +0000
+++ b/sql/ndb_share.h	2012-01-25 17:50:29 +0000
@@ -46,6 +46,17 @@ enum enum_conflict_fn_type
 };
 
 #ifdef HAVE_NDB_BINLOG
+enum Ndb_binlog_type
+{
+  NBT_DEFAULT                   = 0
+  ,NBT_NO_LOGGING               = 1
+  ,NBT_UPDATED_ONLY             = 2
+  ,NBT_FULL                     = 3
+  ,NBT_USE_UPDATE               = 4 /* bit 0x4 indicates USE_UPDATE */
+  ,NBT_UPDATED_ONLY_USE_UPDATE  = NBT_UPDATED_ONLY | NBT_USE_UPDATE
+  ,NBT_FULL_USE_UPDATE          = NBT_FULL         | NBT_USE_UPDATE
+};
+
 static const Uint32 MAX_CONFLICT_ARGS= 8;
 
 enum enum_conflict_fn_arg_type

=== modified file 'sql/ndb_table_guard.h'
--- a/sql/ndb_table_guard.h	2011-03-08 14:40:38 +0000
+++ b/sql/ndb_table_guard.h	2012-01-25 17:50:29 +0000
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2011,2012 Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -81,5 +81,5 @@ private:
   int m_invalidate;
 };
 
+/* NDB_TABLE_GUARD_H */
 #endif
-

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2012-01-11 12:33:50 +0000
+++ b/storage/ndb/CMakeLists.txt	2012-01-26 12:07:58 +0000
@@ -88,6 +88,7 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ndb_schema_dist.cc
   ../../sql/ndb_component.cc
   ../../sql/ndb_local_schema.cc
+  ../../sql/ndb_repl_tab.cc
 )
 
 # Include directories used when building ha_ndbcluster

=== modified file 'storage/ndb/include/kernel/kernel_types.h'
--- a/storage/ndb/include/kernel/kernel_types.h	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/kernel/kernel_types.h	2012-01-28 10:11:10 +0000
@@ -77,6 +77,13 @@ struct Local_key
   static bool isInvalid(Uint32 lk1, Uint32 lk2) {
     return ref(lk1, lk2) == ~Uint32(0);
   }
+
+  /**
+   * Can the local key be saved in one Uint32
+   */
+  static bool isShort(Uint32 pageId) {
+    return pageId < (1 << (32 - MAX_TUPLES_BITS));
+  }
 };
 
 class NdbOut&

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	2012-01-19 13:31:51 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h	2012-01-31 06:37:19 +0000
@@ -203,20 +203,19 @@
 #define NDB_DEFAULT_LOG_PARTS 4
 
 #if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
-#define NDB_MAX_LOG_PARTS 4
-#define MAX_NDBMT_TC_THREADS  2
-#define MAX_NDBMT_RECEIVE_THREADS 1
-#define MAX_THREADS_IN_BLOCK 4
+#define NDB_MAX_LOG_PARTS          4
+#define MAX_NDBMT_TC_THREADS       2
+#define MAX_NDBMT_RECEIVE_THREADS  1
+#define MAX_NDBMT_SEND_THREADS     0
 #else
-#define NDB_MAX_LOG_PARTS 16
-#define MAX_NDBMT_TC_THREADS  16
-#define MAX_NDBMT_RECEIVE_THREADS 8
-#define MAX_THREADS_IN_BLOCK 16
+#define NDB_MAX_LOG_PARTS         16
+#define MAX_NDBMT_TC_THREADS      16
+#define MAX_NDBMT_RECEIVE_THREADS  8
+#define MAX_NDBMT_SEND_THREADS     8
 #endif
 
 #define MAX_NDBMT_LQH_WORKERS NDB_MAX_LOG_PARTS
 #define MAX_NDBMT_LQH_THREADS NDB_MAX_LOG_PARTS
-#define MAX_NDBMT_SEND_THREADS 8
 
 #define NDB_FILE_BUFFER_SIZE (256*1024)
 

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-12-12 12:49:43 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2012-01-31 06:37:19 +0000
@@ -206,6 +206,7 @@
 #define CFG_NODE_ARBIT_RANK           200
 #define CFG_NODE_ARBIT_DELAY          201
 #define CFG_RESERVED_SEND_BUFFER_MEMORY 202
+#define CFG_EXTRA_SEND_BUFFER_MEMORY  203
 
 #define CFG_MIN_LOGLEVEL          250
 #define CFG_LOGLEVEL_STARTUP      250

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-20 14:30:22 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-31 06:37:19 +0000
@@ -270,8 +270,10 @@ public:
    *
    * Argument is the value of config parameter TotalSendBufferMemory. If 0,
    * a default will be used of sum(max send buffer) over all transporters.
+   * The second is the config parameter ExtraSendBufferMemory
    */
-  void allocate_send_buffers(Uint64 total_send_buffer);
+  void allocate_send_buffers(Uint64 total_send_buffer,
+                             Uint64 extra_send_buffer);
 
   /**
    * Get sum of max send buffer over all transporters, to be used as a default
@@ -292,6 +294,13 @@ public:
   const NodeBitmask& get_status_overloaded() const;
   
   /**
+   * Set or clear slowdown bit.
+   * Query if any slowdown bit is set.
+   */
+  void set_status_slowdown(Uint32 nodeId, bool val);
+  const NodeBitmask& get_status_slowdown() const;
+  
+  /**
    * prepareSend
    *
    * When IOState is HaltOutput or HaltIO do not send or insert any 
@@ -431,8 +440,10 @@ private:
 
   /**
    * Overloaded bits, for fast check.
+   * Similarly slowdown bits for fast check.
    */
   NodeBitmask m_status_overloaded;
+  NodeBitmask m_status_slowdown;
 
   /**
    * Unpack signal data.
@@ -591,6 +602,8 @@ TransporterRegistry::set_status_overload
   assert(nodeId < MAX_NODES);
   if (val != m_status_overloaded.get(nodeId))
     m_status_overloaded.set(nodeId, val);
+  if (val)
+    set_status_slowdown(nodeId, val);
 }
 
 inline const NodeBitmask&
@@ -599,4 +612,18 @@ TransporterRegistry::get_status_overload
   return m_status_overloaded;
 }
 
+inline void
+TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
+{
+  assert(nodeId < MAX_NODES);
+  if (val != m_status_slowdown.get(nodeId))
+    m_status_slowdown.set(nodeId, val);
+}
+
+inline const NodeBitmask&
+TransporterRegistry::get_status_slowdown() const
+{
+  return m_status_slowdown;
+}
+
 #endif // Define of TransporterRegistry_H

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-01-19 13:31:51 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2012-01-27 16:03:49 +0000
@@ -112,6 +112,10 @@ TCP_Transporter::TCP_Transporter(Transpo
   setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
 
   m_overload_limit = overload_limit(conf);
+  /**
+   * Always set slowdown limit to 60% of overload limit
+   */
+  m_slowdown_limit = m_overload_limit * 6 / 10; 
 }
 
 

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2012-01-19 16:50:23 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2012-01-25 10:27:10 +0000
@@ -43,7 +43,8 @@ Transporter::Transporter(TransporterRegi
   : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
-    m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),
+    m_overload_limit(0xFFFFFFFF), m_slowdown_limit(0xFFFFFFFF),
+    isMgmConnection(_isMgmConnection),
     m_connected(false),
     m_type(_type),
     m_transporter_registry(t_reg)

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2012-01-19 16:50:23 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2012-01-26 13:22:50 +0000
@@ -90,6 +90,8 @@ public:
   {
     m_transporter_registry.set_status_overloaded(remoteNodeId,
                                                  used >= m_overload_limit);
+    m_transporter_registry.set_status_slowdown(remoteNodeId,
+                                               used >= m_slowdown_limit);
   }
 
   virtual int doSend() = 0;
@@ -159,6 +161,7 @@ protected:
   Uint32 m_max_send_buffer;
   /* Overload limit, as configured with the OverloadLimit config parameter. */
   Uint32 m_overload_limit;
+  Uint32 m_slowdown_limit;
 
 private:
 

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-24 07:20:52 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-31 06:37:19 +0000
@@ -250,8 +250,11 @@ TransporterRegistry::TransporterRegistry
   DBUG_VOID_RETURN;
 }
 
+#define MIN_SEND_BUFFER_SIZE (4 * 1024 * 1024)
+
 void
-TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer)
+TransporterRegistry::allocate_send_buffers(Uint64 total_send_buffer,
+                                           Uint64 extra_send_buffer)
 {
   if (!m_use_default_send_buffer)
     return;
@@ -259,6 +262,22 @@ TransporterRegistry::allocate_send_buffe
   if (total_send_buffer == 0)
     total_send_buffer = get_total_max_send_buffer();
 
+  total_send_buffer += extra_send_buffer;
+
+  if (!extra_send_buffer)
+  {
+    /**
+     * If extra send buffer memory is 0 it means we can decide on an
+     * appropriate value for it. We select to always ensure that the
+     * minimum send buffer memory is 4M, otherwise we simply don't
+     * add any extra send buffer memory at all.
+     */
+    if (total_send_buffer < MIN_SEND_BUFFER_SIZE)
+    {
+      total_send_buffer = (Uint64)MIN_SEND_BUFFER_SIZE;
+    }
+  }
+
   if (m_send_buffers)
   {
     /* Send buffers already allocated -> resize the buffer pages */

=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupInit.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupInit.cpp	2012-01-30 12:31:48 +0000
@@ -206,7 +206,31 @@ Backup::execREAD_CONFIG_REQ(Signal* sign
   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MAX_WRITE_SIZE, &maxWriteSize);
-  
+
+  if (maxWriteSize < szWrite)
+  {
+    /**
+     * max can't be lower than min
+     */
+    maxWriteSize = szWrite;
+  }
+  if ((maxWriteSize % szWrite) != 0)
+  {
+    /**
+     * max needs to be a multiple of min
+     */
+    maxWriteSize = (maxWriteSize + szWrite - 1) / szWrite;
+    maxWriteSize *= szWrite;
+  }
+
+  /**
+   * add min writesize to buffer size...and the alignment added here and there
+   */
+  Uint32 extra = szWrite + 4 * (/* align * 512b */ 128);
+
+  szDataBuf += extra;
+  szLogBuf += extra;
+
   c_defaults.m_logBufferSize = szLogBuf;
   c_defaults.m_dataBufferSize = szDataBuf;
   c_defaults.m_minWriteSize = szWrite;
@@ -215,8 +239,12 @@ Backup::execREAD_CONFIG_REQ(Signal* sign
 
   Uint32 szMem = 0;
   ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MEM, &szMem);
-  Uint32 noPages = (szMem + c_defaults.m_lcp_buffer_size + sizeof(Page32) - 1) 
-    / sizeof(Page32);
+
+  szMem += 3 * extra; // (data+log+lcp);
+  Uint32 noPages =
+    (szMem + sizeof(Page32) - 1) / sizeof(Page32) +
+    (c_defaults.m_lcp_buffer_size + sizeof(Page32) - 1) / sizeof(Page32);
+
   // We need to allocate an additional of 2 pages. 1 page because of a bug in
   // ArrayPool and another one for DICTTAINFO.
   c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2, true); 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2011-12-14 09:34:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2012-01-26 12:07:58 +0000
@@ -574,7 +574,8 @@ public:
     Uint8 m_last_row;
     Uint8 m_reserved;
     Uint8 statScan;
-    Uint8 dummy[3]; // align?
+    Uint8 m_stop_batch;
+    Uint8 dummy[2]; // align?
   }; // Size 272 bytes
   typedef Ptr<ScanRecord> ScanRecordPtr;
 
@@ -3300,7 +3301,8 @@ Dblqh::ScanRecord::check_scan_batch_comp
   Uint32 max_rows = m_max_batch_size_rows;
   Uint32 max_bytes = m_max_batch_size_bytes;
 
-  return (max_rows > 0 && (m_curr_batch_size_rows >= max_rows))  ||
+  return m_stop_batch ||
+    (max_rows > 0 && (m_curr_batch_size_rows >= max_rows))  ||
     (max_bytes > 0 && (m_curr_batch_size_bytes >= max_bytes));
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-01-20 14:56:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2012-01-26 12:07:58 +0000
@@ -11284,17 +11284,17 @@ void Dblqh::scanTupkeyConfLab(Signal* si
   scanptr.p->m_curr_batch_size_rows = rows + 1;
   scanptr.p->m_last_row = tdata5;
 
-  const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+  const NodeBitmask& all = globalTransporterRegistry.get_status_slowdown();
   if (unlikely(!all.isclear()))
   {
     if (all.get(refToNode(scanptr.p->scanApiBlockref)))
     {
       /**
-       * End scan batch if transporter-buffer are overloaded
+       * End scan batch if transporter-buffer are in slowdown state
        *
        * TODO: We should have counters for this...
        */
-      tdata5 = 1;
+      scanptr.p->m_stop_batch = 1;
     }
   }
 
@@ -11604,6 +11604,7 @@ Uint32 Dblqh::initScanrec(const ScanFrag
   scanptr.p->scanTcrec = tcConnectptr.i;
   scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
 
+  scanptr.p->m_stop_batch = 0;
   scanptr.p->m_curr_batch_size_rows = 0;
   scanptr.p->m_curr_batch_size_bytes= 0;
   scanptr.p->m_max_batch_size_rows = max_rows;
@@ -12091,6 +12092,8 @@ void Dblqh::sendScanFragConf(Signal* sig
     scanptr.p->m_curr_batch_size_rows = 0;
     scanptr.p->m_curr_batch_size_bytes= 0;
   }
+
+  scanptr.p->m_stop_batch = 0;
 }//Dblqh::sendScanFragConf()
 
 /* ######################################################################### */

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2012-01-19 13:31:51 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2012-01-26 12:07:58 +0000
@@ -3069,7 +3069,16 @@ void Dbtc::execTCKEYREQ(Signal* signal)
     case ZWRITE:
     case ZREFRESH:
       jam();
-      if (unlikely((++ regApiPtr->m_write_count) > m_max_writes_per_trans))
+      regApiPtr->m_write_count++;
+      if (regApiPtr->m_flags & ApiConnectRecord::TF_DEFERRED_CONSTRAINTS)
+      {
+        /**
+         * Allow slave applier to ignore m_max_writes_per_trans
+         */
+        break;
+      }
+
+      if (unlikely(regApiPtr->m_write_count > m_max_writes_per_trans))
       {
         TCKEY_abort(signal, 65);
         return;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2012-01-20 14:56:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	2012-01-31 06:37:19 +0000
@@ -1832,6 +1832,7 @@ int Dbtup::handleInsertReq(Signal* signa
    */
   if(mem_insert)
   {
+    terrorCode = 0;
     if (!rowid)
     {
       if (ERROR_INSERTED(4018))
@@ -1938,7 +1939,13 @@ int Dbtup::handleInsertReq(Signal* signa
       terrorCode = 1601;
       goto disk_prealloc_error;
     }
-    
+
+    if (!Local_key::isShort(frag_page_id))
+    {
+      terrorCode = 1603;
+      goto disk_prealloc_error;
+    }
+
     int ret= disk_page_prealloc(signal, fragPtr, &tmp, size);
     if (unlikely(ret < 0))
     {
@@ -2018,7 +2025,10 @@ null_check_error:
 
 mem_error:
   jam();
-  terrorCode= ZMEM_NOMEM_ERROR;
+  if (terrorCode == 0)
+  {
+    terrorCode= ZMEM_NOMEM_ERROR;
+  }
   goto update_error;
 
 log_space_error:

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2012-01-28 10:11:10 +0000
@@ -230,6 +230,15 @@ Dbtup::allocFragPage(Uint32 * err, Fragr
   {
     jam();
     pageId = max;
+    if (!Local_key::isShort(pageId))
+    {
+      /**
+       * TODO: remove when ACC supports 48 bit references
+       */
+      jam();
+      * err = 889;
+      return RNIL;
+    }
     Uint32 * ptr = map.set(2 * pageId);
     if (unlikely(ptr == 0))
     {

=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp	2012-01-20 14:30:22 +0000
+++ b/storage/ndb/src/kernel/ndbd.cpp	2012-01-31 06:37:19 +0000
@@ -214,17 +214,40 @@ init_global_memory_manager(EmulatorData
   Uint32 sbpages = 0;
   if (globalTransporterRegistry.get_using_default_send_buffer() == false)
   {
-    Uint64 mem = globalTransporterRegistry.get_total_max_send_buffer();
+    Uint64 mem;
+    {
+      Uint32 tot_mem = 0;
+      ndb_mgm_get_int_parameter(p, CFG_TOTAL_SEND_BUFFER_MEMORY, &tot_mem);
+      if (tot_mem)
+      {
+        mem = (Uint64)tot_mem;
+      }
+      else
+      {
+        mem = globalTransporterRegistry.get_total_max_send_buffer();
+      }
+    }
+
     sbpages = Uint32((mem + GLOBAL_PAGE_SIZE - 1) / GLOBAL_PAGE_SIZE);
+
+    /**
+     * Add extra send buffer pages for NDB multithreaded case
+     */
+    {
+      Uint64 extra_mem;
+      ndb_mgm_get_int64_parameter(p, CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_mem);
+      Uint32 extra_mem_pages = Uint32((extra_mem + GLOBAL_PAGE_SIZE - 1) /
+                                      GLOBAL_PAGE_SIZE);
+      sbpages += mt_get_extra_send_buffer_pages(sbpages, extra_mem_pages);
+    }
+
     Resource_limit rl;
-    /*
-      Add one allocation of 32 pages per thread since each thread is
-      likely to seize at least one such in addition to the memory
-      needed to buffer in front of transporters
-    */
-    sbpages += 32 * get_total_number_of_block_threads();
     rl.m_min = sbpages;
-    rl.m_max = sbpages;
+    /**
+     * allow over allocation (from SharedGlobalMemory) of up to 25% of
+     *   totally allocated SendBuffer
+     */
+    rl.m_max = sbpages + (sbpages * 25) / 100;
     rl.m_resource_id = RG_TRANSPORTER_BUFFERS;
     ed.m_mem_manager->set_resource_limit(rl);
   }

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2012-01-20 14:30:22 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2012-01-31 06:37:19 +0000
@@ -316,7 +316,10 @@ Configuration::setupConfiguration(){
 
   Uint32 total_send_buffer = 0;
   iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
-  globalTransporterRegistry.allocate_send_buffers(total_send_buffer);
+  Uint64 extra_send_buffer = 0;
+  iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
+  globalTransporterRegistry.allocate_send_buffers(total_send_buffer,
+                                                  extra_send_buffer);
   
   if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
     ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", 

=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp	2012-01-19 13:31:51 +0000
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp	2012-01-31 06:37:19 +0000
@@ -78,9 +78,10 @@ struct EmulatorData {
 extern struct EmulatorData globalEmulatorData;
 
 /**
- * Compute total number of block threads in data node
+ * Get number of extra send buffer pages to use
  */
-Uint32 get_total_number_of_block_threads(void);
+Uint32 mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,
+                                      Uint32 extra_mem_pages);
 
 /**
  * Compute no of pages to be used as job-buffer

=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2012-01-11 16:20:37 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp	2012-01-31 06:37:19 +0000
@@ -44,9 +44,13 @@ mt_get_instance_count(Uint32 block)
   return 0;
 }
 
-Uint32 get_total_number_of_block_threads(void)
+Uint32
+mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,
+                               Uint32 extra_mem_pages)
 {
-  return 1;
+  (void)curr_num_pages;
+  (void)extra_mem_pages;
+  return 0;
 }
 
 Uint32
@@ -55,7 +59,6 @@ compute_jb_pages(struct EmulatorData*)
   return 0;
 }
 
-
 bool
 NdbIsMultiThreaded()
 {

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-01-24 07:20:52 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-01-31 06:37:19 +0000
@@ -411,49 +411,7 @@ struct thr_safe_pool
   Uint32 m_cnt;
   thr_spin_lock<NDB_CL - (sizeof(void*) + sizeof(Uint32))> m_lock;
 
-  T* seize_list(Ndbd_mem_manager *mm,
-                Uint32 rg,
-                Uint32 alloc_cnt,
-                Uint32 *alloced) {
-    T* ret = 0;
-    T* prev = 0;
-    T* next = 0;
-    Uint32 i;
-    assert(alloc_cnt > 0);
-
-    lock(&m_lock);
-    if (!m_cnt)
-    {
-      unlock(&m_lock);
-      ret = seize(mm, rg);
-      if (ret)
-      {
-        ret->m_next = 0;
-        *alloced = 1;
-      }
-      else
-        *alloced = 0;
-      return ret;
-    }
-    next = m_free_list;
-    alloc_cnt = alloc_cnt <= m_cnt ? alloc_cnt : m_cnt;
-    *alloced = alloc_cnt;
-    for (i = 0; i < alloc_cnt; i++)
-    {
-      ret = next;
-      assert(ret);
-      next = ret->m_next;
-      ret->m_next = prev;
-      prev = ret;
-    }
-    m_cnt = m_cnt - alloc_cnt;
-    m_free_list = next;
-    unlock(&m_lock);
-    return ret;
-  }
-
-  T* seize(Ndbd_mem_manager *mm, Uint32 rg)
-  {
+  T* seize(Ndbd_mem_manager *mm, Uint32 rg) {
     T* ret = 0;
     lock(&m_lock);
     if (m_free_list)
@@ -466,8 +424,8 @@ struct thr_safe_pool
     }
     else
     {
-      Uint32 dummy;
       unlock(&m_lock);
+      Uint32 dummy;
       ret = reinterpret_cast<T*>
         (mm->alloc_page(rg, &dummy,
                         Ndbd_mem_manager::NDB_ZONE_ANY));
@@ -478,6 +436,49 @@ struct thr_safe_pool
     return ret;
   }
 
+  T* seize_list(Ndbd_mem_manager *mm, Uint32 rg,
+                Uint32 requested, Uint32 * received) {
+    lock(&m_lock);
+    if (m_cnt == 0)
+    {
+      unlock(&m_lock);
+      Uint32 dummy;
+      T* ret = reinterpret_cast<T*>
+        (mm->alloc_page(rg, &dummy,
+                        Ndbd_mem_manager::NDB_ZONE_ANY));
+
+      if (ret == 0)
+      {
+        * received = 0;
+        return 0;
+      }
+      else
+      {
+        ret->m_next = 0;
+        * received = 1;
+        return ret;
+      }
+    }
+    else
+    {
+      if (m_cnt < requested )
+        requested = m_cnt;
+
+      T* first = m_free_list;
+      T* last = first;
+      for (Uint32 i = 1; i < requested; i++)
+      {
+        last = last->m_next;
+      }
+      m_cnt -= requested;
+      m_free_list = last->m_next;
+      unlock(&m_lock);
+      last->m_next = 0;
+      * received = requested;
+      return first;
+    }
+  }
+
   void release(Ndbd_mem_manager *mm, Uint32 rg, T* t) {
     lock(&m_lock);
     t->m_next = m_free_list;
@@ -504,8 +505,7 @@ class thread_local_pool
 {
 public:
   thread_local_pool(thr_safe_pool<T> *global_pool,
-                    unsigned max_free,
-                    unsigned alloc_size) :
+                    unsigned max_free, unsigned alloc_size = 1) :
     m_max_free(max_free),
     m_alloc_size(alloc_size),
     m_free(0),
@@ -515,34 +515,62 @@ public:
   }
 
   T *seize(Ndbd_mem_manager *mm, Uint32 rg) {
-    T *tmp;
+    T *tmp = m_freelist;
+    if (tmp == 0)
+    {
+      tmp = m_global_pool->seize_list(mm, rg, m_alloc_size, &m_free);
+    }
+    if (tmp)
+    {
+      m_freelist = tmp->m_next;
+      assert(m_free > 0);
+      m_free--;
+    }
+    validate();
+    return tmp;
+  }
+
+  T* find_last(T* first)
+  {
+    T* next = first;
+    if (!first)
+      return first;
+    while (next->m_next)
+    {
+      next = next->m_next;
+    }
+    return next;
+  }
+
+
+  bool fill(Ndbd_mem_manager *mm, Uint32 rg, Uint32 fill_level)
+  {
     Uint32 alloced = 0;
-    bool first = true;
-    while (1)
+    if (m_free >= fill_level)
     {
-      tmp = m_freelist;
-      if (tmp)
+      return true;
+    }
+    do
+    {
+      T *save_last = find_last(m_freelist);
+      T *new_list = m_global_pool->seize_list(mm, rg,
+                                              m_alloc_size,
+                                              &alloced);
+      m_free += alloced;
+      if (save_last)
       {
-        m_freelist = tmp->m_next;
-        assert(m_free > 0);
-        m_free--;
-        break;
+        save_last->m_next = new_list;
       }
       else
       {
-        if (!first)
-        {
-          tmp = NULL;
-          break;
-        }
-        m_freelist = m_global_pool->seize_list(mm, rg, m_alloc_size, &alloced);
-        m_free = alloced;
+        m_freelist = new_list;
       }
-      first = false;
+    } while ((m_free < fill_level) && alloced);
+    if (m_free >= fill_level)
+    {
+      return true;
     }
-
-    validate();
-    return tmp;
+    return false;
   }
 
   void release(Ndbd_mem_manager *mm, Uint32 rg, T *t) {
@@ -637,6 +665,10 @@ public:
     validate();
   }
 
+  /**
+   * release everything if more than m_max_free
+   *   else do nothing
+   */
   void release_chunk(Ndbd_mem_manager *mm, Uint32 rg) {
     if (m_free > m_max_free)
       release_all(mm, rg);
@@ -813,6 +845,20 @@ struct thr_tq
 };
 
 /*
+ * THR_SEND_BUFFER_ALLOC_SIZE is the amount of 32k pages allocated
+ * when we allocate pages from the global pool of send buffers to
+ * the thread_local_pool (which is local to a thread).
+ */
+#define THR_SEND_BUFFER_ALLOC_SIZE 32
+/*
+ * THR_MINIMUM_SEND_BUFFERS is the minimum amount of send buffer pages
+ * which is kept in thread_local_pool before starting execution of a
+ * new loop in a block thread to execute signals.
+ */
+#define THR_MINIMUM_SEND_BUFFERS 32
+#define THR_SEND_BUFFER_MAX_FREE \
+  (THR_SEND_BUFFER_ALLOC_SIZE + THR_MINIMUM_SEND_BUFFERS)
+/*
  * Max number of thread-local job buffers to keep before releasing to
  * global pool.
  */
@@ -882,7 +928,9 @@ struct thr_send_queue
 struct thr_data
 {
   thr_data() : m_jba_write_lock("jbalock"),
-               m_send_buffer_pool(0, THR_FREE_BUF_MAX, THR_FREE_BUF_MAX) {}
+               m_send_buffer_pool(0,
+                                  THR_SEND_BUFFER_MAX_FREE,
+                                  THR_SEND_BUFFER_ALLOC_SIZE) {}
 
   thr_wait m_waiter;
   unsigned m_thr_no;
@@ -1039,21 +1087,36 @@ struct thr_repository
       m_sb_pool("sendbufferpool")
     {}
 
+  /**
+   * m_receive_lock, m_section_lock, m_mem_manager_lock, m_jb_pool
+   * and m_sb_pool are allvariables globally shared among the threads
+   * and also heavily updated.
+   */
   struct thr_spin_lock<64> m_receive_lock[MAX_NDBMT_RECEIVE_THREADS];
   struct thr_spin_lock<64> m_section_lock;
   struct thr_spin_lock<64> m_mem_manager_lock;
+  /* thr_safe_pool is aligned to be also 64 bytes in size */
   struct thr_safe_pool<thr_job_buffer> m_jb_pool;
   struct thr_safe_pool<thr_send_page> m_sb_pool;
+  /* m_mm and m_thread_count are globally shared and read only variables */
   Ndbd_mem_manager * m_mm;
   unsigned m_thread_count;
-  /* Protect m_mm and m_thread_count from CPU cache misses */
-  char protection_unused[NDB_CL];
+  /**
+   * Protect m_mm and m_thread_count from CPU cache misses, first
+   * part of m_thread (struct thr_data) is globally shared variables.
+   * So sharing cache line with these for these read only variables
+   * isn't a good idea
+   */
+  char protection_unused[NDB_CL - sizeof(void*) - sizeof(unsigned)];
   struct thr_data m_thread[MAX_BLOCK_THREADS];
 
   /**
    * send buffer handling
+   * Put protection cacheline to avoid sharing with last m_thread
+   * instance.
    */
 
+  char protection_unused2[NDB_CL];
   /* The buffers that are to be sent */
   struct send_buffer
   {
@@ -1104,16 +1167,15 @@ struct thr_repository
   Uint32 stopped_threads;
 };
 
-/*
-  Class to handle send threads
-  ----------------------------
-    We can have up to 8 send threads.
-
-    This class will handle when a block thread needs to send, it will
-    handle the running of the send thread and will also start the
-    send thread.
-*/
-
+/**
+ *  Class to handle send threads
+ *  ----------------------------
+ *  We can have up to 8 send threads.
+ *
+ *  This class will handle when a block thread needs to send, it will
+ *  handle the running of the send thread and will also start the
+ *  send thread.
+ */
 #define is_send_thread(thr_no) (thr_no >= num_threads)
 
 struct thr_send_thread_instance
@@ -1124,7 +1186,9 @@ struct thr_send_thread_instance
                m_awake(FALSE),
                m_thread(NULL),
                m_waiter_struct(),
-               m_send_buffer_pool(0, THR_FREE_BUF_MAX, THR_FREE_BUF_MAX)
+               m_send_buffer_pool(0,
+                                  THR_SEND_BUFFER_MAX_FREE,
+                                  THR_SEND_BUFFER_ALLOC_SIZE)
   {}
   Uint32 m_instance_no;
   Uint32 m_watchdog_counter;
@@ -1151,7 +1215,7 @@ public:
   ~thr_send_threads();
 
   /* A block thread has flushed data for a node and wants it sent */
-  void alert_send_thread(NodeId node); 
+  void alert_send_thread(NodeId node);
 
   /* Method used to run the send thread */
   void run_send_thread(Uint32 instance_no);
@@ -1175,8 +1239,7 @@ public:
   }
 
   /* Get send buffer pool for send thread */
-  thread_local_pool<thr_send_page>*
-  get_send_buffer_pool(Uint32 thr_no)
+  thread_local_pool<thr_send_page>* get_send_buffer_pool(Uint32 thr_no)
   {
     return &m_send_threads[thr_no - num_threads].m_send_buffer_pool;
   }
@@ -1212,10 +1275,19 @@ private:
   /* Is data available and next reference for each node in cluster */
   struct thr_send_nodes m_node_state[MAX_NODES];
 
+  /**
+   * Very few compiler (gcc) allow zero length arrays
+   */
+#if MAX_NDBMT_SEND_THREADS == 0
+#define _MAX_SEND_THREADS 1
+#else
+#define _MAX_SEND_THREADS MAX_NDBMT_SEND_THREADS
+#endif
+
   /* Data and state for the send threads */
-  struct thr_send_thread_instance m_send_threads[MAX_NDBMT_SEND_THREADS];
+  struct thr_send_thread_instance m_send_threads[_MAX_SEND_THREADS];
 
-  /* 
+  /**
    * Mutex protecting the linked list of nodes awaiting sending
    * and also the not_awake variable of the send thread.
    */
@@ -1236,9 +1308,8 @@ mt_send_thread_main(void *thr_arg)
 {
   struct thr_send_thread_instance *this_send_thread =
     (thr_send_thread_instance*)thr_arg;
-  Uint32 instance_no;
 
-  instance_no = this_send_thread->m_instance_no;
+  Uint32 instance_no = this_send_thread->m_instance_no;
   ndbout_c("Send thread : %u is started", instance_no);
   g_send_threads->run_send_thread(instance_no);
   return NULL;
@@ -1246,18 +1317,17 @@ mt_send_thread_main(void *thr_arg)
 
 thr_send_threads::thr_send_threads()
 {
-  Uint32 i;
   struct thr_repository *rep = &g_thr_repository;
 
   m_started_threads = FALSE;
   m_first_node = 0;
   m_last_node = 0;
-  for (i = 0; i < MAX_NODES; i++)
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_node_state); i++)
   {
     m_node_state[i].m_next = 0;
     m_node_state[i].m_data_available = FALSE;
   }
-  for (i = 0; i < globalData.ndbMtSendThreads; i++)
+  for (Uint32 i = 0; i < NDB_ARRAY_SIZE(m_send_threads); i++)
   {
     m_send_threads[i].m_waiter_struct.init();
     m_send_threads[i].m_instance_no = i;
@@ -1268,13 +1338,13 @@ thr_send_threads::thr_send_threads()
 
 thr_send_threads::~thr_send_threads()
 {
-  Uint32 i;
-  void *dummy_return_status;
-
   if (!m_started_threads)
     return;
-  for (i = 0; i < globalData.ndbMtSendThreads; i++)
+
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
   {
+    void *dummy_return_status;
+
     /* Ensure thread is woken up to die */
     wakeup(&(m_send_threads[i].m_waiter_struct));
     NdbThread_WaitFor(m_send_threads[i].m_thread, &dummy_return_status);
@@ -1285,9 +1355,7 @@ thr_send_threads::~thr_send_threads()
 void
 thr_send_threads::start_send_threads()
 {
-  Uint32 i;
-
-  for (i = 0; i < globalData.ndbMtSendThreads; i++)
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
   {
     ndbout_c("Start send thread: %u", i);
     m_send_threads[i].m_thread =
@@ -1352,9 +1420,8 @@ struct thr_send_thread_instance*
 thr_send_threads::get_not_awake_send_thread()
 {
   struct thr_send_thread_instance *used_send_thread;
-  Uint32 i;
 
-  for (i = 0; i < globalData.ndbMtSendThreads; i++)
+  for (Uint32 i = 0; i < globalData.ndbMtSendThreads; i++)
   {
     if (!m_send_threads[i].m_awake)
     {
@@ -1465,7 +1532,7 @@ thr_send_threads::check_and_lock_send_no
 
 int
 thr_send_threads::perform_send(NodeId node, Uint32 instance_no)
-{ 
+{
   int res;
   thr_repository::send_buffer * sb = g_thr_repository.m_send_buffers+node;
   sb->m_send_thread = num_threads + instance_no;
@@ -1480,81 +1547,96 @@ thr_send_threads::run_send_thread(Uint32
 {
   struct thr_send_thread_instance *this_send_thread =
     &m_send_threads[instance_no];
-  Uint32 thr_no = num_threads + instance_no;
-  NodeId node;
-  int res;
-  BaseString tmp;
-  THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
+  const Uint32 thr_no = num_threads + instance_no;
 
-  /*
-    Print out information about starting thread, its number, its tid, its name,
-    the CPU it's locked into (if locked at all).
-    Also perform the locking to CPU.
-  */
-  tmp.appfmt("thr: %u ", thr_no);
   {
+    /**
+     * Wait for thread object to be visible
+     */
     while(this_send_thread->m_thread == 0)
       NdbSleep_MilliSleep(30);
   }
-  int tid = NdbThread_GetTid(this_send_thread->m_thread);
-  if (tid != -1)
-  {
-    tmp.appfmt("tid: %u ", tid);
-  }
-  conf.appendInfoSendThread(tmp, instance_no);
-  res= conf.do_bind_send(this_send_thread->m_thread, instance_no);
-  if (res < 0)
-  {
-    tmp.appfmt("err: %d ", -res);
-  }
-  else if (res > 0)
+
   {
-    tmp.appfmt("OK ");
+    /**
+     * Print out information about starting thread
+     *   (number, tid, name, the CPU it's locked into (if locked at all))
+     * Also perform the locking to CPU.
+     */
+    BaseString tmp;
+    THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
+    tmp.appfmt("thr: %u ", thr_no);
+    int tid = NdbThread_GetTid(this_send_thread->m_thread);
+    if (tid != -1)
+    {
+      tmp.appfmt("tid: %u ", tid);
+    }
+    conf.appendInfoSendThread(tmp, instance_no);
+    int res = conf.do_bind_send(this_send_thread->m_thread, instance_no);
+    if (res < 0)
+    {
+      tmp.appfmt("err: %d ", -res);
+    }
+    else if (res > 0)
+    {
+      tmp.appfmt("OK ");
+    }
+    printf("%s\n", tmp.c_str());
+    fflush(stdout);
   }
-  printf("%s\n", tmp.c_str());
-  fflush(stdout);
 
+  /**
+   * register watchdog
+   */
   globalEmulatorData.theWatchDog->
     registerWatchedThread(&this_send_thread->m_watchdog_counter, thr_no);
+
   NdbMutex_Lock(send_thread_mutex);
   this_send_thread->m_awake = FALSE;
   NdbMutex_Unlock(send_thread_mutex);
+
   while (globalData.theRestartFlag != perform_stop)
   {
     this_send_thread->m_watchdog_counter = 1;
+
     /* Yield for a maximum of 1ms */
     const Uint32 wait = 1000000;
     yield(&this_send_thread->m_waiter_struct, wait,
           check_available_send_data, NULL);
+
     NdbMutex_Lock(send_thread_mutex);
     this_send_thread->m_awake = TRUE;
 
+    NodeId node;
     while ((node = get_node()) != 0 &&
            globalData.theRestartFlag != perform_stop)
     {
       this_send_thread->m_watchdog_counter = 2;
+
       /* We enter this method with send thread mutex and come
        * back with send thread mutex released and instead owning
        * the spin lock to send to the node returned
        */
       node = check_and_lock_send_node(node);
-      res = perform_send(node, instance_no);
+
+      int res = perform_send(node, instance_no);
       /* We return with no spin locks or mutexes held */
 
       /* Release chunk-wise to decrease pressure on spin lock */
-      this_send_thread->m_send_buffer_pool.release_chunk(
-        g_thr_repository.m_mm, RG_TRANSPORTER_BUFFERS);
+      this_send_thread->m_watchdog_counter = 3;
+      this_send_thread->m_send_buffer_pool.
+        release_chunk(g_thr_repository.m_mm, RG_TRANSPORTER_BUFFERS);
 
       NdbMutex_Lock(send_thread_mutex);
       if (res && !data_available(node))
         insert_node(node);
     }
+
     /* No data to send, prepare to sleep */
     this_send_thread->m_awake = FALSE;
     NdbMutex_Unlock(send_thread_mutex);
-    /* Release send buffers to global pool */
-    this_send_thread->m_watchdog_counter = 3;
   }
+
   globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
   ndbout_c("Exit send thread %u", instance_no);
 }
@@ -2179,7 +2261,13 @@ trp_callback::reportSendLen(NodeId nodeI
   memset(&signal.header, 0, sizeof(signal.header));
 
   if (g_send_threads)
+  {
+    /**
+     * TODO: Implement this also when using send threads!!
+     */
     return;
+  }
+
   signal.header.theLength = 3;
   signal.header.theSendersSignalId = 0;
   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
@@ -2394,6 +2482,50 @@ release_list(thread_local_pool<thr_send_
   pool->release_local(tail);
 }
 
+/**
+ * pack thr_send_pages for a particular send-buffer <em>db</em>
+ *   release pages (local) to <em>pool<em>
+ *
+ * can only be called with sb->m_lock held
+ */
+static
+void
+pack_sb_pages(thread_local_pool<thr_send_page>* pool,
+              thr_repository::send_buffer* sb)
+{
+  assert(sb->m_buffer.m_first_page != 0);
+  assert(sb->m_buffer.m_last_page != 0);
+  assert(sb->m_buffer.m_last_page->m_next == 0);
+
+  thr_send_page* curr = sb->m_buffer.m_first_page;
+  Uint32 curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+  while (curr->m_next != 0)
+  {
+    thr_send_page* next = curr->m_next;
+    assert(next->m_start == 0); // only first page should have half sent bytes
+    if (next->m_bytes <= curr_free)
+    {
+      thr_send_page * save = next;
+      memcpy(curr->m_data + (curr->m_bytes + curr->m_start),
+             next->m_data,
+             next->m_bytes);
+
+      curr_free -= next->m_bytes;
+
+      curr->m_bytes += next->m_bytes;
+      curr->m_next = next->m_next;
+
+      pool->release_local(save);
+    }
+    else
+    {
+      curr = next;
+      curr_free = curr->max_bytes() - (curr->m_bytes + curr->m_start);
+    }
+  }
+
+  sb->m_buffer.m_last_page = curr;
+}
 
 static
 Uint32
@@ -2457,6 +2589,14 @@ bytes_sent(thread_local_pool<thr_send_pa
   sb->m_buffer.m_first_page = curr;
   assert(sb->m_bytes > bytes);
   sb->m_bytes -= bytes;
+
+  /**
+   * Since not all bytes were sent...
+   *   spend the time to try to pack the pages
+   *   possibly releasing send-buffer
+   */
+  pack_sb_pages(pool, sb);
+
   return sb->m_bytes;
 }
 
@@ -2538,6 +2678,41 @@ register_pending_send(thr_data *selfptr,
   }
 }
 
+static void try_send(thr_data * selfptr, Uint32 node);
+
+void
+pack_send_buffer(thr_data *selfptr, Uint32 node)
+{
+  Uint32 thr_no = selfptr->m_thr_no;
+  thr_repository* rep = &g_thr_repository;
+  thr_repository::send_buffer* sb = rep->m_send_buffers+node;
+  thread_local_pool<thr_send_page>* pool =
+    &rep->m_thread[thr_no].m_send_buffer_pool;
+
+  lock(&sb->m_send_lock);
+  int bytes = link_thread_send_buffers(sb, node);
+  if (bytes)
+  {
+    pack_sb_pages(pool, sb);
+  }
+  unlock(&sb->m_send_lock);
+  pool->release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+  if (sb->m_force_send)
+  {
+    try_send(selfptr, node);
+  }
+}
+
+void
+pack_send_buffers(thr_data *selfptr)
+{
+  for (Uint32 i = 1; i < NDB_ARRAY_SIZE(selfptr->m_send_buffers); i++)
+  {
+    if (globalTransporterRegistry.get_transporter(i))
+      pack_send_buffer(selfptr, i);
+  }
+}
+
 /**
  * publish thread-locally prepared send-buffer
  */
@@ -2564,9 +2739,7 @@ flush_send_buffer(thr_data* selfptr, Uin
 
   if (unlikely(next == ri))
   {
-    lock(&sb->m_send_lock);
-    link_thread_send_buffers(sb, node);
-    unlock(&sb->m_send_lock);
+    pack_send_buffer(selfptr, node);
   }
 
   dst->m_buffers[wi] = src->m_first_page;
@@ -2596,9 +2769,13 @@ mt_send_handle::forceSend(NodeId nodeId)
     globalTransporterRegistry.performSend(nodeId);
     sb->m_send_thread = NO_SEND_THREAD;
     unlock(&sb->m_send_lock);
-  } while (sb->m_force_send);
 
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+    /**
+     * release buffers prior to maybe looping on sb->m_force_send
+     */
+    selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                               RG_TRANSPORTER_BUFFERS);
+  } while (sb->m_force_send);
 
   return true;
 }
@@ -2627,9 +2804,13 @@ try_send(thr_data * selfptr, Uint32 node
     globalTransporterRegistry.performSend(node);
     sb->m_send_thread = NO_SEND_THREAD;
     unlock(&sb->m_send_lock);
-  } while (sb->m_force_send);
 
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+    /**
+     * release buffers prior to maybe looping on sb->m_force_send
+     */
+    selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                               RG_TRANSPORTER_BUFFERS);
+  } while (sb->m_force_send);
 }
 
 /**
@@ -2761,10 +2942,19 @@ do_send(struct thr_data* selfptr, bool m
       {
         register_pending_send(selfptr, node);
       }
+      if (sb->m_force_send)
+      {
+        /**
+         * release buffers prior to looping on sb->m_force_send
+         */
+        selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                                   RG_TRANSPORTER_BUFFERS);
+      }
     } while (sb->m_force_send);
   }
 
-  selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
+  selfptr->m_send_buffer_pool.release_global(rep->m_mm,
+                                             RG_TRANSPORTER_BUFFERS);
 
   return selfptr->m_pending_send_count;
 }
@@ -3617,6 +3807,14 @@ mt_job_thread_main(void *thr_arg)
   { 
     loops++;
 
+    watchDogCounter = 11;
+    if (!selfptr->m_send_buffer_pool.fill(g_thr_repository.m_mm,
+                                          RG_TRANSPORTER_BUFFERS,
+                                          THR_MINIMUM_SEND_BUFFERS))
+    {
+      pack_send_buffers(selfptr);
+    }
+
     watchDogCounter = 2;
     scan_time_queues(selfptr, now);
 
@@ -4044,7 +4242,7 @@ rep_init(struct thr_repository* rep, uns
 #include "ThreadConfig.hpp"
 #include <signaldata/StartOrd.hpp>
 
-Uint32
+static Uint32
 get_total_number_of_block_threads(void)
 {
   return (NUM_MAIN_THREADS +
@@ -4053,6 +4251,74 @@ get_total_number_of_block_threads(void)
           globalData.ndbMtReceiveThreads);
 }
 
+static Uint32
+get_num_nodes()
+{
+  Uint32 count = 0;
+  for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+  {
+    if (globalTransporterRegistry.get_transporter(nodeId))
+    {
+      count++;
+    }
+  }
+  return count;
+}
+
+/**
+ * This function returns the amount of extra send buffer pages
+ * that we should allocate in addition to the amount allocated
+ * for each node send buffer.
+ */
+#define MIN_SEND_BUFFER_GENERAL (512) //16M
+#define MIN_SEND_BUFFER_PER_NODE (8) //256k
+#define MIN_SEND_BUFFER_PER_THREAD (64) //2M
+
+Uint32
+mt_get_extra_send_buffer_pages(Uint32 curr_num_pages,
+                               Uint32 extra_mem_pages)
+{
+  Uint32 num_threads = get_total_number_of_block_threads();
+  Uint32 num_nodes = get_num_nodes();
+
+  Uint32 extra_pages = extra_mem_pages;
+
+  /**
+   * Add 2M for each thread since we allocate 1M every
+   * time we allocate and also we ensure there is also a minimum
+   * of 1M of send buffer in each thread. Thus we can easily have
+   * 2M of send buffer just to keep the contention around the
+   * send buffer page spinlock small. This memory we add independent
+   * of the configuration settings since the user cannot be
+   * expected to handle this and also since we could change this
+   * behaviour at any time.
+   */
+  extra_pages += num_threads *  (THR_SEND_BUFFER_ALLOC_SIZE + THR_MINIMUM_SEND_BUFFERS);
+
+  if (extra_mem_pages == 0)
+  {
+    /**
+     * The user have set extra send buffer memory to 0 and left for us
+     * to decide on our own how much extra memory is needed.
+     *
+     * We'll make sure that we have at least a minimum of 16M +
+     * 2M per thread + 256k per node. If we have this based on
+     * curr_num_pages and our local additions we don't add
+     * anything more, if we don't come up to this level we add to
+     * reach this minimum level.
+     */
+    Uint32 min_pages = MIN_SEND_BUFFER_GENERAL +
+      (MIN_SEND_BUFFER_PER_NODE * num_nodes) +
+      (MIN_SEND_BUFFER_PER_THREAD * num_threads);
+
+    if ((curr_num_pages + extra_pages) < min_pages)
+    {
+      extra_pages = min_pages - curr_num_pages;
+    }
+  }
+  return extra_pages;
+}
+
 Uint32
 compute_jb_pages(struct EmulatorData * ed)
 {
@@ -4212,21 +4478,12 @@ ThreadConfig::ipControlLoop(NdbThread* p
   unsigned int thr_no;
   struct thr_repository* rep = &g_thr_repository;
 
-#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
-  /* No send threads started before version 7.2 */
-#else
-  /* Get configured number of send threads */
-  ndbout << "NDBMT: Number of send threads = ";
-  ndbout << globalData.ndbMtSendThreads << endl;
-
-  /* Get configured number of send threads */
-  ndbout << "NDBMT: Number of receive threads = ";
-  ndbout << globalData.ndbMtReceiveThreads << endl;
+  setcpuaffinity(rep);
 
   if (globalData.ndbMtSendThreads)
+  {
     g_send_threads = new thr_send_threads();
-#endif
-  setcpuaffinity(rep);
+  }
 
   /**
    * assign nodes to receiver threads
@@ -4235,7 +4492,9 @@ ThreadConfig::ipControlLoop(NdbThread* p
 
   /* Start the send thread(s) */
   if (g_send_threads)
+  {
     g_send_threads->start_send_threads();
+  }
 
   /*
    * Start threads for all execution threads, except for the receiver
@@ -4289,9 +4548,12 @@ ThreadConfig::ipControlLoop(NdbThread* p
     NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
     NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
   }
+
   /* Delete send threads, includes waiting for threads to shutdown */
   if (g_send_threads)
+  {
     delete g_send_threads;
+  }
 }
 
 int

=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2012-01-24 07:20:52 +0000
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp	2012-01-31 06:37:19 +0000
@@ -1101,7 +1101,7 @@ TAPTEST(mt_thr_config)
         "main={ keso=88, count=23},ldm,ldm",
         "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
         "main={ cpuset=1-3 }, ldm={cpubind=2}",
-        "tc,tc,tc={count=5}",
+        "tc,tc,tc={count=25}",
         0
       };
 

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-24 07:20:52 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2012-01-31 06:37:19 +0000
@@ -1118,13 +1118,9 @@ const ConfigInfo::ParamInfo ConfigInfo::
     ConfigInfo::CI_USED,
     CI_RESTART_INITIAL,
     ConfigInfo::CI_INT,
+    STR_VALUE(NDB_DEFAULT_LOG_PARTS),
     "4",
-    "4",
-#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
-    "4"
-#else
-    "16"
-#endif
+    STR_VALUE(NDB_MAX_LOG_PARTS)
   },
 
   {
@@ -1692,11 +1688,24 @@ const ConfigInfo::ParamInfo ConfigInfo::
     "true"},
 
   {
+    CFG_EXTRA_SEND_BUFFER_MEMORY,
+    "ExtraSendBufferMemory",
+    DB_TOKEN,
+    "Extra send buffer memory to use for send buffers in all transporters",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT64,
+    "0",
+    "0",
+    "32G"
+  },
+
+  {
     CFG_TOTAL_SEND_BUFFER_MEMORY,
     "TotalSendBufferMemory",
     DB_TOKEN,
     "Total memory to use for send buffers in all transporters",
-    ConfigInfo::CI_USED,
+    ConfigInfo::CI_DEPRECATED,
     false,
     ConfigInfo::CI_INT,
     "0",
@@ -1711,7 +1720,7 @@ const ConfigInfo::ParamInfo ConfigInfo::
     "Amount of bytes (out of TotalSendBufferMemory) to reserve for connection\n"
     "between data nodes. This memory will not be available for connections to\n"
     "management server or API nodes.",
-    ConfigInfo::CI_USED,
+    ConfigInfo::CI_DEPRECATED,
     false,
     ConfigInfo::CI_INT,
     "0",
@@ -1742,7 +1751,7 @@ const ConfigInfo::ParamInfo ConfigInfo::
     ConfigInfo::CI_INT,
     "0",
     "2",
-    "51"
+    "8"
   },
 
   {

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2012-01-20 14:30:22 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2012-01-31 06:37:19 +0000
@@ -643,7 +643,10 @@ TransporterFacade::configure(NodeId node
   // Configure send buffers
   Uint32 total_send_buffer = 0;
   iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
-  theTransporterRegistry->allocate_send_buffers(total_send_buffer);
+  Uint64 extra_send_buffer = 0;
+  iter.get(CFG_EXTRA_SEND_BUFFER_MEMORY, &extra_send_buffer);
+  theTransporterRegistry->allocate_send_buffers(total_send_buffer,
+                                                extra_send_buffer);
 
   Uint32 auto_reconnect=1;
   iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2011-12-20 08:49:07 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2012-01-28 10:11:10 +0000
@@ -210,16 +210,21 @@ ErrorBundle ErrorCodes[] = {
   { 623,  HA_ERR_RECORD_FILE_FULL, IS, "623" },
   { 624,  HA_ERR_RECORD_FILE_FULL, IS, "624" },
   { 625,  HA_ERR_INDEX_FILE_FULL, IS, "Out of memory in Ndb Kernel, hash index part (increase IndexMemory)" },
-  { 633,  HA_ERR_INDEX_FILE_FULL, IS, "Table fragment hash index has reached maximum possible size" },
+  { 633,  HA_ERR_INDEX_FILE_FULL, IS,
+    "Table fragment hash index has reached maximum possible size" },
   { 640,  DMEC, IS, "Too many hash indexes (should not happen)" },
   { 826,  HA_ERR_RECORD_FILE_FULL, IS, "Too many tables and attributes (increase MaxNoOfAttributes or MaxNoOfTables)" },
   { 827,  HA_ERR_RECORD_FILE_FULL, IS, "Out of memory in Ndb Kernel, table data (increase DataMemory)" },
+  { 889,  HA_ERR_RECORD_FILE_FULL, IS,
+    "Table fragment fixed data reference has reached maximum possible value (specify MAXROWS or increase no of partitions)"},
   { 902,  HA_ERR_RECORD_FILE_FULL, IS, "Out of memory in Ndb Kernel, ordered index data (increase DataMemory)" },
   { 903,  HA_ERR_INDEX_FILE_FULL, IS, "Too many ordered indexes (increase MaxNoOfOrderedIndexes)" },
   { 904,  HA_ERR_INDEX_FILE_FULL, IS, "Out of fragment records (increase MaxNoOfOrderedIndexes)" },
   { 905,  DMEC, IS, "Out of attribute records (increase MaxNoOfAttributes)" },
   { 1601, HA_ERR_RECORD_FILE_FULL, IS, "Out extents, tablespace full" },
   { 1602, DMEC, IS,"No datafile in tablespace" },
+  { 1603, HA_ERR_RECORD_FILE_FULL, IS,
+    "Table fragment fixed data reference has reached maximum possible value (specify MAXROWS or increase no of partitions)"},
 
   /**
    * TimeoutExpired 

=== modified file 'storage/ndb/src/ndbjtie/NdbApiWrapper.hpp'
--- a/storage/ndb/src/ndbjtie/NdbApiWrapper.hpp	2012-01-19 18:16:31 +0000
+++ b/storage/ndb/src/ndbjtie/NdbApiWrapper.hpp	2012-01-26 20:57:36 +0000
@@ -3192,6 +3192,13 @@ struct NdbApiWrapper {
         return obj.nextResult(p0, p1);
     }
 
+    static int
+    NdbScanOperation__nextResultCopyOut
+    ( NdbScanOperation & obj, char * p0, bool p1, bool p2 )
+    {
+        return obj.nextResultCopyOut(p0, p1, p2);
+    }
+
     static void
     NdbScanOperation__close
     ( NdbScanOperation & obj, bool p0, bool p1 )

=== modified file 'storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java'
--- a/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java	2012-01-19 18:16:31 +0000
+++ b/storage/ndb/src/ndbjtie/com/mysql/ndbjtie/ndbapi/NdbScanOperation.java	2012-01-26 20:57:36 +0000
@@ -87,6 +87,7 @@ public class NdbScanOperation extends Nd
     public /*_virtual_*/ native int readTuples(int/*_LockMode_*/ lock_mode /*_= LM_Read_*/, int/*_Uint32_*/ scan_flags /*_= 0_*/, int/*_Uint32_*/ parallel /*_= 0_*/, int/*_Uint32_*/ batch /*_= 0_*/);
     public final native int nextResult(boolean fetchAllowed /*_= true_*/, boolean forceSend /*_= false_*/);
     // MMM! support <out:char *> or check if needed: public final native int nextResult(const char * * out_row_ptr, boolean fetchAllowed, boolean forceSend);
+    public final native int nextResultCopyOut(ByteBuffer/*_char *_*/ buffer, boolean fetchAllowed, boolean forceSend);
     public final native void close(boolean forceSend /*_= false_*/, boolean releaseOp /*_= false_*/);
     public final native NdbOperation/*_NdbOperation *_*/ lockCurrentTuple();
     public final native NdbOperation/*_NdbOperation *_*/ lockCurrentTuple(NdbTransaction/*_NdbTransaction *_*/ lockTrans);

=== modified file 'storage/ndb/src/ndbjtie/ndbapi_jtie.hpp'
--- a/storage/ndb/src/ndbjtie/ndbapi_jtie.hpp	2012-01-20 06:22:16 +0000
+++ b/storage/ndb/src/ndbjtie/ndbapi_jtie.hpp	2012-01-28 10:11:10 +0000
@@ -9114,6 +9114,22 @@ Java_com_mysql_ndbjtie_ndbapi_NdbScanOpe
 
 /*
  * Class:     com_mysql_ndbjtie_ndbapi_NdbScanOperation
+ * Method:    nextResultCopyOut
+ * Signature: (Ljava/nio/ByteBuffer;ZZ)I
+ */
+JNIEXPORT jint JNICALL
+Java_com_mysql_ndbjtie_ndbapi_NdbScanOperation_nextResultCopyOut(JNIEnv * env, jobject obj, jobject p0, jboolean p1, jboolean p2)
+{
+    TRACE("jint Java_com_mysql_ndbjtie_ndbapi_NdbScanOperation_nextResultCopyOut(JNIEnv *, jobject, jobject, jboolean, jboolean)");
+#ifndef NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+    return gcall_mfr< ttrait_c_m_n_n_NdbScanOperation_t, ttrait_int, ttrait_char_1p_bb, ttrait_bool, ttrait_bool, &NdbScanOperation::nextResultCopyOut >(env, obj, p0, p1, p2);
+#else
+    return gcall_fr< ttrait_int, ttrait_c_m_n_n_NdbScanOperation_r, ttrait_char_1p_bb, ttrait_bool, ttrait_bool, &NdbApiWrapper::NdbScanOperation__nextResultCopyOut >(env, NULL, obj, p0, p1, p2);
+#endif // NDBJTIE_USE_WRAPPED_VARIANT_FOR_FUNCTION
+}
+
+/*
+ * Class:     com_mysql_ndbjtie_ndbapi_NdbScanOperation
  * Method:    close
  * Signature: (ZZ)V
  */

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3724 to 3742) jonas oreland31 Jan