MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:antony Date:June 4 2007 9:49am
Subject:bk commit into 5.1 tree (acurtis:1.2520) BUG#25513
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of antony. When antony does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2007-06-04 02:49:38-07:00, acurtis@stripped +7 -0
  Bug#25513
    "Federated transaction failure"
  
  Experimental and fully functional patch for review/comments.
  
  What this patch provides:
    * Uses only 1 federated connection per server per concurrent transaction.
      As a result, will require significantly fewer network connections.
      Network connections are bound to a transaction until commit/rollback
    * Implements much better/more complete transactions support.
    * Implements primitive per-server connecton pool.
      connection reuse is strictly LIFO so old idle connections are permitted
      to time out and reconnect later when required.
    * Supports single-statement rollback.
    * Support for savepoints.
  
  Notes:
  1) transaction support still requires that the remote server's storage engines have
  a working transaction implementation complete with savepoint support.
  2) XA 2-phase commit is not yet implemented but will be trivial to add.
  3) Trivial to extend in order to support other transports than mysql client protocol.
  4) bulk-inserts can be implemented later as a performance improvement.
  5) no limits on connection pool size - a tunable server variable may be added.
  6) at shutdown, not all memory is freed. This is trivial to resolve.
  7) Some indents have tab characters.
  
  Things to address later:
  1) We need to use the connection pool when CREATE TABLE is performed for instances
  where the new federated table is using a server for which a connection already exists.
  2) We need to resolve situation where CREATE TABLE requires a new network connection...
  Do we stall the entire server when this occurs, pick a small timeout or change LOCK_open
  3) Old federated code builds SQL inconsistantly - inconsistant quoting of identifiers 
  may open possibilities for SQL injection attack. This should be investigated.

  mysql-test/r/federated_transactions.result@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +92 -0
    test for bug25513 and new features

  mysql-test/t/federated_transactions.test@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +89 -0
    test for bug25513 and new features

  storage/federated/Makefile.am@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +3 -2
    new files added to project:
      federated_io.cc
      federated_txn.cc

  storage/federated/federated_io.cc@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +374 -0
    New BitKeeper file ``storage/federated/federated_io.cc''

  storage/federated/federated_io.cc@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +0 -0

  storage/federated/federated_txn.cc@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +412 -0
    New BitKeeper file ``storage/federated/federated_txn.cc''

  storage/federated/federated_txn.cc@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +0 -0

  storage/federated/ha_federated.cc@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +464 -207
    fix for bug#25513
    properly support transactions
    abstract out network i/o

  storage/federated/ha_federated.h@stripped, 2007-06-04 02:49:33-07:00, acurtis@stripped +136 -12
    fix for bug25513
    new data structures and classes

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	acurtis
# Host:	ltamd64.xiphis.org
# Root:	/home/antony/work2/p2-bug25513.4
--- New file ---
+++ storage/federated/federated_io.cc	07/06/04 02:49:33
/* Copyright (C) 2007 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#define MYSQL_SERVER 1
#include "mysql_priv.h"
#include <mysql/plugin.h>

#include "ha_federated.h"

#include "m_string.h"

#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation                          // gcc: Class implementation
#endif


#define SAVEPOINT_REALIZED	1
#define SAVEPOINT_RESTRICT	2
#define SAVEPOINT_EMITTED	4


typedef struct federated_savepoint
{
  ulong level;
  uint  flags;
} SAVEPT;


federated_io::federated_io(FEDERATED_SERVER *aserver)
  : server(aserver),  idle_next(0), txn_next(0),
    active(FALSE), readonly(TRUE), busy(FALSE),
	requested_autocommit(TRUE), actual_autocommit(TRUE)
{
  DBUG_ENTER("federated_io::federated_io");
  DBUG_ASSERT(server);

  safe_mutex_assert_owner(&server->mutex);
  server->io_count++;

  bzero(&mysql, sizeof(MYSQL));
  bzero(&savepoints, sizeof(DYNAMIC_ARRAY));

  my_init_dynamic_array(&savepoints, sizeof(SAVEPT), 16, 16);  
  
  DBUG_VOID_RETURN;
}


federated_io::~federated_io()
{
  DBUG_ENTER("federated_io::~federated_io");

  mysql_close(&mysql);
  delete_dynamic(&savepoints);
  server->io_count--;

  DBUG_VOID_RETURN;
}


int federated_io::commit()
{
  int error= 0;
  DBUG_ENTER("federated_io::commit");
  
  if (!actual_autocommit && (error= actual_query("COMMIT", 6)))
    rollback();
  
  reset_dynamic(&savepoints);
  active= FALSE;
  
  requested_autocommit= TRUE;
  mysql.reconnect= 1;
  
  DBUG_RETURN(error);
}

int federated_io::rollback()
{
  int error= 0;
  DBUG_ENTER("federated_io::rollback");
  
  if (!actual_autocommit)
    error= actual_query("ROLLBACK", 8);
  else
    error= ER_WARNING_NOT_COMPLETE_ROLLBACK;

  reset_dynamic(&savepoints);
  active= FALSE;
  
  requested_autocommit= TRUE;
  mysql.reconnect= 1;
  
  DBUG_RETURN(error);
}


ulong federated_io::last_savepoint() const
{
  SAVEPT *savept= NULL;
  DBUG_ENTER("federated_io::last_savepoint");

  if (savepoints.elements)
    savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);

  DBUG_RETURN(savept ? savept->level : 0);
}


ulong federated_io::actual_savepoint() const
{
  SAVEPT *savept= NULL;
  uint index= savepoints.elements;
  DBUG_ENTER("federated_io::last_savepoint");

  while (index)
  {
    savept= dynamic_element(&savepoints, --index, SAVEPT *);
    if (savept->flags & SAVEPOINT_REALIZED)
	  break;
	savept= NULL;
  }

  DBUG_RETURN(savept ? savept->level : 0);
}


int federated_io::savepoint_set(ulong sp)
{
  int error;
  SAVEPT savept;
  DBUG_ENTER("federated_io::savepoint_set");
  DBUG_PRINT("info",("savepoint=%lu", sp));
  DBUG_ASSERT(sp > last_savepoint());

  savept.level= sp;
  savept.flags= 0;

  if ((error= insert_dynamic(&savepoints, (uchar*) &savept) ? -1 : 0))
    goto err;

  active= TRUE;
  mysql.reconnect= 0;
  requested_autocommit= FALSE;

err:
  DBUG_RETURN(error);
}


ulong federated_io::savepoint_release(ulong sp)
{
  SAVEPT *savept, *last= NULL;
  DBUG_ENTER("federated_io::savepoint_release");
  DBUG_PRINT("info",("savepoint=%lu", sp));
  
  while (savepoints.elements)
  {
    savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
    if (savept->level < sp)
      break;
	if ((savept->flags & (SAVEPOINT_REALIZED | 
	                      SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED)
	  last= savept;
    savepoints.elements--;
  }

  if (last)
  {
    char buffer[STRING_BUFFER_USUAL_SIZE];
	int length= my_snprintf(buffer, sizeof(buffer),
							"RELEASE SAVEPOINT save%lu", last->level);
    actual_query(buffer, length);
  }

  DBUG_RETURN(last_savepoint()); 
}


ulong federated_io::savepoint_rollback(ulong sp)
{
  SAVEPT *savept;
  uint index;
  DBUG_ENTER("federated_io::savepoint_release");
  DBUG_PRINT("info",("savepoint=%lu", sp));
  
  while (savepoints.elements)
  {
    savept= dynamic_element(&savepoints, savepoints.elements - 1, SAVEPT *);
	if (savept->level <= sp)
	  break;
    savepoints.elements--;
  }

  for (index= savepoints.elements, savept= NULL; index;)
  {
    savept= dynamic_element(&savepoints, --index, SAVEPT *);
    if (savept->flags & SAVEPOINT_REALIZED)
	  break;
	savept= NULL;
  }
  
  if (savept && !(savept->flags & SAVEPOINT_RESTRICT))
  {
    char buffer[STRING_BUFFER_USUAL_SIZE];
	int length= my_snprintf(buffer, sizeof(buffer),
							"ROLLBACK TO SAVEPOINT save%lu", savept->level);
    actual_query(buffer, length);
  }

  DBUG_RETURN(last_savepoint());
}


void federated_io::savepoint_restrict(ulong sp)
{
  SAVEPT *savept;
  uint index= savepoints.elements;
  DBUG_ENTER("federated_io::savepoint_restrict");
  
  while (index)
  {
    savept= dynamic_element(&savepoints, --index, SAVEPT *);
	if (savept->level > sp)
	  continue;
	if (savept->level < sp)
	  break;
	savept->flags|= SAVEPOINT_RESTRICT;
	break;
  }
  
  DBUG_VOID_RETURN;
}


int federated_io::simple_query(const char *fmt, ...)
{
  char buffer[STRING_BUFFER_USUAL_SIZE];
  int length, error;
  va_list arg;
  DBUG_ENTER("federated_io::simple_query");

  va_start(arg, fmt);  
  length= my_vsnprintf(buffer, sizeof(buffer), fmt, arg);
  va_end(arg);
  
  error= query(buffer, length);
  
  DBUG_RETURN(error);
}


bool federated_io::test_all_restrict() const
{
  bool result= FALSE;
  SAVEPT *savept;
  uint index= savepoints.elements;
  DBUG_ENTER("federated_io::test_all_restrict");
  
  while (index)
  {
    savept= dynamic_element(&savepoints, --index, SAVEPT *);
	if ((savept->flags & (SAVEPOINT_REALIZED | 
	                      SAVEPOINT_RESTRICT)) == SAVEPOINT_REALIZED ||
		(savept->flags & SAVEPOINT_EMITTED))
      DBUG_RETURN(FALSE);
    if (savept->flags & SAVEPOINT_RESTRICT)
	  result= TRUE;
  }
  
  DBUG_RETURN(result); 
}


int federated_io::query(const char *buffer, uint length)
{
  int error;
  bool wants_autocommit= requested_autocommit | readonly;
  DBUG_ENTER("federated_io::query");

  if (!wants_autocommit && test_all_restrict())
    wants_autocommit= TRUE;

  if (wants_autocommit != actual_autocommit)
  {
    if ((error= actual_query(wants_autocommit ? "SET AUTOCOMMIT=1"
	                                          : "SET AUTOCOMMIT=0", 16)))
	  DBUG_RETURN(error);												  
    mysql.reconnect= wants_autocommit ? 1 : 0;
    actual_autocommit= wants_autocommit;
  }
  
  if (!actual_autocommit && last_savepoint() != actual_savepoint())
  {
    SAVEPT *savept= dynamic_element(&savepoints, savepoints.elements - 1, 
	                            	SAVEPT *);
    if (!(savept->flags & SAVEPOINT_RESTRICT))
	{
      char buf[STRING_BUFFER_USUAL_SIZE];
	  int len= my_snprintf(buf, sizeof(buf),
				          "SAVEPOINT save%lu", savept->level);
      if ((error= actual_query(buf, len)))
		DBUG_RETURN(error);												  
	  active= TRUE;
	  savept->flags|= SAVEPOINT_EMITTED;
    }
    savept->flags|= SAVEPOINT_REALIZED;
  }

  if (!(error= actual_query(buffer, length)))
    active|= !actual_autocommit;

  DBUG_RETURN(error);
}


int federated_io::actual_query(const char *buffer, uint length)
{
  int error;
  DBUG_ENTER("federated_io::actual_query");

  if (!mysql.master)
  {
    if (!(mysql_init(&mysql)))
	  DBUG_RETURN(-1);
	
	/*
      BUG# 17044 Federated Storage Engine is not UTF8 clean
      Add set names to whatever charset the table is at open
      of table
	*/
	/* this sets the csname like 'set names utf8' */
	mysql_options(&mysql, MYSQL_SET_CHARSET_NAME, server->csname);

    if (!mysql_real_connect(&mysql,
                            server->hostname,
                            server->username,
                            server->password,
                            server->database,
                            server->port,
                            server->socket, 0))
      DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
    mysql.reconnect= 1;
  }

  error= mysql_real_query(&mysql, buffer, length);
  
  DBUG_RETURN(error);
}


