List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:December 10 2009 2:46am
Subject:Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151
View as plain text  
Hi Mats,

Excellent and impressive work.
There are just a few details that need to be fixed before
it is ready to be pushed.

STATUS
------
  Not approved.

REQUIRED CHANGES
----------------
1 - Is there any particular reason to use the macro below?
I think the pattern is to write the function name.

DBUG_ENTER(__FUNCTION__);

2 - Address the comments in the test case.

3 - Address the comments in the code.


REQUESTS
--------
  n/a

SUGGESTIONS
-----------

1 - In can_convert_field_to:

Would it be better (e.g. ieasier to maitain and read) to have a compatibility matrix
where the safe combinations would be decided straightforward and those combinations
requiring further processing will be checked by calling compatible_field_size() and any
other auxiliar function?


DETAILS
-------


I am not the arch. reviewer but I have some doubts on the worklog.

1 - If conversion is necessary, a fake table is created and then saved to be reused.
I haven't seen any comment on how the fake table is invalidated.

2 - The definition of non-lossy and lossy for strings are hard to read:

- "Non-lossy conversions from STRING(N) to STRING'(M), where M > N or M = N and
STRING != STRING', is supported if and only if ``ALL_NON_LOSSY`` is in
``SLAVE_TYPE_CONVERSIONS``."

- "Note that if M = N and STRING = STRING' it is not considered a non-lossy
conversion, which means that if non-lossy conversion is not enabled, only
replication between identical types is permitted."

- Lossy conversion from STRING(M) to STRING'(N), where M > N, is supported if and
only if ``ALL_LOSSY`` is in ``SLAVE_TYPE_CONVERSIONS``.

- Note that the case when M = N and STRING = STRING' is not considered a lossy
conversion.

Are those definitions contradictory, aren't they?


Cheers.

Mats Kindahl wrote:
> #At file:///home/bzr/mkindahl/w5151-mysql-5.1/pipes/conversion-table/ based on
> revid:bjorn.munch@stripped
> 
>  3180 Mats Kindahl	2009-12-04
>       WL#5151: Conversion between different types when replicating
>       
>       Row-based replication requires the types of columns on the
>       master and slave to be approximately the same (some safe
>       conversions between strings are allowed), but does not
>       allow safe conversions between fields of similar types such
>       as TINYINT and INT.
>       
>       This patch implement type conversions between similar fields
>       on the master and slave.
>       
>       The conversions are controlled using a new variable
>       SLAVE_TYPE_CONVERSIONS of type SET('ALL_LOSSY','ALL_NON_LOSSY').
>       
>       Non-lossy conversions are any conversions that do not run the
>       risk of losing any information, while lossy conversions can
>       potentially truncate the value. The column definitions are
>       checked to decide if the conversion is acceptable.
>       
>       If neither conversion is enabled, it is required that the
>       definitions of the columns are identical on master and slave.
>       
>       Conversion is done by creating an internal conversion table,
>       unpacking the master data into it, and then copy the data to
>       the real table on the slave.
>      @ .bzrignore
>         New files added
>      @ client/Makefile.am
>         New files added
>      @ client/mysqlbinlog.cc
>         Functions in rpl_utility.cc is now needed by mysqlbinlog.cc.
>      @ libmysqld/Makefile.am
>         New files added
>      @ mysql-test/extra/rpl_tests/check_type.inc
>         Test include file to check a single type conversion.
>      @ mysql-test/extra/rpl_tests/type_conversions.test
>         Test file to check a set of type conversions
>         with current settings of slave_type_conversions.
>      @ mysql-test/suite/rpl/t/rpl_typeconv.test
>         Test file to test conversions from master to slave with
>         all possible values for slave_type_conversions.
>         
>         The test also checks that the slave_type_conversions
>         variable works as expected.
>      @ sql/field.cc
>         Changing definition of compatible_field_size to both check if 
>         two field with identical base types are compatible and give an
>         order between them if they are compatible.
>         
>         This only implement checking on the slave, so it will not affect
>         replication from an old master to a new slave.
>      @ sql/field.h
>         Changing prototypes for functions:
>         - compatible_field_size()
>         - init_for_tmp_table()
>         - row_pack_length()
>      @ sql/log_event.cc
>         Changing compability checks to build a conversion table if the fields
>         are compatible, but does not have the same base type.
>      @ sql/log_event_old.cc
>         Changing compability checks to build a conversion table if the fields
>         are compatible, but does not have the same base type.
>      @ sql/mysql_priv.h
>         Adding global option variable for SLAVE_TYPE_CONVERSIONS
>      @ sql/mysqld.cc
>         Adding SLAVE_TYPE_CONVERSIONS global server variable.
>      @ sql/rpl_record.cc
>         Changing unpack_row to use the conversion table if present.
>      @ sql/rpl_rli.h
>         Removing function get_tabledef and replacing it with get_table_data().
>         This function retrieve data for table opened for replication, not just
>         table definition.
>      @ sql/rpl_utility.cc
>         Function table_def::compatible_with is changed to compare table on master
>         and slave for compatibility and generate a conversions table if they are
>         compatible.
>      @ sql/rpl_utility.h
>         Changing prototypes since implementation has changed.
>      @ sql/set_var.cc
>         Adding SLAVE_TYPE_CONVERSIONS variable.
>      @ sql/set_var.h
>         Adding SLAVE_TYPE_CONVERSIONS variable.
>      @ sql/share/errmsg.txt
>         Adding error messages for slave type conversions.
>      @ sql/sql_class.h
>         Adding SLAVE_TYPE_CONVERSIONS variable.
>      @ sql/sql_select.cc
>         Correcting create_virtual_tmp_table() to compute null bit positions
>         correctly in the presence of bit fields.
> 
>     added:
>       mysql-test/extra/rpl_tests/check_type.inc
>       mysql-test/extra/rpl_tests/type_conversions.test
>       mysql-test/suite/rpl/r/rpl_typeconv.result
>       mysql-test/suite/rpl/t/rpl_typeconv.test
>     modified:
>       .bzrignore
>       client/Makefile.am
>       client/mysqlbinlog.cc
>       libmysqld/Makefile.am
>       sql/field.cc
>       sql/field.h
>       sql/log_event.cc
>       sql/log_event_old.cc
>       sql/mysql_priv.h
>       sql/mysqld.cc
>       sql/rpl_record.cc
>       sql/rpl_rli.h
>       sql/rpl_utility.cc
>       sql/rpl_utility.h
>       sql/set_var.cc
>       sql/set_var.h
>       sql/share/errmsg.txt
>       sql/sql_class.h
>       sql/sql_select.cc
> === modified file '.bzrignore'
> --- a/.bzrignore	2009-09-29 15:38:40 +0000
> +++ b/.bzrignore	2009-12-04 10:53:15 +0000
> @@ -392,6 +392,7 @@ client/rpl_record_old.h
>  client/rpl_tblmap.h
>  client/rpl_tblmap.cc
>  client/rpl_utility.h
> +client/rpl_utility.cc
>  client/select_test
>  client/sql_string.cpp
>  client/ssl_test
> @@ -1142,6 +1143,7 @@ libmysqld/rpl_filter.cc
>  libmysqld/rpl_injector.cc
>  libmysqld/rpl_record.cc
>  libmysqld/rpl_record_old.cc
> +libmysqld/rpl_utility.cc
>  libmysqld/scheduler.cc
>  libmysqld/set_var.cc
>  libmysqld/simple-test
> 
> === modified file 'client/Makefile.am'
> --- a/client/Makefile.am	2009-09-03 12:29:25 +0000
> +++ b/client/Makefile.am	2009-12-04 10:53:15 +0000
> @@ -104,9 +104,10 @@ DEFS =			-DMYSQL_CLIENT_NO_THREADS \
>  			-DMYSQL_DATADIR="\"$(localstatedir)\""
>  
>  sql_src=log_event.h mysql_priv.h rpl_constants.h \
> -	rpl_utility.h rpl_tblmap.h rpl_tblmap.cc \
> +	rpl_tblmap.h rpl_tblmap.cc \
>  	log_event.cc my_decimal.h my_decimal.cc \
>  	log_event_old.h log_event_old.cc \
> +	rpl_utility.h rpl_utility.cc \
>  	rpl_record_old.h rpl_record_old.cc
>  strings_src=decimal.c
>  
> 
> === modified file 'client/mysqlbinlog.cc'
> --- a/client/mysqlbinlog.cc	2009-09-30 02:31:25 +0000
> +++ b/client/mysqlbinlog.cc	2009-12-04 10:53:15 +0000
> @@ -2108,4 +2108,4 @@ int main(int argc, char** argv)
>  #include "my_decimal.cc"
>  #include "log_event.cc"
>  #include "log_event_old.cc"
> -
> +#include "rpl_utility.cc"
> 
> === modified file 'libmysqld/Makefile.am'
> --- a/libmysqld/Makefile.am	2009-09-29 15:38:40 +0000
> +++ b/libmysqld/Makefile.am	2009-12-04 10:53:15 +0000
> @@ -55,7 +55,7 @@ sqlsources = derror.cc field.cc field_co
>  	item_geofunc.cc item_subselect.cc item_row.cc\
>  	item_xmlfunc.cc \
>  	key.cc lock.cc log.cc sql_state.c \
> -	log_event.cc rpl_record.cc \
> +	log_event.cc rpl_record.cc rpl_utility.cc \
>  	log_event_old.cc rpl_record_old.cc \
>  	protocol.cc net_serv.cc opt_range.cc \
>  	opt_sum.cc procedure.cc records.cc sql_acl.cc \
> 
> === added file 'mysql-test/extra/rpl_tests/check_type.inc'
> --- a/mysql-test/extra/rpl_tests/check_type.inc	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/extra/rpl_tests/check_type.inc	2009-12-04 10:53:15 +0000
> @@ -0,0 +1,50 @@
> +# Helper file to perform one insert of a value into a table with
> +# different types on master and slave.  The file will insert the
> +# result into the type_conversions table *on the slave* to get a
> +# summary of failing and succeeding tests.
> +
> +# Input:
> +#    $source_type      Type on the master
> +#    $target_type      Type on the slave

> +#    $source_value     Type on the master (inserted into the table)

s/Type/Value/

> +#    $target_value     Type on the slave (expected value in the table
> +#                      on the slave)

s/Type/Value/

> +#    $can_convert      True if conversion shall work, false if it
> +#                      shall generate an error 
> +
> +
> +connection master;
> +DROP TABLE IF EXISTS t1;
> +eval CREATE TABLE t1 (a $source_type);
> +sync_slave_with_master;
> +eval ALTER TABLE t1 MODIFY a $target_type;
> +
> +connection master;
> +eval INSERT INTO t1 VALUES($source_value);
> +if ($can_convert) {
> +  sync_slave_with_master;
> +  eval SELECT a = $target_value into @compare FROM t1;
> +  eval INSERT INTO type_conversions SET
> +       Source = '$source_type',
> +       Target = '$target_type',
> +       Flags = @@slave_type_conversions,
> +       On_Master = $source_value,
> +       Expected = $target_value,
> +       Compare = @compare;
> +  UPDATE type_conversions
> +     SET On_Slave = (SELECT a FROM t1)
> +   WHERE TestNo = LAST_INSERT_ID();
> +}
> +if (!$can_convert) {
> +  connection slave;
> +  wait_for_slave_to_stop;
> +  let $error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Error, 1);
> +  eval INSERT INTO type_conversions SET
> +       Source = '$source_type',
> +       Target = '$target_type',
> +       Flags = @@slave_type_conversions,
> +       On_Master = $source_value,
> +       Error = "$error";
> +  SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1;
> +  START SLAVE;
> +}
> 
> === added file 'mysql-test/extra/rpl_tests/type_conversions.test'
> --- a/mysql-test/extra/rpl_tests/type_conversions.test	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/extra/rpl_tests/type_conversions.test	2009-12-04 10:53:15 +0000
> @@ -0,0 +1,592 @@
> +# File containing different lossy and non-lossy type conversions.
> +
> +# Integral conversion testing, we do not reduce the test using
> +# transitivity of conversions since the implementation is not using a
> +# transitivity strategy. Instead we do an exhaustive testing.
> +
> +disable_query_log;
> +connection slave;
> +--let $conv = `select @@slave_type_conversions`
> +--echo **** Running tests with @@SLAVE_TYPE_CONVERSIONS = '$conv' ****
> +
> +let $if_is_lossy = `SELECT FIND_IN_SET('ALL_LOSSY', @@SLAVE_TYPE_CONVERSIONS)`;
> +let $if_is_non_lossy = `SELECT FIND_IN_SET('ALL_NON_LOSSY',
> @@SLAVE_TYPE_CONVERSIONS)`;
> +
> +connection master;
> +let $source_type  = TINYINT;
> +let $target_type  = TINYINT;
> +let $source_value = 1;
> +let $target_value = 1;
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type  = TINYINT;
> +let $target_type  = SMALLINT;
> +let $source_value = 1;
> +let $target_value = 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TINYINT;
> +let $target_type= MEDIUMINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TINYINT;
> +let $target_type= INT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TINYINT;
> +let $target_type= BIGINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= TINYINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= TINYINT;
> +let $source_value= 512;
> +let $target_value= 127;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= SMALLINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= MEDIUMINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= INT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= SMALLINT;
> +let $target_type= BIGINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;



> +
> +let $source_type= MEDIUMINT;
> +let $target_type= TINYINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= MEDIUMINT;
> +let $target_type= SMALLINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;


I think we need to check also the highest values.
Basicaly, any conversation that requeries the lossy option should
check the corner cases.


> +
> +let $source_type= MEDIUMINT;
> +let $target_type= MEDIUMINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= MEDIUMINT; 
> +let $target_type= INT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= MEDIUMINT;
> +let $target_type= BIGINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;

The same in here.

> +
> +let $source_type= INT;
> +let $target_type= TINYINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= INT;
> +let $target_type= SMALLINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= INT;
> +let $target_type= MEDIUMINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= INT;
> +let $target_type= INT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= INT;
> +let $target_type= BIGINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIGINT;
> +let $target_type= TINYINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIGINT;
> +let $target_type= SMALLINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIGINT;
> +let $target_type= MEDIUMINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIGINT;
> +let $target_type= INT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIGINT;
> +let $target_type= BIGINT;
> +let $source_value= 1;
> +let $target_value= 1;
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= CHAR(20);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert  = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= CHAR(30);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= CHAR(10);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnood';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= VARCHAR(20);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= VARCHAR(30);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= VARCHAR(10);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnood';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= TINYTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= TEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= MEDIUMTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= CHAR(20);
> +let $target_type= LONGTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= VARCHAR(20);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= VARCHAR(30);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= VARCHAR(10);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnood';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= CHAR(30);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= CHAR(10);
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnood';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= TINYTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= TEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= MEDIUMTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(20);
> +let $target_type= LONGTEXT;
> +let $source_value= 'Smoothnoodlemaps';
> +let $target_value= 'Smoothnoodlemaps';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $blob = `select repeat('abcd', 125)`;
> +let $truncated_blob = `select left('$blob', 255)`;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= VARCHAR(500);
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= VARCHAR(510);
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= VARCHAR(255);
> +let $source_value= '$blob';
> +let $target_value= '$truncated_blob';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;


I think we are missing some combinations:

TINYTEXT/TEXT,....


