List:Internals« Previous MessageNext Message »
From:Mats Kindahl Date:April 1 2005 11:26am
Subject:bk commit into 5.1 tree (mats:1.1799)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of mats. When mats does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1799 05/04/01 13:26:11 mats@stripped +15 -0
  WL#1012: row-level binlogging
    Row-writer functionality moved to THD structure.
    Added pre-processor directives to handle several configurations.
    Table-mapping functionality moved to MYSQL_LOG.

  sql/sql_select.cc
    1.303 05/04/01 13:25:14 mats@stripped +2 -2
    Using ha_write_row() instead of write_row().

  sql/sql_class.h
    1.228 05/04/01 13:25:14 mats@stripped +44 -1
    Row-writer functionality moved to THD structure.

  sql/rpl_tblmap.h
    1.2 05/04/01 13:25:14 mats@stripped +10 -0
    Using separate id for "undefined table".

  sql/rpl_tblmap.cc
    1.2 05/04/01 13:25:14 mats@stripped +7 -1
    Adding debug messages.

  sql/sql_class.cc
    1.173 05/04/01 13:25:13 mats@stripped +235 -3
    Row-writer functionality moved to THD structure.

  sql/rpl_injector.cc
    1.6 05/04/01 13:25:13 mats@stripped +27 -14
    Adding transaction destructor to free allocated memory.
    Row-writer functionality moved to THD structure.

  sql/log_event.h
    1.108 05/04/01 13:25:13 mats@stripped +19 -12
    Adding pre-processor directives to handle several configurations.
    Setting database name correctly for table map event.
    Phasing out STMT_END_F flag and use TRANS_END_F instead.

  sql/log_event.cc
    1.169 05/04/01 13:25:13 mats@stripped +39 -28
    Adding pre-processor directives to handle several configurations.
    Setting database name correctly for table map event.
    Phasing out STMT_END_F flag and use TRANS_END_F instead.

  sql/log.cc
    1.158 05/04/01 13:25:13 mats@stripped +34 -1
    Table mapping from table to table id is managed by MYSQL_LOG.

  sql/item_sum.cc
    1.132 05/04/01 13:25:13 mats@stripped +1 -1
    Switching to new handle interface.

  sql/handler.h
    1.132 05/04/01 13:25:13 mats@stripped +1 -101
    No replication of temporary tables.
    Adding pre-processor directives to handle several configurations.
    Row-writer functionality moved to THD class.

  sql/handler.cc
    1.150 05/04/01 13:25:12 mats@stripped +30 -224
    No replication of temporary tables.
    Adding pre-processor directives to handle several configurations.
    Row-writer functionality moved to THD class.

  mysql-test/t/rpl_row_basic.test
    1.3 05/04/01 13:25:12 mats@stripped +8 -8
    binlog position change

  mysql-test/r/rpl_row_trans.result
    1.2 05/04/01 13:25:12 mats@stripped +12 -1
    binlog position change

  mysql-test/r/rpl_row_basic.result
    1.3 05/04/01 13:25:12 mats@stripped +13 -13
    binlog position change

  sql/rpl_tblmap.cc
    1.1 05/03/21 20:11:18 mats@stripped +151 -0

  sql/rpl_tblmap.cc
    1.0 05/03/21 20:11:18 mats@stripped +0 -0
    BitKeeper file /home/bk/w1012-mysql-5.1/sql/rpl_tblmap.cc

  sql/rpl_tblmap.h
    1.1 05/03/21 20:11:16 mats@stripped +59 -0

  sql/rpl_tblmap.h
    1.0 05/03/21 20:11:16 mats@stripped +0 -0
    BitKeeper file /home/bk/w1012-mysql-5.1/sql/rpl_tblmap.h

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	mats
# Host:	romeo.kindahl.net
# Root:	/home/bk/w1012-mysql-5.1

--- 1.149/sql/handler.cc	2005-03-22 08:25:35 +01:00
+++ 1.150/sql/handler.cc	2005-04-01 13:25:12 +02:00
@@ -2404,12 +2404,18 @@
 			table->field[i]->ptr));
   }
 #endif
+  DBUG_PRINT("info", ("Writing row to actual handler"));
   if (int error = write_row(buf)) {
     DBUG_PRINT("exit", ("error = %d", error));
     DBUG_RETURN(error);
   }
-  bitvector const bv(table->s->fields, true);
-  m_row_writer.write_row(bv, buf);
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+  DBUG_PRINT("info", ("Writing row to THD"));
+  if (table->s->tmp_table == NO_TMP_TABLE) {
+    bitvector const cols(table->s->fields, true);
+    thd->write_row(table, cols, buf);
+  }
+#endif
   DBUG_PRINT("exit", ("error = %d", 0));
   DBUG_RETURN(0);
 }
@@ -2435,8 +2441,12 @@
     DBUG_PRINT("exit", ("error = %d", error));
     DBUG_RETURN(error);
   }
-  bitvector const cols(table->s->fields, true);
-  m_row_writer.update_row(cols, old_data, new_data);
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+  if (table->s->tmp_table == NO_TMP_TABLE) {
+    bitvector const cols(table->s->fields, true);
+    thd->update_row(table, cols, old_data, new_data);
+  }
+#endif
   DBUG_PRINT("exit", ("error = %d", 0));
   DBUG_RETURN(0);
 }
@@ -2462,8 +2472,12 @@
     DBUG_PRINT("exit", ("error = %d", error));
     DBUG_RETURN(error);
   }
-  bitvector const cols(table->s->fields, true);
-  m_row_writer.delete_row(cols, buf);
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+  if (table->s->tmp_table == NO_TMP_TABLE) {
+    bitvector const cols(table->s->fields, true);
+    thd->delete_row(table, cols, buf);
+  }
+#endif
   DBUG_PRINT("exit", ("error = %d", 0));
   DBUG_RETURN(0);
 }    