MYSQL_RES *federated_io::store_result()
{
  MYSQL_RES *result;
  DBUG_ENTER("federated_io::store_result");
  
  result= mysql_store_result(&mysql);
  
  DBUG_RETURN(result);
}


--- New file ---
+++ storage/federated/federated_txn.cc	07/06/04 02:49:33
/* Copyright (C) 2003 MySQL AB

  This program is free software; you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation; version 2 of the License.

  This program is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with this program; if not, write to the Free Software
  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


#define MYSQL_SERVER 1
#include "mysql_priv.h"
#include <mysql/plugin.h>

#include "ha_federated.h"

#include "m_string.h"

#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation                          // gcc: Class implementation
#endif


federated_txn::federated_txn()
  : txn_list(0), savepoint_next(0), 
    savepoint_level(0), savepoint_stmt(0)
{
  DBUG_ENTER("federated_txn::federated_txn");
  DBUG_VOID_RETURN;
}

federated_txn::~federated_txn()
{
  DBUG_ENTER("federated_txn::~federated_txn");
  DBUG_ASSERT(!txn_list);
  DBUG_VOID_RETURN;
}


void federated_txn::close(FEDERATED_SERVER *server)
{
  uint count= 0;
  federated_io *io, **iop;
  DBUG_ENTER("federated_txn::close");
  
  DBUG_ASSERT(!server->use_count);
  DBUG_PRINT("info",("use count: %u  connections: %u", 
                     server->use_count, server->io_count));

  for (iop= &txn_list; (io= *iop);)
  {
    if (io->server != server)
	  iop= &io->txn_next;
	else
	{
	  *iop= io->txn_next;
	  io->txn_next= NULL;
	  io->busy= FALSE;
	  
	  io->idle_next= server->idle_list;
	  server->idle_list= io;
	}
  }

  while ((io= server->idle_list))
  {
    server->idle_list= io->idle_next;
	delete io;
	count++;
  }
  
  DBUG_PRINT("info",("closed %u connections", count));
  DBUG_VOID_RETURN;
}


int federated_txn::acquire(FEDERATED_SHARE *share, bool readonly,
						   federated_io **ioptr)
{
  federated_io *io;
  FEDERATED_SERVER *server= share->s;
  DBUG_ENTER("federated_txn::acquire");
  DBUG_ASSERT(ioptr && server);

  if (!(io= *ioptr))
  {
    /* check to see if we have an available IO connection */
	for (io= txn_list; io; io= io->txn_next)
	  if (io->server == server)
	    break;

    if (!io)
    {
	  /* check to see if there are any unowned IO connections */
      pthread_mutex_lock(&server->mutex);
	  if ((io= server->idle_list))
	  {
	    server->idle_list= io->idle_next;
		io->idle_next= NULL;
	  }
	  else
	    io= new (&server->mem_root) federated_io(server);

      io->txn_next= txn_list;
	  txn_list= io;

      pthread_mutex_unlock(&server->mutex);
    }

	if (io->busy)
	  *io->owner_ptr= NULL;
	  
    io->busy= TRUE;
	io->owner_ptr= ioptr;
  }
  
  DBUG_ASSERT(io->busy && io->server == server);
  
  io->readonly&= readonly;

  DBUG_RETURN((*ioptr= io) ? 0 : -1);
}


void federated_txn::release(federated_io **ioptr)
{
  federated_io *io;
  DBUG_ENTER("federated_txn::release");
  DBUG_ASSERT(ioptr);

  if ((io= *ioptr))
  {
	/* mark as available for reuse in this transaction */
    io->busy= FALSE;
    *ioptr= NULL;
	
	DBUG_PRINT("info", ("active=%d autocommit=%d", 
	                    io->active, io->actual_autocommit));

	if (io->actual_autocommit)
	  io->active= FALSE;
  }

  release_scan();

  DBUG_VOID_RETURN;
}


void federated_txn::release_scan()
{
  uint count= 0, returned= 0;
  federated_io *io, **pio;
  DBUG_ENTER("federated_txn::release_scan");

  /* return any inactive and idle connections to the server */	
  for (pio= &txn_list; (io= *pio); count++)
  {
	if (io->active || io->busy)
	  pio= &io->txn_next;
	else
	{
      FEDERATED_SERVER *server= io->server;

	  /* unlink from list of connections bound to the transaction */
	  *pio= io->txn_next; 
	  io->txn_next= NULL;

	  /* reset some values */
	  io->readonly= TRUE;
	  io->requested_autocommit= TRUE;

      pthread_mutex_lock(&server->mutex);
	  io->idle_next= server->idle_list;
	  server->idle_list= io;
      pthread_mutex_unlock(&server->mutex);
	  returned++;
	}
  }
  DBUG_PRINT("info",("returned %u of %u connections(s)", returned, count));

  DBUG_VOID_RETURN;
}


bool federated_txn::txn_begin()
{
  ulong level= 0;
  DBUG_ENTER("federated_txn::txn_begin");

  if (savepoint_next == 0)
  {
    savepoint_next++;
    savepoint_level= savepoint_stmt= 0;
	sp_acquire(&level);
  }

  DBUG_RETURN(level == 1);
}


int federated_txn::txn_commit()
{
  int error= 0;
  federated_io *io;
  DBUG_ENTER("federated_txn::txn_commit");

  if (savepoint_next)
  {
	DBUG_ASSERT(savepoint_stmt != 1);

	for (io= txn_list; io; io= io->txn_next)
	{
      int rc= 0;

      if (io->active)
    	rc= io->commit();
      else
    	io->rollback();

      if (io->active && rc)
    	error= -1;

      reset_dynamic(&io->savepoints);
      io->active= FALSE;
	}

	release_scan();

	savepoint_next= savepoint_stmt= savepoint_level= 0;
  }
    
  DBUG_RETURN(error);
}