> +
> +let $source_type= VARCHAR(500);
> +let $target_type= TINYTEXT;
> +let $source_value= '$blob';
> +let $target_value= '$truncated_blob';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= TEXT;
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= MEDIUMTEXT;
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= VARCHAR(500);
> +let $target_type= LONGTEXT;
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $tiny_blob = `select repeat('tiny blob ', 25)`;
> +let $truncated_tiny_blob = `select left('$tiny_blob', 254)`;
> +
> +let $source_type= TINYTEXT;
> +let $target_type= VARCHAR(500);
> +let $source_value= '$tiny_blob';
> +let $target_value= '$tiny_blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TEXT;
> +let $target_type= VARCHAR(500);
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= MEDIUMTEXT;
> +let $target_type= VARCHAR(500);
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= LONGTEXT;
> +let $target_type= VARCHAR(500);
> +let $source_value= '$blob';
> +let $target_value= '$blob';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TINYTEXT;
> +let $target_type= CHAR(255);
> +let $source_value= '$tiny_blob';
> +let $target_value= '$tiny_blob';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TINYTEXT;
> +let $target_type= CHAR(250);
> +let $source_value= '$tiny_blob';
> +let $target_value= left('$tiny_blob', 250);
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= TEXT;
> +let $target_type= CHAR(255);
> +let $source_value= '$blob';
> +let $target_value= left('$blob', 255);
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= MEDIUMTEXT;
> +let $target_type= CHAR(255);
> +let $source_value= '$blob';
> +let $target_value= left('$blob', 255);
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= LONGTEXT;
> +let $target_type= CHAR(255);
> +let $source_value= '$blob';
> +let $target_value= left('$blob', 255);
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(10,5);
> +let $source_value= 3.14159;
> +let $target_value= 3.14159;
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(10,6);
> +let $source_value= 3.14159;
> +let $target_value= 3.141590;
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(11,5);
> +let $source_value= 3.14159;
> +let $target_value= 3.14159;
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(11,6);
> +let $source_value= 3.14159;
> +let $target_value= 3.141590;
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(10,4);
> +let $source_value= 3.14159;
> +let $target_value= 3.1416;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(9,5);
> +let $source_value= 3.14159;
> +let $target_value= 3.14159;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DECIMAL(9,4);
> +let $source_value= 3.14159;
> +let $target_value= 3.1416;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= FLOAT;
> +let $target_type= DECIMAL(10,5);
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DOUBLE;
> +let $target_type= DECIMAL(10,5);
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= FLOAT;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DECIMAL(10,5);
> +let $target_type= DOUBLE;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= FLOAT;
> +let $target_type= FLOAT;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DOUBLE;
> +let $target_type= DOUBLE;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= FLOAT;
> +let $target_type= DOUBLE;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= DOUBLE;
> +let $target_type= FLOAT;
> +let $source_value= 3.15625;
> +let $target_value= 3.15625;
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;


I think we need to check the corner cases.
See comments above.

> +
> +let $source_type= BIT(5);
> +let $target_type= BIT(5);
> +let $source_value= b'11001';
> +let $target_value= b'11001';
> +let $can_convert = 1;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIT(5);
> +let $target_type= BIT(6);
> +let $source_value= b'11001';
> +let $target_value= b'11001';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIT(6);
> +let $target_type= BIT(5);
> +let $source_value= b'111001';
> +let $target_value= b'11111';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;
> +
> +let $source_type= BIT(5);
> +let $target_type= BIT(12);
> +let $source_value= b'11001';
> +let $target_value= b'11001';
> +let $can_convert = $if_is_non_lossy;
> +source extra/rpl_tests/check_type.inc;
> +


> +let $source_type= BIT(12);
> +let $target_type= BIT(5);
> +let $source_value= b'101100111000';
> +let $target_value= b'11111';
> +let $can_convert = $if_is_lossy;
> +source extra/rpl_tests/check_type.inc;


I did not get the result for the bit type.