@@ -2471,227 +2485,19 @@
 int handler::
 ha_stmt_begin() 
 { 
-  return m_row_writer.transaction_begin(); 
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+  return current_thd->transaction_begin(); 
+#else
+  return 0;
+#endif
 }
 
 int handler::
 ha_stmt_end()
 { 
-  return m_row_writer.transaction_end(); 
-}
-
-
-/**************************************************************************
-	Row_rw_base member functions
-**************************************************************************/
-
-Row_rw_base::
-Row_rw_base(THD *thd, TABLE *table)
-  : m_thd(thd), m_table(table)
-{ 
-}
-
-size_t Row_rw_base::
-max_row_length(const byte * const record) const
-{
-  DBUG_ENTER("Row_rw_base::max_row_length");
-  DBUG_PRINT("enter", ("record = 0x%0x", record));
-
-  size_t length = m_table->s->reclength + 2 * m_table->s->fields;
-  uint* const beg = m_table->s->blob_field;
-  uint* const end = beg + m_table->s->blob_fields;
-
-  for (uint *ptr = beg ; ptr != end ; ++ptr)
-  {
-    Field_blob* const blob = (Field_blob*) m_table->field[*ptr];
-    length += blob->get_length(record + blob->offset()) + 2;
-  }
-
-  DBUG_RETURN(length);
-}
-
-size_t Row_rw_base::
-pack_row(byte *row_data, size_t max_size, const byte *record) const
-{
-  DBUG_ENTER("Row_rw_base::pack_row");
-  DBUG_PRINT("enter", ("row_data = %p, max_size = %lu, record = %p", 
-		       row_data, max_size, record));
-
-  byte *ptr = row_data;
-
-  bzero(row_data, max_size);
-
-  memcpy(row_data, record, m_table->s->null_bytes);
-  ptr += m_table->s->null_bytes;
-
-  for ( Field **field = m_table->field ; *field ; ++field) 
-  {
-    ptrdiff_t const offset = (*field)->offset();
-    ptr = (*field)->pack(ptr, record + offset);
-    DBUG_PRINT("info", ("Packing length %d field '%s' from %p to %p + %d", 
-			(*field)->field_length, (*field)->field_name, 
-			ptr, record, offset));
-  }
-
-  // ptrdiff_t is signed, size_t is unsigned. Check that the conversion
-  // will work correctly.
-  DBUG_ASSERT(ptr - row_data >= 0);
-  size_t const size = (size_t) (ptr - row_data);
-  DBUG_PRINT("return", ("size = %lu", size));
-  DBUG_RETURN(size);
-}
-
-/**************************************************************************
-	Row_writer member functions
-**************************************************************************/
-
-
-int Row_writer::
-write_row(bitvector const& cols, byte const *record) 
-{ 
-  DBUG_ENTER("Row_writer::write_row");
-  DBUG_PRINT("enter", ("cols = { size: %d, data = %p }; record = %p", 
-		       cols.size(), cols.data(), record));
-
-  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
-    DBUG_RETURN(0);
-
-  THD* const thd = current_thd;
-
-  /* 
-     Pack records into format for transfer. We are allocating more
-     memory than needed, but that doesn't matter.
-  */
-  size_t const max_len = max_row_length(record);
-  byte* const row_data = my_malloc(max_len, MYF(MY_WME));
-  size_t const len     = pack_row(row_data, max_len, record);
-
-  Rows_log_event* const
-    ev = thd->prepare_pending<Write_rows_log_event>(m_table, m_server_id, cols, len);
-
-  if (ev == NULL)
-    DBUG_RETURN(1);
-
-  // add_row_data copies row_data to internal buffer
-  ev->add_row_data(row_data,len);
-
-  my_free(row_data, MYF(MY_WME));
-
-  DBUG_RETURN(0);
-}
-
-int Row_writer::
-update_row(bitvector const& cols,
-	   const byte *before_record, const byte *after_record)
-{ 
-  DBUG_ENTER("Row_writer::update_row");
-  DBUG_PRINT("enter", ("before: 0x%0x; after: 0x%0x", 
-		       before_record, after_record));
-
-  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
-    DBUG_RETURN(0);
-
-  THD* const thd = current_thd;
-
-  size_t const before_maxlen = max_row_length(before_record);
-  size_t const after_maxlen  = max_row_length(after_record);
-
-  byte *before_row, *after_row;
-  byte* const memory = my_multi_malloc(MYF(MY_WME),
-				       &before_row, before_maxlen,
-				       &after_row, after_maxlen,
-				       NULL);
-
-  size_t const before_size = pack_row(before_row, before_maxlen, before_record);
-  size_t const after_size = pack_row(after_row, after_maxlen, after_record);
-  
-  Rows_log_event* const
-    ev = thd->prepare_pending<Update_rows_log_event>(m_table, 
-						     m_server_id, cols, 
-						     before_size + after_size);
-
-  if (ev == NULL)
-    DBUG_RETURN(1);
-
-  ev->add_row_data(before_row, before_size);
-  ev->add_row_data(after_row, after_size);
-
-  // add_row_data copies row_data to internal buffer
-  my_free(memory, MYF(MY_WME));
-
-  
-  DBUG_RETURN(0);
-}
-
-int Row_writer::
-delete_row(bitvector const& cols, byte const *record)
-{ 
-  DBUG_ENTER("Row_writer::delete_row");
-  DBUG_PRINT("enter", ("record = 0x%0x", record));
-
-  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
-    DBUG_RETURN(0);
-
-  THD* const thd = current_thd;
-
-  /* 
-     Pack records into format for transfer. We are allocating more
-     memory than needed, but that doesn't matter.
-  */
-  size_t const max_len = max_row_length(record);
-  byte* const row_data = my_malloc(max_len, MYF(MY_WME));
-  size_t const len     = pack_row(row_data, max_len, record);
-
-  Rows_log_event* const
-    ev = thd->prepare_pending<Delete_rows_log_event>(m_table, 
-						     m_server_id, cols, len);
-
-  if (ev == NULL)
-    DBUG_RETURN(1);
-
-  ev->add_row_data(row_data, len);
-
-  // add_row_data copies row_data
-  my_free(row_data, MYF(MY_WME));
-
-  DBUG_RETURN(0);
-}
-
-int Row_writer::
-write_table_map()
-{
-  DBUG_ENTER("Row_writer::write_table_map()");
-  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
-    DBUG_RETURN(0);
-
-  THD* const thd = current_thd;
-  ulong table_id = mysql_bin_log.get_table_id(m_table);
-  Table_map_log_event the_event(thd, m_table, table_id, m_has_trans);
-  if (mysql_bin_log.write(&the_event))
-    DBUG_RETURN(1);
-  DBUG_RETURN(0);
-}
-
-
-int Row_writer::
-flush_pending_event(bool stmt_end)
-{
-  DBUG_ENTER("Row_writer::flush_pending_event(bool)");
-  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
-    DBUG_RETURN(0);
-
-  THD* const thd = current_thd;
-  /*
-    Mark the event as the last event of a statement if the stmt_end flag is
-    set. 
-  */
-  if (stmt_end) {
-    if (Rows_log_event* pending = thd->get_pending_event()) {
-      pending->set_flags(Rows_log_event::STMT_END_F);
-    }
-  }
-  
-  DBUG_PRINT("flush", ("thd = 0x%0x", thd));
-  int const error = thd->flush_and_set_pending_event(0);
-  DBUG_RETURN(error);
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+  return current_thd->transaction_end(); 
+#else
+  return 0;
+#endif
 }

--- 1.131/sql/handler.h	2005-03-22 08:25:35 +01:00
+++ 1.132/sql/handler.h	2005-04-01 13:25:13 +02:00
@@ -409,100 +409,6 @@
 } HANDLER_BUFFER;
 
 
-class Rows_log_event;
-
-/*****************************************************************************
-
- Row event reader/writer base class.
-
- The row event reader/writer base class contain support utilities to
- help the row reader and row writer to perform its tasks. 
-
- The responsibilities are:
- - Provide packing and unpacking utilities for subclasses.
- 
-*****************************************************************************/
-
-class Row_rw_base {
-public:
-  Row_rw_base(THD *thd, TABLE *table);
-
-  virtual ~Row_rw_base() { }
-
-protected:
-  size_t max_row_length(const byte *data) const;
-  size_t pack_row(byte *row_data, size_t max_len, const byte *data) const;
-
-protected:
-  // These are declared 'const' since you're not supposed to change
-  // them for the lifetime of the object.
-  THD   *const m_thd;
-  TABLE *const m_table;
-};
-
-/*****************************************************************************
-
-  Row event writer class.
-
-  The row event writer serves as a facade to the real row-level
-  events. It takes care of writing row log events at the apropriate
-  times, creating new row-level events when needed and writes them to
-  the binary log when required.
-
-  RESPONSIBILITIES
-
-    - Keeping a mapping from tables to table ids.
-    - Packing rows for transmission to the slave.
-    - Creating events and store the received rows in the event.
-    - Keeping track of the size of the event and ensure that the events
-      are within correct bounds.
-    - Sending events to the binary log.
-
-  COLLABORATION
-
-    Write_rows_log_event  
-    Delete_rows_log_event  
-    Update_rows_log_event  
-
- ****************************************************************************/
-
-class handler;
-
-class Row_writer : public Row_rw_base
-{
-public:
-  Row_writer(THD *thd, TABLE *table, bool has_trans)
-    : Row_rw_base(thd, table), m_has_trans(has_trans) 
-  { 
-  }
-
-  ~Row_writer() {}
- 
-  int write_row(bitvector const& cols, 
-		const byte *buf);
-  int delete_row(bitvector const& cols, 
-		 const byte *buf);
-  int update_row(bitvector const& cols, 
-		 const byte *old_data, const byte *new_data);
-
-  void set_server_id(uint32 sid) { m_server_id = sid; }
-
-  int transaction_begin() { 
-    return write_table_map(); 
-  }
-
-  int transaction_end() { 
-    return flush_pending_event(true); 
-  }
-
-private:
-  int flush_pending_event(bool stmt_end);
-  int write_table_map();
-
-  uint32 m_server_id;
-  bool m_has_trans;
-};
-
 
 class handler :public Sql_alloc
 {
@@ -571,8 +477,7 @@
     key_used_on_scan(MAX_KEY), active_index(MAX_KEY),
     ref_length(sizeof(my_off_t)), block_size(0),
     raid_type(0), ft_handler(0), inited(NONE), implicit_emptied(0),
-    pushed_cond(NULL),
-    m_row_writer(current_thd, table_arg, has_transactions())
+    pushed_cond(NULL)
     {}
   virtual ~handler(void) { /* TODO: DBUG_ASSERT(inited == NONE); */ }
   int ha_open(const char *name, int mode, int test_if_locked);
@@ -869,9 +774,6 @@
  */
  virtual void cond_pop() { return; };
 
-  // This is a temporary solution
-  Row_writer* row_writer() { return &m_row_writer; }
-
 private:
   /*
     Row-level primitives for storage engines. 
@@ -883,8 +785,6 @@
   { return  HA_ERR_WRONG_COMMAND; }
   virtual int delete_row(const byte * buf)
   { return  HA_ERR_WRONG_COMMAND; }
- 
-  Row_writer m_row_writer;
 };
 
 	/* Some extern variables used with handlers */

--- 1.131/sql/item_sum.cc	2005-03-22 08:44:12 +01:00
+++ 1.132/sql/item_sum.cc	2005-04-01 13:25:13 +02:00
@@ -2370,7 +2370,7 @@
     */
     return tree->unique_add(table->record[0] + table->s->null_bytes);
   }