int federated_txn::txn_rollback()
{
  int error= 0;
  federated_io *io;
  DBUG_ENTER("federated_txn::txn_commit");

  if (savepoint_next)
  {
	DBUG_ASSERT(savepoint_stmt != 1);

	for (io= txn_list; io; io= io->txn_next)
	{
      int rc= io->rollback();

      if (io->active && rc)
    	error= -1;

      reset_dynamic(&io->savepoints);
      io->active= FALSE;
	}

	release_scan();

	savepoint_next= savepoint_stmt= savepoint_level= 0;
  }
    
  DBUG_RETURN(error);
}


bool federated_txn::sp_acquire(ulong *sp)
{
  bool rc= FALSE;
  federated_io *io;
  DBUG_ENTER("federated_txn::sp_acquire");
  DBUG_ASSERT(sp && savepoint_next);
  
  *sp= savepoint_level= savepoint_next++;
    
  for (io= txn_list; io; io= io->txn_next)
  {
    if (io->readonly)
      continue;

    io->savepoint_set(savepoint_level);
    rc= TRUE;
  }

  DBUG_RETURN(rc);
}


int federated_txn::sp_rollback(ulong *sp)
{
  ulong level, new_level= savepoint_level;
  federated_io *io;
  DBUG_ENTER("federated_txn::sp_rollback");
  DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
  
  for (io= txn_list; io; io= io->txn_next)
  {
    if (io->readonly)
      continue;

    if ((level= io->savepoint_rollback(*sp)) < new_level)
      new_level= level;
  } 
  
  savepoint_level= new_level;
  
  DBUG_RETURN(0);
}


int federated_txn::sp_release(ulong *sp)
{
  ulong level, new_level= savepoint_level;
  federated_io *io;
  DBUG_ENTER("federated_txn::sp_release");
  DBUG_ASSERT(sp && savepoint_next && *sp && *sp <= savepoint_level);
  
  for (io= txn_list; io; io= io->txn_next)
  {
    if (io->readonly)
      continue;

    if ((level= io->savepoint_release(*sp)) < new_level)
      new_level= level;
  }

  savepoint_level= new_level;
  *sp= 0;

  DBUG_RETURN(0);
}


bool federated_txn::stmt_begin()
{
  bool result= FALSE;
  DBUG_ENTER("federated_txn::stmt_begin");

  if (!savepoint_stmt)
  {
	if (!savepoint_next)
	{
      savepoint_next++;
      savepoint_level= savepoint_stmt= 0;
	}
    result= sp_acquire(&savepoint_stmt);
  }

  DBUG_RETURN(result);
}


int federated_txn::stmt_commit()
{ 
  int result= 0;
  DBUG_ENTER("federated_txn::stmt_commit");
  
  if (savepoint_stmt == 1)
  {
    savepoint_stmt= 0;
    result= txn_commit();
  }
  else  
  if (savepoint_stmt)
    result= sp_release(&savepoint_stmt);

  DBUG_RETURN(result);
}


int federated_txn::stmt_rollback()
{
  int result= 0;
  DBUG_ENTER("federated:txn::stmt_rollback");

  if (savepoint_stmt == 1)
  {
    savepoint_stmt= 0;
	result= txn_rollback();
  }
  else
  if (savepoint_stmt)
  {
    result= sp_rollback(&savepoint_stmt);
	sp_release(&savepoint_stmt);
  }
  
  DBUG_RETURN(result);
}


void federated_txn::stmt_autocommit()
{
  federated_io *io;
  DBUG_ENTER("federated_txn::stmt_autocommit");

  for (io= txn_list; savepoint_stmt && io; io= io->txn_next)
  {
    if (io->readonly)
      continue;

    io->savepoint_restrict(savepoint_stmt);
  }

  DBUG_VOID_RETURN;  
}


--- 1.4/storage/federated/Makefile.am	2007-06-04 02:49:52 -07:00
+++ 1.5/storage/federated/Makefile.am	2007-06-04 02:49:52 -07:00
@@ -30,20 +30,21 @@
 DEFS =                  @DEFS@
 
 noinst_HEADERS =	ha_federated.h
+federated_sources =     ha_federated.cc federated_txn.cc federated_io.cc
 
 EXTRA_LTLIBRARIES =	ha_federated.la
 pkglib_LTLIBRARIES =	@plugin_federated_shared_target@
 ha_federated_la_LDFLAGS =	-module -rpath $(MYSQLLIBdir)
 ha_federated_la_CXXFLAGS=	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
 ha_federated_la_CFLAGS =	$(AM_CFLAGS) -DMYSQL_DYNAMIC_PLUGIN
-ha_federated_la_SOURCES =	ha_federated.cc
+ha_federated_la_SOURCES =	$(federated_sources)
 
 
 EXTRA_LIBRARIES =	libfederated.a
 noinst_LIBRARIES =	@plugin_federated_static_target@
 libfederated_a_CXXFLAGS =	$(AM_CFLAGS)
 libfederated_a_CFLAGS =	$(AM_CFLAGS)
-libfederated_a_SOURCES=	ha_federated.cc
+libfederated_a_SOURCES=	$(federated_sources)
 
 
 EXTRA_DIST =		CMakeLists.txt plug.in

--- 1.4/mysql-test/r/federated_transactions.result	2007-06-04 02:49:52 -07:00
+++ 1.5/mysql-test/r/federated_transactions.result	2007-06-04 02:49:52 -07:00
@@ -43,6 +43,98 @@
 5	foe
 6	fig
 DELETE FROM federated.t1;