> +
> +disable_warnings;
> +source include/reset_master_and_slave.inc;
> +enable_warnings;
> +enable_query_log;
> \ No newline at end of file
> 
> === added file 'mysql-test/suite/rpl/r/rpl_typeconv.result'
> --- a/mysql-test/suite/rpl/r/rpl_typeconv.result	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/suite/rpl/r/rpl_typeconv.result	2009-12-04 10:53:15 +0000
> @@ -0,0 +1,396 @@
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +set @saved_slave_type_conversions = @@global.slave_type_conversions;
> +CREATE TABLE type_conversions (
> +TestNo INT AUTO_INCREMENT PRIMARY KEY,
> +Source TEXT,
> +Target TEXT,
> +Flags TEXT,
> +On_Master TEXT,
> +On_Slave TEXT,
> +Expected TEXT,
> +Compare INT,
> +Error TEXT);
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +ALL_LOSSY
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='';
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_NON_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +ALL_NON_LOSSY
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +ALL_LOSSY
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +ALL_LOSSY,ALL_NON_LOSSY
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY,NONEXISTING_BIT';
> +ERROR 42000: Variable 'slave_type_conversions' can't be set to the value of
> 'NONEXISTING_BIT'
> +SELECT @@global.slave_type_conversions;
> +@@global.slave_type_conversions
> +ALL_LOSSY,ALL_NON_LOSSY
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='';
> +**** Running tests with @@SLAVE_TYPE_CONVERSIONS = '' ****
> +Warnings:
> +Note	1051	Unknown table 't1'
> +include/stop_slave.inc
> +RESET SLAVE;
> +RESET MASTER;
> +include/start_slave.inc
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_NON_LOSSY';
> +**** Running tests with @@SLAVE_TYPE_CONVERSIONS = 'ALL_NON_LOSSY' ****
> +include/stop_slave.inc
> +RESET SLAVE;
> +RESET MASTER;
> +include/start_slave.inc
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY';
> +**** Running tests with @@SLAVE_TYPE_CONVERSIONS = 'ALL_LOSSY' ****
> +include/stop_slave.inc
> +RESET SLAVE;
> +RESET MASTER;
> +include/start_slave.inc
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY';
> +**** Running tests with @@SLAVE_TYPE_CONVERSIONS = 'ALL_LOSSY,ALL_NON_LOSSY' ****
> +include/stop_slave.inc
> +RESET SLAVE;
> +RESET MASTER;
> +include/start_slave.inc
> +**** Result of conversions ****
> +Source_Type	Target_Type	All_Type_Conversion_Flags	Value_On_Slave
> +TINYINT        	TINYINT        	                         	<Correct value>
> +TINYINT        	SMALLINT       	                         	<Correct error>
> +TINYINT        	MEDIUMINT      	                         	<Correct error>
> +TINYINT        	INT            	                         	<Correct error>
> +TINYINT        	BIGINT         	                         	<Correct error>
> +SMALLINT       	TINYINT        	                         	<Correct error>
> +SMALLINT       	TINYINT        	                         	<Correct error>
> +SMALLINT       	SMALLINT       	                         	<Correct value>
> +SMALLINT       	MEDIUMINT      	                         	<Correct error>
> +SMALLINT       	INT            	                         	<Correct error>
> +SMALLINT       	BIGINT         	                         	<Correct error>
> +MEDIUMINT      	TINYINT        	                         	<Correct error>
> +MEDIUMINT      	SMALLINT       	                         	<Correct error>
> +MEDIUMINT      	MEDIUMINT      	                         	<Correct value>
> +MEDIUMINT      	INT            	                         	<Correct error>
> +MEDIUMINT      	BIGINT         	                         	<Correct error>
> +INT            	TINYINT        	                         	<Correct error>
> +INT            	SMALLINT       	                         	<Correct error>
> +INT            	MEDIUMINT      	                         	<Correct error>
> +INT            	INT            	                         	<Correct value>
> +INT            	BIGINT         	                         	<Correct error>
> +BIGINT         	TINYINT        	                         	<Correct error>
> +BIGINT         	SMALLINT       	                         	<Correct error>
> +BIGINT         	MEDIUMINT      	                         	<Correct error>
> +BIGINT         	INT            	                         	<Correct error>
> +BIGINT         	BIGINT         	                         	<Correct value>
> +CHAR(20)       	CHAR(20)       	                         	<Correct value>
> +CHAR(20)       	CHAR(30)       	                         	<Correct error>
> +CHAR(20)       	CHAR(10)       	                         	<Correct error>
> +CHAR(20)       	VARCHAR(20)    	                         	<Correct error>
> +CHAR(20)       	VARCHAR(30)    	                         	<Correct error>
> +CHAR(20)       	VARCHAR(10)    	                         	<Correct error>
> +CHAR(20)       	TINYTEXT       	                         	<Correct error>
> +CHAR(20)       	TEXT           	                         	<Correct error>
> +CHAR(20)       	MEDIUMTEXT     	                         	<Correct error>
> +CHAR(20)       	LONGTEXT       	                         	<Correct error>
> +VARCHAR(20)    	VARCHAR(20)    	                         	<Correct value>
> +VARCHAR(20)    	VARCHAR(30)    	                         	<Correct error>
> +VARCHAR(20)    	VARCHAR(10)    	                         	<Correct error>
> +VARCHAR(20)    	CHAR(30)       	                         	<Correct error>
> +VARCHAR(20)    	CHAR(10)       	                         	<Correct error>
> +VARCHAR(20)    	TINYTEXT       	                         	<Correct error>
> +VARCHAR(20)    	TEXT           	                         	<Correct error>
> +VARCHAR(20)    	MEDIUMTEXT     	                         	<Correct error>
> +VARCHAR(20)    	LONGTEXT       	                         	<Correct error>
> +VARCHAR(500)   	VARCHAR(500)   	                         	<Correct value>
> +VARCHAR(500)   	VARCHAR(510)   	                         	<Correct error>
> +VARCHAR(500)   	VARCHAR(255)   	                         	<Correct error>
> +VARCHAR(500)   	TINYTEXT       	                         	<Correct error>
> +VARCHAR(500)   	TEXT           	                         	<Correct error>
> +VARCHAR(500)   	MEDIUMTEXT     	                         	<Correct error>
> +VARCHAR(500)   	LONGTEXT       	                         	<Correct error>
> +TINYTEXT       	VARCHAR(500)   	                         	<Correct error>
> +TEXT           	VARCHAR(500)   	                         	<Correct error>
> +MEDIUMTEXT     	VARCHAR(500)   	                         	<Correct error>
> +LONGTEXT       	VARCHAR(500)   	                         	<Correct error>
> +TINYTEXT       	CHAR(255)      	                         	<Correct error>
> +TINYTEXT       	CHAR(250)      	                         	<Correct error>
> +TEXT           	CHAR(255)      	                         	<Correct error>
> +MEDIUMTEXT     	CHAR(255)      	                         	<Correct error>
> +LONGTEXT       	CHAR(255)      	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(10,5)  	                         	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,6)  	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(11,5)  	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(11,6)  	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(10,4)  	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(9,5)   	                         	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(9,4)   	                         	<Correct error>
> +FLOAT          	DECIMAL(10,5)  	                         	<Correct error>
> +DOUBLE         	DECIMAL(10,5)  	                         	<Correct error>
> +DECIMAL(10,5)  	FLOAT          	                         	<Correct error>
> +DECIMAL(10,5)  	DOUBLE         	                         	<Correct error>
> +FLOAT          	FLOAT          	                         	<Correct value>
> +DOUBLE         	DOUBLE         	                         	<Correct value>
> +FLOAT          	DOUBLE         	                         	<Correct error>
> +DOUBLE         	FLOAT          	                         	<Correct error>
> +BIT(5)         	BIT(5)         	                         	<Correct value>
> +BIT(5)         	BIT(6)         	                         	<Correct error>
> +BIT(6)         	BIT(5)         	                         	<Correct error>
> +BIT(5)         	BIT(12)        	                         	<Correct error>
> +BIT(12)        	BIT(5)         	                         	<Correct error>
> +TINYINT        	TINYINT        	ALL_NON_LOSSY            	<Correct value>
> +TINYINT        	SMALLINT       	ALL_NON_LOSSY            	<Correct value>
> +TINYINT        	MEDIUMINT      	ALL_NON_LOSSY            	<Correct value>
> +TINYINT        	INT            	ALL_NON_LOSSY            	<Correct value>
> +TINYINT        	BIGINT         	ALL_NON_LOSSY            	<Correct value>
> +SMALLINT       	TINYINT        	ALL_NON_LOSSY            	<Correct error>
> +SMALLINT       	TINYINT        	ALL_NON_LOSSY            	<Correct error>
> +SMALLINT       	SMALLINT       	ALL_NON_LOSSY            	<Correct value>
> +SMALLINT       	MEDIUMINT      	ALL_NON_LOSSY            	<Correct value>
> +SMALLINT       	INT            	ALL_NON_LOSSY            	<Correct value>
> +SMALLINT       	BIGINT         	ALL_NON_LOSSY            	<Correct value>
> +MEDIUMINT      	TINYINT        	ALL_NON_LOSSY            	<Correct error>
> +MEDIUMINT      	SMALLINT       	ALL_NON_LOSSY            	<Correct error>
> +MEDIUMINT      	MEDIUMINT      	ALL_NON_LOSSY            	<Correct value>
> +MEDIUMINT      	INT            	ALL_NON_LOSSY            	<Correct value>
> +MEDIUMINT      	BIGINT         	ALL_NON_LOSSY            	<Correct value>
> +INT            	TINYINT        	ALL_NON_LOSSY            	<Correct error>
> +INT            	SMALLINT       	ALL_NON_LOSSY            	<Correct error>
> +INT            	MEDIUMINT      	ALL_NON_LOSSY            	<Correct error>
> +INT            	INT            	ALL_NON_LOSSY            	<Correct value>
> +INT            	BIGINT         	ALL_NON_LOSSY            	<Correct value>
> +BIGINT         	TINYINT        	ALL_NON_LOSSY            	<Correct error>
> +BIGINT         	SMALLINT       	ALL_NON_LOSSY            	<Correct error>
> +BIGINT         	MEDIUMINT      	ALL_NON_LOSSY            	<Correct error>
> +BIGINT         	INT            	ALL_NON_LOSSY            	<Correct error>
> +BIGINT         	BIGINT         	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	CHAR(20)       	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	CHAR(30)       	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	CHAR(10)       	ALL_NON_LOSSY            	<Correct error>
> +CHAR(20)       	VARCHAR(20)    	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	VARCHAR(30)    	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	VARCHAR(10)    	ALL_NON_LOSSY            	<Correct error>
> +CHAR(20)       	TINYTEXT       	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	TEXT           	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	MEDIUMTEXT     	ALL_NON_LOSSY            	<Correct value>
> +CHAR(20)       	LONGTEXT       	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	VARCHAR(20)    	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	VARCHAR(30)    	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	VARCHAR(10)    	ALL_NON_LOSSY            	<Correct error>
> +VARCHAR(20)    	CHAR(30)       	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	CHAR(10)       	ALL_NON_LOSSY            	<Correct error>
> +VARCHAR(20)    	TINYTEXT       	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	TEXT           	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	MEDIUMTEXT     	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(20)    	LONGTEXT       	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(500)   	VARCHAR(500)   	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(500)   	VARCHAR(510)   	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(500)   	VARCHAR(255)   	ALL_NON_LOSSY            	<Correct error>
> +VARCHAR(500)   	TINYTEXT       	ALL_NON_LOSSY            	<Correct error>
> +VARCHAR(500)   	TEXT           	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(500)   	MEDIUMTEXT     	ALL_NON_LOSSY            	<Correct value>
> +VARCHAR(500)   	LONGTEXT       	ALL_NON_LOSSY            	<Correct value>
> +TINYTEXT       	VARCHAR(500)   	ALL_NON_LOSSY            	<Correct value>
> +TEXT           	VARCHAR(500)   	ALL_NON_LOSSY            	<Correct error>
> +MEDIUMTEXT     	VARCHAR(500)   	ALL_NON_LOSSY            	<Correct error>
> +LONGTEXT       	VARCHAR(500)   	ALL_NON_LOSSY            	<Correct error>
> +TINYTEXT       	CHAR(255)      	ALL_NON_LOSSY            	<Correct value>
> +TINYTEXT       	CHAR(250)      	ALL_NON_LOSSY            	<Correct error>
> +TEXT           	CHAR(255)      	ALL_NON_LOSSY            	<Correct error>
> +MEDIUMTEXT     	CHAR(255)      	ALL_NON_LOSSY            	<Correct error>
> +LONGTEXT       	CHAR(255)      	ALL_NON_LOSSY            	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(10,5)  	ALL_NON_LOSSY            	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,6)  	ALL_NON_LOSSY            	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(11,5)  	ALL_NON_LOSSY            	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(11,6)  	ALL_NON_LOSSY            	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,4)  	ALL_NON_LOSSY            	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(9,5)   	ALL_NON_LOSSY            	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(9,4)   	ALL_NON_LOSSY            	<Correct error>
> +FLOAT          	DECIMAL(10,5)  	ALL_NON_LOSSY            	<Correct error>
> +DOUBLE         	DECIMAL(10,5)  	ALL_NON_LOSSY            	<Correct error>
> +DECIMAL(10,5)  	FLOAT          	ALL_NON_LOSSY            	<Correct error>
> +DECIMAL(10,5)  	DOUBLE         	ALL_NON_LOSSY            	<Correct error>
> +FLOAT          	FLOAT          	ALL_NON_LOSSY            	<Correct value>
> +DOUBLE         	DOUBLE         	ALL_NON_LOSSY            	<Correct value>
> +FLOAT          	DOUBLE         	ALL_NON_LOSSY            	<Correct value>
> +DOUBLE         	FLOAT          	ALL_NON_LOSSY            	<Correct error>
> +BIT(5)         	BIT(5)         	ALL_NON_LOSSY            	<Correct value>
> +BIT(5)         	BIT(6)         	ALL_NON_LOSSY            	<Correct value>
> +BIT(6)         	BIT(5)         	ALL_NON_LOSSY            	<Correct error>
> +BIT(5)         	BIT(12)        	ALL_NON_LOSSY            	<Correct value>
> +BIT(12)        	BIT(5)         	ALL_NON_LOSSY            	<Correct error>
> +TINYINT        	TINYINT        	ALL_LOSSY                	<Correct value>
> +TINYINT        	SMALLINT       	ALL_LOSSY                	<Correct error>
> +TINYINT        	MEDIUMINT      	ALL_LOSSY                	<Correct error>
> +TINYINT        	INT            	ALL_LOSSY                	<Correct error>
> +TINYINT        	BIGINT         	ALL_LOSSY                	<Correct error>
> +SMALLINT       	TINYINT        	ALL_LOSSY                	<Correct value>
> +SMALLINT       	TINYINT        	ALL_LOSSY                	<Correct value>
> +SMALLINT       	SMALLINT       	ALL_LOSSY                	<Correct value>
> +SMALLINT       	MEDIUMINT      	ALL_LOSSY                	<Correct error>
> +SMALLINT       	INT            	ALL_LOSSY                	<Correct error>
> +SMALLINT       	BIGINT         	ALL_LOSSY                	<Correct error>
> +MEDIUMINT      	TINYINT        	ALL_LOSSY                	<Correct value>
> +MEDIUMINT      	SMALLINT       	ALL_LOSSY                	<Correct value>
> +MEDIUMINT      	MEDIUMINT      	ALL_LOSSY                	<Correct value>
> +MEDIUMINT      	INT            	ALL_LOSSY                	<Correct error>
> +MEDIUMINT      	BIGINT         	ALL_LOSSY                	<Correct error>
> +INT            	TINYINT        	ALL_LOSSY                	<Correct value>
> +INT            	SMALLINT       	ALL_LOSSY                	<Correct value>
> +INT            	MEDIUMINT      	ALL_LOSSY                	<Correct value>
> +INT            	INT            	ALL_LOSSY                	<Correct value>
> +INT            	BIGINT         	ALL_LOSSY                	<Correct error>
> +BIGINT         	TINYINT        	ALL_LOSSY                	<Correct value>
> +BIGINT         	SMALLINT       	ALL_LOSSY                	<Correct value>
> +BIGINT         	MEDIUMINT      	ALL_LOSSY                	<Correct value>
> +BIGINT         	INT            	ALL_LOSSY                	<Correct value>
> +BIGINT         	BIGINT         	ALL_LOSSY                	<Correct value>
> +CHAR(20)       	CHAR(20)       	ALL_LOSSY                	<Correct value>
> +CHAR(20)       	CHAR(30)       	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	CHAR(10)       	ALL_LOSSY                	<Correct value>
> +CHAR(20)       	VARCHAR(20)    	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	VARCHAR(30)    	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	VARCHAR(10)    	ALL_LOSSY                	<Correct value>
> +CHAR(20)       	TINYTEXT       	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	TEXT           	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	MEDIUMTEXT     	ALL_LOSSY                	<Correct error>
> +CHAR(20)       	LONGTEXT       	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	VARCHAR(20)    	ALL_LOSSY                	<Correct value>
> +VARCHAR(20)    	VARCHAR(30)    	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	VARCHAR(10)    	ALL_LOSSY                	<Correct value>
> +VARCHAR(20)    	CHAR(30)       	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	CHAR(10)       	ALL_LOSSY                	<Correct value>
> +VARCHAR(20)    	TINYTEXT       	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	TEXT           	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	MEDIUMTEXT     	ALL_LOSSY                	<Correct error>
> +VARCHAR(20)    	LONGTEXT       	ALL_LOSSY                	<Correct error>
> +VARCHAR(500)   	VARCHAR(500)   	ALL_LOSSY                	<Correct value>
> +VARCHAR(500)   	VARCHAR(510)   	ALL_LOSSY                	<Correct error>
> +VARCHAR(500)   	VARCHAR(255)   	ALL_LOSSY                	<Correct value>
> +VARCHAR(500)   	TINYTEXT       	ALL_LOSSY                	<Correct value>
> +VARCHAR(500)   	TEXT           	ALL_LOSSY                	<Correct error>
> +VARCHAR(500)   	MEDIUMTEXT     	ALL_LOSSY                	<Correct error>
> +VARCHAR(500)   	LONGTEXT       	ALL_LOSSY                	<Correct error>
> +TINYTEXT       	VARCHAR(500)   	ALL_LOSSY                	<Correct error>
> +TEXT           	VARCHAR(500)   	ALL_LOSSY                	<Correct value>
> +MEDIUMTEXT     	VARCHAR(500)   	ALL_LOSSY                	<Correct value>
> +LONGTEXT       	VARCHAR(500)   	ALL_LOSSY                	<Correct value>
> +TINYTEXT       	CHAR(255)      	ALL_LOSSY                	<Correct error>
> +TINYTEXT       	CHAR(250)      	ALL_LOSSY                	<Correct value>
> +TEXT           	CHAR(255)      	ALL_LOSSY                	<Correct value>
> +MEDIUMTEXT     	CHAR(255)      	ALL_LOSSY                	<Correct value>
> +LONGTEXT       	CHAR(255)      	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,5)  	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,6)  	ALL_LOSSY                	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(11,5)  	ALL_LOSSY                	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(11,6)  	ALL_LOSSY                	<Correct error>
> +DECIMAL(10,5)  	DECIMAL(10,4)  	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(9,5)   	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(9,4)   	ALL_LOSSY                	<Correct value>
> +FLOAT          	DECIMAL(10,5)  	ALL_LOSSY                	<Correct value>
> +DOUBLE         	DECIMAL(10,5)  	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	FLOAT          	ALL_LOSSY                	<Correct value>
> +DECIMAL(10,5)  	DOUBLE         	ALL_LOSSY                	<Correct value>
> +FLOAT          	FLOAT          	ALL_LOSSY                	<Correct value>
> +DOUBLE         	DOUBLE         	ALL_LOSSY                	<Correct value>
> +FLOAT          	DOUBLE         	ALL_LOSSY                	<Correct error>
> +DOUBLE         	FLOAT          	ALL_LOSSY                	<Correct value>
> +BIT(5)         	BIT(5)         	ALL_LOSSY                	<Correct value>
> +BIT(5)         	BIT(6)         	ALL_LOSSY                	<Correct error>
> +BIT(6)         	BIT(5)         	ALL_LOSSY                	<Correct value>
> +BIT(5)         	BIT(12)        	ALL_LOSSY                	<Correct error>
> +BIT(12)        	BIT(5)         	ALL_LOSSY                	<Correct value>
> +TINYINT        	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYINT        	SMALLINT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYINT        	MEDIUMINT      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYINT        	INT            	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYINT        	BIGINT         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	SMALLINT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	MEDIUMINT      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	INT            	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +SMALLINT       	BIGINT         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMINT      	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMINT      	SMALLINT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMINT      	MEDIUMINT      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMINT      	INT            	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMINT      	BIGINT         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +INT            	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +INT            	SMALLINT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +INT            	MEDIUMINT      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +INT            	INT            	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +INT            	BIGINT         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIGINT         	TINYINT        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIGINT         	SMALLINT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIGINT         	MEDIUMINT      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIGINT         	INT            	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIGINT         	BIGINT         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	CHAR(20)       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	CHAR(30)       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	CHAR(10)       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	VARCHAR(20)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	VARCHAR(30)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	VARCHAR(10)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	TINYTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	TEXT           	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	MEDIUMTEXT     	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +CHAR(20)       	LONGTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	VARCHAR(20)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	VARCHAR(30)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	VARCHAR(10)    	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	CHAR(30)       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	CHAR(10)       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	TINYTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	TEXT           	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	MEDIUMTEXT     	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(20)    	LONGTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	VARCHAR(500)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	VARCHAR(510)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	VARCHAR(255)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	TINYTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	TEXT           	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	MEDIUMTEXT     	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +VARCHAR(500)   	LONGTEXT       	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYTEXT       	VARCHAR(500)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TEXT           	VARCHAR(500)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMTEXT     	VARCHAR(500)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +LONGTEXT       	VARCHAR(500)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYTEXT       	CHAR(255)      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TINYTEXT       	CHAR(250)      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +TEXT           	CHAR(255)      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +MEDIUMTEXT     	CHAR(255)      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +LONGTEXT       	CHAR(255)      	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,5)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,6)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(11,5)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(11,6)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(10,4)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(9,5)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DECIMAL(9,4)   	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +FLOAT          	DECIMAL(10,5)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DOUBLE         	DECIMAL(10,5)  	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	FLOAT          	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DECIMAL(10,5)  	DOUBLE         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +FLOAT          	FLOAT          	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DOUBLE         	DOUBLE         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +FLOAT          	DOUBLE         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DOUBLE         	FLOAT          	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIT(5)         	BIT(5)         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIT(5)         	BIT(6)         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIT(6)         	BIT(5)         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIT(5)         	BIT(12)        	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +BIT(12)        	BIT(5)         	ALL_LOSSY,ALL_NON_LOSSY  	<Correct value>
> +DROP TABLE type_conversions;
> +DROP TABLE t1;
> +set global slave_type_conversions = @saved_slave_type_conversions;
> 
> === added file 'mysql-test/suite/rpl/t/rpl_typeconv.test'
> --- a/mysql-test/suite/rpl/t/rpl_typeconv.test	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/suite/rpl/t/rpl_typeconv.test	2009-12-04 10:53:15 +0000
> @@ -0,0 +1,69 @@
> +--source include/have_binlog_format_row.inc
> +--source include/master-slave.inc
> +
> +connection slave;
> +set @saved_slave_type_conversions = @@global.slave_type_conversions;
> +CREATE TABLE type_conversions (
> +       TestNo INT AUTO_INCREMENT PRIMARY KEY,
> +       Source TEXT,
> +       Target TEXT,
> +       Flags TEXT,
> +       On_Master TEXT,
> +       On_Slave TEXT,
> +       Expected TEXT,
> +       Compare INT,
> +       Error TEXT);
> +
> +SELECT @@global.slave_type_conversions;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='';
> +SELECT @@global.slave_type_conversions;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_NON_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY';
> +SELECT @@global.slave_type_conversions;
> +--error ER_WRONG_VALUE_FOR_VAR
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY,NONEXISTING_BIT';
> +SELECT @@global.slave_type_conversions;
> +
> +# Checking strict interpretation of type conversions
> +connection slave;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='';
> +source extra/rpl_tests/type_conversions.test;
> +
> +# Checking lossy integer type conversions
> +connection slave;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_NON_LOSSY';
> +source extra/rpl_tests/type_conversions.test;
> +
> +# Checking non-lossy integer type conversions
> +connection slave;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY';
> +source extra/rpl_tests/type_conversions.test;
> +
> +# Checking all type conversions
> +connection slave;
> +SET GLOBAL SLAVE_TYPE_CONVERSIONS='ALL_LOSSY,ALL_NON_LOSSY';
> +source extra/rpl_tests/type_conversions.test;
> +
> +connection slave;
> +--echo **** Result of conversions ****
> +disable_query_log;
> +SELECT RPAD(Source, 15, ' ') AS Source_Type,
> +       RPAD(Target, 15, ' ') AS Target_Type,
> +       RPAD(Flags, 25, ' ') AS All_Type_Conversion_Flags,
> +       IF(Compare IS NULL AND Error IS NOT NULL, '<Correct error>',
> +          IF(Compare, '<Correct value>',
> +             CONCAT("'", On_Slave, "' != '", Expected, "'")))
> +         AS Value_On_Slave
> +  FROM type_conversions;
> +enable_query_log;
> +DROP TABLE type_conversions;
> +
> +connection master;
> +DROP TABLE t1;
> +sync_slave_with_master;
> +
> +set global slave_type_conversions = @saved_slave_type_conversions;
> +
> 
> === modified file 'sql/field.cc'
> --- a/sql/field.cc	2009-09-29 10:58:42 +0000
> +++ b/sql/field.cc	2009-12-04 10:53:15 +0000
> @@ -59,6 +59,8 @@ const char field_separator=',';
>  #define ASSERT_COLUMN_MARKED_FOR_READ DBUG_ASSERT(!table || (!table->read_set ||
> bitmap_is_set(table->read_set, field_index)))
>  #define ASSERT_COLUMN_MARKED_FOR_WRITE DBUG_ASSERT(!table || (!table->write_set
> || bitmap_is_set(table->write_set, field_index)))
>  
> +#define FLAGSTR(S,F) ((S) & (F) ? #F " " : "")
> +
>  /*
>    Rules for merging different types of fields in UNION
>  
> @@ -997,6 +999,21 @@ test_if_important_data(CHARSET_INFO *cs,
>  
>  
>  /**
> +   Template function to compare two objects.
> + */
> +namespace {
> +  template <class A_type, class B_type>
> +  int compare(A_type a, B_type b)
> +  {
> +    if (a < b)
> +      return -1;
> +    if (b < a)
> +      return 1;
> +    return 0;
> +  }
> +}
> +
> +/**
>    Detect Item_result by given field type of UNION merge result.
>  
>    @param field_type  given field type
> @@ -1367,22 +1384,46 @@ bool Field::send_binary(Protocol *protoc
>  /**
>     Check to see if field size is compatible with destination.
>  
> -   This method is used in row-based replication to verify that the slave's
> -   field size is less than or equal to the master's field size. The 
> -   encoded field metadata (from the master or source) is decoded and compared
> -   to the size of this field (the slave or destination). 
> +   This method is used in row-based replication to verify that the
> +   slave's field size is less than or equal to the master's field
> +   size. The encoded field metadata (from the master or source) is
> +   decoded and compared to the size of this field (the slave or
> +   destination).
>  
> -   @param   field_metadata   Encoded size in field metadata
> +   @note
>  
> -   @retval 0 if this field's size is < the source field's size
> -   @retval 1 if this field's size is >= the source field's size
> -*/
> -int Field::compatible_field_size(uint field_metadata,
> -                                 const Relay_log_info *rli_arg
> __attribute__((unused)))
> +   The comparison is made so that if the source data (from the master)
> +   is less than the target data (on the slave), -1 is returned in @c
> +   <code>*order_var</code>. This implies that a conversion is
> +   necessary, but that it is lossy and can result in truncation of the
> +   value.
> +
> +   If the source data is strictly greater than the target data, 1 is
> +   returned in <code>*order_var</code>. This implies that the source
> +   type can is contained in the target type and that a conversion is
> +   necessary but is non-lossy.
> +
> +   If no conversion is required to fit the source type in the target
> +   type, 0 is returned in <code>*order_var</code>.
> +
> +   @param   field_metadata   Encoded size in field metadata
> +   @param   order_var        Pointer to variable where the order
> +                             between the source field and this field
> +                             will be returned.
> +
> +   @return @c true if this field's size is compatible with the
> +   master's field size, @c false otherwise.
> +*/
> +bool Field::compatible_field_size(uint field_metadata,
> +                                  Relay_log_info *rli_arg __attribute__((unused)),
> +                                  int *order_var)
>  {
>    uint const source_size= pack_length_from_metadata(field_metadata);
>    uint const destination_size= row_pack_length();
> -  return (source_size <= destination_size);
> +  DBUG_PRINT("debug", ("real_type: %d, source_size: %u, destination_size: %u",
> +                       real_type(), source_size, destination_size));
> +  *order_var = compare(source_size, destination_size);
> +  return true;


--->


>  }
>  
>  
> @@ -2907,33 +2948,15 @@ uint Field_new_decimal::pack_length_from
>  }
>  
>  
> -/**
> -   Check to see if field size is compatible with destination.
> -
> -   This method is used in row-based replication to verify that the slave's
> -   field size is less than or equal to the master's field size. The 
> -   encoded field metadata (from the master or source) is decoded and compared
> -   to the size of this field (the slave or destination). 
> -
> -   @param   field_metadata   Encoded size in field metadata
> -
> -   @retval 0 if this field's size is < the source field's size
> -   @retval 1 if this field's size is >= the source field's size
> -*/
> -int Field_new_decimal::compatible_field_size(uint field_metadata,
> -                                             const Relay_log_info *
> __attribute__((unused)))
> +bool Field_new_decimal::compatible_field_size(uint field_metadata,
> +                                              Relay_log_info *
> __attribute__((unused)),
> +                                              int *order_var)
>  {
> -  int compatible= 0;
>    uint const source_precision= (field_metadata >> 8U) & 0x00ff;
>    uint const source_decimal= field_metadata & 0x00ff; 
> -  uint const source_size= my_decimal_get_binary_size(source_precision, 
> -                                                     source_decimal);
> -  uint const destination_size= row_pack_length();
> -  compatible= (source_size <= destination_size);
> -  if (compatible)
> -    compatible= (source_precision <= precision) &&
> -                (source_decimal <= decimals());
> -  return (compatible);
> +  int order= compare(source_precision, precision);
> +  *order_var= order != 0 ? order : compare(source_decimal, dec);
> +  return true;
>  }
>  
>  
> @@ -6707,8 +6730,10 @@ check_field_for_37426(const void *param_
>  }
>  #endif
>  
> -int Field_string::compatible_field_size(uint field_metadata,
> -                                        const Relay_log_info *rli_arg)
> +bool
> +Field_string::compatible_field_size(uint field_metadata,
> +                                    Relay_log_info *rli_arg,
> +                                    int *order_var)
>  {
>  #ifdef HAVE_REPLICATION
>    const Check_field_param check_param = { this };
> @@ -6716,7 +6741,7 @@ int Field_string::compatible_field_size(
>                           check_field_for_37426, &check_param))
>      return FALSE;                        // Not compatible field sizes
>  #endif
> -  return Field::compatible_field_size(field_metadata, rli_arg);
> +  return Field::compatible_field_size(field_metadata, rli_arg, order_var);
>  }
>  
>  