-  if ((error= table->file->write_row(table->record[0])) &&
+  if ((error= table->file->ha_write_row(table->record[0])) &&
       error != HA_ERR_FOUND_DUPP_KEY &&
       error != HA_ERR_FOUND_DUPP_UNIQUE)
     return TRUE;

--- 1.157/sql/log.cc	2005-03-22 08:25:35 +01:00
+++ 1.158/sql/log.cc	2005-04-01 13:25:13 +02:00
@@ -1744,6 +1744,7 @@
 
     if (file == &log_file) // we are writing to the real log (disk)
     {
+      DBUG_PRINT("info", ("Flushing cache %p", file));
       if (flush_io_cache(file) || sync_binlog(file))
 	goto err;
     }
@@ -2303,11 +2304,14 @@
 }
 
 
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
 ulong MYSQL_LOG::
 get_table_id(TABLE* table)
 {
   DBUG_ENTER("MYSQL_LOG::get_table_id(TABLE*)");
   DBUG_PRINT("enter", ("table=%p", table)); 
+  DBUG_ASSERT(table != NULL);
 
   // Have to create it here since it relies on my_malloc, which requires
   // my_init() to have been executed prior to this.
@@ -2315,16 +2319,45 @@
     m_table_map = new table_mapping;
 
   DBUG_ASSERT(m_table_map != NULL);
+
   ulong tid = m_table_map->get_table_id(table);
-  if (tid == m_table_map->count()) {
+  if (tid == table_mapping::NO_TABLE) {
     // We can't use the number of tables in the list since the highest table
     // id might be larger than the number of elements in the list. 
     tid = m_next_table_id++;
+
+    // There is one reserved number that cannot be used.
+    if (tid == table_mapping::NO_TABLE)
+	tid = m_next_table_id++;
+	
     m_table_map->set_table(tid, table);
   }
   DBUG_PRINT("return", ("table_id=%d", tid));
   DBUG_RETURN(tid);
 }
+
+int MYSQL_LOG::
+is_table_mapped(TABLE* table) const
+{
+  DBUG_ASSERT(table != NULL);
+
+  // Have to create it here since it relies on my_malloc, which requires
+  // my_init() to have been executed prior to this.
+  if (m_table_map == NULL)
+    m_table_map = new table_mapping;
+
+  DBUG_ASSERT(m_table_map != NULL);
+  return m_table_map->get_table_id(table) != table_mapping::NO_TABLE;
+}
+
+void MYSQL_LOG::
+clear_table_mappings()
+{
+  DBUG_ASSERT(m_table_map != NULL);
+  m_table_map->clear_tables();
+}
+
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
 #ifdef __NT__
 void print_buffer_to_nt_eventlog(enum loglevel level, char *buff,

--- 1.168/sql/log_event.cc	2005-03-22 08:44:12 +01:00
+++ 1.169/sql/log_event.cc	2005-04-01 13:25:13 +02:00
@@ -4723,9 +4723,8 @@
 
 #ifndef MYSQL_CLIENT
 Rows_log_event::
-Rows_log_event(THD* thd_arg, TABLE* tbl_arg, ulong tid, 
-	       bitvector const& cols, bool using_trans)
-  : Log_event(thd_arg, 0, using_trans),
+Rows_log_event(THD* thd_arg, TABLE* tbl_arg, ulong tid, bitvector const& cols)
+  : Log_event(thd_arg, 0, tbl_arg->file->has_transactions()),
     m_dbnam(thd_arg->db), m_dblen(m_dbnam ? strlen(m_dbnam) : 0),
     m_table(tbl_arg), 
     m_tblnam(tbl_arg->s->table_name),
@@ -4833,6 +4832,7 @@
 }
 
 #ifndef MYSQL_CLIENT
+#ifdef HAVE_REPLICATION
 /*
   Unpack a row into a record. The row is assumed to only consist of the fields
   for which the bitset represented by 'arr' and 'bits'; the other parts of the
@@ -4915,7 +4915,7 @@
   }
   do_after_row_operations(table);
 
-  if (m_rows_flags & STMT_END_F) {
+  if (m_rows_flags & TRANS_END_F) {
     // This is the end of a statement, so close (and unlock) the tables we
     // opened when processing the Table_map_log_event starting the statement.
 
@@ -4928,7 +4928,8 @@
     
   DBUG_RETURN(error || Log_event::exec_event(rli));
 }
-#endif
+#endif // HAVE_REPLICATION
+#endif // MYSQL_CLIENT
 
 bool Rows_log_event::
 write_data_header(IO_CACHE* file)
@@ -4980,10 +4981,10 @@
  */
 #ifndef MYSQL_CLIENT
 Table_map_log_event::
-Table_map_log_event(THD* thd, TABLE* tbl, ulong tid, bool using_trans)
-  : Log_event(thd, 0, using_trans), 
+Table_map_log_event(THD* thd, TABLE* tbl, ulong tid)
+  : Log_event(thd, 0, tbl->file->has_transactions()), 
     m_table(tbl),
-    m_dbnam(thd->db), 
+    m_dbnam(tbl->s->db), 
     m_dblen(m_dbnam ? strlen(m_dbnam) : 0),
     m_tblnam(tbl->s->table_name), // Do I need to take a copy???
     m_tbllen(strlen(m_tblnam)),
@@ -4998,6 +4999,9 @@
   m_data_size += m_tbllen + 2;	// Include length and terminating \0
   m_data_size += 1 + m_colcnt;	// COLCNT and column types
 
+  DBUG_PRINT("info", ("m_dbnam='%s', m_tblnam='%s', m_table_id=%d", 
+		      m_dbnam, m_tblnam, m_table_id));
+
   DBUG_ASSERT(m_memory == NULL);
   if ((m_memory = my_multi_malloc(MYF(0), 
 				  &m_coltype, m_colcnt, 
@@ -5087,6 +5091,7 @@
 }
 
 #ifndef MYSQL_CLIENT
+#ifdef HAVE_REPLICATION
 int Table_map_log_event::
 exec_event(st_relay_log_info* rli)
 {
@@ -5111,7 +5116,8 @@
   int error = rli->m_table_map.set_table(m_table_id, m_table);
   DBUG_RETURN(error || Log_event::exec_event(rli));
 }
-#endif
+#endif // HAVE_REPLICATION
+#endif // !MYSQL_CLIENT
 
 #ifndef MYSQL_CLIENT
 bool Table_map_log_event::
@@ -5192,9 +5198,8 @@
 #ifndef MYSQL_CLIENT
 Write_rows_log_event::
 Write_rows_log_event(THD* thd_arg, TABLE* tbl_arg, ulong tid_arg, 
-		     bitvector const& cols, 
-		     bool using_trans)
-  : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols, using_trans)
+		     bitvector const& cols)
+  : Rows_log_event(thd_arg, tbl_arg, tid_arg, cols)
 {
 }
 #endif
@@ -5211,7 +5216,7 @@
   DBUG_VOID_RETURN;
 }
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 int Write_rows_log_event::
 do_before_row_operations(TABLE* table)
 {
@@ -5240,10 +5245,11 @@
 do_exec_row(TABLE* table, st_relay_log_info* rli)
 {
   DBUG_ENTER("Write_rows_log_event::do_exec_row(TABLE*,...)");
+  DBUG_ASSERT(table != NULL);
   int error = table->file->ha_write_row(table->record[0]);
   DBUG_RETURN(error);
 }
-#endif
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
 #ifdef MYSQL_CLIENT
 void Write_rows_log_event::
@@ -5283,9 +5289,13 @@
 #ifndef MYSQL_CLIENT
 Delete_rows_log_event::
 Delete_rows_log_event(THD* thd_arg, TABLE* tbl_arg, ulong tid, 
-		      bitvector const& cols, bool using_trans)
-  : Rows_log_event(thd_arg, tbl_arg, tid, cols, using_trans), 
+		      bitvector const& cols)
+#ifndef HAVE_REPLICATION
+  : Rows_log_event(thd_arg, tbl_arg, tid, cols)
+#else
+  : Rows_log_event(thd_arg, tbl_arg, tid, cols), 
     m_memory(NULL), m_key(NULL), m_search_record(NULL)