+DROP TABLE federated.t1;
+DROP TABLE federated.t1;
+CREATE TABLE federated.t1 (`id` INT PRIMARY KEY) ENGINE=MyISAM;
+CREATE TABLE federated.t1 (`id` INT PRIMARY KEY) ENGINE=FEDERATED
+CONNECTION='mysql://root@stripped:SLAVE_PORT/federated/t1';
+INSERT INTO federated.t1 VALUES (1), (2), (1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 'PRIMARY'' from FEDERATED
+SELECT COUNT(*) FROM federated.t1;
+COUNT(*)
+2
+DELETE FROM federated.t1;
+ALTER TABLE federated.t1 ENGINE=InnoDB;
+INSERT INTO federated.t1 VALUES (1), (2), (1);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '1' for key 'PRIMARY'' from FEDERATED
+SELECT COUNT(*) FROM federated.t1;
+COUNT(*)
+0
+DELETE FROM federated.t1;
+drop table federated.t1;
+drop table federated.t1;
+create table federated.t1 (n int not null primary key) engine=innodb;
+create table federated.t1 (n int not null primary key) engine=federated
+CONNECTION='mysql://root@stripped:SLAVE_PORT/federated/t1';
+set autocommit=0;
+insert into federated.t1 values (4);
+rollback;
+select n, "after rollback" from federated.t1;
+n	after rollback
+insert into federated.t1 values (4);
+commit;
+select n, "after commit" from federated.t1;
+n	after commit
+4	after commit
+commit;
+insert into federated.t1 values (5);
+insert into federated.t1 values (4);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '4' for key 'PRIMARY'' from FEDERATED
+commit;
+select n, "after commit" from federated.t1;
+n	after commit
+4	after commit
+5	after commit
+set autocommit=1;
+insert into federated.t1 values (6);
+insert into federated.t1 values (4);
+ERROR HY000: Got error 10000 'Error on remote system: 1582: Duplicate entry '4' for key 'PRIMARY'' from FEDERATED
+select n from federated.t1;
+n
+4
+5
+6
+set autocommit=0;
+begin;
+savepoint `my_savepoint`;
+insert into federated.t1 values (7);
+savepoint `savept2`;
+insert into federated.t1 values (3);
+select n from federated.t1;
+n
+3
+4
+5
+6
+7
+savepoint savept3;
+rollback to savepoint savept2;
+rollback to savepoint savept3;
+ERROR 42000: SAVEPOINT savept3 does not exist
+rollback to savepoint savept2;
+release savepoint `my_savepoint`;
+select n from federated.t1;
+n
+4
+5
+6
+7
+rollback to savepoint `my_savepoint`;
+ERROR 42000: SAVEPOINT my_savepoint does not exist
+rollback to savepoint savept2;
+ERROR 42000: SAVEPOINT savept2 does not exist
+insert into federated.t1 values (8);
+savepoint sv;
+commit;
+savepoint sv;
+set autocommit=1;
+select a.n from federated.t1 a, federated.t1 b where a.n = b.n + 1;
+n
+5
+6
+7
+8
+rollback;
 DROP TABLE IF EXISTS federated.t1;
 DROP DATABASE IF EXISTS federated;
 DROP TABLE IF EXISTS federated.t1;

--- 1.7/mysql-test/t/federated_transactions.test	2007-06-04 02:49:52 -07:00
+++ 1.8/mysql-test/t/federated_transactions.test	2007-06-04 02:49:52 -07:00
@@ -37,4 +37,93 @@
 SELECT * FROM federated.t1;
 DELETE FROM federated.t1;
 
+#
+# Bug#25513
+#
+connection master;
+DROP TABLE federated.t1;
+connection slave;
+DROP TABLE federated.t1;
+CREATE TABLE federated.t1 (`id` INT PRIMARY KEY) ENGINE=MyISAM;
+connection master;
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval CREATE TABLE federated.t1 (`id` INT PRIMARY KEY) ENGINE=FEDERATED
+  CONNECTION='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
+
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1), (2), (1);
+SELECT COUNT(*) FROM federated.t1;
+DELETE FROM federated.t1;
+
+connection slave;
+ALTER TABLE federated.t1 ENGINE=InnoDB;
+connection master;
+
+--error ER_GET_ERRMSG
+INSERT INTO federated.t1 VALUES (1), (2), (1);
+SELECT COUNT(*) FROM federated.t1;
+DELETE FROM federated.t1;
+
+#
+# Test rollback
+#
+drop table federated.t1;
+connection slave;
+drop table federated.t1;
+create table federated.t1 (n int not null primary key) engine=innodb;
+connection master;
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+eval create table federated.t1 (n int not null primary key) engine=federated
+  CONNECTION='mysql://root@stripped:$SLAVE_MYPORT/federated/t1';
+
+set autocommit=0;
+insert into federated.t1 values (4);
+rollback;
+select n, "after rollback" from federated.t1;
+insert into federated.t1 values (4);
+commit;
+select n, "after commit" from federated.t1;
+commit;
+insert into federated.t1 values (5);
+--error ER_GET_ERRMSG
+insert into federated.t1 values (4);
+commit;
+select n, "after commit" from federated.t1;
+set autocommit=1;
+insert into federated.t1 values (6);
+--error ER_GET_ERRMSG
+insert into federated.t1 values (4);
+select n from federated.t1;
+set autocommit=0;
+#
+# savepoints
+#
+begin;
+savepoint `my_savepoint`;
+insert into federated.t1 values (7);
+savepoint `savept2`;
+insert into federated.t1 values (3);
+select n from federated.t1;
+savepoint savept3;
+rollback to savepoint savept2;
+--error ER_SP_DOES_NOT_EXIST
+rollback to savepoint savept3;
+rollback to savepoint savept2;
+release savepoint `my_savepoint`;
+select n from federated.t1;
+--error ER_SP_DOES_NOT_EXIST
+rollback to savepoint `my_savepoint`;
+--error ER_SP_DOES_NOT_EXIST
+rollback to savepoint savept2;
+insert into federated.t1 values (8);
+savepoint sv;
+commit;
+savepoint sv;
+set autocommit=1;
+# test self joins - should still only use 1 network connection
+select a.n from federated.t1 a, federated.t1 b where a.n = b.n + 1;
+# nop
+rollback;
+
+
 source include/federated_cleanup.inc;

--- 1.101/storage/federated/ha_federated.cc	2007-06-04 02:49:52 -07:00
+++ 1.102/storage/federated/ha_federated.cc	2007-06-04 02:49:52 -07:00
@@ -375,18 +375,18 @@
 #include "mysql_priv.h"
 #include <mysql/plugin.h>
 
-#ifdef USE_PRAGMA_IMPLEMENTATION
-#pragma implementation                          // gcc: Class implementation
-#endif
-
 #include "ha_federated.h"
 
 #include "m_string.h"
 
-#include <mysql/plugin.h>
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation                          // gcc: Class implementation
+#endif
+
 
 /* Variables for federated share methods */
 static HASH federated_open_tables;              // To track open tables
+static HASH federated_open_servers;             // To track open servers
 pthread_mutex_t federated_mutex;                // To init the hash
 
 /* Variables used when chopping off trailing characters */
@@ -399,8 +399,6 @@
 static handler *federated_create_handler(handlerton *hton,
                                          TABLE_SHARE *table,
                                          MEM_ROOT *mem_root);
-static int federated_commit(handlerton *hton, THD *thd, bool all);
-static int federated_rollback(handlerton *hton, THD *thd, bool all);
 
 /* Federated storage engine handlerton */
 
@@ -414,13 +412,22 @@
 
 /* Function we use in the creation of our hash to get key */
 
-static uchar *federated_get_key(FEDERATED_SHARE *share, size_t *length,
-                               my_bool not_used __attribute__ ((unused)))
+static uchar *
+federated_share_get_key(FEDERATED_SHARE *share, size_t *length,
+                        my_bool not_used __attribute__ ((unused)))
 {
   *length= share->share_key_length;
   return (uchar*) share->share_key;
 }
 
+static uchar *
+federated_server_get_key(FEDERATED_SERVER *server, size_t *length,
+                         my_bool not_used __attribute__ ((unused)))
+{
+  *length= server->key_length;
+  return server->key;
+}
+
 /*
   Initialize the federated handler.
 
@@ -439,15 +446,22 @@
   handlerton *federated_hton= (handlerton *)p;
   federated_hton->state= SHOW_OPTION_YES;
   federated_hton->db_type= DB_TYPE_FEDERATED_DB;
-  federated_hton->commit= federated_commit;
-  federated_hton->rollback= federated_rollback;
+  federated_hton->savepoint_offset= sizeof(ulong);
+  federated_hton->close_connection= ha_federated::disconnect;
+  federated_hton->savepoint_set= ha_federated::savepoint_set;
+  federated_hton->savepoint_rollback= ha_federated::savepoint_rollback;
+  federated_hton->savepoint_release= ha_federated::savepoint_release;
+  federated_hton->commit= ha_federated::commit;
+  federated_hton->rollback= ha_federated::rollback;
   federated_hton->create= federated_create_handler;
   federated_hton->flags= HTON_ALTER_NOT_SUPPORTED | HTON_NO_PARTITION;
 
   if (pthread_mutex_init(&federated_mutex, MY_MUTEX_INIT_FAST))
     goto error;
   if (!hash_init(&federated_open_tables, &my_charset_bin, 32, 0, 0,
-                    (hash_get_key) federated_get_key, 0, 0))
+                 (hash_get_key) federated_share_get_key, 0, 0) &&
+      !hash_init(&federated_open_servers, &my_charset_bin, 32, 0, 0,
+                 (hash_get_key) federated_server_get_key, 0, 0))
   {
     DBUG_RETURN(FALSE);
   }
@@ -904,9 +918,8 @@
 ha_federated::ha_federated(handlerton *hton,
                            TABLE_SHARE *table_arg)
   :handler(hton, table_arg),
-  mysql(0), stored_result(0)
+  txn(0), io(0), stored_result(0)
 {
-  trx_next= 0;
 }
 
 
@@ -1475,6 +1488,119 @@
   DBUG_RETURN(1);
 }
 
+
+static FEDERATED_SERVER *get_server(FEDERATED_SHARE *share, TABLE *table)
+{
+  FEDERATED_SERVER *server= NULL, tmp_server;
+  MEM_ROOT mem_root;
+  char buffer[STRING_BUFFER_USUAL_SIZE];
+  String key(buffer, sizeof(buffer), &my_charset_bin);  
+  String scheme(share->scheme, &my_charset_latin1);
+  String hostname(share->hostname, &my_charset_latin1);
+  String database(share->database, system_charset_info);
+  String username(share->username, system_charset_info);
+  String socket(share->socket ? share->socket : "", files_charset_info);
+  String password(share->password ? share->password : "", &my_charset_bin);
+  DBUG_ENTER("ha_federated.cc::get_server");
+
+  safe_mutex_assert_owner(&federated_mutex);
+
+  /* Do some case conversions */
+  scheme.reserve(scheme.length());
+  scheme.length(my_casedn_str(&my_charset_latin1, scheme.c_ptr_safe()));
+  
+  hostname.reserve(hostname.length());
+  hostname.length(my_casedn_str(&my_charset_latin1, hostname.c_ptr_safe()));
+  
+  if (lower_case_table_names)
+  {
+    database.reserve(database.length());
+    database.length(my_casedn_str(system_charset_info, database.c_ptr_safe()));
+  }
+  
+  if (lower_case_file_system && socket.length())
+  {
+    socket.reserve(socket.length());
+    socket.length(my_casedn_str(files_charset_info, socket.c_ptr_safe()));
+  }
+
+  /* start with all bytes zeroed */  
+  bzero(&tmp_server, sizeof(tmp_server));
+
+  key.length(0);
+  key.reserve(scheme.length() + hostname.length() + database.length() +
+              socket.length() + username.length() + password.length() +
+			  sizeof(int) + 8);
+  key.append(scheme);
+  key.q_append('\0');
+  tmp_server.hostname= (const char *) (intptr) key.length();
+  key.append(hostname);
+  key.q_append('\0');
+  tmp_server.database= (const char *) (intptr) key.length();
+  key.append(database);
+  key.q_append('\0');
+  key.q_append((uint32) share->port);
+  tmp_server.socket= (const char *) (intptr) key.length();
+  key.append(socket);
+  key.q_append('\0');
+  tmp_server.username= (const char *) (intptr) key.length();
+  key.append(username);
+  key.q_append('\0');
+  tmp_server.password= (const char *) (intptr) key.length();
+  key.append(password);
+  
+  init_alloc_root(&mem_root, 4096, 4096);
+  
+  tmp_server.key_length= key.length();
+  tmp_server.key= (uchar *)  memdup_root(&mem_root, key.ptr(), key.length());
+  
+  /* pointer magic */
+  tmp_server.scheme+= (intptr) tmp_server.key;
+  tmp_server.hostname+= (intptr) tmp_server.key;
+  tmp_server.database+= (intptr) tmp_server.key;
+  tmp_server.username+= (intptr) tmp_server.key;
+  tmp_server.password+= (intptr) tmp_server.key;
+  tmp_server.socket+= (intptr) tmp_server.key;
+  tmp_server.port= share->port;
+
+  if (!(server= (FEDERATED_SERVER *) hash_search(&federated_open_servers,
+                                                 tmp_server.key,
+                                                 tmp_server.key_length)))
+  {
+    if (!table)
+	  goto error;
+	
+    if (!share->socket)
+	  tmp_server.socket= NULL;
+    if (!share->password)
+	  tmp_server.password= NULL;
+
+    tmp_server.csname= strdup_root(&mem_root, 
+                                   table->s->table_charset->csname);
+
+    if (!(server= (FEDERATED_SERVER *) memdup_root(&mem_root, 
+												   (char *) &tmp_server,
+												   sizeof(*server))))
+      goto error;
+
+    server->mem_root= mem_root;
+
+    if (my_hash_insert(&federated_open_servers, (uchar*) server))
+      goto error;
+
+    pthread_mutex_init(&server->mutex, MY_MUTEX_INIT_FAST);
+  }
+  else
+    free_root(&mem_root, MYF(0)); /* prevents memory leak */
+
+  server->use_count++;
+  
+  DBUG_RETURN(server);
+error:
+  free_root(&mem_root, MYF(0));
+  DBUG_RETURN(NULL);
+}
+
 /*
   Example of simple lock controls. The "share" it creates is structure we will
   pass to each federated handler. Do you have to have one of these? Well, you
@@ -1496,6 +1622,7 @@
   */
   query.length(0);
 