> @@ -7625,6 +7650,7 @@ Field_blob::Field_blob(uchar *ptr_arg, u
>                   cs),
>     packlength(blob_pack_length)
>  {
> +  DBUG_ASSERT(blob_pack_length <= 4); // Only pack lengths 1-4 supported
> currently


Can you explain this?


>    flags|= BLOB_FLAG;
>    share->blob_fields++;
>    /* TODO: why do not fill table->s->blob_field array here? */
> @@ -8035,8 +8061,10 @@ int Field_blob::key_cmp(const uchar *a,c
>  */
>  int Field_blob::do_save_field_metadata(uchar *metadata_ptr)
>  {
> +  DBUG_ENTER("Field_blob::do_save_field_metadata");
>    *metadata_ptr= pack_length_no_ptr();
> -  return 1;
> +  DBUG_PRINT("debug", ("metadata: %u (pack_length_no_ptr)", *metadata_ptr));
> +  DBUG_RETURN(1);
>  }
>  
>  
> @@ -8977,6 +9005,9 @@ Field_bit::Field_bit(uchar *ptr_arg, uin
>      bit_ptr(bit_ptr_arg), bit_ofs(bit_ofs_arg), bit_len(len_arg & 7),
>      bytes_in_rec(len_arg / 8)
>  {
> +  DBUG_ENTER("Field_bit::Field_bit");
> +  DBUG_PRINT("enter", ("ptr_arg: %p, null_ptr_arg: %p, len_arg: %u, bit_len: %u,
> bytes_in_rec: %u",
> +                       ptr_arg, null_ptr_arg, len_arg, bit_len, bytes_in_rec));
>    flags|= UNSIGNED_FLAG;
>    /*
>      Ensure that Field::eq() can distinguish between two different bit fields.
> @@ -8984,6 +9015,7 @@ Field_bit::Field_bit(uchar *ptr_arg, uin
>    */
>    if (!null_ptr_arg)
>      null_bit= bit_ofs_arg;
> +  DBUG_VOID_RETURN;
>  }
>  
>  
> @@ -9268,9 +9300,12 @@ uint Field_bit::get_key_image(uchar *buf
>  */
>  int Field_bit::do_save_field_metadata(uchar *metadata_ptr)
>  {
> +  DBUG_ENTER("Field_bit::do_save_field_metadata");
> +  DBUG_PRINT("debug", ("bit_len: %d, bytes_in_rec: %d",
> +                       bit_len, bytes_in_rec));
>    *metadata_ptr= bit_len;
>    *(metadata_ptr + 1)= bytes_in_rec;
> -  return 2;
> +  DBUG_RETURN(2);
>  }
>  
>  
> @@ -9295,34 +9330,20 @@ uint Field_bit::pack_length_from_metadat
>  }
>  
>  
> -/**
> -   Check to see if field size is compatible with destination.
> -
> -   This method is used in row-based replication to verify that the slave's
> -   field size is less than or equal to the master's field size. The 
> -   encoded field metadata (from the master or source) is decoded and compared
> -   to the size of this field (the slave or destination). 
> -
> -   @param   field_metadata   Encoded size in field metadata
> -
> -   @retval 0 if this field's size is < the source field's size
> -   @retval 1 if this field's size is >= the source field's size
> -*/
> -int Field_bit::compatible_field_size(uint field_metadata,
> -                                     const Relay_log_info *
> __attribute__((unused)))
> -{
> -  int compatible= 0;
> -  uint const source_size= pack_length_from_metadata(field_metadata);
> -  uint const destination_size= row_pack_length();
> -  uint const from_bit_len= field_metadata & 0x00ff;
> -  uint const from_len= (field_metadata >> 8U) & 0x00ff;
> -  if ((bit_len == 0) || (from_bit_len == 0))
> -    compatible= (source_size <= destination_size);
> -  else if (from_bit_len > bit_len)
> -    compatible= (from_len < bytes_in_rec);
> -  else
> -    compatible= ((from_bit_len <= bit_len) && (from_len <=
> bytes_in_rec));
> -  return (compatible);
> +bool
> +Field_bit::compatible_field_size(uint field_metadata,
> +                                 Relay_log_info * __attribute__((unused)),
> +                                 int *order_var)
> +{
> +  DBUG_ENTER("Field_bit::compatible_field_size");
> +  DBUG_ASSERT((field_metadata >> 16) == 0);
> +  uint const from_bit_len=
> +    8 * (field_metadata >> 8) + (field_metadata & 0xff);
> +  uint const to_bit_len= max_display_length();
> +  DBUG_PRINT("debug", ("from_bit_len: %u, to_bit_len: %u",
> +                       from_bit_len, to_bit_len));
> +  *order_var= compare(from_bit_len, to_bit_len);
> +  DBUG_RETURN(TRUE);
>  }
>  
>  
> @@ -9388,8 +9409,15 @@ const uchar *
>  Field_bit::unpack(uchar *to, const uchar *from, uint param_data,
>                    bool low_byte_first __attribute__((unused)))
>  {
> +  DBUG_ENTER("Field_bit::unpack");
> +  DBUG_PRINT("enter", ("to: %p, from: %p, param_data: 0x%x",
> +                       to, from, param_data));
> +  DBUG_PRINT("debug", ("bit_ptr: %p, bit_len: %u, bit_ofs: %u",
> +                       bit_ptr, bit_len, bit_ofs));
>    uint const from_len= (param_data >> 8U) & 0x00ff;
>    uint const from_bit_len= param_data & 0x00ff;
> +  DBUG_PRINT("debug", ("from_len: %u, from_bit_len: %u",
> +                       from_len, from_bit_len));
>    /*
>      If the parameter data is zero (i.e., undefined), or if the master
>      and slave have the same sizes, then use the old unpack() method.
> @@ -9410,7 +9438,7 @@ Field_bit::unpack(uchar *to, const uchar
>        from++;
>      }
>      memcpy(to, from, bytes_in_rec);
> -    return from + bytes_in_rec;
> +    DBUG_RETURN(from + bytes_in_rec);
>    }
>  
>    /*
> @@ -9436,7 +9464,7 @@ Field_bit::unpack(uchar *to, const uchar
>    bitmap_set_bit(table->write_set,field_index);
>    store(value, new_len, system_charset_info);
>    my_afree(value);
> -  return from + len;
> +  DBUG_RETURN(from + len);
>  }
>  
>  
> @@ -9564,8 +9592,11 @@ void Create_field::create_length_to_inte
>  */
>  void Create_field::init_for_tmp_table(enum_field_types sql_type_arg,
>                                        uint32 length_arg, uint32 decimals_arg,
> -                                      bool maybe_null, bool is_unsigned)
> +                                      bool maybe_null, bool is_unsigned,
> +                                      uint pack_length)
>  {
> +  DBUG_ENTER("Create_field::init_for_tmp_table");
> +
>    field_name= "";
>    sql_type= sql_type_arg;
>    char_length= length= length_arg;;
> @@ -9573,10 +9604,78 @@ void Create_field::init_for_tmp_table(en
>    interval= 0;
>    charset= &my_charset_bin;
>    geom_type= Field::GEOM_GEOMETRY;
> -  pack_flag= (FIELDFLAG_NUMBER |
> -              ((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT)
> |
> -              (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
> -              (is_unsigned ? 0 : FIELDFLAG_DECIMAL));
> +
> +  /*
> +    These pack flags are crafted to get it correctly through the
> +    branches of make_field().
> +   */
> +  switch (sql_type_arg)
> +  {
> +  case MYSQL_TYPE_VARCHAR:
> +  case MYSQL_TYPE_VAR_STRING:
> +  case MYSQL_TYPE_STRING:
> +    pack_flag= 0;
> +    break;
> +
> +  case MYSQL_TYPE_GEOMETRY:
> +    pack_flag= FIELDFLAG_GEOM;
> +    break;
> +
> +  case MYSQL_TYPE_ENUM:
> +    pack_flag= FIELDFLAG_INTERVAL;
> +  case MYSQL_TYPE_SET:
> +    break;
> +
> +  case MYSQL_TYPE_TINY_BLOB:
> +  case MYSQL_TYPE_MEDIUM_BLOB:
> +  case MYSQL_TYPE_LONG_BLOB:
> +  case MYSQL_TYPE_BLOB:
> +    pack_flag= FIELDFLAG_BLOB;
> +    break;
> +
> +  default:
> +    pack_flag= FIELDFLAG_NUMBER;
> +    break;
> +  }
> +
> +  /*
> +    Set the pack flag correctly for the blob-like types. This sets the
> +    packtype to something that make_field can use. If the pack type is
> +    not set correctly, the packlength will be reeeeally wierd (like
> +    129 or so).
> +   */
> +  switch (sql_type_arg)
> +  {
> +  case MYSQL_TYPE_ENUM:
> +  case MYSQL_TYPE_SET:
> +  case MYSQL_TYPE_TINY_BLOB:
> +  case MYSQL_TYPE_MEDIUM_BLOB:
> +  case MYSQL_TYPE_LONG_BLOB:
> +  case MYSQL_TYPE_BLOB:
> +  case MYSQL_TYPE_GEOMETRY:
> +    // If you are going to use the above types, you have to pass a
> +    // pack_length as parameter. Assert that is really done.
> +    DBUG_ASSERT(pack_length != ~0U);
> +    pack_flag|= pack_length_to_packflag(pack_length);
> +    break;
> +  default:
> +    /* Nothing */
> +    break;
> +  }
> +
> +  pack_flag|=
> +    ((decimals_arg & FIELDFLAG_MAX_DEC) << FIELDFLAG_DEC_SHIFT) |
> +    (maybe_null ? FIELDFLAG_MAYBE_NULL : 0) |
> +    (is_unsigned ? 0 : FIELDFLAG_DECIMAL);
> +
> +  DBUG_PRINT("debug", ("pack_flag: %s%s%s%s%s, pack_type: %d",
> +                       FLAGSTR(pack_flag, FIELDFLAG_BINARY),
> +                       FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
> +                       FLAGSTR(pack_flag, FIELDFLAG_INTERVAL),
> +                       FLAGSTR(pack_flag, FIELDFLAG_GEOM),
> +                       FLAGSTR(pack_flag, FIELDFLAG_BLOB),
> +                       f_packtype(pack_flag)));
> +  DBUG_VOID_RETURN;
>  }
>  
>  
> @@ -10073,6 +10172,12 @@ Field *make_field(TABLE_SHARE *share, uc
>    default: break;
>    }
>  
> +  DBUG_PRINT("debug", ("pack_flag: %s%s%s%s",
> +                       FLAGSTR(pack_flag, FIELDFLAG_BINARY),
> +                       FLAGSTR(pack_flag, FIELDFLAG_NUMBER),
> +                       FLAGSTR(pack_flag, FIELDFLAG_PACK),
> +                       FLAGSTR(pack_flag, FIELDFLAG_BLOB)));
> +
>    if (f_is_alpha(pack_flag))
>    {
>      if (!f_is_packed(pack_flag))
> 
> === modified file 'sql/field.h'
> --- a/sql/field.h	2009-09-29 10:58:42 +0000
> +++ b/sql/field.h	2009-12-04 10:53:15 +0000
> @@ -164,22 +164,13 @@ public:
>      table, which is located on disk).
>    */
>    virtual uint32 pack_length_in_rec() const { return pack_length(); }
> -  virtual int compatible_field_size(uint field_metadata,
> -                                    const Relay_log_info *);
> +  virtual bool compatible_field_size(uint metadata, Relay_log_info *rli,
> +                                     int *order);
>    virtual uint pack_length_from_metadata(uint field_metadata)
> -  { return field_metadata; }
> -  /*
> -    This method is used to return the size of the data in a row-based
> -    replication row record. The default implementation of returning 0 is
> -    designed to allow fields that do not use metadata to return TRUE (1)
> -    from compatible_field_size() which uses this function in the comparison.
> -    The default value for field metadata for fields that do not have 
> -    metadata is 0. Thus, 0 == 0 means the fields are compatible in size.
> -
> -    Note: While most classes that override this method return pack_length(),
> -    the classes Field_string, Field_varstring, and Field_blob return 
> -    field_length + 1, field_length, and pack_length_no_ptr() respectfully.
> -  */
> +  {
> +    DBUG_ENTER("Field::pack_length_from_metadata");
> +    DBUG_RETURN(field_metadata);
> +  }
>    virtual uint row_pack_length() { return 0; }
>    virtual int save_field_metadata(uchar *first_byte)
>    { return do_save_field_metadata(first_byte); }
> @@ -636,6 +627,13 @@ public:
>    int store_decimal(const my_decimal *);
>    my_decimal *val_decimal(my_decimal *);
>    uint is_equal(Create_field *new_field);
> +  uint row_pack_length() { return pack_length(); }
> +  uint32 pack_length_from_metadata(uint field_metadata) {
> +    uint32 length= pack_length();
> +    DBUG_PRINT("result", ("pack_length_from_metadata(%d): %u",
> +                          field_metadata, length));
> +    return length;
> +  }

Why do you need a different method? Is this just for debugging?


>    int check_int(CHARSET_INFO *cs, const char *str, int length,
>                  const char *int_end, int error);
>    bool get_int(CHARSET_INFO *cs, const char *from, uint len, 
> @@ -806,8 +804,8 @@ public:
>    uint32 pack_length() const { return (uint32) bin_size; }
>    uint pack_length_from_metadata(uint field_metadata);
>    uint row_pack_length() { return pack_length(); }
> -  int compatible_field_size(uint field_metadata,
> -                            const Relay_log_info *rli);
> +  bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
> +                             int *order_var);
>    uint is_equal(Create_field *new_field);
>    virtual const uchar *unpack(uchar* to, const uchar *from,
>                                uint param_data, bool low_byte_first);
> @@ -1501,9 +1499,9 @@ public:
>        return row_pack_length();
>      return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata
> & 0x00ff);
>    }
> -  int compatible_field_size(uint field_metadata,
> -                            const Relay_log_info *rli);
> -  uint row_pack_length() { return (field_length + 1); }
> +  bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
> +                             int *order_var);
> +  uint row_pack_length() { return field_length; }
>    int pack_cmp(const uchar *a,const uchar *b,uint key_length,
>                 my_bool insert_or_update);
>    int pack_cmp(const uchar *b,uint key_length,my_bool insert_or_update);
> @@ -1965,8 +1963,8 @@ public:
>    uint pack_length_from_metadata(uint field_metadata);
>    uint row_pack_length()
>    { return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); }
> -  int compatible_field_size(uint field_metadata,
> -                            const Relay_log_info *rli);
> +  bool compatible_field_size(uint metadata, Relay_log_info *rli,
> +                             int *order_var);
>    void sql_type(String &str) const;
>    virtual uchar *pack(uchar *to, const uchar *from,
>                        uint max_length, bool low_byte_first);
> @@ -2069,7 +2067,8 @@ public:
>    /* Init for a tmp table field. To be extended if need be. */
>    void init_for_tmp_table(enum_field_types sql_type_arg,
>                            uint32 max_length, uint32 decimals,
> -                          bool maybe_null, bool is_unsigned);
> +                          bool maybe_null, bool is_unsigned,
> +                          uint pack_length = ~0U);
>  
>    bool init(THD *thd, char *field_name, enum_field_types type, char *length,
>              char *decimals, uint type_modifier, Item *default_value,
> 
> === modified file 'sql/log_event.cc'
> --- a/sql/log_event.cc	2009-10-14 01:39:05 +0000
> +++ b/sql/log_event.cc	2009-12-04 10:53:15 +0000
> @@ -29,7 +29,6 @@
>  #include "rpl_rli.h"
>  #include "rpl_mi.h"
>  #include "rpl_filter.h"
> -#include "rpl_utility.h"
>  #include "rpl_record.h"
>  #include <my_dir.h>
>  
> @@ -37,6 +36,7 @@
>  
>  #include <base64.h>
>  #include <my_bitmap.h>
> +#include "rpl_utility.h"
>  
>  #define log_cs	&my_charset_latin1
>  
> @@ -7309,11 +7309,18 @@ int Rows_log_event::do_apply_event(Relay
>      */
>  
>      {
> +      DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock:
> %p",
> +                           rli->tables_to_lock));
>        RPL_TABLE_LIST *ptr= rli->tables_to_lock;
>        for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
>        {
> -        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
> +        TABLE *conv_table;
> +        if (!ptr->m_tabledef.compatible_with(thd,
> const_cast<Relay_log_info*>(rli),
> +                                             ptr->table, &conv_table))
>          {
> +          DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
> +                               ptr->table->s->db.str,
> +                               ptr->table->s->table_name.str));
>            /*
>              We should not honour --slave-skip-errors at this point as we are
>              having severe errors which should not be skiped.
> @@ -7324,12 +7331,17 @@ int Rows_log_event::do_apply_event(Relay
>            const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
>            DBUG_RETURN(ERR_BAD_TABLE_DEF);
>          }
> +        DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
> +                             " - conv_table: %p",
> +                             ptr->table->s->db.str,
> +                             ptr->table->s->table_name.str, conv_table));
> +        ptr->m_conv_table= conv_table;
>        }
>      }
>  
>      /*
> -      ... and then we add all the tables to the table map and remove
> -      them from tables to lock.
> +      ... and then we add all the tables to the table map and but keep
> +      them in the tables to lock list.
>  
>        We also invalidate the query cache for all the tables, since
>        they will now be changed.
> @@ -7825,7 +7837,10 @@ int Table_map_log_event::save_field_meta
>    DBUG_ENTER("Table_map_log_event::save_field_metadata");
>    int index= 0;
>    for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
> +  {
> +    DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
>      index+=
> m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
> +  }
>    DBUG_RETURN(index);
>  }
>  #endif /* !defined(MYSQL_CLIENT) */
> 
> === modified file 'sql/log_event_old.cc'
> --- a/sql/log_event_old.cc	2009-10-14 01:39:05 +0000
> +++ b/sql/log_event_old.cc	2009-12-04 10:53:15 +0000
> @@ -107,7 +107,9 @@ Old_rows_log_event::do_apply_event(Old_r
>        RPL_TABLE_LIST *ptr= rli->tables_to_lock;
>        for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
>        {
> -        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
> +        TABLE *conv_table;
> +        if (ptr->m_tabledef.compatible_with(thd,
> const_cast<Relay_log_info*>(rli),
> +                                            ptr->table, &conv_table))
>          {
>            mysql_unlock_tables(thd, thd->lock);
>            thd->lock= 0;
> @@ -115,6 +117,7 @@ Old_rows_log_event::do_apply_event(Old_r
>            const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
>            DBUG_RETURN(Old_rows_log_event::ERR_BAD_TABLE_DEF);
>          }
> +        ptr->m_conv_table= conv_table;
>        }
>      }
>  
> @@ -1578,7 +1581,9 @@ int Old_rows_log_event::do_apply_event(R
>        RPL_TABLE_LIST *ptr= rli->tables_to_lock;
>        for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
>        {
> -        if (ptr->m_tabledef.compatible_with(rli, ptr->table))
> +        TABLE *conv_table;
> +        if (ptr->m_tabledef.compatible_with(thd,
> const_cast<Relay_log_info*>(rli),
> +                                            ptr->table, &conv_table))
>          {
>            mysql_unlock_tables(thd, thd->lock);
>            thd->lock= 0;
> @@ -1586,12 +1591,14 @@ int Old_rows_log_event::do_apply_event(R
>            const_cast<Relay_log_info*>(rli)->clear_tables_to_lock();
>            DBUG_RETURN(ERR_BAD_TABLE_DEF);
>          }
> +        ptr->m_conv_table= conv_table;
>        }
>      }
>  
>      /*
> -      ... and then we add all the tables to the table map and remove
> -      them from tables to lock.
> +      ... and then we add all the tables to the table map but keep
> +      them in the tables to lock list.
> +
>  
>        We also invalidate the query cache for all the tables, since
>        they will now be changed.
> 
> === modified file 'sql/mysql_priv.h'
> --- a/sql/mysql_priv.h	2009-10-08 13:21:07 +0000
> +++ b/sql/mysql_priv.h	2009-12-04 10:53:15 +0000
> @@ -1998,6 +1998,7 @@ extern my_bool opt_sql_bin_update, opt_s
>  extern my_bool opt_safe_show_db, opt_local_infile, opt_myisam_use_mmap;
>  extern my_bool opt_slave_compressed_protocol, use_temp_pool;
>  extern ulong slave_exec_mode_options;
> +extern ulong slave_type_conversions_options;
>  extern my_bool opt_readonly, lower_case_file_system;
>  extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs;
>  extern my_bool opt_secure_auth;
> 
> === modified file 'sql/mysqld.cc'
> --- a/sql/mysqld.cc	2009-09-29 15:38:40 +0000
> +++ b/sql/mysqld.cc	2009-12-04 10:53:15 +0000
> @@ -525,6 +525,8 @@ ulong open_files_limit, max_binlog_size,
>  ulong slave_net_timeout, slave_trans_retries;
>  ulong slave_exec_mode_options;
>  const char *slave_exec_mode_str= "STRICT";
> +ulong slave_type_conversions_options;
> +const char *slave_type_conversions_default= "ALL_LOSSY";
>  ulong thread_cache_size=0, thread_pool_size= 0;
>  ulong binlog_cache_size=0;
>  ulonglong  max_binlog_cache_size=0;
> @@ -5690,6 +5692,7 @@ enum options_mysqld
>  #endif /* defined(ENABLED_DEBUG_SYNC) */
>    OPT_OLD_MODE,
>    OPT_SLAVE_EXEC_MODE,
> +  OPT_SLAVE_TYPE_CONVERSIONS,
>    OPT_GENERAL_LOG_FILE,
>    OPT_SLOW_QUERY_LOG_FILE,
>    OPT_IGNORE_BUILTIN_INNODB
> @@ -6413,6 +6416,13 @@ replicating a LOAD DATA INFILE command."
>    {"slave-exec-mode", OPT_SLAVE_EXEC_MODE,
>     "Modes for how replication events should be executed.  Legal values are STRICT
> (default) and IDEMPOTENT. In IDEMPOTENT mode, replication will not stop for operations
> that are idempotent. In STRICT mode, replication will stop on any unexpected difference
> between the master and the slave.",
>     (uchar**) &slave_exec_mode_str, (uchar**) &slave_exec_mode_str, 0,
> GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
> +  {"slave-type-conversions", OPT_SLAVE_TYPE_CONVERSIONS,
> +   "Slave type conversions that are enabled. Legal values are"
> +   " ALL_LOSSY to enable lossy conversions and"
> +   " ALL_NON_LOSSY to enable non-lossy conversions.",
> +   (uchar**) &slave_type_conversions_default,
> +   (uchar**) &slave_type_conversions_default,
> +   0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},


I think you should add the case that the option is empty in order
to have a description on how to force the slave to be strictly equal
to the master.


>  #endif
>    {"slow-query-log", OPT_SLOW_LOG,
>     "Enable|disable slow query log", (uchar**) &opt_slow_log,
> @@ -7671,6 +7681,11 @@ static int mysql_init_variables(void)
>    slave_exec_mode_options= (uint)
>      find_bit_type_or_exit(slave_exec_mode_str, &slave_exec_mode_typelib, NULL,
>                            &error);
> +  /* Slave type conversions */
> +  slave_type_conversions_options= 0;
> +  slave_type_conversions_options=
> +    find_bit_type_or_exit(slave_type_conversions_default,
> &slave_type_conversions_typelib,
> +                          NULL, &error);
>    if (error)
>      return 1;
>    opt_specialflag= SPECIAL_ENGLISH;
> @@ -7900,6 +7915,12 @@ mysqld_get_one_option(int optid,
>      if (error)
>        return 1;
>      break;
> +  case OPT_SLAVE_TYPE_CONVERSIONS:
> +    slave_type_conversions_options= (uint)
> +      find_bit_type_or_exit(argument, &slave_type_conversions_typelib, "",
> &error);
> +    if (error)
> +      return 1;
> +    break;
>  #endif
>    case OPT_SAFEMALLOC_MEM_LIMIT:
>  #if !defined(DBUG_OFF) && defined(SAFEMALLOC)
> 
> === modified file 'sql/rpl_record.cc'
> --- a/sql/rpl_record.cc	2009-03-05 19:54:53 +0000
> +++ b/sql/rpl_record.cc	2009-12-04 10:53:15 +0000
> @@ -201,10 +201,29 @@ unpack_row(Relay_log_info const *rli,
>    // The "current" null bits
>    unsigned int null_bits= *null_ptr++;
>    uint i= 0;
> -  table_def *tabledef= ((Relay_log_info*)rli)->get_tabledef(table);
> +  table_def *tabledef;
> +  TABLE *conv_table;
> +  bool table_found= rli->get_table_data(table, &tabledef, &conv_table);
> +  DBUG_PRINT("debug", ("Table data: table_found: %d, tabldef: %p, conv_table: %p",
> +                       table_found, tabledef, conv_table));
> +  DBUG_ASSERT(table_found);
> +  if (!table_found)
> +    return HA_ERR_GENERIC;
> +
>    for (field_ptr= begin_ptr ; field_ptr < end_ptr && *field_ptr ;
> ++field_ptr)
>    {
> -    Field *const f= *field_ptr;
> +    /*
> +      If there is a conversion table, we pick up the field pointer to
> +      the conversion table.  If the conversion table or the field
> +      pointer is NULL, no conversions are necessary.
> +     */
> +    Field *conv_field=
> +      conv_table ? conv_table->field[field_ptr - begin_ptr] : NULL;
> +    Field *const f=
> +      conv_field ? conv_field : *field_ptr;
> +    DBUG_PRINT("debug", ("Conversion %srequired for field %d",
> +                         conv_field ? "" : "not ", field_ptr - begin_ptr));
> +    DBUG_ASSERT(f != NULL);
>  
>      /*
>        No need to bother about columns that does not exist: they have
> @@ -247,6 +266,39 @@ unpack_row(Relay_log_info const *rli,
>                               (int) (pack_ptr - old_pack_ptr)));
>        }
>  
> +      /*
> +        If conv_field is set, then we are doing a conversion. In this
> +        case, we have unpacked the master data to the conversion
> +        table, so we need to copy the value stored in the conversion
> +        table into the final table and do the conversion at the same time.
> +      */
> +      if (conv_field)
> +      {
> +        Copy_field copy;
> +#ifndef DBUG_OFF
> +        char source_buf[MAX_FIELD_WIDTH];
> +        char value_buf[MAX_FIELD_WIDTH];
> +        String source_type(source_buf, sizeof(source_buf), system_charset_info);
> +        String value_string(value_buf, sizeof(value_buf), system_charset_info);
> +        conv_field->sql_type(source_type);
> +        conv_field->val_str(&value_string);
> +        DBUG_PRINT("debug", ("Copying field '%s' of type '%s' with value '%s'",
> +                             (*field_ptr)->field_name,
> +                             source_type.c_ptr(), value_string.c_ptr()));
> +#endif
> +        copy.set(*field_ptr, f, TRUE);
> +        (*copy.do_copy)(&copy);
> +#ifndef DBUG_OFF
> +        char target_buf[MAX_FIELD_WIDTH];
> +        String target_type(target_buf, sizeof(target_buf), system_charset_info);
> +        (*field_ptr)->sql_type(target_type);
> +        (*field_ptr)->val_str(&value_string);
> +        DBUG_PRINT("debug", ("Value of field '%s' of type '%s' is now '%s'",
> +                             (*field_ptr)->field_name,
> +                             target_type.c_ptr(), value_string.c_ptr()));
> +#endif
> +      }
> +
>        null_mask <<= 1;
>      }
>      i++;
> 
> === modified file 'sql/rpl_rli.h'
> --- a/sql/rpl_rli.h	2009-02-21 09:36:07 +0000
> +++ b/sql/rpl_rli.h	2009-12-04 10:53:15 +0000
> @@ -314,13 +314,21 @@ public:
>    uint tables_to_lock_count;        /* RBR: Count of tables to lock */
>    table_mapping m_table_map;      /* RBR: Mapping table-id to table */
>  
> -  inline table_def *get_tabledef(TABLE *tbl)
> +  bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE
> **conv_table_var) const
>    {
> -    table_def *td= 0;
> -    for (TABLE_LIST *ptr= tables_to_lock; ptr && !td; ptr=
> ptr->next_global)
> -      if (ptr->table == tbl)
> -        td= &((RPL_TABLE_LIST *)ptr)->m_tabledef;
> -    return (td);
> +    DBUG_ASSERT(tabledef_var && conv_table_var);
> +    for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
> +      if (ptr->table == table_arg)
> +      {
> +        *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
> +        *conv_table_var= static_cast<RPL_TABLE_LIST*>(ptr)->m_conv_table;
> +        DBUG_PRINT("debug", ("Fetching table data for table %s.%s:"
> +                             " tabledef: %p, conv_table: %p",
> +                             table_arg->s->db.str,
> table_arg->s->table_name.str,
> +                             *tabledef_var, *conv_table_var));
> +        return true;
> +      }
> +    return false;
>    }
>  
>    /*
> 
> === modified file 'sql/rpl_utility.cc'
> --- a/sql/rpl_utility.cc	2008-06-30 20:11:18 +0000
> +++ b/sql/rpl_utility.cc	2009-12-04 10:53:15 +0000
> @@ -14,8 +14,178 @@
>     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA */
>  
>  #include "rpl_utility.h"
> +
> +#ifndef MYSQL_CLIENT
>  #include "rpl_rli.h"
>  
> +/**
> +   Template function to compare two objects.
> + */
> +namespace {
> +  template <class Type>
> +  int compare(Type a, Type b)
> +  {
> +    if (a < b)
> +      return -1;
> +    if (b < a)
> +      return 1;
> +    return 0;
> +  }
> +}

Why don't you move the other compare template to this utility file?


> +
> +
> +/**
> +   Max value for an unsigned integer of 'bits' bits.
> +
> +   The somewhat contorted expression is to avoid overflow.
> + */
> +uint32 uint_max(int bits) {
> +  return ((1UL << (bits - 1)) - 1) << 1 | 1;
> +}


Please, do what follows:

(((1UL << (bits - 1)) - 1) << 1) | 1;


> +
> +
> +/**
> +   Compute the maximum display of a field.
> +
> +   @param sql_type Type of the field
> +   @param metadata The metadata from the master for the field.
> +   @return Maximum length of the field in bytes.
> + */
> +static uint32
> +max_display_length_for_field(enum_field_types sql_type, unsigned int metadata)
> +{
> +  DBUG_PRINT("debug", ("sql_type: %d, metadata: 0x%x", sql_type, metadata));
> +  DBUG_ASSERT(metadata >> 16 == 0);
> +
> +  switch (sql_type) {
> +  case MYSQL_TYPE_NEWDECIMAL:
> +    return metadata >> 8;
> +
> +  case MYSQL_TYPE_FLOAT:
> +    return 12;
> +
> +  case MYSQL_TYPE_DOUBLE:
> +    return 22;
> +
> +  case MYSQL_TYPE_SET:
> +  case MYSQL_TYPE_ENUM:
> +      return metadata & 0x00ff;
> +
> +  case MYSQL_TYPE_STRING:
> +    /*
> +      This is taken from Field_string::unpack.
> +    */
> +    return (((metadata >> 4) & 0x300) ^ 0x300) + (metadata & 0x00ff);
> +
> +  case MYSQL_TYPE_YEAR:
> +  case MYSQL_TYPE_TINY:
> +    return 4;
> +
> +  case MYSQL_TYPE_SHORT:
> +    return 6;
> +
> +  case MYSQL_TYPE_INT24:
> +    return 9;
> +
> +  case MYSQL_TYPE_LONG:
> +    return 11;
> +
> +#ifdef HAVE_LONG_LONG
> +  case MYSQL_TYPE_LONGLONG:
> +    return 20;
> +
> +#endif
> +  case MYSQL_TYPE_NULL:
> +    return 0;
> +
> +  case MYSQL_TYPE_NEWDATE:
> +    return 3;
> +
> +  case MYSQL_TYPE_DATE:
> +  case MYSQL_TYPE_TIME:
> +    return 3;
> +
> +  case MYSQL_TYPE_TIMESTAMP:
> +    return 4;
> +
> +  case MYSQL_TYPE_DATETIME:
> +    return 8;
> +
> +  case MYSQL_TYPE_BIT:
> +  {
> +    /*
> +      Decode the size of the bit field from the master.
> +        from_len is the length in bytes from the master
> +        from_bit_len is the number of extra bits stored in the master record
> +    */
> +    uint from_len= (metadata >> 8U) & 0x00ff;
> +    uint from_bit_len= metadata & 0x00ff;
> +    DBUG_ASSERT(from_bit_len <= 7);
> +    return 8 * from_len + from_bit_len;
> +  }
> +  case MYSQL_TYPE_VARCHAR:
> +    return metadata;
> +
> +    /*
> +      The actual length for these types does not really matter since
> +      they are used to calc_pack_length, which ignores the given
> +      length for these types.
> +
> +      Since we want this to be accurate for other uses, we return the
> +      maximum size in bytes of these BLOBs.
> +    */
> +
> +  case MYSQL_TYPE_TINY_BLOB:
> +    return uint_max(1 * 8);
> +
> +  case MYSQL_TYPE_MEDIUM_BLOB:
> +    return uint_max(3 * 8);
> +
> +  case MYSQL_TYPE_BLOB:
> +    /*
> +      For the blob type, Field::real_type() lies and say that all
> +      blobs are of type MYSQL_TYPE_BLOB. In that case, we have to look
> +      at the length instead to decide what the max display size is.
> +     */
> +    return uint_max(metadata * 8);
> +
> +  case MYSQL_TYPE_LONG_BLOB:
> +  case MYSQL_TYPE_GEOMETRY:
> +    return uint_max(4 * 8);
> +
> +  default:
> +    return ~(uint32) 0;
> +  }
> +}
> +
> +
> +/*
> +  Compare the pack lengths of a source field (on the master) and a
> +  target field (on the slave).
> +
> +  @param field    Target field.
> +  @param type     Source field type.
> +  @param metadata Source field metadata.
> +
> +  @retval -1 The length of the source field is smaller than the target field.
> +  @retval  0 The length of the source and target fields are the same.
> +  @retval  1 The length of the source field is greater than the target field.
> + */
> +int compare_lengths(Field *field, enum_field_types source_type, uint16 metadata)
> +{
> +  DBUG_ENTER(__FUNCTION__);
> +  size_t const source_length=
> +    max_display_length_for_field(source_type, metadata);
> +  size_t const target_length= field->max_display_length();
> +  DBUG_PRINT("debug", ("source_length: %lu, source_type: %u,"
> +                       " target_length: %lu, target_type: %u",
> +                       (unsigned long) source_length, source_type,
> +                       (unsigned long) target_length, field->real_type()));
> +  int result= compare(source_length, target_length);
> +  DBUG_PRINT("result", ("%d", result));
> +  DBUG_RETURN(result);
> +}
> +
>  /*********************************************************************
>   *                   table_def member definitions                    *
>   *********************************************************************/
> @@ -169,58 +339,683 @@ uint32 table_def::calc_field_size(uint c
>    return length;
>  }
>  
> -/*
> +
> +/**
> + */
> +void show_sql_type(enum_field_types type, uint16 metadata, String *str)
> +{
> +  DBUG_ENTER(__FUNCTION__);
> +  DBUG_PRINT("enter", ("type: %d, metadata: 0x%x", type, metadata));
> +
> +  switch (type)
> +  {
> +  case MYSQL_TYPE_TINY:
> +    str->set_ascii(STRING_WITH_LEN("tinyint"));
> +    break;
> +
> +  case MYSQL_TYPE_SHORT:
> +    str->set_ascii(STRING_WITH_LEN("smallint"));
> +    break;
> +
> +  case MYSQL_TYPE_LONG:
> +    str->set_ascii(STRING_WITH_LEN("int"));
> +    break;
> +
> +  case MYSQL_TYPE_FLOAT:
> +    str->set_ascii(STRING_WITH_LEN("float"));
> +    break;
> +
> +  case MYSQL_TYPE_DOUBLE:
> +    str->set_ascii(STRING_WITH_LEN("double"));
> +    break;
> +
> +  case MYSQL_TYPE_NULL:
> +    str->set_ascii(STRING_WITH_LEN("null"));
> +    break;
> +
> +  case MYSQL_TYPE_TIMESTAMP:
> +    str->set_ascii(STRING_WITH_LEN("timestamp"));
> +    break;
> +
> +  case MYSQL_TYPE_LONGLONG:
> +    str->set_ascii(STRING_WITH_LEN("bigint"));
> +    break;
> +
> +  case MYSQL_TYPE_INT24:
> +    str->set_ascii(STRING_WITH_LEN("mediumint"));
> +    break;
> +
> +  case MYSQL_TYPE_NEWDATE:
> +  case MYSQL_TYPE_DATE:
> +    str->set_ascii(STRING_WITH_LEN("date"));
> +    break;
> +
> +  case MYSQL_TYPE_TIME:
> +    str->set_ascii(STRING_WITH_LEN("time"));
> +    break;
> +
> +  case MYSQL_TYPE_DATETIME:
> +    str->set_ascii(STRING_WITH_LEN("datetime"));
> +    break;
> +
> +  case MYSQL_TYPE_YEAR:
> +    str->set_ascii(STRING_WITH_LEN("year"));
> +    break;
> +
> +  case MYSQL_TYPE_VARCHAR:
> +    {
> +      CHARSET_INFO *cs= str->charset();
> +      uint32 length=
> +        cs->cset->snprintf(cs, (char*) str->ptr(),
> str->alloced_length(),
> +                           "varchar(%u)", metadata);
> +      str->length(length);
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_BIT:
> +    {
> +      CHARSET_INFO *cs= str->charset();
> +      int bit_length= (metadata >> 8) + (metadata & 0xFF);
> +      uint32 length=
> +        cs->cset->snprintf(cs, (char*) str->ptr(),
> str->alloced_length(),
> +                           "bit(%d)", bit_length);
> +      str->length(length);
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_NEWDECIMAL:
> +    {
> +      CHARSET_INFO *cs= str->charset();
> +      uint32 length=
> +        cs->cset->snprintf(cs, (char*) str->ptr(),
> str->alloced_length(),
> +                           "decimal(%d,%d)", metadata >> 8, metadata &
> 0xff);
> +      str->length(length);
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_ENUM:
> +    str->set_ascii(STRING_WITH_LEN("enum"));
> +    break;
> +
> +  case MYSQL_TYPE_SET:
> +    str->set_ascii(STRING_WITH_LEN("set"));
> +    break;
> +
> +  case MYSQL_TYPE_BLOB:
> +    /*
> +      Field::real_type() lies regarding the actual type of a BLOB, so
> +      it is necessary to check the pack length to figure out what kind
> +      of blob it really is.
> +     */
> +    switch (get_blob_type_from_length(metadata))
> +    {
> +    case MYSQL_TYPE_TINY_BLOB:
> +      str->set_ascii(STRING_WITH_LEN("tinyblob"));
> +      break;
> +
> +    case MYSQL_TYPE_MEDIUM_BLOB:
> +      str->set_ascii(STRING_WITH_LEN("mediumblob"));
> +      break;
> +
> +    case MYSQL_TYPE_LONG_BLOB:
> +      str->set_ascii(STRING_WITH_LEN("longblob"));
> +      break;
> +
> +    case MYSQL_TYPE_BLOB:
> +      str->set_ascii(STRING_WITH_LEN("blob"));
> +      break;
> +
> +    default:
> +      DBUG_ASSERT(0);
> +      break;
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_STRING:
> +    {
> +      /*
> +        This is taken from Field_string::unpack.
> +      */
> +      CHARSET_INFO *cs= str->charset();
> +      uint bytes= (((metadata >> 4) & 0x300) ^ 0x300) + (metadata &
> 0x00ff);
> +      uint32 length=
> +        cs->cset->snprintf(cs, (char*) str->ptr(),
> str->alloced_length(),
> +                           "char(%d)", bytes / cs->mbmaxlen);
> +      str->length(length);
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_GEOMETRY:
> +    str->set_ascii(STRING_WITH_LEN("geometry"));
> +    break;
> +
> +  case MYSQL_TYPE_DECIMAL:
> +  case MYSQL_TYPE_VAR_STRING:
> +    DBUG_ASSERT(0);
> +
> +  default:
> +    str->set_ascii(STRING_WITH_LEN("<unknown type>"));
> +  }
> +  DBUG_VOID_RETURN;
> +}
> +
> +
> +/**
> +   Check the order variable and print errors if the order is not
> +   acceptable according to the current settings.
> + */
> +bool is_conversion_ok(int order, Relay_log_info *rli)
> +{
> +  DBUG_ENTER(__FUNCTION__);
> +  bool allow_non_lossy=
> +    bit_is_set(slave_type_conversions_options,
> SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY);
> +  bool allow_lossy=
> +    bit_is_set(slave_type_conversions_options, SLAVE_TYPE_CONVERSIONS_ALL_LOSSY);
> +
> +  DBUG_PRINT("enter", ("order: %d, flags:%s%s", order,
> +                       allow_non_lossy ? " ALL_NON_LOSSY" : "",
> +                       allow_lossy ? " ALL_LOSSY" : ""));
> +  if (order < 0 && !allow_non_lossy)
> +  {
> +    /* !!! Add error message saying that non-lossy conversions need to be allowed.
> */
> +    DBUG_RETURN(false);
> +  }
> +
> +  if (order > 0 && !allow_lossy)
> +  {
> +    /* !!! Add error message saying that lossy conversions need to be allowed. */
> +    DBUG_RETURN(false);
> +  }
> +
> +  DBUG_RETURN(true);
> +}
> +
> +
> +/**
> +   Can a type potentially be converted to another type?
> +
> +   This function check if the types are convertible and what
> +   conversion is required.
> +
> +   If conversion is not possible, and error is printed.
> +
> +   If conversion is possible:
> +
> +   - *order will be set to -1 if source type is smaller than target
> +     type and a non-lossy conversion can be required. This includes
> +     the case where the field types are different but types could
> +     actually be converted in either direction.
> +
> +   - *order will be set to 0 if no conversion is required.
> +
> +   - *order will be set to 1 if the source type is strictly larger
> +      than the target type and that conversion is potentially lossy.
> +
> +   @param[in] field    Target field
> +   @param[in] type     Source field type
> +   @param[in] metadata Source field metadata
> +   @param[in] rli      Relay log info (for error reporting)
> +   @param[out] order   Order between source field and target field
> +
> +   @return @c true if conversion is possible according to the current
> +   settings, @c false if conversion is not possible according to the
> +   current setting.
> + */
> +static bool
> +can_convert_field_to(Field *field, enum_field_types source_type, uint16 metadata,
> +                     Relay_log_info *rli, int *order_var)
> +{
> +  DBUG_ENTER(__FUNCTION__);
> +#ifndef DBUG_OFF
> +  char field_type_buf[MAX_FIELD_WIDTH];
> +  String field_type(field_type_buf, sizeof(field_type_buf), field->charset());
> +  field->sql_type(field_type);
> +  DBUG_PRINT("enter", ("field_type: %s, source_type: %d, source_metadata: 0x%x",
> +                       field_type.c_ptr(), source_type, metadata));
> +#endif
> +  /*
> +    If the real type is the same, we need to check the metadata to
> +    decide if conversions are allowed.
> +   */
> +  if (field->real_type() == source_type)
> +  {
> +    DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
> +    if (const_cast<Field*>(field)->compatible_field_size(metadata, rli,
> order_var))
> +      DBUG_RETURN(is_conversion_ok(*order_var, rli));
> +    else
> +      DBUG_RETURN(false);
> +  }
> +
> +  /*
> +    Here, from and to will always be different. Since the types are
> +    different, we cannot use the compatible_field_size() function, but
> +    have to rely on hard-coded max-sizes for fields.
> +  */
> +
> +  DBUG_PRINT("debug", ("Base types are different, checking conversion"));
> +  switch (source_type)                      // Source type (on master)
> +  {
> +  case MYSQL_TYPE_NEWDECIMAL:
> +  case MYSQL_TYPE_FLOAT:
> +  case MYSQL_TYPE_DOUBLE:
> +    switch (field->real_type())
> +    {
> +    case MYSQL_TYPE_NEWDECIMAL:
> +      /*
> +        Then the other type is either FLOAT or DOUBLE, so we require
> +        lossy conversion.
> +      */
> +      *order_var= 1;
> +      DBUG_RETURN(is_conversion_ok(*order_var, rli));
> +      
> +    case MYSQL_TYPE_FLOAT:
> +    case MYSQL_TYPE_DOUBLE:
> +    {
> +      if (source_type == MYSQL_TYPE_NEWDECIMAL)
> +        *order_var = 1;                         // Always require lossy conversions
> +      else
> +        *order_var= compare_lengths(field, source_type, metadata);
> +      DBUG_ASSERT(*order_var != 0);
> +      DBUG_RETURN(is_conversion_ok(*order_var, rli));
> +    }
> +
> +    default:
> +      DBUG_RETURN(false);
> +    }
> +    break;
> +
> +  /*
> +    The length comparison check will do the correct job of comparing
> +    the field lengths (in bytes) of two integer types.
> +  */
> +  case MYSQL_TYPE_TINY:
> +  case MYSQL_TYPE_SHORT:
> +  case MYSQL_TYPE_INT24:
> +  case MYSQL_TYPE_LONG:
> +  case MYSQL_TYPE_LONGLONG:
> +    switch (field->real_type())
> +    {
> +    case MYSQL_TYPE_TINY:
> +    case MYSQL_TYPE_SHORT:
> +    case MYSQL_TYPE_INT24:
> +    case MYSQL_TYPE_LONG:
> +    case MYSQL_TYPE_LONGLONG:
> +      *order_var= compare_lengths(field, source_type, metadata);
> +      DBUG_ASSERT(*order_var != 0);
> +      DBUG_RETURN(is_conversion_ok(*order_var, rli));
> +
> +    default:
> +      DBUG_RETURN(false);
> +    }
> +    break;
> +
> +  /*
> +    Since source and target type is different, and it is not possible
> +    to convert bit types to anything else, this will return false.
> +   */
> +  case MYSQL_TYPE_BIT:
> +    DBUG_RETURN(false);
> +
> +  /*
> +    If all conversions are disabled, it is not allowed to convert
> +    between these types. Since the TEXT vs. BINARY is distinguished by
> +    the charset, and the charset is not replication, we cannot
> +    currently distinguish between , e.g., TEXT and BLOB.

s/is not replication/is not replicated/

> +   */
> +  case MYSQL_TYPE_TINY_BLOB:
> +  case MYSQL_TYPE_MEDIUM_BLOB:
> +  case MYSQL_TYPE_LONG_BLOB:
> +  case MYSQL_TYPE_BLOB:
> +  case MYSQL_TYPE_STRING:
> +  case MYSQL_TYPE_VARCHAR:
> +    switch (field->real_type())
> +    {
> +    case MYSQL_TYPE_TINY_BLOB:
> +    case MYSQL_TYPE_MEDIUM_BLOB:
> +    case MYSQL_TYPE_LONG_BLOB:
> +    case MYSQL_TYPE_BLOB:
> +    case MYSQL_TYPE_STRING:
> +    case MYSQL_TYPE_VARCHAR:
> +      *order_var= compare_lengths(field, source_type, metadata);
> +      /*
> +        Here we know that the types are different, so if the order
> +        gives that they do not require any conversion, we still need
> +        to have non-lossy conversion enabled to allow conversion
> +        between different (string) types of the same length.
> +       */
> +      if (*order_var == 0)
> +        *order_var= -1;
> +      DBUG_RETURN(is_conversion_ok(*order_var, rli));
> +
> +    default:
> +      DBUG_RETURN(false);
> +    }
> +    break;
> +
> +  case MYSQL_TYPE_GEOMETRY:
> +  case MYSQL_TYPE_TIMESTAMP:
> +  case MYSQL_TYPE_DATE:
> +  case MYSQL_TYPE_TIME:
> +  case MYSQL_TYPE_DATETIME:
> +  case MYSQL_TYPE_YEAR:
> +  case MYSQL_TYPE_NEWDATE:
> +  case MYSQL_TYPE_NULL:
> +  case MYSQL_TYPE_ENUM:
> +  case MYSQL_TYPE_SET:
> +    DBUG_RETURN(false);
> +
> +  /*
> +    The types MYSQL_TYPE_DECIMAL and MYSQL_TYPE_VAR_STRING should not
> +    appear inside the server at all.
> +  */


Can you elaborate on this?


> +  case MYSQL_TYPE_DECIMAL:
> +  case MYSQL_TYPE_VAR_STRING:
> +    DBUG_ASSERT(0);
> +  }
> +  DBUG_RETURN(false);                                 // To keep GCC happy
> +}
> +
> +
> +/**
>    Is the definition compatible with a table?
>  
> +  This function will compare the master table with an existing table
> +  on the slave and see if they are compatible with respect to the
> +  current settings of @c SLAVE_TYPE_CONVERSIONS.
> +
> +  If the tables are compatible and conversions are required, @c
> +  *tmp_table_var will be set to a virtual temporary table with field
> +  pointers for the fields that require conversions.  This allow simple
> +  checking of whether a conversion are to be applied or not.
> +
> +  If tables are compatible, but no conversions are necessary, @c
> +  *tmp_table_var will be set to NULL.
> +
> +  @param rli_arg[in]
> +  Relay log info, for error reporting.
> +
> +  @param table[in]
> +  Table to compare with
> +
> +  @param tmp_table_var[out]
> +  Virtual temporary table for performing conversions, if necessary.
> +
> +  @retval true Master table is compatible with slave table.
> +  @retval false Master table is not compatible with slave table.
>  */
> -int
> -table_def::compatible_with(Relay_log_info const *rli_arg, TABLE *table)
> +bool
> +table_def::compatible_with(THD *thd, Relay_log_info *rli,
> +                           TABLE *table, TABLE **conv_table_var)
>    const
>  {
>    /*
>      We only check the initial columns for the tables.
>    */
>    uint const cols_to_check= min(table->s->fields, size());
> -  int error= 0;
> -  Relay_log_info const *rli= const_cast<Relay_log_info*>(rli_arg);
> -
> -  TABLE_SHARE const *const tsh= table->s;
> +  TABLE *tmp_table= NULL;
>  
>    for (uint col= 0 ; col < cols_to_check ; ++col)
>    {
>      Field *const field= table->field[col];
> -    if (field->type() != type(col))
> +    int order;
> +    if (can_convert_field_to(field, type(col), field_metadata(col), rli,
> &order))
>      {
> -      DBUG_ASSERT(col < size() && col < tsh->fields);
> -      DBUG_ASSERT(tsh->db.str && tsh->table_name.str);
> -      error= 1;
> -      char buf[256];
> -      my_snprintf(buf, sizeof(buf), "Column %d type mismatch - "
> -                  "received type %d, %s.%s has type %d",
> -                  col, type(col), tsh->db.str, tsh->table_name.str,
> -                  field->type());
> -      rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
> -                  ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
> -    }
> -    /*
> -      Check the slave's field size against that of the master.
> -    */
> -    if (!error &&
> -        !field->compatible_field_size(field_metadata(col), rli_arg))
> -    {
> -      error= 1;
> -      char buf[256];
> -      my_snprintf(buf, sizeof(buf), "Column %d size mismatch - "
> -                  "master has size %d, %s.%s on slave has size %d."
> -                  " Master's column size should be <= the slave's "
> -                  "column size.", col,
> -                  field->pack_length_from_metadata(m_field_metadata[col]),
> -                  tsh->db.str, tsh->table_name.str,
> -                  field->row_pack_length());
> -      rli->report(ERROR_LEVEL, ER_BINLOG_ROW_WRONG_TABLE_DEF,
> -                  ER(ER_BINLOG_ROW_WRONG_TABLE_DEF), buf);
> +      DBUG_PRINT("debug", ("Checking column %d -"
> +                           " field '%s' can be converted - order: %d",
> +                           col, field->field_name, order));
> +      DBUG_ASSERT(order >= -1 && order <= 1);
> +
> +      /*
> +        If order is not 0, a conversion is required, so we need to set
> +        up the conversion table.
> +       */
> +      if (order != 0 && tmp_table == NULL)
> +      {
> +        /*
> +          This will create the full table with all fields. This is
> +          necessary to ge the correct field lengths for the record.
> +        */
> +        tmp_table= create_conversion_table(thd, rli, table);
> +        if (tmp_table == NULL)
> +            return false;
> +        /*
> +          Clear all fields up to, but not including, this column.
> +        */
> +        for (unsigned int i= 0; i < col; ++i)
> +          tmp_table->field[i]= NULL;
> +      }
> +
> +      if (order == 0 && tmp_table != NULL)
> +        tmp_table->field[col]= NULL;
> +    }
> +    else
> +    {
> +      DBUG_PRINT("debug", ("Checking column %d -"
> +                           " field '%s' can not be converted",
> +                           col, field->field_name));
> +      DBUG_ASSERT(col < size() && col < table->s->fields);
> +      DBUG_ASSERT(table->s->db.str &&
> table->s->table_name.str);
> +      const char *db_name= table->s->db.str;
> +      const char *tbl_name= table->s->table_name.str;
> +      char source_buf[MAX_FIELD_WIDTH];
> +      char target_buf[MAX_FIELD_WIDTH];
> +      String source_type(source_buf, sizeof(source_buf), field->charset());
> +      String target_type(target_buf, sizeof(target_buf), field->charset());
> +      show_sql_type(type(col), field_metadata(col), &source_type);
> +      field->sql_type(target_type);
> +      rli->report(ERROR_LEVEL, ER_SLAVE_CONVERSION_FAILED,
> +                  ER(ER_SLAVE_CONVERSION_FAILED),
> +                  col, db_name, tbl_name,
> +                  source_type.c_ptr(), target_type.c_ptr());
> +      return false;
>      }
>    }
>  
> -  return error;
> +#ifndef DBUG_OFF
> +  if (tmp_table)
> +  {
> +    for (unsigned int col= 0; col < tmp_table->s->fields; ++col)
> +      if (tmp_table->field[col])
> +      {
> +        char source_buf[MAX_FIELD_WIDTH];
> +        char target_buf[MAX_FIELD_WIDTH];
> +        String source_type(source_buf, sizeof(source_buf),
> table->field[col]->charset());
> +        String target_type(target_buf, sizeof(target_buf),
> table->field[col]->charset());
> +        tmp_table->field[col]->sql_type(source_type);
> +        table->field[col]->sql_type(target_type);
> +        DBUG_PRINT("debug", ("Field %s - conversion required."
> +                             " Source type: '%s', Target type: '%s'",
> +                             tmp_table->field[col]->field_name,
> +                             source_type.c_ptr(), target_type.c_ptr()));
> +      }
> +  }
> +#endif
> +
> +  *conv_table_var= tmp_table;
> +  return true;
>  }
> +
> +/**
> +  Create a conversion table.
> +
> +  If the function is unable to create the conversion table, an error
> +  will be printed and NULL will be returned.
> +
> +  @return Pointer to conversion table, or NULL if unable to create
> +  conversion table.
> + */
> +
> +TABLE *table_def::create_conversion_table(THD *thd, Relay_log_info *rli, TABLE
> *target_table) const
> +{
> +  DBUG_ENTER("table_def::create_conversion_table");
> +
> +  List<Create_field> field_list;
> +
> +  for (uint col= 0 ; col < size() ; ++col)
> +  {
> +    Create_field *field_def=
> +      (Create_field*) alloc_root(thd->mem_root, sizeof(Create_field));
> +    if (field_list.push_back(field_def))
> +      DBUG_RETURN(NULL);
> +
> +    uint decimals= 0;
> +    TYPELIB* interval= NULL;
> +    uint pack_length= 0;
> +    uint32 max_length=
> +      max_display_length_for_field(type(col), field_metadata(col));
> +
> +    switch(type(col))
> +    {
> +      int precision;
> +    case MYSQL_TYPE_ENUM:
> +    case MYSQL_TYPE_SET:
> +      interval=
> static_cast<Field_enum*>(target_table->field[col])->typelib;
> +      break;
> +
> +    case MYSQL_TYPE_NEWDECIMAL:
> +      /*
> +        The display length of a DECIMAL type is not the same as the
> +        length that should be supplied to make_field, so we correct
> +        the length here.
> +       */
> +      precision = field_metadata(col) >> 8;
> +      decimals= field_metadata(col) & 0x00ff;
> +      max_length=
> +        my_decimal_precision_to_length(precision, decimals, FALSE);
> +      break;
> +
> +    case MYSQL_TYPE_TINY_BLOB:
> +    case MYSQL_TYPE_MEDIUM_BLOB:
> +    case MYSQL_TYPE_LONG_BLOB:
> +    case MYSQL_TYPE_BLOB:
> +    case MYSQL_TYPE_GEOMETRY:
> +      pack_length= field_metadata(col);
> +      break;
> +
> +    default:
> +      break;
> +    }
> +
> +    DBUG_PRINT("debug", ("sql_type: %d, max_length: %d, decimals: %d,"
> +                         " maybe_null: %d, unsigned_flag: %d, pack_length: %u",
> +                         type(col), max_length, decimals, TRUE, FALSE,
> pack_length));
> +    field_def->init_for_tmp_table(type(col),
> +                                  max_length,
> +                                  decimals,
> +                                  TRUE,         // maybe_null
> +                                  FALSE,        // unsigned_flag
> +                                  pack_length);


Shouldn't you check if the field is unsigned? Null?


> +    field_def->charset= target_table->field[col]->charset();
> +    field_def->interval= interval;
> +  }
> +
> +  TABLE *conv_table= create_virtual_tmp_table(thd, field_list);
> +  if (conv_table == NULL)
> +    rli->report(ERROR_LEVEL, ER_SLAVE_CANT_CREATE_CONVERSION,
> +                ER(ER_SLAVE_CANT_CREATE_CONVERSION),
> +                target_table->s->db.str,
> +                target_table->s->table_name.str);
> +  DBUG_RETURN(conv_table);
> +}
> +
> +#endif /* MYSQL_CLIENT */
> +
> +table_def::table_def(unsigned char *types, ulong size,
> +                     uchar *field_metadata, int metadata_size,
> +                     uchar *null_bitmap)

> +  : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
> +    m_field_metadata(0), m_null_bits(0), m_memory(NULL)

Why do you initialize some fields in here if you do it explicetly below?

> +{
> +  m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> +                                     &m_type, size,
> +                                     &m_field_metadata,
> +                                     size * sizeof(uint16),
> +                                     &m_null_bits, (size + 7) / 8,
> +                                     NULL);



> +
> +  bzero(m_field_metadata, size * sizeof(uint16));
> +
> +  if (m_type)
> +    memcpy(m_type, types, size);
> +  else
> +    m_size= 0;
> +  /*
> +    Extract the data from the table map into the field metadata array
> +    iff there is field metadata. The variable metadata_size will be
> +    0 if we are replicating from an older version server since no field
> +    metadata was written to the table map. This can also happen if 
> +    there were no fields in the master that needed extra metadata.
> +  */
> +  if (m_size && metadata_size)
> +  { 
> +    int index= 0;
> +    for (unsigned int i= 0; i < m_size; i++)
> +    {
> +      switch (m_type[i]) {
> +      case MYSQL_TYPE_TINY_BLOB:
> +      case MYSQL_TYPE_BLOB:
> +      case MYSQL_TYPE_MEDIUM_BLOB:
> +      case MYSQL_TYPE_LONG_BLOB:
> +      case MYSQL_TYPE_DOUBLE:
> +      case MYSQL_TYPE_FLOAT:
> +      {
> +        /*
> +          These types store a single byte.
> +        */
> +        m_field_metadata[i]= field_metadata[index];
> +        index++;
> +        break;
> +      }
> +      case MYSQL_TYPE_SET:
> +      case MYSQL_TYPE_ENUM:
> +      case MYSQL_TYPE_STRING:
> +      {
> +        uint16 x= field_metadata[index++] << 8U; // real_type
> +        x+= field_metadata[index++];            // pack or field length
> +        m_field_metadata[i]= x;
> +        break;
> +      }
> +      case MYSQL_TYPE_BIT:
> +      {


> +        uint16 x= field_metadata[index++]; 
> +        x = x + (field_metadata[index++] << 8U);


Is this a mistake? Note that the most significative byte is in the second part of the
array.
In the other cases, it is always in the first part.


> +        m_field_metadata[i]= x;
> +        break;
> +      }
> +      case MYSQL_TYPE_VARCHAR:
> +      {


> +        /*
> +          These types store two bytes.
> +        */
> +        char *ptr= (char *)&field_metadata[index];
> +        m_field_metadata[i]= uint2korr(ptr);
> +        index= index + 2;


I was wondering if this may not generate any architecture issue.



> +        break;
> +      }
> +      case MYSQL_TYPE_NEWDECIMAL:
> +      {
> +        uint16 x= field_metadata[index++] << 8U; // precision
> +        x+= field_metadata[index++];            // decimals
> +        m_field_metadata[i]= x;
> +        break;
> +      }
> +      default:
> +        m_field_metadata[i]= 0;
> +        break;
> +      }
> +    }
> +  }
> +  if (m_size && null_bitmap)
> +    memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
> +}
> +


> +
> +table_def::~table_def()
> +{
> +  my_free(m_memory, MYF(0));

> +#ifndef DBUG_OFF
> +  m_type= 0;
> +  m_size= 0;
> +#endif

What are you trying to prevent with this?

> +}
> +




> 
> === modified file 'sql/rpl_utility.h'
> --- a/sql/rpl_utility.h	2009-03-25 10:53:56 +0000
> +++ b/sql/rpl_utility.h	2009-12-04 10:53:15 +0000
> @@ -21,6 +21,7 @@
>  #endif
>  
>  #include "mysql_priv.h"
> +#include "mysql_com.h"
>  
>  class Relay_log_info;
>  
> @@ -44,115 +45,18 @@ class table_def
>  {
>  public:
>    /**
> -    Convenience declaration of the type of the field type data in a
> -    table map event.
> -  */
> -  typedef unsigned char field_type;
> -
> -  /**
>      Constructor.
>  
> -    @param types Array of types
> +    @param types Array of types, each stored as a byte
>      @param size  Number of elements in array 'types'
>      @param field_metadata Array of extra information about fields
>      @param metadata_size Size of the field_metadata array
>      @param null_bitmap The bitmap of fields that can be null
>     */
> -  table_def(field_type *types, ulong size, uchar *field_metadata, 
> -      int metadata_size, uchar *null_bitmap)
> -    : m_size(size), m_type(0), m_field_metadata_size(metadata_size),
> -      m_field_metadata(0), m_null_bits(0), m_memory(NULL)
> -  {
> -    m_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> -                                       &m_type, size,
> -                                       &m_field_metadata,
> -                                       size * sizeof(uint16),
> -                                       &m_null_bits, (size + 7) / 8,
> -                                       NULL);
> -
> -    bzero(m_field_metadata, size * sizeof(uint16));
> +  table_def(unsigned char *types, ulong size, uchar *field_metadata,
> +            int metadata_size, uchar *null_bitmap);
>  
> -    if (m_type)
> -      memcpy(m_type, types, size);
> -    else
> -      m_size= 0;
> -    /*
> -      Extract the data from the table map into the field metadata array
> -      iff there is field metadata. The variable metadata_size will be
> -      0 if we are replicating from an older version server since no field
> -      metadata was written to the table map. This can also happen if 
> -      there were no fields in the master that needed extra metadata.
> -    */
> -    if (m_size && metadata_size)
> -    { 
> -      int index= 0;
> -      for (unsigned int i= 0; i < m_size; i++)
> -      {
> -        switch (m_type[i]) {
> -        case MYSQL_TYPE_TINY_BLOB:
> -        case MYSQL_TYPE_BLOB:
> -        case MYSQL_TYPE_MEDIUM_BLOB:
> -        case MYSQL_TYPE_LONG_BLOB:
> -        case MYSQL_TYPE_DOUBLE:
> -        case MYSQL_TYPE_FLOAT:
> -        {
> -          /*
> -            These types store a single byte.
> -          */
> -          m_field_metadata[i]= field_metadata[index];
> -          index++;
> -          break;
> -        }
> -        case MYSQL_TYPE_SET:
> -        case MYSQL_TYPE_ENUM:
> -        case MYSQL_TYPE_STRING:
> -        {
> -          uint16 x= field_metadata[index++] << 8U; // real_type
> -          x+= field_metadata[index++];            // pack or field length
> -          m_field_metadata[i]= x;
> -          break;
> -        }
> -        case MYSQL_TYPE_BIT:
> -        {
> -          uint16 x= field_metadata[index++]; 
> -          x = x + (field_metadata[index++] << 8U);
> -          m_field_metadata[i]= x;
> -          break;
> -        }
> -        case MYSQL_TYPE_VARCHAR:
> -        {
> -          /*
> -            These types store two bytes.
> -          */
> -          char *ptr= (char *)&field_metadata[index];
> -          m_field_metadata[i]= uint2korr(ptr);
> -          index= index + 2;
> -          break;
> -        }
> -        case MYSQL_TYPE_NEWDECIMAL:
> -        {
> -          uint16 x= field_metadata[index++] << 8U; // precision
> -          x+= field_metadata[index++];            // decimals
> -          m_field_metadata[i]= x;
> -          break;
> -        }
> -        default:
> -          m_field_metadata[i]= 0;
> -          break;
> -        }
> -      }
> -    }
> -    if (m_size && null_bitmap)
> -       memcpy(m_null_bits, null_bitmap, (m_size + 7) / 8);
> -  }
> -
> -  ~table_def() {
> -    my_free(m_memory, MYF(0));
> -#ifndef DBUG_OFF
> -    m_type= 0;
> -    m_size= 0;
> -#endif
> -  }
> +  ~table_def();
>  
>    /**
>      Return the number of fields there is type data for.
> @@ -171,10 +75,10 @@ public:
>      <code>index</code>. Currently, only the type identifier is
>      returned.
>     */
> -  field_type type(ulong index) const
> +  enum_field_types type(ulong index) const
>    {
>      DBUG_ASSERT(index < m_size);
> -    return m_type[index];
> +    return static_cast<enum_field_types>(m_type[index]);
>    }
>  
>  
> @@ -226,23 +130,58 @@ public:
>      with it.
>  
>      A table definition is compatible with a table if:
> -      - the columns types of the table definition is a (not
> -        necessarily proper) prefix of the column type of the table, or
> -      - the other way around
> +      - The columns types of the table definition is a (not
> +        necessarily proper) prefix of the column type of the table.
>  
> +      - The other way around.
> +
> +      - Each column on the master that also exists on the slave can be
> +        converted according to the current settings of @c
> +        SLAVE_TYPE_CONVERSIONS.
> +
> +    @param thd
>      @param rli   Pointer to relay log info
>      @param table Pointer to table to compare with.
>  
> +    @param[out] tmp_table_var Pointer to temporary table for holding
> +    conversion table.
> +
>      @retval 1  if the table definition is not compatible with @c table
>      @retval 0  if the table definition is compatible with @c table
>    */
>  #ifndef MYSQL_CLIENT
> -  int compatible_with(Relay_log_info const *rli, TABLE *table) const;
> +  bool compatible_with(THD *thd, Relay_log_info *rli, TABLE *table,
> +                      TABLE **conv_table_var) const;
> +
> +  /**
> +   Create a virtual in-memory temporary table structure.
> +
> +   The table structure has records and field array so that a row can
> +   be unpacked into the record for further processing.
> +
> +   In the virtual table, each field that requires conversion will
> +   have a non-NULL value, while fields that do not require
> +   conversion will have a NULL value.
> +
> +   Some information that is missing in the events, such as the
> +   character set for string types, are taken from the table that the
> +   field is going to be pushed into, so the target table that the data
> +   eventually need to be pushed into need to be supplied.
> +
> +   @param thd Thread to allocate memory from.
> +   @param rli Relay log info structure, for error reporting.
> +   @param target_table Target table for fields.
> +
> +   @return A pointer to a temporary table with memory allocated in the
> +   thread's memroot, NULL if the table could not be created
> +   */
> +  TABLE *create_conversion_table(THD *thd, Relay_log_info *rli, TABLE *target_table)
> const;
>  #endif
>  
> +
>  private:
>    ulong m_size;           // Number of elements in the types array
> -  field_type *m_type;                     // Array of type descriptors
> +  unsigned char *m_type;  // Array of type descriptors
>    uint m_field_metadata_size;
>    uint16 *m_field_metadata;
>    uchar *m_null_bits;
> @@ -260,6 +199,7 @@ struct RPL_TABLE_LIST
>  {
>    bool m_tabledef_valid;
>    table_def m_tabledef;
> +  TABLE *m_conv_table;
>  };
>  
>  
> 
> === modified file 'sql/set_var.cc'
> --- a/sql/set_var.cc	2009-09-29 15:38:40 +0000
> +++ b/sql/set_var.cc	2009-12-04 10:53:15 +0000
> @@ -92,6 +92,33 @@ TYPELIB delay_key_write_typelib=
>    delay_key_write_type_names, NULL
>  };
>  
> +/**
> +  SLAVE_TYPE_CONVERSIONS variable.
> +
> +  Definition is equivalent to
> +  @code
> +  SET('ALL_NON_LOSSY', 'ALL_LOSSY')
> +  @endcode
> + */
> +const char *slave_type_conversions_type_name[]= {
> +  "ALL_LOSSY",
> +  "ALL_NON_LOSSY",
> +  NullS
> +};
> +
> +size_t slave_type_conversions_type_length[]= {
> +  sizeof("ALL_LOSSY")-1,
> +  sizeof("ALL_NON_LOSSY")-1,
> +  0
> +};
> +
> +TYPELIB slave_type_conversions_typelib=
> +{
> +  array_elements(slave_type_conversions_type_name)-1, "",
> +  slave_type_conversions_type_name,
> +  slave_type_conversions_type_length
> +};
> +
>  const char *slave_exec_mode_names[]=
>  { "STRICT", "IDEMPOTENT", NullS };
>  static const unsigned int slave_exec_mode_names_len[]=
> @@ -580,6 +607,12 @@ static sys_var_set_slave_mode slave_exec
>                                                &slave_exec_mode_options,
>                                                &slave_exec_mode_typelib,
>                                                0);
> +static sys_var_set slave_type_conversions(&vars,
> +                                          "slave_type_conversions",
> +                                          &slave_type_conversions_options,
> +                                          &slave_type_conversions_typelib,
> +                                          0);
> +
>  static sys_var_long_ptr	sys_slow_launch_time(&vars, "slow_launch_time",
>  					     &slow_launch_time);
>  static sys_var_thd_ulong	sys_sort_buffer(&vars, "sort_buffer_size",


> @@ -1261,7 +1294,6 @@ bool sys_var_thd_binlog_format::check(TH
>    return result;
>  }
>  
> -
>  bool sys_var_thd_binlog_format::is_readonly() const
>  {
>    /*
> 


Please, remove this hunk.



> === modified file 'sql/set_var.h'
> --- a/sql/set_var.h	2009-09-29 15:38:40 +0000
> +++ b/sql/set_var.h	2009-12-04 10:53:15 +0000
> @@ -32,6 +32,7 @@ typedef struct my_locale_st MY_LOCALE;
>  
>  extern TYPELIB bool_typelib, delay_key_write_typelib, sql_mode_typelib,
>    optimizer_switch_typelib, slave_exec_mode_typelib;
> +extern TYPELIB slave_type_conversions_typelib;
>  
>  typedef int (*sys_check_func)(THD *,  set_var *);
>  typedef bool (*sys_update_func)(THD *, set_var *);
> 
> === modified file 'sql/share/errmsg.txt'
> --- a/sql/share/errmsg.txt	2009-09-29 15:38:40 +0000
> +++ b/sql/share/errmsg.txt	2009-12-04 10:53:15 +0000
> @@ -6213,3 +6213,8 @@ ER_DEBUG_SYNC_TIMEOUT
>  ER_DEBUG_SYNC_HIT_LIMIT
>    eng "debug sync point hit limit reached"
>    ger "Debug Sync Point Hit Limit erreicht"
> +
> +ER_SLAVE_CONVERSION_FAILED
> +  eng "Column %d of table '%-.192s.%-.192s' cannot be converted from type '%-.32s'
> to type '%-.32s'"
> +ER_SLAVE_CANT_CREATE_CONVERSION
> +  eng "Can't create conversion table for table '%-.192s.%-.192s'"
> 
> === modified file 'sql/sql_class.h'
> --- a/sql/sql_class.h	2009-10-14 08:46:50 +0000
> +++ b/sql/sql_class.h	2009-12-04 10:53:15 +0000
> @@ -86,6 +86,12 @@ enum enum_delay_key_write { DELAY_KEY_WR
>  enum enum_slave_exec_mode { SLAVE_EXEC_MODE_STRICT,
>                              SLAVE_EXEC_MODE_IDEMPOTENT,
>                              SLAVE_EXEC_MODE_LAST_BIT};
> +enum enum_slave_type_conversions {
> +  SLAVE_TYPE_CONVERSIONS_ALL_LOSSY,
> +  SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY,
> +  SLAVE_TYPE_CONVERSIONS_COUNT
> +};
> +
>  enum enum_mark_columns
>  { MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE};
>  
> 
> === modified file 'sql/sql_select.cc'
> --- a/sql/sql_select.cc	2009-10-14 16:24:29 +0000
> +++ b/sql/sql_select.cc	2009-12-04 10:53:15 +0000
> @@ -10462,6 +10462,16 @@ TABLE *create_virtual_tmp_table(THD *thd
>            null_bit= 1;
>          }
>        }
> +      if (cur_field->type() == MYSQL_TYPE_BIT)
> +      {
> +        static_cast<Field_bit*>(cur_field)->set_bit_ptr(null_pos,
> null_bit);
> +        null_bit+= cur_field->field_length & 7;
> +        if (null_bit > 7)
> +        {
> +          null_pos++;
> +          null_bit-= 8;
> +        }
> +      }
>        cur_field->reset();
>  
>        field_pos+= cur_field->pack_length();
> 
> 
> ------------------------------------------------------------------------
> 
> This body part will be downloaded on demand.
Thread
bzr commit into mysql-5.1 branch (mats:3180) WL#5151Mats Kindahl4 Dec
  • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Alfranio Correia10 Dec
    • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Sergei Golubchik10 Dec
    • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Mats Kindahl10 Dec
      • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Alfranio Correia10 Dec
        • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Mats Kindahl10 Dec
  • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Luís Soares10 Dec
    • Re: bzr commit into mysql-5.1 branch (mats:3180) WL#5151Mats Kindahl10 Dec