+#endif
 {
 }
 
@@ -5301,7 +5311,7 @@
 Delete_rows_log_event::
 Delete_rows_log_event(const char* buf, uint event_len,
 		      const Format_description_log_event *description_event)
-#ifdef MYSQL_CLIENT
+#if defined(MYSQL_CLIENT) || !defined(HAVE_REPLICATION)
   : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event)
 #else
   : Rows_log_event(buf, event_len, DELETE_ROWS_EVENT, description_event),
@@ -5313,7 +5323,7 @@
 }
 
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 int Delete_rows_log_event::
 do_before_row_operations(TABLE* table)
 {
@@ -5385,6 +5395,7 @@
 do_exec_row(TABLE* table, st_relay_log_info* rli)
 {
   DBUG_ENTER("Delete_rows_log_event::do_exec_row(TABLE*,...)");
+  DBUG_ASSERT(table != NULL);
  
   if (table->s->keys > 0) {
     if (int error = table->file->index_read(m_search_record, m_key, 
@@ -5442,7 +5453,7 @@
   DBUG_PRINT("return", ("error = %d", error));
   DBUG_RETURN(error);
 }
-#endif
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
 /**************************************************************************
 	Update_rows_log_event member functions
@@ -5451,16 +5462,15 @@
 /*
   Constructor used to build an event for writing to the binary log.
  */
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 Update_rows_log_event::
 Update_rows_log_event(THD* thd_arg, TABLE* tbl_arg, ulong tid, 
-		      bitvector const& cols,
-		      bool using_trans)
-: Rows_log_event(thd_arg, tbl_arg, tid, cols, using_trans),
+		      bitvector const& cols)
+: Rows_log_event(thd_arg, tbl_arg, tid, cols),
   m_memory(NULL), m_key(NULL)
 {
 }
-#endif
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 
 /*
   Constructor used by slave to read the event from the binary log.
@@ -5468,10 +5478,10 @@
 Update_rows_log_event::
 Update_rows_log_event(const char* buf, uint event_len,
 		      const Format_description_log_event *description_event)
-#ifdef MYSQL_CLIENT
+#if defined(MYSQL_CLIENT) || !defined(HAVE_REPLICATION)
   : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event)
 #else
-  : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event),
+  : Rows_log_event(buf, event_len, UPDATE_ROWS_EVENT, description_event), 
     m_memory(NULL), m_key(NULL)
 #endif
 {
@@ -5479,7 +5489,7 @@
   DBUG_VOID_RETURN;
 }
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 int Update_rows_log_event::
 do_before_row_operations(TABLE* table)
 {
@@ -5557,6 +5567,7 @@
 do_exec_row(TABLE* table, st_relay_log_info* rli)
 {
   DBUG_ENTER("Update_rows_log_event::do_exec_row(TABLE*,...)");
+  DBUG_ASSERT(table != NULL);
   if (table->s->keys > 0) {
     if (int error = table->file
 	->index_read(m_search_record, m_key, 
@@ -5614,5 +5625,5 @@
   DBUG_PRINT("return", ("error = %d", error));
   DBUG_RETURN(error);
 }
-#endif
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 

--- 1.107/sql/log_event.h	2005-03-22 08:44:12 +01:00
+++ 1.108/sql/log_event.h	2005-04-01 13:25:13 +02:00
@@ -641,6 +641,7 @@
   /* returns the human readable name of the event's type */
   const char* get_type_str();
 
+#ifndef MYSQL_CLIENT
   /*
     Predicate to check if this event should be written to the binary
     log.  This function is called by MYSQL_LOG to decide if this event
@@ -668,8 +669,10 @@
   bool can_be_cached() const {
       return do_can_be_cached();
   }
+#endif
 
 private:
+#ifndef MYSQL_CLIENT
   /*
     Primitive for testing if the event should be written to the binary log.
     The subclasses should override this if they do not want to be written to
@@ -688,6 +691,7 @@
   virtual bool do_can_be_cached() const {
     return true;
   }
+#endif
 };
 
 /*
@@ -1615,7 +1619,7 @@
   };
 
 #ifndef MYSQL_CLIENT
-  Table_map_log_event(THD *thd, TABLE* tbl, ulong tid, bool using_trans);
+  Table_map_log_event(THD *thd, TABLE* tbl, ulong tid);
 #endif
   Table_map_log_event(const char* buf, uint event_len, 
                       const Format_description_log_event *description_event);
@@ -1631,8 +1635,10 @@
   virtual bool write_data_body(IO_CACHE* file);
 
 #ifndef MYSQL_CLIENT
+#ifdef HAVE_REPLICATION
   virtual int exec_event(struct st_relay_log_info* rli);
 #endif
+#endif
 
 #ifdef MYSQL_CLIENT
   virtual void print(FILE* file, bool short_form = 0, 
@@ -1700,8 +1706,10 @@
   }
 
 #ifndef MYSQL_CLIENT
+#ifdef HAVE_REPLICATION
   virtual int exec_event(struct st_relay_log_info* rli);
 #endif
+#endif
 
 #ifdef MYSQL_CLIENT
   virtual void print(FILE* file, bool short_form = 0, 
@@ -1731,7 +1739,7 @@
   // this class, not create instances of this class.
 #ifndef MYSQL_CLIENT
   Rows_log_event(THD*, TABLE*, ulong table_id, 
-		 bitvector const& cols, bool using_trans);
+		 bitvector const& cols);
 #endif
   Rows_log_event(const char* row_data, uint event_len, 
 		 Log_event_type event_type,
@@ -1768,6 +1776,7 @@
     return opt_binlog_row_level && m_table->file->has_transactions();
   }
 
+#ifdef HAVE_REPLICATION
   /*
     Primitive to prepare for a sequence of row executions.
 
@@ -1829,6 +1838,7 @@
   */
   virtual int do_exec_row(TABLE* table, st_relay_log_info* rli) = 0;
 #endif
+#endif
 };
 
 
@@ -1849,8 +1859,7 @@
   };
 
 #ifndef MYSQL_CLIENT
-  Write_rows_log_event(THD*, TABLE*, ulong table_id, 
-		       bitvector const& cols, bool using_trans);
+  Write_rows_log_event(THD*, TABLE*, ulong table_id, bitvector const& cols);
 #endif
   Write_rows_log_event(const char* buf, uint event_len, 
                        const Format_description_log_event *description_event);
@@ -1863,7 +1872,7 @@
   void print(FILE* file, bool short_form, LAST_EVENT_INFO* last_event_info);
 #endif
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
   gptr  m_memory;
   byte* m_search_record;
 
@@ -1895,8 +1904,7 @@
   };
 
 #ifndef MYSQL_CLIENT