+  bzero(&tmp_share, sizeof(tmp_share));
   init_alloc_root(&mem_root, 256, 0);
 
   pthread_mutex_lock(&federated_mutex);
@@ -1531,16 +1658,17 @@
         !(share->select_query= (char*) strmake_root(&mem_root, query.ptr(), query.length())))
       goto error;
 
-    share->use_count= 0;
     share->mem_root= mem_root;
 
     DBUG_PRINT("info",
                ("share->select_query %s", share->select_query));
 
+    if (!(share->s= get_server(share, table)))
+	  goto error;
+	  
     if (my_hash_insert(&federated_open_tables, (uchar*) share))
       goto error;
     thr_lock_init(&share->lock);
-    pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
   }
   else
     free_root(&mem_root, MYF(0)); /* prevents memory leak */
@@ -1557,26 +1685,62 @@
 }
 
 
+
+static int free_server(federated_txn *txn, FEDERATED_SERVER *server)
+{
+  bool destroy;
+  DBUG_ENTER("free_server");
+
+  pthread_mutex_lock(&federated_mutex);
+  if ((destroy= !--server->use_count))
+    hash_delete(&federated_open_servers, (uchar*) server);
+  pthread_mutex_unlock(&federated_mutex);
+
+  if (destroy)
+  {
+    MEM_ROOT mem_root;
+	
+    txn->close(server);
+	
+	DBUG_ASSERT(server->io_count == 0);
+
+    pthread_mutex_destroy(&server->mutex);
+	mem_root= server->mem_root;
+    free_root(&mem_root, MYF(0));
+  }
+
+  DBUG_RETURN(0);
+}
+
+
 /*
   Free lock controls. We call this whenever we close a table.
   If the table had the last reference to the share then we
   free memory associated with it.
 */
 
-static int free_share(FEDERATED_SHARE *share)
+static int free_share(federated_txn *txn, FEDERATED_SHARE *share)
 {
-  MEM_ROOT mem_root= share->mem_root;
+  bool destroy;
   DBUG_ENTER("free_share");
 
   pthread_mutex_lock(&federated_mutex);
-  if (!--share->use_count)
-  {
+  if ((destroy= !--share->use_count))
     hash_delete(&federated_open_tables, (uchar*) share);
+  pthread_mutex_unlock(&federated_mutex);
+
+  if (destroy)
+  {
+    MEM_ROOT mem_root;
+	FEDERATED_SERVER *server= share->s;
+	
     thr_lock_delete(&share->lock);
-    VOID(pthread_mutex_destroy(&share->mutex));
+
+	mem_root= share->mem_root;
     free_root(&mem_root, MYF(0));
+	
+	free_server(txn, server);
   }
-  pthread_mutex_unlock(&federated_mutex);
 
   DBUG_RETURN(0);
 }
@@ -1612,6 +1776,22 @@
 }
 
 
+federated_txn *ha_federated::get_txn(THD *thd, bool no_create)
+{
+  federated_txn **txnp= (federated_txn **) ha_data(thd);
+  if (!*txnp && !no_create)
+    *txnp= new federated_txn();
+  return *txnp;
+}
+  
+int ha_federated::disconnect(handlerton *hton, THD *thd)
+{
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  delete txn;
+  return 0;
+}
+ 
+
 /*
   Used for opening tables. The name will be the name of the file.
   A table is opened when it needs to be opened. For instance
@@ -1625,45 +1805,26 @@
 
 int ha_federated::open(const char *name, int mode, uint test_if_locked)
 {
+  int error;
+  THD *thd= current_thd;
   DBUG_ENTER("ha_federated::open");
 
   if (!(share= get_share(name, table)))
     DBUG_RETURN(1);
   thr_lock_data_init(&share->lock, &lock, NULL);
 
-  /* Connect to foreign database mysql_real_connect() */
-  mysql= mysql_init(0);
+  DBUG_ASSERT(io == NULL);
+  
+  txn= get_txn(thd);
 
-  /*
-    BUG# 17044 Federated Storage Engine is not UTF8 clean
-    Add set names to whatever charset the table is at open
-    of table
-  */
-  /* this sets the csname like 'set names utf8' */
-  mysql_options(mysql,MYSQL_SET_CHARSET_NAME,
-                this->table->s->table_charset->csname);
-
-  DBUG_PRINT("info", ("calling mysql_real_connect hostname %s user %s",
-			share->hostname, share->username));
-  if (!mysql || !mysql_real_connect(mysql,
-                                   share->hostname,
-                                   share->username,
-                                   share->password,
-                                   share->database,
-                                   share->port,
-                                   share->socket, 0))
+  if ((error= txn->acquire(share, TRUE, &io)))
   {
-    free_share(share);
-    DBUG_RETURN(stash_remote_error());
+    free_share(txn, share);
+    DBUG_RETURN(error);
   }
-  /*
-    Since we do not support transactions at this version, we can let the client
-    API silently reconnect. For future versions, we will need more logic to
-    deal with transactions
-  */
-
-  mysql->reconnect= 1;
-
+  
+  txn->release(&io);
+  
   ref_length= (table->s->primary_key != MAX_KEY ?
                table->key_info[table->s->primary_key].key_length :
                table->s->reclength);
@@ -1690,17 +1851,16 @@
   DBUG_ENTER("ha_federated::close");
 
   /* free the result set */
-  if (stored_result)
-  {
-    mysql_free_result(stored_result);
-    stored_result= 0;
-  }
+  mysql_free_result(stored_result);
+  stored_result= 0;
+
   /* Disconnect from mysql */
-  if (mysql)                                    // QQ is this really needed
-    mysql_close(mysql);
-  retval= free_share(share);
-  DBUG_RETURN(retval);
+  txn->release(&io);
+  
+  DBUG_ASSERT(io == NULL);
 
+  retval= free_share(txn, share);
+  DBUG_RETURN(retval);
 }
 
 /*
@@ -1817,6 +1977,7 @@
   char values_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   char insert_field_value_buffer[STRING_BUFFER_USUAL_SIZE];
   Field **field;
+  int error;
 
   /* The main insert query string */
   String insert_string(insert_buffer, sizeof(insert_buffer), &my_charset_bin);