-  Update_rows_log_event(THD*, TABLE*, ulong table_id, 
-			bitvector const& cols, bool using_trans);
+  Update_rows_log_event(THD*, TABLE*, ulong table_id, bitvector const& cols);
 #endif
   Update_rows_log_event(const char* buf, uint event_len, 
 			const Format_description_log_event *description_event);
@@ -1905,7 +1913,7 @@
   virtual Log_event_type get_type_code() { return UPDATE_ROWS_EVENT; }
   virtual bool is_valid() const { return 1; }
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
   gptr  m_memory;
   byte* m_key;
   byte* m_search_record;
@@ -1914,7 +1922,7 @@
   virtual void        do_after_row_operations(TABLE* table);
   virtual char const* do_prepare_row(THD*, TABLE*, char const* row_start);
   virtual int         do_exec_row(TABLE* table, st_relay_log_info* rli);
-#endif
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
 };
 
 /*****************************************************************************
@@ -1945,8 +1953,7 @@
   };
 
 #ifndef MYSQL_CLIENT
-  Delete_rows_log_event(THD*, TABLE*, ulong, 
-			bitvector const& cols, bool using_trans);
+  Delete_rows_log_event(THD*, TABLE*, ulong, bitvector const& cols);
   virtual ~Delete_rows_log_event();
 #endif
   Delete_rows_log_event(const char* buf, uint event_len, 
@@ -1956,7 +1963,7 @@
   virtual Log_event_type get_type_code() { return DELETE_ROWS_EVENT; }
   virtual bool is_valid() const { return 1; }
 
-#ifndef MYSQL_CLIENT
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
   gptr  m_memory;
   byte* m_key;
   byte* m_search_record;

--- 1.172/sql/sql_class.cc	2005-03-22 08:25:36 +01:00
+++ 1.173/sql/sql_class.cc	2005-04-01 13:25:13 +02:00
@@ -1807,6 +1807,13 @@
   bzero((char*) &status_var, sizeof(status_var));
 }
 
+/*
+  Implementation of interface to write rows to the binary log through the
+  thread.  The thread is responsible for
+ */
+
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
 int THD::
 set_pending_event(Rows_log_event* ev) 
 {
@@ -1878,11 +1885,14 @@
       || pending->get_cols() != cols) 
   {
     // If not, flush the event and create a new RowsEventT.
+    
+    // If the table does not exist, write a table map for the new table
+    if (!mysql_bin_log.is_table_mapped(table))
+      write_table_map(table);
     ulong const tid = mysql_bin_log.get_table_id(table);
 
     Rows_log_event* const 
-	ev = new RowsEventT(this, table, tid, cols, 
-			    table->file->has_transactions());
+	ev = new RowsEventT(this, table, tid, cols);
     ev->server_id = server_id; // I don't like this, it's too easy to forget. 
     if (flush_and_set_pending_event(ev))
       DBUG_RETURN(NULL);
@@ -1905,7 +1915,6 @@
 template Rows_log_event* THD::
 prepare_pending<Update_rows_log_event>(TABLE*, uint32, bitvector const&, size_t);
 
-
 int THD::
 flush_and_set_pending_event(Rows_log_event* event)
 {
@@ -1931,3 +1940,226 @@
   DBUG_PRINT("exit", ("0"));
   DBUG_RETURN(0);		// All OK
 }
+
+size_t THD::
+max_row_length(TABLE *table, const byte *record) const
+{
+  DBUG_ENTER("THD::max_row_length");
+  DBUG_PRINT("enter", ("record = 0x%0x", record));
+
+  size_t length = table->s->reclength + 2 * table->s->fields;
+  uint* const beg = table->s->blob_field;
+  uint* const end = beg + table->s->blob_fields;
+
+  for (uint *ptr = beg ; ptr != end ; ++ptr)
+  {
+    Field_blob* const blob = (Field_blob*) table->field[*ptr];
+    length += blob->get_length(record + blob->offset()) + 2;
+  }
+
+  DBUG_RETURN(length);
+}
+
+size_t THD::
+pack_row(TABLE *table, 
+	 byte *row_data, 
+	 size_t max_size, 
+	 const byte *record) const
+{
+  DBUG_ENTER("THD::pack_row");
+  DBUG_PRINT("enter", ("row_data = %p, max_size = %lu, record = %p", 
+		       row_data, max_size, record));
+
+  byte *ptr = row_data;
+
+  bzero(row_data, max_size);
+
+  memcpy(row_data, record, table->s->null_bytes);
+  ptr += table->s->null_bytes;
+
+  for ( Field **field = table->field ; *field ; ++field) 
+  {
+    ptrdiff_t const offset = (*field)->offset();
+    ptr = (*field)->pack(ptr, record + offset);
+    DBUG_PRINT("info", ("Packing length %d field '%s' from %p to %p + %d", 
+			(*field)->field_length, (*field)->field_name, 
+			ptr, record, offset));
+  }
+
+  // ptrdiff_t is signed, size_t is unsigned. Check that the conversion
+  // will work correctly.
+  DBUG_ASSERT(ptr - row_data >= 0);
+  size_t const size = (size_t) (ptr - row_data);
+  DBUG_PRINT("return", ("size = %lu", size));
+  DBUG_RETURN(size);
+}
+
+int THD::
+write_row(TABLE* table, bitvector const& cols, byte const *record) 
+{ 
+  DBUG_ENTER("THD::write_row");
+  DBUG_PRINT("enter", ("cols = { size: %d, data = %p }; "
+		       "table=%p (%s), record=%p", 
+		       cols.size(), cols.data(), 
+		       table, table->s->table_name, record));
+
+  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
+    DBUG_RETURN(0);
+
+  THD* const thd = current_thd;
+
+  /* 
+     Pack records into format for transfer. We are allocating more
+     memory than needed, but that doesn't matter.
+  */
+  size_t const max_len = max_row_length(table, record);
+  byte* const row_data = my_malloc(max_len, MYF(MY_WME));
+  size_t const len     = pack_row(table, row_data, max_len, record);
+
+  Rows_log_event* const
+    ev = thd->prepare_pending<Write_rows_log_event>(table, server_id, cols, len);
+
+  if (ev == NULL)
+    DBUG_RETURN(1);
+
+  // add_row_data copies row_data to internal buffer
+  ev->add_row_data(row_data,len);
+
+  my_free(row_data, MYF(MY_WME));
+
+  DBUG_RETURN(0);
+}
+
+int THD::
+update_row(TABLE* table, bitvector const& cols,
+	   const byte *before_record, const byte *after_record)
+{ 
+  DBUG_ENTER("THD::update_row");
+  DBUG_PRINT("enter", ("cols = { size: %d, data = %p }; "
+		       "table=%p (%s); "
+		       "before=%p; after=%p", 
+		       cols.size(), cols.data(), 
+		       table, table->s->table_name, 
+		       before_record, after_record));
+
+  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
+    DBUG_RETURN(0);
+
+  THD* const thd = current_thd;
+
+  size_t const before_maxlen = max_row_length(table, before_record);
+  size_t const after_maxlen  = max_row_length(table, after_record);
+
+  byte *before_row, *after_row;
+  byte* const memory = my_multi_malloc(MYF(MY_WME),
+				       &before_row, before_maxlen,
+				       &after_row, after_maxlen,
+				       NULL);
+
+  size_t const before_size = pack_row(table, before_row, 
+				      before_maxlen, before_record);
+  size_t const after_size = pack_row(table, after_row, 
+				     after_maxlen, after_record);
+  
+  Rows_log_event* const
+    ev = thd->prepare_pending<Update_rows_log_event>(table, 
+						     server_id, cols, 
+						     before_size + after_size);
+
+  if (ev == NULL)
+    DBUG_RETURN(1);
+
+  ev->add_row_data(before_row, before_size);
+  ev->add_row_data(after_row, after_size);
+
+  // add_row_data copies row_data to internal buffer
+  my_free(memory, MYF(MY_WME));
+
+  
+  DBUG_RETURN(0);
+}
+
+int THD::
+delete_row(TABLE* table, bitvector const& cols, byte const *record)
+{ 
+  DBUG_ENTER("THD::delete_row");
+  DBUG_PRINT("enter", ("cols = { size: %d, data = %p }; "
+		       "table=%p (%s), record=%p", 
+		       cols.size(), cols.data(), 
+		       table, table->s->table_name, record));
+
+  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
+    DBUG_RETURN(0);
+
+  /* 
+     Pack records into format for transfer. We are allocating more
+     memory than needed, but that doesn't matter.
+  */
+  size_t const max_len = max_row_length(table, record);
+  byte* const row_data = my_malloc(max_len, MYF(MY_WME));
+  size_t const len     = pack_row(table, row_data, max_len, record);
+
+  Rows_log_event* const
+    ev = prepare_pending<Delete_rows_log_event>(table, server_id, cols, len);
+
+  if (ev == NULL)
+    DBUG_RETURN(1);
+
+  ev->add_row_data(row_data, len);
+
+  // add_row_data copies row_data
+  my_free(row_data, MYF(MY_WME));
+
+  DBUG_RETURN(0);
+}
+
+int THD::
+write_table_map(TABLE* table)
+{
+  DBUG_ENTER("THD::write_table_map()");
+  DBUG_PRINT("enter", ("table=%p (%s)", table, table->s->table_name));
+
+  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
+    DBUG_RETURN(0);
+
+  ulong table_id = mysql_bin_log.get_table_id(table);
+
+  Table_map_log_event 
+    the_event(this, table, table_id);
+
+  if (mysql_bin_log.write(&the_event))
+    DBUG_RETURN(1);
+  DBUG_RETURN(0);
+}
+
+int THD::
+flush_pending_event(bool stmt_end)
+{
+  DBUG_ENTER("THD::flush_pending_event(bool)");
+  if (!opt_binlog_row_level || !mysql_bin_log.is_open())
+    DBUG_RETURN(0);
+
+  /*
+    Mark the event as the last event of a statement if the stmt_end flag is
+    set. 
+  */
+  if (stmt_end) {
+    if (Rows_log_event* pending = get_pending_event()) {
+      pending->set_flags(Rows_log_event::STMT_END_F);
+
+      // !!! This is extra right now: there have to be a more advanced logic
+      // !!! to decide when the transaction actually ends. 
+      pending->set_flags(Rows_log_event::TRANS_END_F);
+
+      DBUG_ASSERT(mysql_bin_log.is_open());
+      mysql_bin_log.clear_table_mappings();
+    }
+  }
+  
+  DBUG_PRINT("flush", ("thd = 0x%0x", this));
+  int const error = flush_and_set_pending_event(0);
+  DBUG_RETURN(error);
+}
+
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+

--- 1.227/sql/sql_class.h	2005-03-22 08:25:36 +01:00
+++ 1.228/sql/sql_class.h	2005-04-01 13:25:14 +02:00
@@ -31,6 +31,7 @@
 class Format_description_log_event;
 class sp_rcontext;
 class sp_cache;
+class Rows_log_event;
 
 enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE };
 enum enum_ha_read_modes { RFIRST, RNEXT, RPREV, RLAST, RKEY, RNEXT_SAME };