@@ -1909,7 +2070,10 @@
   /* add the values */
   insert_string.append(values_string);
 
-  if (mysql_real_query(mysql, insert_string.ptr(), insert_string.length()))
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(insert_string.ptr(), insert_string.length()))
   {
     DBUG_RETURN(stash_remote_error());
   }
@@ -1937,7 +2101,7 @@
   DBUG_ENTER("ha_federated::update_auto_increment");
 
   thd->first_successful_insert_id_in_cur_stmt= 
-    mysql->last_used_con->insert_id;
+    io->last_insert_id();
   DBUG_PRINT("info",("last_insert_id: %ld", (long) stats.auto_increment_value));
 
   DBUG_VOID_RETURN;
@@ -1945,6 +2109,7 @@
 
 int ha_federated::optimize(THD* thd, HA_CHECK_OPT* check_opt)
 {
+  int error= 0;
   char query_buffer[STRING_BUFFER_USUAL_SIZE];
   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
   DBUG_ENTER("ha_federated::optimize");
@@ -1956,17 +2121,21 @@
   query.append(share->table_name, share->table_name_length);
   query.append(STRING_WITH_LEN("`"));
 
-  if (mysql_real_query(mysql, query.ptr(), query.length()))
-  {
-    DBUG_RETURN(stash_remote_error());
-  }
+  DBUG_ASSERT(txn == get_txn(thd));
 
-  DBUG_RETURN(0);
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(query.ptr(), query.length()))
+    error= stash_remote_error();
+
+  DBUG_RETURN(error);
 }
 
 
 int ha_federated::repair(THD* thd, HA_CHECK_OPT* check_opt)
 {
+  int error= 0;
   char query_buffer[STRING_BUFFER_USUAL_SIZE];
   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
   DBUG_ENTER("ha_federated::repair");
@@ -1984,12 +2153,15 @@
   if (check_opt->sql_flags & TT_USEFRM)
     query.append(STRING_WITH_LEN(" USE_FRM"));
 
-  if (mysql_real_query(mysql, query.ptr(), query.length()))
-  {
-    DBUG_RETURN(stash_remote_error());
-  }
+  DBUG_ASSERT(txn == get_txn(thd));
 
-  DBUG_RETURN(0);
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(query.ptr(), query.length()))
+    error= stash_remote_error();
+  
+  DBUG_RETURN(error);
 }
 
 
@@ -2045,6 +2217,7 @@
                       sizeof(where_buffer),
                       &my_charset_bin);
   uchar *record= table->record[0];
+  int error;
   DBUG_ENTER("ha_federated::update_row");
   /*
     set string lengths to 0 to avoid misc chars in string
@@ -2133,7 +2306,10 @@
   if (!has_a_primary_key)
     update_string.append(STRING_WITH_LEN(" LIMIT 1"));
 
-  if (mysql_real_query(mysql, update_string.ptr(), update_string.length()))
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(update_string.ptr(), update_string.length()))
   {
     DBUG_RETURN(stash_remote_error());
   }
@@ -2162,6 +2338,7 @@
   String delete_string(delete_buffer, sizeof(delete_buffer), &my_charset_bin);
   String data_string(data_buffer, sizeof(data_buffer), &my_charset_bin);
   uint found= 0;
+  int error;
   DBUG_ENTER("ha_federated::delete_row");
 
   delete_string.length(0);
@@ -2204,15 +2381,19 @@
   delete_string.append(STRING_WITH_LEN(" LIMIT 1"));
   DBUG_PRINT("info",
              ("Delete sql: %s", delete_string.c_ptr_quick()));
-  if (mysql_real_query(mysql, delete_string.ptr(), delete_string.length()))
+
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(delete_string.ptr(), delete_string.length()))
   {
     DBUG_RETURN(stash_remote_error());
   }
-  stats.deleted+= (ha_rows) mysql->affected_rows;
-  stats.records-= (ha_rows) mysql->affected_rows;
+  stats.deleted+= (ha_rows) io->affected_rows();
+  stats.records-= (ha_rows) io->affected_rows();
   DBUG_PRINT("info",
              ("rows deleted %ld  rows deleted for all time %ld",
-              (long) mysql->affected_rows, (long) stats.deleted));
+              (long) io->affected_rows(), (long) stats.deleted));
 
   DBUG_RETURN(0);
 }
@@ -2312,14 +2493,17 @@
                         NULL, 0, 0);
   sql_query.append(index_string);
 
-  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
+  if ((retval= txn->acquire(share, TRUE, &io)))
+    DBUG_RETURN(retval);
+
+  if (io->query(sql_query.ptr(), sql_query.length()))
   {
     my_sprintf(error_buffer, (error_buffer, "error: %d '%s'",
-                              mysql_errno(mysql), mysql_error(mysql)));
+                              io->error_code(), io->error_str()));
     retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
     goto error;
   }
-  if (!(*result= mysql_store_result(mysql)))
+  if (!(*result= io->store_result()))
   {
     retval= HA_ERR_END_OF_FILE;
     goto error;
@@ -2378,14 +2562,18 @@
     mysql_free_result(stored_result);
     stored_result= 0;
   }
-  if (mysql_real_query(mysql, sql_query.ptr(), sql_query.length()))
+
+  if ((retval= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(retval);
+
+  if (io->query(sql_query.ptr(), sql_query.length()))
   {
     retval= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
     goto error;
   }
   sql_query.length(0);
 
-  if (!(stored_result= mysql_store_result(mysql)))
+  if (!(stored_result= io->store_result()))
   {
     retval= HA_ERR_END_OF_FILE;
     goto error;
@@ -2471,18 +2659,22 @@
 
   if (scan)
   {
+    int error;
+	
     if (stored_result)
     {
       mysql_free_result(stored_result);
       stored_result= 0;
     }
 
-    if (mysql_real_query(mysql,
-                         share->select_query,
-                         strlen(share->select_query)))
+	if ((error= txn->acquire(share, TRUE, &io)))
+      DBUG_RETURN(error);
+	  
+    if (io->query(share->select_query,
+                  strlen(share->select_query)))
       goto error;
 
-    stored_result= mysql_store_result(mysql);
+    stored_result= io->store_result();
     if (!stored_result)
       goto error;
   }
@@ -2701,13 +2893,16 @@
     status_query_string.append(escaped_table_name);
     status_query_string.append(STRING_WITH_LEN("'"));
 
-    if (mysql_real_query(mysql, status_query_string.ptr(),
-                         status_query_string.length()))
+    if ((error_code= txn->acquire(share, TRUE, &io)))
+        goto fail;
+
+    if (io->query(status_query_string.ptr(),
+                  status_query_string.length()))
       goto error;
 
     status_query_string.length(0);
 
-    result= mysql_store_result(mysql);
+    result= io->store_result();
     if (!result)
       goto error;
 
@@ -2764,8 +2959,10 @@
     mysql_free_result(result);
 
   my_sprintf(error_buffer, (error_buffer, ": %d : %s",
-                            mysql_errno(mysql), mysql_error(mysql)));
+                            io->error_code(), io->error_str()));
   my_error(error_code, MYF(0), error_buffer);
+
+fail:
   DBUG_RETURN(error_code);
 }
 
@@ -2786,6 +2983,7 @@
 {
   char query_buffer[FEDERATED_QUERY_BUFFER_SIZE];
   String query(query_buffer, sizeof(query_buffer), &my_charset_bin);
+  int error;
   DBUG_ENTER("ha_federated::delete_all_rows");
 
   query.length(0);
@@ -2795,10 +2993,18 @@
   query.append(share->table_name);
   query.append(STRING_WITH_LEN("`"));
 
+  /* no need for savepoint in autocommit mode */
+  if (!(ha_thd()->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+    txn->stmt_autocommit();
+
   /*
     TRUNCATE won't return anything in mysql_affected_rows
   */
-  if (mysql_real_query(mysql, query.ptr(), query.length()))
+
+  if ((error= txn->acquire(share, FALSE, &io)))
+    DBUG_RETURN(error);
+
+  if (io->query(query.ptr(), query.length()))
   {
     DBUG_RETURN(stash_remote_error());
   }
@@ -2886,11 +3092,71 @@
   int retval;
   THD *thd= current_thd;
   FEDERATED_SHARE tmp_share; // Only a temporary share, to test the url
+  federated_txn *tmp_txn;
+  federated_io *tmp_io= NULL;
   DBUG_ENTER("ha_federated::create");
 
-  if (!(retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1)))
+  if ((retval= parse_url(thd->mem_root, &tmp_share, table_arg, 1)))
+    goto error;
+
+#ifdef FEDERATED_EXPERIMENTAL
+  /*
+    If possible, we try to use an existing network connection to
+	the remote server. To ensure that no new FEDERATED_SERVER
+	instance is created, we pass NULL in get_server() TABLE arg.
+	
+  */
+  pthread_mutex_lock(&federated_mutex);
+  tmp_share.s= get_server(&tmp_share, NULL);
+  pthread_mutex_unlock(&federated_mutex);
+    
+  if (tmp_share.s)
+  {
+    tmp_txn= get_txn(thd);
+	if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io)))
+	{
+      char buffer[FEDERATED_QUERY_BUFFER_SIZE];
+	  String str(buffer, sizeof(buffer), &my_charset_bin);
+	  MYSQL_RES *resultset= NULL;
+	  bool saved_no_errors= thd->no_errors;
+	  thd->no_errors= TRUE;
+	  
+	  str.length(0);
+	  str.append(STRING_WITH_LEN("SELECT * FROM "));
+	  append_identifier(thd, &str, tmp_share.table_name, 
+	  				    tmp_share.table_name_length);
+      str.append(STRING_WITH_LEN(" WHERE 1=0"));
+	  
+	  if (tmp_io->query(str.ptr(), str.length()))
+	  {
+    	retval= tmp_io->error_code();
+	    thd->no_errors= saved_no_errors;
+		my_sprintf(buffer, (buffer,
+	               "database: '%s'  username: '%s'  hostname: '%s'",
+                   tmp_share.database, tmp_share.username, tmp_share.hostname));
+        DBUG_PRINT("info", ("error-code: %d", retval));
+    	my_error(ER_CANT_CREATE_FEDERATED_TABLE, MYF(0), buffer);
+    	//my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), buffer);
+	  }
+	  else
+	    resultset= tmp_io->store_result();
+
+	  thd->no_errors= saved_no_errors;		
+      mysql_free_result(resultset);
+
+	  tmp_txn->release(&tmp_io);	  
+	}
+	free_server(tmp_txn, tmp_share.s);
+  }
+  else
+#endif /* FEDERATED_EXPERIMENTAL */
+    /*
+	  no existing network connection exists, 
+	  so we have to do it the hard way.
+	*/
     retval= check_foreign_data_source(&tmp_share, 1);
 