@@ -239,7 +240,9 @@
   pthread_cond_t  COND_prep_xids;
   friend class Log_event;
 
-  table_mapping* m_table_map;
+  // 'Mutable' needed since this class can not be initialized in the
+  // constructor.  
+  mutable table_mapping* m_table_map;
   ulong m_next_table_id;
 
 public:
@@ -256,10 +259,18 @@
   void unlog(ulong cookie, my_xid xid);
   int recover(IO_CACHE *log, Format_description_log_event *fdle);
 
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
   // This will return a table id for the table. If the table is not known, a
   // new table id will be invented and returned. 
   ulong get_table_id(TABLE* table);
 
+  // This will check if the given table is mapped to any table id
+  int is_table_mapped(TABLE* table) const;
+
+  // This will clear all table mappings
+  void clear_table_mappings();
+
+#endif // !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
   /*
      These describe the log's format. This is used only for relay logs.
      _for_exec is used by the SQL thread, _for_queue by the I/O thread. It's
@@ -1090,6 +1101,28 @@
   /* container for handler's private per-connection data */
   void *ha_data[MAX_HA];
 
+#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
+
+  /*
+    Public interface to write rows to the binlog
+  */
+  int write_row(TABLE* table, bitvector const& cols, const byte *buf);
+  int delete_row(TABLE* table, bitvector const& cols, const byte *buf);
+  int update_row(TABLE* table, bitvector const& cols, 
+		 const byte *old_data, const byte *new_data);
+
+  void set_server_id(uint32 sid) { server_id = sid; }
+
+  int transaction_begin() { 
+      return 0;
+  }
+
+  int transaction_end() { 
+    
+    return flush_pending_event(true); 
+  }
+
+private:
   /*
     Member functions to handle pending event for row-level logging.
   */
@@ -1102,6 +1135,16 @@
       prepare_pending(TABLE* table, uint32 server_id, 
 		      bitvector const& cols, size_t needed);
 
+  size_t max_row_length(TABLE* table, const byte *data) const;
+  size_t pack_row(TABLE* table, byte *row_data, 
+		  size_t max_len, const byte *data) const;
+
+  int flush_pending_event(bool stmt_end);
+  int write_table_map(TABLE*);
+
+#endif
+
+public:
   struct st_transactions {
     SAVEPOINT *savepoints;
     THD_TRANS all;			// Trans since BEGIN WORK

--- 1.302/sql/sql_select.cc	2005-03-22 08:44:12 +01:00
+++ 1.303/sql/sql_select.cc	2005-04-01 13:25:14 +02:00
@@ -10145,7 +10145,7 @@
                        join->sum_funcs_end[send_group_parts]);
 	if (join->having && join->having->val_int() == 0)
           error= -1;
-        else if ((error=table->file->write_row(table->record[0])))
+        else if ((error=table->file->ha_write_row(table->record[0])))
 	{
 	  if (create_myisam_from_heap(join->thd, table,
 				      &join->tmp_table_param,
@@ -12796,7 +12796,7 @@
           item->save_in_result_field(1);
       }
       copy_sum_funcs(sum_funcs_end[i+1], sum_funcs_end[i]);
-      if ((error= table->file->write_row(table->record[0])))
+      if ((error= table->file->ha_write_row(table->record[0])))
       {
 	if (create_myisam_from_heap(thd, table, &tmp_table_param,
 				      error, 0))

--- 1.2/mysql-test/r/rpl_row_basic.result	2005-03-21 19:24:55 +01:00
+++ 1.3/mysql-test/r/rpl_row_basic.result	2005-04-01 13:25:12 +02:00
@@ -7,20 +7,20 @@
 CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1));
 SHOW BINLOG EVENTS;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
-<binlog>	<pos>	Format_desc	<server_id>	<end_log_pos>	Server ver: 5.1.0-alpha-debug-log, Binlog ver: 4
-<binlog>	<pos>	Query	<server_id>	<end_log_pos>	use `test`; CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1))
+<binlog>	<pos>	Format_desc	1	<end_log_pos>	Server ver: 5.1.0-alpha-debug-log, Binlog ver: 4
+<binlog>	<pos>	Query	1	<end_log_pos>	use `test`; CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1))
 SELECT * FROM t1;
 C1	C2
 SELECT * FROM t1;
 C1	C2
 INSERT INTO t1 VALUES ('A','B'), ('X','Y'), ('X','X');
 INSERT INTO t1 VALUES ('A','C'), ('X', 'Z'), ('A', 'A');