+error:
   DBUG_RETURN(retval);
 
 }
@@ -2899,8 +3165,8 @@
 int ha_federated::stash_remote_error()
 {
   DBUG_ENTER("ha_federated::stash_remote_error()");
-  remote_error_number= mysql_errno(mysql);
-  strmake(remote_error_buf, mysql_error(mysql), sizeof(remote_error_buf)-1);
+  remote_error_number= io->error_code();
+  strmake(remote_error_buf, io->error_str(), sizeof(remote_error_buf)-1);
   DBUG_RETURN(HA_FEDERATED_ERROR_WITH_REMOTE_SYSTEM);
 }
 
@@ -2923,153 +3189,144 @@
   DBUG_RETURN(FALSE);
 }
 
-int ha_federated::external_lock(THD *thd, int lock_type)
-{
-  int error= 0;
-  ha_federated *trx= (ha_federated *)thd->ha_data[ht->slot];
-  DBUG_ENTER("ha_federated::external_lock");
 
-  if (lock_type != F_UNLCK)
+int ha_federated::start_stmt(THD *thd, thr_lock_type lock_type)
+{
+  DBUG_ENTER("ha_federated::start_stmt");
+  DBUG_ASSERT(txn == get_txn(thd));
+  
+  if (!txn->in_transaction())
   {
-    DBUG_PRINT("info",("federated not lock F_UNLCK"));
-    if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
-    {
-      DBUG_PRINT("info",("federated autocommit"));
-      /* 
-        This means we are doing an autocommit
-      */
-      error= connection_autocommit(TRUE);
-      if (error)
-      {
-        DBUG_PRINT("info", ("error setting autocommit TRUE: %d", error));
-        DBUG_RETURN(error);
-      }
-      trans_register_ha(thd, FALSE, ht);
-    }
-    else 
-    { 
-      DBUG_PRINT("info",("not autocommit"));
-      if (!trx)
-      {
-        /* 
-          This is where a transaction gets its start
-        */
-        error= connection_autocommit(FALSE);
-        if (error)
-        { 
-          DBUG_PRINT("info", ("error setting autocommit FALSE: %d", error));
-          DBUG_RETURN(error);
-        }
-        thd->ha_data[ht->slot]= this;
-        trans_register_ha(thd, TRUE, ht);
-        /*
-          Send a lock table to the remote end.
-          We do not support this at the moment
-        */
-        if (thd->options & (OPTION_TABLE_LOCK))
-        {
-          DBUG_PRINT("info", ("We do not support lock table yet"));
-        }
-      }
-      else
-      {
-        ha_federated *ptr;
-        for (ptr= trx; ptr; ptr= ptr->trx_next)
-          if (ptr == this)
-            break;
-          else if (!ptr->trx_next)
-            ptr->trx_next= this;
-      }
-    }
+    txn->stmt_begin();
+    trans_register_ha(thd, FALSE, ht);
   }
+
   DBUG_RETURN(0);
 }
 
 
-static int federated_commit(handlerton *hton, THD *thd, bool all)
+int ha_federated::external_lock(THD *thd, int lock_type)
 {
-  int return_val= 0;
-  ha_federated *trx= (ha_federated *)thd->ha_data[hton->slot];
-  DBUG_ENTER("federated_commit");
+  int error= 0;
+  DBUG_ENTER("ha_federated::external_lock");
 
-  if (all)
+  if (lock_type == F_UNLCK)
+    txn->release(&io);
+  else
   {
-    int error= 0;
-    ha_federated *ptr, *old= NULL;
-    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
-    {
-      if (old)
-        old->trx_next= NULL;
-      error= ptr->connection_commit();
-      if (error && !return_val)
-        return_val= error;
-    }
-    thd->ha_data[hton->slot]= NULL;
-  }
+	txn= get_txn(thd);	
+    if (!(error= txn->acquire(share, lock_type == F_RDLCK, &io)) &&
+        (lock_type == F_WRLCK || !io->is_autocommit()))
+	{
+      if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+	  {
+	    txn->stmt_begin();
+    	trans_register_ha(thd, FALSE, ht);
+	  }
+	  else
+	  {
+    	txn->txn_begin();
+    	trans_register_ha(thd, TRUE, ht);
+	  }
+	}
+  }
+
+  DBUG_RETURN(error);
+}
 
-  DBUG_PRINT("info", ("error val: %d", return_val));
-  DBUG_RETURN(return_val);
+
+void ha_federated::start_bulk_insert(ha_rows rows)
+{
+  DBUG_ENTER("ha_federated::start_bulk_insert");
+
+  /* no need for savepoint when only 1 row and in autocommit mode */
+  if (rows == 1 && 
+      !(ha_thd()->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
+    txn->stmt_autocommit();
+
+  DBUG_VOID_RETURN;
 }
 
 
-static int federated_rollback(handlerton *hton, THD *thd, bool all)
-{
-  int return_val= 0;
-  ha_federated *trx= (ha_federated *)thd->ha_data[hton->slot];
-  DBUG_ENTER("federated_rollback");
 
-  if (all)
+int ha_federated::savepoint_set(handlerton *hton, THD *thd, void *sv)
+{
+  int error= 0;
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  DBUG_ENTER("ha_federated::savepoint_set");
+  
+  if (txn && txn->has_connections())
   {
-    int error= 0;
-    ha_federated *ptr, *old= NULL;
-    for (ptr= trx; ptr; old= ptr, ptr= ptr->trx_next)
-    {
-      if (old)
-        old->trx_next= NULL;
-      error= ptr->connection_rollback();
-      if (error && !return_val)
-        return_val= error;
-    }
-    thd->ha_data[hton->slot]= NULL;
+    if (txn->txn_begin())
+      trans_register_ha(thd, TRUE, hton);
+    
+    txn->sp_acquire((ulong *) sv);
+
+    DBUG_ASSERT(1 < *(ulong *) sv);
   }
 
-  DBUG_PRINT("info", ("error val: %d", return_val));
-  DBUG_RETURN(return_val);
+  DBUG_RETURN(error);
 }
 
-int ha_federated::connection_commit()
+
+
+int ha_federated::savepoint_rollback(handlerton *hton, THD *thd, void *sv)
 {
-  DBUG_ENTER("ha_federated::connection_commit");
-  DBUG_RETURN(execute_simple_query("COMMIT", 6));
+  int error= 0;
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  DBUG_ENTER("ha_federated::savepoint_rollback");
+  
+  if (txn)
+    error= txn->sp_rollback((ulong *) sv);
+
+  DBUG_RETURN(error);
 }
 
 
-int ha_federated::connection_rollback()
+int ha_federated::savepoint_release(handlerton *hton, THD *thd, void *sv)
 {
-  DBUG_ENTER("ha_federated::connection_rollback");
-  DBUG_RETURN(execute_simple_query("ROLLBACK", 8));
+  int error= 0;
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  DBUG_ENTER("ha_federated::savepoint_release");
+  
+  if (txn)
+    error= txn->sp_release((ulong *) sv);
+
+  DBUG_RETURN(error);
 }
 
 
-int ha_federated::connection_autocommit(bool state)
+int ha_federated::commit(handlerton *hton, THD *thd, bool all)
 {
-  const char *text;
-  DBUG_ENTER("ha_federated::connection_autocommit");
-  text= (state == TRUE) ? "SET AUTOCOMMIT=1" : "SET AUTOCOMMIT=0";
-  DBUG_RETURN(execute_simple_query(text, 16));
+  int return_val;
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  DBUG_ENTER("ha_federated::commit");
+
+  if (all)
+    return_val= txn->txn_commit();
+  else
+    return_val= txn->stmt_commit();    
+  
+  DBUG_PRINT("info", ("error val: %d", return_val));
+  DBUG_RETURN(return_val);
 }
 
 
-int ha_federated::execute_simple_query(const char *query, int len)
+int ha_federated::rollback(handlerton *hton, THD *thd, bool all)
 {
-  DBUG_ENTER("ha_federated::execute_simple_query");
+  int return_val;
+  federated_txn *txn= (federated_txn *) thd->ha_data[hton->slot];
+  DBUG_ENTER("ha_federated::rollback");
 
-  if (mysql_real_query(mysql, query, len))
-  {
-    DBUG_RETURN(stash_remote_error());
-  }
-  DBUG_RETURN(0);
+  if (all)
+    return_val= txn->txn_rollback();
+  else
+    return_val= txn->stmt_rollback();
+
+  DBUG_PRINT("info", ("error val: %d", return_val));
+  DBUG_RETURN(return_val);
 }
+
 
 struct st_mysql_storage_engine federated_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };

--- 1.46/storage/federated/ha_federated.h	2007-06-04 02:49:52 -07:00
+++ 1.47/storage/federated/ha_federated.h	2007-06-04 02:49:52 -07:00
@@ -38,6 +38,35 @@
 #define FEDERATED_RECORDS_IN_RANGE 2
 #define FEDERATED_MAX_KEY_LENGTH 3500 // Same as innodb
 
+class federated_io;
+
+/*
+  FEDERATED_SERVER will eventually be a structure that will be shared among
+  all FEDERATED_SHARE instances so that the federated server can minimise
+  the number of open connections. This will eventually lead to the support
+  of reliable XA federated tables.
+*/
+typedef struct st_fedrated_server {
+  MEM_ROOT mem_root;
+  uint use_count, io_count;
+  
+  uchar *key;
+  uint key_length;
+
+  const char *scheme;
+  const char *hostname;
+  const char *username;
+  const char *password;
+  const char *database;
+  const char *socket;
+  ushort port;
+
+  const char *csname;
+
+  pthread_mutex_t mutex;
+  federated_io *idle_list;
+} FEDERATED_SERVER;
+
 /*
   FEDERATED_SHARE is a structure that will be shared amoung all open handlers
   The example implements the minimum of what you will probably need.
@@ -58,7 +87,6 @@
   char *server_name;
   char *connection_string;
   char *scheme;
-  char *connect_string;
   char *hostname;
   char *username;
   char *password;
@@ -70,19 +98,112 @@
   int share_key_length;
   ushort port;
 
-  uint table_name_length, server_name_length, connect_string_length, use_count;
-  pthread_mutex_t mutex;
+  uint table_name_length, server_name_length, connect_string_length;
+  uint use_count;
   THR_LOCK lock;
+  FEDERATED_SERVER *s;
 } FEDERATED_SHARE;
 
+
+class federated_io
+{
+  friend class federated_txn;
+  MYSQL mysql;	/* MySQL connection */
+  DYNAMIC_ARRAY savepoints;
+  FEDERATED_SERVER * const server;
+  federated_io **owner_ptr;
+  federated_io *txn_next;
+  federated_io *idle_next;
+  bool active;  /* currently participating in a transaction */
+  bool busy;    /* in use by a ha_federated instance */
+  bool readonly;/* indicates that no updates have occurred */
+  bool requested_autocommit;
+  bool actual_autocommit;
+
+  int actual_query(const char *buffer, uint length);
+  bool test_all_restrict() const;
+public:
+  federated_io(FEDERATED_SERVER *);
+  ~federated_io();
+
+  static void *operator new(size_t size, MEM_ROOT *mem_root) throw ()
+  { return alloc_root(mem_root, size); }
+  static void operator delete(void *ptr, size_t size)
+  { TRASH(ptr, size); }
+    
+  int simple_query(const char *fmt, ...);
+  int query(const char *buffer, uint length);
+  MYSQL_RES *store_result();
+
+  my_ulonglong affected_rows() const
+  { return mysql.affected_rows; }
+  my_ulonglong last_insert_id() const
+  { return mysql.last_used_con->insert_id; }
+
+  int error_code()
+  { return mysql_errno(&mysql); }
+  const char *error_str()
+  { return mysql_error(&mysql); }
+  
+  int commit();
+  int rollback();
+  
+  int savepoint_set(ulong sp);
+  ulong savepoint_release(ulong sp);
+  ulong savepoint_rollback(ulong sp);
+  void savepoint_restrict(ulong sp);
+  
+  ulong last_savepoint() const;
+  ulong actual_savepoint() const;
+  bool is_autocommit() const { return actual_autocommit; }
+};
+
+
+class federated_txn
+{
+  federated_io *txn_list;
+  ulong savepoint_level;
+  ulong savepoint_stmt;
+  ulong savepoint_next;
+  
+  void release_scan();
+public:
+  federated_txn();
+  ~federated_txn();
+  
+  bool has_connections() const { return txn_list != NULL; }
+  bool in_transaction() const { return savepoint_next != 0; }
+  int acquire(FEDERATED_SHARE *share, bool readonly, federated_io **io);
+  void release(federated_io **io);
+  void close(FEDERATED_SERVER *);
+
+  
+  bool txn_begin();
+  int txn_commit();
+  int txn_rollback();
+
+  bool sp_acquire(ulong *save);
+  int sp_rollback(ulong *save);
+  int sp_release(ulong *save);
+
+  bool stmt_begin();
+  int stmt_commit();
+  int stmt_rollback();
+  void stmt_autocommit();
+};
+
+
 /*
   Class definition for the storage engine
 */
 class ha_federated: public handler
 {
+  friend int federated_db_init(void *p);
+
   THR_LOCK_DATA lock;      /* MySQL lock */
   FEDERATED_SHARE *share;    /* Shared lock info */
-  MYSQL *mysql; /* MySQL connection */
+  federated_txn *txn;
+  federated_io *io;
   MYSQL_RES *stored_result;
   uint fetch_num; // stores the fetch num
   MYSQL_ROW_OFFSET current_position;  // Current position used by ::position()
@@ -102,16 +223,21 @@
                              bool records_in_range, bool eq_range);
   int stash_remote_error();
 
+  federated_txn *get_txn(THD *thd, bool no_create= FALSE);
+
+  static int disconnect(handlerton *hton, THD *thd);
+  static int savepoint_set(handlerton *hton, THD *thd, void *sv);
+  static int savepoint_rollback(handlerton *hton, THD *thd, void *sv);
+  static int savepoint_release(handlerton *hton, THD *thd, void *sv);
+  static int commit(handlerton *hton, THD *thd, bool all);
+  static int rollback(handlerton *hton, THD *thd, bool all);
+
 public:
   ha_federated(handlerton *hton, TABLE_SHARE *table_arg);
   ~ha_federated() {}
   /* The name that will be used for display purposes */
   const char *table_type() const { return "FEDERATED"; }
   /*
-    Next pointer used in transaction
-  */
-  ha_federated *trx_next;
-  /*
     The name of the index type that will be used for display
     don't implement this method unless you really have indexes
    */
@@ -188,6 +314,7 @@
   int open(const char *name, int mode, uint test_if_locked);    // required
   int close(void);                                              // required
 
+  void start_bulk_insert(ha_rows rows);	
   int write_row(uchar *buf);
   int update_row(const uchar *old_data, uchar *new_data);
   int delete_row(const uchar *buf);
@@ -231,11 +358,8 @@
   THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
                              enum thr_lock_type lock_type);     //required
   virtual bool get_error_message(int error, String *buf);
+  int start_stmt(THD *thd, thr_lock_type lock_type);
   int external_lock(THD *thd, int lock_type);
-  int connection_commit();
-  int connection_rollback();
-  int connection_autocommit(bool state);
-  int execute_simple_query(const char *query, int len);
 
   int read_next(uchar *buf, MYSQL_RES *result);
   int index_read_idx_with_result_set(uchar *buf, uint index,
Thread
bk commit into 5.1 tree (acurtis:1.2520) BUG#25513antony4 Jun