-SHOW BINLOG EVENTS FROM 216;
+SHOW BINLOG EVENTS FROM 218;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
-<binlog>	<pos>	Table_map	<server_id>	<end_log_pos>	
-<binlog>	<pos>	Write_rows	<server_id>	<end_log_pos>	
-<binlog>	<pos>	Table_map	<server_id>	<end_log_pos>	
-<binlog>	<pos>	Write_rows	<server_id>	<end_log_pos>	
+<binlog>	<pos>	Table_map	1	<end_log_pos>	
+<binlog>	<pos>	Write_rows	1	<end_log_pos>	
+<binlog>	<pos>	Table_map	1	<end_log_pos>	
+<binlog>	<pos>	Write_rows	1	<end_log_pos>	
 SELECT * FROM t1;
 C1	C2
 A	B
@@ -38,10 +38,10 @@
 X	Z
 A	A
 DELETE FROM t1 WHERE C1 = C2;
-SHOW BINLOG EVENTS FROM 372;
+SHOW BINLOG EVENTS FROM 374;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
-<binlog>	<pos>	Table_map	<server_id>	<end_log_pos>	
-<binlog>	<pos>	Delete_rows	<server_id>	<end_log_pos>	
+<binlog>	<pos>	Table_map	1	<end_log_pos>	
+<binlog>	<pos>	Delete_rows	1	<end_log_pos>	
 SELECT * FROM t1;
 C1	C2
 A	B
@@ -55,10 +55,10 @@
 A	C
 X	Z
 UPDATE t1 SET C2 = 'I' WHERE C1 = 'A' AND C2 = 'C';
-SHOW BINLOG EVENTS FROM 445;
+SHOW BINLOG EVENTS FROM 447;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
-<binlog>	<pos>	Table_map	<server_id>	<end_log_pos>	
-<binlog>	<pos>	Update_rows	<server_id>	<end_log_pos>	
+<binlog>	<pos>	Table_map	1	<end_log_pos>	
+<binlog>	<pos>	Update_rows	1	<end_log_pos>	
 SELECT * FROM t1;
 C1	C2
 A	B

--- 1.1/mysql-test/r/rpl_row_trans.result	2005-03-10 20:10:08 +01:00
+++ 1.2/mysql-test/r/rpl_row_trans.result	2005-04-01 13:25:12 +02:00
@@ -44,8 +44,19 @@
 X	X
 SELECT * FROM t5;
 a	b
-SHOW BINLOG EVENTS FROM 1724;
+SHOW BINLOG EVENTS;
 Log_name	Pos	Event_type	Server_id	End_log_pos	Info
+<binlog>	<pos>	Format_desc	<server_id>	<end_pos>	Server ver: 5.1.0-alpha-debug-log, Binlog ver: 4
+<binlog>	<pos>	Query	<server_id>	<end_pos>	use `test`; CREATE TABLE t4 (a CHAR(1), b CHAR(1)) engine=myisam
+<binlog>	<pos>	Query	<server_id>	<end_pos>	use `test`; CREATE TABLE t5 (a CHAR(1), b CHAR(1)) engine=innodb
+<binlog>	<pos>	Table_map	<server_id>	<end_pos>	
+<binlog>	<pos>	Write_rows	<server_id>	<end_pos>	
+<binlog>	<pos>	Table_map	<server_id>	<end_pos>	
+<binlog>	<pos>	Write_rows	<server_id>	<end_pos>	
+<binlog>	<pos>	Table_map	<server_id>	<end_pos>	
+<binlog>	<pos>	Write_rows	<server_id>	<end_pos>	
+<binlog>	<pos>	Table_map	<server_id>	<end_pos>	
+<binlog>	<pos>	Write_rows	<server_id>	<end_pos>	
 SELECT * FROM t4;
 a	b
 A	B

--- 1.2/mysql-test/t/rpl_row_basic.test	2005-03-21 19:24:55 +01:00
+++ 1.3/mysql-test/t/rpl_row_basic.test	2005-04-01 13:25:12 +02:00
@@ -5,7 +5,7 @@
 #
 
 CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1));
---replace_column 1 <binlog> 2 <pos> 4 <server_id> 5 <end_log_pos>
+--replace_column 1 <binlog> 2 <pos> 5 <end_log_pos>
 SHOW BINLOG EVENTS;
 SELECT * FROM t1;
 sync_slave_with_master;
@@ -15,19 +15,19 @@
 connection master;
 INSERT INTO t1 VALUES ('A','B'), ('X','Y'), ('X','X');
 INSERT INTO t1 VALUES ('A','C'), ('X', 'Z'), ('A', 'A');
---replace_column 1 <binlog> 2 <pos> 4 <server_id> 5 <end_log_pos>
-SHOW BINLOG EVENTS FROM 216;
+--replace_column 1 <binlog> 2 <pos> 5 <end_log_pos>
+SHOW BINLOG EVENTS FROM 218;
 SELECT * FROM t1;
 sync_slave_with_master;
 SELECT * FROM t1;
-
+ 
 # Testing delete
 # Observe that are several rows having the value for the index but only one
 # should be deleted.
 connection master;
 DELETE FROM t1 WHERE C1 = C2;
---replace_column 1 <binlog> 2 <pos> 4 <server_id> 5 <end_log_pos>
-SHOW BINLOG EVENTS FROM 372;
+--replace_column 1 <binlog> 2 <pos> 5 <end_log_pos>
+SHOW BINLOG EVENTS FROM 374;
 SELECT * FROM t1;
 sync_slave_with_master;
 SELECT * FROM t1;
@@ -39,8 +39,8 @@
 #
 connection master;
 UPDATE t1 SET C2 = 'I' WHERE C1 = 'A' AND C2 = 'C';
---replace_column 1 <binlog> 2 <pos> 4 <server_id> 5 <end_log_pos>
-SHOW BINLOG EVENTS FROM 445;
+--replace_column 1 <binlog> 2 <pos> 5 <end_log_pos>
+SHOW BINLOG EVENTS FROM 447;
 SELECT * FROM t1;
 sync_slave_with_master;
 SELECT * FROM t1;

--- 1.5/sql/rpl_injector.cc	2005-03-21 19:24:56 +01:00
+++ 1.6/sql/rpl_injector.cc	2005-04-01 13:25:13 +02:00
@@ -1,7 +1,5 @@
 // -*- Mode: C++; c-indentation-style: gnu -*-
 
-// This include cannot be here since there is no include guard on
-// 'mysql_priv.h'.
 #include "mysql_priv.h" 
 #include "rpl_injector.h"
 
@@ -27,20 +25,35 @@
 transaction(MYSQL_LOG* log, THD* thd)
   : m_thd(thd)
 {
-  // Default initialization of m_start_pos. We need to fill it in using the
-  // code below.
+  // Default initialization of m_start_pos (which initializes it to garbage).
+  // We need to fill it in using the code below.
 
   LOG_INFO log_info;
   log->get_current_log(&log_info);
-  // !!!
+  // !!! binlog_pos does not follow RAII !!!
   m_start_pos.m_file_name = my_strdup(log_info.log_file_name, MYF(0));
   m_start_pos.m_file_pos = log_info.pos;
 }
 
+injector::transaction::
+~transaction()
+{
+  // Needed since my_free expects a 'char*' (instead of 'void*').
+  char* const the_memory = const_cast<char*>(m_start_pos.m_file_name);
+
+  // We set the first character to null just to give all the copies of the
+  // start position a (minimal) chance of seening that the memory is lost.
+  // All assuming the my_free does not step over the memory, of course.
+  *the_memory = '\0';
+
+  my_free(the_memory, MYF(0));
+}
+
 int injector::transaction::
 commit()
 {
    DBUG_ENTER("injector::transaction::commit()");
+   ha_commit(m_thd);
    DBUG_RETURN(0);
 }
 
@@ -51,8 +64,8 @@
 {
    DBUG_ENTER("injector::transaction::write_row(...)");
    TABLE* table = get_table(m_thd, tbl);
-   table->file->row_writer()->set_server_id(sid);
-   table->file->row_writer()->write_row(cols, record);
+   m_thd->set_server_id(sid);
+   m_thd->write_row(table, cols, record);
    DBUG_RETURN(0);
 }
 
@@ -63,8 +76,8 @@
 {
    DBUG_ENTER("injector::transaction::delete_row(...)");
    TABLE* table = get_table(m_thd, tbl);
-   table->file->row_writer()->set_server_id(sid);
-   table->file->row_writer()->delete_row(cols, record);
+   m_thd->set_server_id(sid);
+   m_thd->delete_row(table, cols, record);
    DBUG_RETURN(0);
 }
 
@@ -75,8 +88,8 @@
 {
    DBUG_ENTER("injector::transaction::update_row(...)");
    TABLE* table = get_table(m_thd, tbl);
-   table->file->row_writer()->set_server_id(sid);
-   table->file->row_writer()->update_row(cols, before, after);
+   m_thd->set_server_id(sid);
+   m_thd->update_row(table, cols, before, after);
    DBUG_RETURN(0);
 }
 
@@ -92,11 +105,10 @@
 // injector - member definitions
 ////////////////////////////////////////////////////////////////
 
-// This constructor is inline since it's called below
+// This constructor is called below
 inline injector::
 injector()
 {
-    
 }
 
 injector* injector::
@@ -112,7 +124,8 @@
 new_trans(THD* thd)
 {
    DBUG_ENTER("injector::new_trans()");
-   
+   // Currently, there is no alternative to using 'mysql_bin_log' since that
+   // is hardcoded into the way the handler is using the binary log.
    DBUG_RETURN(transaction(&mysql_bin_log, thd));
 }
 
--- New file ---
+++ sql/rpl_tblmap.cc	05/03/21 20:11:18

#include "mysql_priv.h"
#include "rpl_tblmap.h"
#include "my_pthread.h"

table_mapping::
table_mapping()
  : m_array(0), m_count(0), m_reserve(16)
{
  pthread_mutex_init(&m_mutex, NULL);

  sentry sentry(&m_mutex);

  m_array = reinterpret_cast<entry*>(my_malloc(m_reserve * sizeof(*m_array), MYF(MY_WME)));
  memset(m_array, m_reserve * sizeof(*m_array), 0);
}

table_mapping::
~table_mapping()
{
  {
    sentry sentry(&m_mutex);
    my_free(reinterpret_cast<char*>(m_array), MYF(MY_WME));
    m_array = NULL;
  }
  pthread_mutex_destroy(&m_mutex);
}



ulong table_mapping::
find_pos(ulong table_id) const
{
  // !!! This is a linear search, it will be infeasible to use for larger 
  // !!! number of tables in the air at the same time. Switch to a binary
  // !!! search later.
  for (ulong i = 0 ; i < m_count ; ++i) {
    if (m_array[i].table_id == table_id)
      return i;
  }
  return m_count;
}

ulong table_mapping::
find_pos(st_table* table) const
{
  // !!! This is a linear search, it will be infeasible to use for larger 
  // !!! number of tables in the air at the same time. Switch to a binary
  // !!! search later.
  for (ulong i = 0 ; i < m_count ; ++i) {
    if (m_array[i].table == table)
      return i;
  }
  return m_count;
}

st_table* table_mapping::
get_table(ulong table_id)
{
  sentry sentry(&m_mutex);

  DBUG_ENTER("table_mapping::get_table(ulong)");
  DBUG_PRINT("enter", ("table_id=%d", table_id));
  pos_type const pos = find_pos(table_id);
  if (pos < m_count) {
    DBUG_PRINT("info", ("tid %d -> table %p (%s)", 
			table_id, m_array[pos].table,
			m_array[pos].table->s->table_name));
    DBUG_RETURN(m_array[pos].table);
  }

  DBUG_PRINT("info", ("tid %d is not mapped!", table_id));
  DBUG_RETURN(NULL);
}


ulong table_mapping::
get_table_id(st_table* table)
{
  sentry sentry(&m_mutex);

  DBUG_ENTER("table_mapping::get_table_id(st_table*)");
  DBUG_PRINT("enter", ("table=%p (%s)", 
		       table, table ? table->s->table_name : "<>"));
  pos_type const pos = find_pos(table);
  if (pos < m_count) {
    DBUG_PRINT("info", ("table %p (%s) -> table id %d", 
			m_array[pos].table,
			m_array[pos].table->s->table_name,
			m_array[pos].table_id));
    DBUG_RETURN(m_array[pos].table_id);
  }
  DBUG_PRINT("info", ("table %p (%s) is not mapped!", 
		      table, table->s->table_name));
  DBUG_RETURN(NO_TABLE);
}

int table_mapping::
set_table(ulong table_id, TABLE* table)
{
  sentry sentry(&m_mutex);

  DBUG_ENTER("table_mapping::set_table(ulong,TABLE*)");
  DBUG_PRINT("enter", ("table_id=%d, table=%p (%s)", 
		       table_id, 
		       table, table ? table->s->table_name : "<>"));
  pos_type const pos = find_pos(table_id);

  // See if we need to allocate a larger array
  if (pos == m_count && m_reserve == m_count) {
    if (m_reserve > ULONG_MAX/2)
      DBUG_RETURN(1);			// Table upper limit exceeded

    int const reserve = 2*m_reserve;
    entry* const 
      array = (entry*) my_realloc((char*) m_array, 
				  reserve*sizeof(*m_array), 
				  MYF(MY_WME));
    if (array == NULL) {
      DBUG_RETURN(1);			// Memory allocation failed
    }
    m_reserve = reserve;
    m_array = array;
  }

  m_array[pos].table_id = table_id;
  m_array[pos].table = table;
  ++m_count;
  DBUG_PRINT("info", ("tid %d -> table %p (%s)", 
		      table_id, m_array[pos].table,
		      m_array[pos].table->s->table_name));
  DBUG_RETURN(0);		// All OK
}

int table_mapping::
remove_table(ulong table_id)
{
  sentry sentry(&m_mutex);

  ulong pos = find_pos(table_id);
  if (pos < m_count) {
    while (++pos < m_count)
      m_array[pos-1] = m_array[pos];
    --m_count;
    return 0;			// All OK
  }
  return 1;			// No table to remove
}

void table_mapping::
clear_tables()
{
  DBUG_ENTER("table_mapping::clear_tables()");
  m_count = 0;
  DBUG_VOID_RETURN;
}


--- New file ---
+++ sql/rpl_tblmap.h	05/03/21 20:11:16

#ifndef TABLE_MAPPING_H
#define TABLE_MAPPING_H

#include "mysql.h"
#include "my_pthread.h"

// Forward declarations
struct st_table;

/*
  CLASS table_mapping

  RESPONSIBILITIES
    The table mapping is used to map table pointer to table id's and vice
    versa.

  COLLABORATION
    MYSQL_LOG    For mapping tables to table id:s before sending events.
    RELAY_LOG    For mapping table id:s to tables when receiving events.
 */
class table_mapping {
public:
  typedef ulong pos_type;

  enum {
    NO_TABLE = ULONG_MAX
  };

  table_mapping();
  ~table_mapping();

  st_table* get_table(ulong table_id);

  /*
    Return table id for 'table', or NO_TABLE if there is no id mapped for
    table.
  */
  ulong     get_table_id(st_table* table);

  int       set_table(ulong table_id, st_table* table);
  int       remove_table(ulong table_id);
  void      clear_tables();
  ulong     count() const { return m_count; }

private:
  pos_type  find_pos(ulong table_id) const;
  pos_type  find_pos(st_table* table) const;

  struct entry { ulong table_id; st_table* table; } *m_array;
  ulong m_count, m_reserve;

  class sentry {
  public:
    sentry(pthread_mutex_t *mutex) 
      : m_mptr(mutex) 
    { 
      pthread_mutex_lock(m_mptr); 
    }

    ~sentry() { pthread_mutex_unlock(m_mptr); }
  private:
    pthread_mutex_t* m_mptr;
  };

  pthread_mutex_t m_mutex;
};

#endif

Thread
bk commit into 5.1 tree (mats:1.1799)Mats Kindahl1 Apr