Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-11-20 10:49:53+01:00, tomas@stripped +6 -0
ndb handler cleanup
- move multo connect stuff into separate file
sql/Makefile.am@stripped, 2007-11-20 10:49:49+01:00, tomas@stripped +2 -0
ndb handler cleanup
- move multo connect stuff into separate file
sql/ha_ndbcluster.cc@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +22 -256
ndb handler cleanup
- move multo connect stuff into separate file
sql/ha_ndbcluster_binlog.cc@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +16 -48
ndb handler cleanup
- move multo connect stuff into separate file
sql/ha_ndbcluster_binlog.h@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +0 -2
ndb handler cleanup
- move multo connect stuff into separate file
sql/ha_ndbcluster_connection.cc@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +301 -0
New BitKeeper file ``sql/ha_ndbcluster_connection.cc''
sql/ha_ndbcluster_connection.cc@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +0 -0
sql/ha_ndbcluster_connection.h@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +37 -0
New BitKeeper file ``sql/ha_ndbcluster_connection.h''
sql/ha_ndbcluster_connection.h@stripped, 2007-11-20 10:49:50+01:00, tomas@stripped +0 -0
diff -Nrup a/sql/Makefile.am b/sql/Makefile.am
--- a/sql/Makefile.am 2007-07-04 22:06:26 +02:00
+++ b/sql/Makefile.am 2007-11-20 10:49:49 +01:00
@@ -55,6 +55,7 @@ noinst_HEADERS = item.h item_func.h item
sql_error.h field.h handler.h mysqld_suffix.h \
ha_ndbcluster.h ha_ndbcluster_cond.h \
ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
+ ha_ndbcluster_connection.h ha_ndbcluster_connection.h \
ha_partition.h rpl_constants.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
rpl_reporting.h \
@@ -122,6 +123,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.
libndb_la_CPPFLAGS= @ndbcluster_includes@
libndb_la_SOURCES= ha_ndbcluster.cc \
ha_ndbcluster_binlog.cc \
+ ha_ndbcluster_connection.cc \
ha_ndbcluster_cond.cc
gen_lex_hash_SOURCES = gen_lex_hash.cc
diff -Nrup a/sql/ha_ndbcluster.cc b/sql/ha_ndbcluster.cc
--- a/sql/ha_ndbcluster.cc 2007-11-19 14:47:14 +01:00
+++ b/sql/ha_ndbcluster.cc 2007-11-20 10:49:50 +01:00
@@ -36,6 +36,7 @@
#include "ha_ndbcluster_binlog.h"
#include "ha_ndbcluster_tables.h"
+#include "ha_ndbcluster_connection.h"
#include <mysql/plugin.h>
@@ -45,11 +46,7 @@
#endif
// options from from mysqld.cc
-extern my_bool opt_ndb_optimized_node_selection;
-extern const char *opt_ndbcluster_connectstring;
extern ulong opt_ndb_cache_check_time;
-extern ulong opt_ndb_wait_connected;
-extern ulong opt_ndb_cluster_connection_pool;
// ndb interface initialization/cleanup
#ifdef __cplusplus
@@ -132,12 +129,8 @@ static uint ndbcluster_alter_partition_f
static int ndbcluster_inited= 0;
int ndbcluster_terminating= 0;
-static Ndb* g_ndb= NULL;
-Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
-Ndb_cluster_connection **g_ndb_cluster_connection_pool= NULL;
-ulong g_ndb_cluster_connection_pool_alloc= 0;
-ulong g_ndb_cluster_connection_pool_pos= 0;
-pthread_mutex_t g_ndb_cluster_connection_pool_mutex;
+extern Ndb* g_ndb;
+
uchar g_node_id_map[max_ndb_nodes];
// Handler synchronization
@@ -168,44 +161,14 @@ ulong ndb_cache_check_time;
/* Status variables shown with 'show status like 'Ndb%' */
-struct st_ndb_status {
- st_ndb_status() { bzero(this, sizeof(struct st_ndb_status)); }
- long cluster_node_id;
- const char * connected_host;
- long connected_port;
- long number_of_replicas;
- long number_of_data_nodes;
- long number_of_ready_data_nodes;
- long connect_count;
-};
-
-static struct st_ndb_status g_ndb_status;
-
-static int update_status_variables(st_ndb_status *ns, Ndb_cluster_connection *c)
-{
- ns->connected_port= c->get_connected_port();
- ns->connected_host= c->get_connected_host();
- if (ns->cluster_node_id != (int) c->node_id())
- {
- ns->cluster_node_id= c->node_id();
- if (&g_ndb_status == ns && g_ndb_cluster_connection == c)
- sql_print_information("NDB: NodeID is %lu, management server '%s:%lu'",
- ns->cluster_node_id, ns->connected_host,
- ns->connected_port);
- }
- ns->number_of_replicas= 0;
- ns->number_of_ready_data_nodes= c->get_no_ready();
- ns->number_of_data_nodes= c->no_db_nodes();
- ns->connect_count= c->get_connect_count();
- return 0;
-}
+static struct st_ndb_connection_info g_ndb_connection_info;
SHOW_VAR ndb_status_variables[]= {
- {"cluster_node_id", (char*) &g_ndb_status.cluster_node_id, SHOW_LONG},
- {"config_from_host", (char*) &g_ndb_status.connected_host, SHOW_CHAR_PTR},
- {"config_from_port", (char*) &g_ndb_status.connected_port, SHOW_LONG},
-//{"number_of_replicas", (char*) &g_ndb_status.number_of_replicas, SHOW_LONG},
- {"number_of_data_nodes",(char*) &g_ndb_status.number_of_data_nodes, SHOW_LONG},
+ {"cluster_node_id", (char*) &g_ndb_connection_info.cluster_node_id, SHOW_LONG},
+ {"config_from_host", (char*) &g_ndb_connection_info.connected_host, SHOW_CHAR_PTR},
+ {"config_from_port", (char*) &g_ndb_connection_info.connected_port, SHOW_LONG},
+//{"number_of_replicas", (char*) &g_ndb_connection_info.number_of_replicas, SHOW_LONG},
+ {"number_of_data_nodes",(char*) &g_ndb_connection_info.number_of_data_nodes, SHOW_LONG},
{NullS, NullS, SHOW_LONG}
};
@@ -338,14 +301,7 @@ uchar *thd_ndb_share_get_key(THD_NDB_SHA
Thd_ndb::Thd_ndb()
{
- pthread_mutex_lock(&g_ndb_cluster_connection_pool_mutex);
- connection=
- g_ndb_cluster_connection_pool[g_ndb_cluster_connection_pool_pos];
- g_ndb_cluster_connection_pool_pos++;
- if (g_ndb_cluster_connection_pool_pos ==
- g_ndb_cluster_connection_pool_alloc)
- g_ndb_cluster_connection_pool_pos= 0;
- pthread_mutex_unlock(&g_ndb_cluster_connection_pool_mutex);
+ connection= ndb_get_cluster_connection();
ndb= new Ndb(connection, "");
lock_count= 0;
start_stmt_count= 0;
@@ -7758,8 +7714,8 @@ int ndbcluster_find_files(handlerton *ht
static int connect_callback()
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
- update_status_variables(&g_ndb_status,
- g_ndb_cluster_connection);
+ ndb_get_connection_info(&g_ndb_connection_info,
+ g_ndb_cluster_connection, 1);
uint node_id, i= 0;
Ndb_cluster_connection_node_iter node_iter;
@@ -7777,7 +7733,6 @@ extern pthread_mutex_t LOCK_plugin;
static int ndbcluster_init(void *p)
{
- int res;
DBUG_ENTER("ndbcluster_init");
if (ndbcluster_inited)
@@ -7827,161 +7782,10 @@ static int ndbcluster_init(void *p)
// Initialize ndb interface
ndb_init_internal();
- // Set connectstring if specified
- if (opt_ndbcluster_connectstring != 0)
- DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
- if ((g_ndb_cluster_connection=
- new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
- {
- sql_print_error("NDB: failed to allocate global "
- "ndb cluster connection object");
- DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
- opt_ndbcluster_connectstring));
- my_errno= HA_ERR_OUT_OF_MEM;
- goto ndbcluster_init_error;
- }
- {
- char buf[128];
- my_snprintf(buf, sizeof(buf), "mysqld --server-id=%lu", server_id);
- g_ndb_cluster_connection->set_name(buf);
- }
- g_ndb_cluster_connection->set_optimized_node_selection
- (opt_ndb_optimized_node_selection);
-
- // Create a Ndb object to open the connection to NDB
- if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
- {
- sql_print_error("NDB: failed to allocate global ndb object");
- DBUG_PRINT("error", ("failed to create global ndb object"));
- my_errno= HA_ERR_OUT_OF_MEM;
+ /* allocate connection resources and connect to cluster */
+ if (ndbcluster_connect(connect_callback))
goto ndbcluster_init_error;
- }
- if (g_ndb->init() != 0)
- {
- ERR_PRINT (g_ndb->getNdbError());
- goto ndbcluster_init_error;
- }
-
- /* Connect to management server */
-
- struct timeval end_time;
- gettimeofday(&end_time, 0);
- end_time.tv_sec+= opt_ndb_wait_connected;
-
- while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
- {
- struct timeval now_time;
- gettimeofday(&now_time, 0);
- if (now_time.tv_sec > end_time.tv_sec ||
- (now_time.tv_sec == end_time.tv_sec &&
- now_time.tv_usec >= end_time.tv_usec))
- break;
- sleep(1);
- }
-
- {
- g_ndb_cluster_connection_pool_alloc= opt_ndb_cluster_connection_pool;
- g_ndb_cluster_connection_pool= (Ndb_cluster_connection**)
- my_malloc(g_ndb_cluster_connection_pool_alloc *
- sizeof(Ndb_cluster_connection*),
- MYF(MY_WME | MY_ZEROFILL));
- pthread_mutex_init(&g_ndb_cluster_connection_pool_mutex,
- MY_MUTEX_INIT_FAST);
- g_ndb_cluster_connection_pool[0]= g_ndb_cluster_connection;
- for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if ((g_ndb_cluster_connection_pool[i]=
- new Ndb_cluster_connection(opt_ndbcluster_connectstring,
- g_ndb_cluster_connection)) == 0)
- {
- sql_print_error("NDB[%u]: failed to allocate cluster connect object",
- i);
- DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
- i, opt_ndbcluster_connectstring));
- goto ndbcluster_init_error;
- }
- {
- char buf[128];
- my_snprintf(buf, sizeof(buf), "mysqld --server-id=%lu (connection %u)",
- server_id, i+1);
- g_ndb_cluster_connection_pool[i]->set_name(buf);
- }
- g_ndb_cluster_connection_pool[i]->set_optimized_node_selection
- (opt_ndb_optimized_node_selection);
- }
- }
-
- if (res == 0)
- {
- connect_callback();
- for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
- {
- // not connected to mgmd yet, try again
- g_ndb_cluster_connection_pool[i]->connect(0,0,0);
- if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
- {
- sql_print_warning("NDB[%u]: starting connect thread", i);
- g_ndb_cluster_connection_pool[i]->start_connect_thread();
- continue;
- }
- }
- DBUG_PRINT("info",
- ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
- g_ndb_cluster_connection_pool[i]->get_connected_host(),
- g_ndb_cluster_connection_pool[i]->get_connected_port()));
- struct timeval now_time;
- gettimeofday(&now_time, 0);
- ulong wait_until_ready_time = (end_time.tv_sec > now_time.tv_sec) ?
- end_time.tv_sec - now_time.tv_sec : 1;
- res= g_ndb_cluster_connection_pool[i]->
- wait_until_ready(wait_until_ready_time,3);
- if (res == 0)
- {
- sql_print_information("NDB[%u]: all storage nodes connected", i);
- }
- else if (res > 0)
- {
- sql_print_information("NDB[%u]: some storage nodes connected", i);
- }
- else if (res < 0)
- {
- sql_print_information("NDB[%u]: no storage nodes connected (timed out)", i);
- }
- }
- }
- else if (res == 1)
- {
- for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if (g_ndb_cluster_connection_pool[i]->
- start_connect_thread(i == 0 ? connect_callback : NULL))
- {
- sql_print_error("NDB[%u]: failed to start connect thread", i);
- DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
- goto ndbcluster_init_error;
- }
- }
-#ifndef DBUG_OFF
- {
- char buf[1024];
- DBUG_PRINT("info",
- ("NDBCLUSTER storage engine not started, "
- "will connect using %s",
- g_ndb_cluster_connection->
- get_connectstring(buf,sizeof(buf))));
- }
-#endif
- }
- else
- {
- DBUG_ASSERT(res == -1);
- DBUG_PRINT("error", ("permanent error"));
- goto ndbcluster_init_error;
- }
-
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
#ifdef HAVE_NDB_BINLOG
@@ -8027,28 +7831,8 @@ static int ndbcluster_init(void *p)
DBUG_RETURN(FALSE);
ndbcluster_init_error:
- if (g_ndb)
- delete g_ndb;
- g_ndb= NULL;
- {
- if (g_ndb_cluster_connection_pool)
- {
- /* first in pool is the main one, wait with release */
- for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if (g_ndb_cluster_connection_pool[i])
- delete g_ndb_cluster_connection_pool[i];
- }
- my_free((uchar*) g_ndb_cluster_connection_pool, MYF(MY_ALLOW_ZERO_PTR));
- pthread_mutex_destroy(&g_ndb_cluster_connection_pool_mutex);
- g_ndb_cluster_connection_pool= 0;
- }
- g_ndb_cluster_connection_pool_alloc= 0;
- g_ndb_cluster_connection_pool_pos= 0;
- }
- if (g_ndb_cluster_connection)
- delete g_ndb_cluster_connection;
- g_ndb_cluster_connection= NULL;
+ /* disconnect from cluster and free connection resources */
+ ndbcluster_disconnect();
ndbcluster_hton->state= SHOW_OPTION_DISABLED; // If we couldn't use handler
pthread_mutex_lock(&LOCK_plugin);
@@ -8107,27 +7891,8 @@ static int ndbcluster_end(handlerton *ht
(leaked == 1)?"has":"have");
}
#endif
- delete g_ndb;
- g_ndb= NULL;
- }
- {
- if (g_ndb_cluster_connection_pool)
- {
- /* first in pool is the main one, wait with release */
- for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if (g_ndb_cluster_connection_pool[i])
- delete g_ndb_cluster_connection_pool[i];
- }
- my_free((uchar*) g_ndb_cluster_connection_pool, MYF(MY_ALLOW_ZERO_PTR));
- pthread_mutex_destroy(&g_ndb_cluster_connection_pool_mutex);
- g_ndb_cluster_connection_pool= 0;
- }
- g_ndb_cluster_connection_pool_alloc= 0;
- g_ndb_cluster_connection_pool_pos= 0;
}
- delete g_ndb_cluster_connection;
- g_ndb_cluster_connection= NULL;
+ ndbcluster_disconnect();
// cleanup ndb interface
ndb_end_internal();
@@ -9988,7 +9753,8 @@ pthread_handler_t ndb_util_thread_func(v
Wait for cluster to start
*/
pthread_mutex_lock(&LOCK_ndb_util_thread);
- while (!g_ndb_status.cluster_node_id && (ndbcluster_hton->slot != ~(uint)0))
+ while (!g_ndb_connection_info.cluster_node_id &&
+ (ndbcluster_hton->slot != ~(uint)0))
{
/* ndb not connected yet */
pthread_cond_wait(&COND_ndb_util_thread, &LOCK_ndb_util_thread);
@@ -10264,11 +10030,11 @@ ndbcluster_show_status(handlerton *hton,
}
Ndb* ndb= check_ndb_in_thd(thd);
- struct st_ndb_status ns;
+ struct st_ndb_connection_info ns;
if (ndb)
- update_status_variables(&ns, get_thd_ndb(thd)->connection);
+ ndb_get_connection_info(&ns, get_thd_ndb(thd)->connection);
else
- update_status_variables(&ns, g_ndb_cluster_connection);
+ ndb_get_connection_info(&ns, g_ndb_cluster_connection);
buflen=
my_snprintf(buf, sizeof(buf),
diff -Nrup a/sql/ha_ndbcluster_binlog.cc b/sql/ha_ndbcluster_binlog.cc
--- a/sql/ha_ndbcluster_binlog.cc 2007-11-16 08:42:46 +01:00
+++ b/sql/ha_ndbcluster_binlog.cc 2007-11-20 10:49:50 +01:00
@@ -18,14 +18,15 @@
#include "sql_show.h"
#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
#include "ha_ndbcluster.h"
+#include "ha_ndbcluster_connection.h"
#ifdef HAVE_NDB_BINLOG
#include "rpl_injector.h"
#include "rpl_filter.h"
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
-#include "NdbDictionary.hpp"
-#include "ndb_cluster_connection.hpp"
+#include <ndbapi/NdbDictionary.hpp>
+#include <ndbapi/ndb_cluster_connection.hpp>
#include <util/NdbAutoPtr.hpp>
#ifdef ndb_dynamite
@@ -149,39 +150,6 @@ static TABLE_LIST binlog_tables;
Helper functions
*/
-static ulonglong get_latest_trans_gci()
-{
- unsigned i;
- ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
- for (i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- ulonglong tmp= *g_ndb_cluster_connection_pool[i]->get_latest_trans_gci();
- if (tmp > val)
- val= tmp;
- }
- return val;
-}
-
-static void set_latest_trans_gci(ulonglong val)
-{
- unsigned i;
- for (i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- *g_ndb_cluster_connection_pool[i]->get_latest_trans_gci()= val;
- }
-}
-
-static int has_node_id(uint id)
-{
- unsigned i;
- for (i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
- {
- if (id == g_ndb_cluster_connection_pool[i]->node_id())
- return 1;
- }
- return 0;
-}
-
#ifndef DBUG_OFF
/* purecov: begin deadcode */
static void print_records(TABLE *table, const uchar *record)
@@ -494,7 +462,7 @@ static void ndbcluster_binlog_wait(THD *
{
DBUG_ENTER("ndbcluster_binlog_wait");
const char *save_info= thd ? thd->proc_info : 0;
- ulonglong wait_epoch= get_latest_trans_gci();
+ ulonglong wait_epoch= ndb_get_latest_trans_gci();
int count= 30;
if (thd)
thd->proc_info= "Waiting for ndbcluster binlog update to "
@@ -1647,7 +1615,7 @@ ndb_handle_schema_change(THD *thd, Ndb *
const char *tabname= table_share->table_name.str;
const char *dbname= table_share->db.str;
bool do_close_cached_tables= FALSE;
- bool is_remote_change= !has_node_id(pOp->getReqNodeId());
+ bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
if (pOp->getEventType() == NDBEVENT::TE_ALTER)
{
@@ -4408,7 +4376,7 @@ restart:
(uint)(ndb_latest_handled_binlog_epoch),
(uint)(schema_gci >> 32),
(uint)(schema_gci));
- set_latest_trans_gci(0);
+ ndb_set_latest_trans_gci(0);
ndb_latest_handled_binlog_epoch= 0;
ndb_latest_applied_binlog_epoch= 0;
ndb_latest_received_binlog_epoch= 0;
@@ -4443,7 +4411,7 @@ restart:
do_ndbcluster_binlog_close_connection= BCCC_running;
for ( ; !((ndbcluster_binlog_terminating ||
do_ndbcluster_binlog_close_connection) &&
- ndb_latest_handled_binlog_epoch >= get_latest_trans_gci()) &&
+ ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
do_ndbcluster_binlog_close_connection != BCCC_restart; )
{
#ifndef DBUG_OFF
@@ -4455,8 +4423,8 @@ restart:
do_ndbcluster_binlog_close_connection,
(uint)(ndb_latest_handled_binlog_epoch >> 32),
(uint)(ndb_latest_handled_binlog_epoch),
- (uint)(get_latest_trans_gci() >> 32),
- (uint)(get_latest_trans_gci())));
+ (uint)(ndb_get_latest_trans_gci() >> 32),
+ (uint)(ndb_get_latest_trans_gci())));
}
#endif
#ifdef RUN_NDB_BINLOG_TIMER
@@ -4508,7 +4476,7 @@ restart:
if ((ndbcluster_binlog_terminating ||
do_ndbcluster_binlog_close_connection) &&
- (ndb_latest_handled_binlog_epoch >= get_latest_trans_gci() ||
+ (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
!ndb_binlog_running))
break; /* Shutting down server */
@@ -4558,12 +4526,12 @@ restart:
{
DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
do_ndbcluster_binlog_close_connection= BCCC_restart;
- if (ndb_latest_received_binlog_epoch < get_latest_trans_gci() && ndb_binlog_running)
+ if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
{
sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
"as latest received epoch is %u/%u",
- (uint)(get_latest_trans_gci() >> 32),
- (uint)(get_latest_trans_gci()),
+ (uint)(ndb_get_latest_trans_gci() >> 32),
+ (uint)(ndb_get_latest_trans_gci()),
(uint)(ndb_latest_received_binlog_epoch >> 32),
(uint)(ndb_latest_received_binlog_epoch));
}
@@ -4783,11 +4751,11 @@ restart:
{
DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
do_ndbcluster_binlog_close_connection= BCCC_restart;
- if (ndb_latest_received_binlog_epoch < get_latest_trans_gci() && ndb_binlog_running)
+ if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
{
sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
"as latest received epoch is %lu",
- (ulong) get_latest_trans_gci(),
+ (ulong) ndb_get_latest_trans_gci(),
(ulong) ndb_latest_received_binlog_epoch);
}
}
@@ -4995,7 +4963,7 @@ ndbcluster_show_status_binlog(THD* thd,
"latest_handled_binlog_epoch=%s, "
"latest_applied_binlog_epoch=%s",
llstr(ndb_latest_epoch, buff1),
- llstr(get_latest_trans_gci(), buff2),
+ llstr(ndb_get_latest_trans_gci(), buff2),
llstr(ndb_latest_received_binlog_epoch, buff3),
llstr(ndb_latest_handled_binlog_epoch, buff4),
llstr(ndb_latest_applied_binlog_epoch, buff5));
diff -Nrup a/sql/ha_ndbcluster_binlog.h b/sql/ha_ndbcluster_binlog.h
--- a/sql/ha_ndbcluster_binlog.h 2007-11-16 08:42:46 +01:00
+++ b/sql/ha_ndbcluster_binlog.h 2007-11-20 10:49:50 +01:00
@@ -148,8 +148,6 @@ extern int ndbcluster_util_inited;
extern pthread_mutex_t ndbcluster_mutex;
extern HASH ndbcluster_open_tables;
extern Ndb_cluster_connection* g_ndb_cluster_connection;
-extern Ndb_cluster_connection **g_ndb_cluster_connection_pool;
-extern ulong g_ndb_cluster_connection_pool_alloc;
extern long ndb_number_of_storage_nodes;
/*
diff -Nrup a/sql/ha_ndbcluster_connection.cc b/sql/ha_ndbcluster_connection.cc
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/sql/ha_ndbcluster_connection.cc 2007-11-20 10:49:50 +01:00
@@ -0,0 +1,301 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#ifdef USE_PRAGMA_IMPLEMENTATION
+#pragma implementation // gcc: Class implementation
+#endif
+
+#include "mysql_priv.h"
+
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
+#include <ndbapi/NdbApi.hpp>
+#include "ha_ndbcluster_connection.h"
+
+/* options from from mysqld.cc */
+extern my_bool opt_ndb_optimized_node_selection;
+extern const char *opt_ndbcluster_connectstring;
+extern ulong opt_ndb_wait_connected;
+extern ulong opt_ndb_cluster_connection_pool;
+
+Ndb* g_ndb= NULL;
+Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+static Ndb_cluster_connection **g_ndb_cluster_connection_pool= NULL;
+static ulong g_ndb_cluster_connection_pool_alloc= 0;
+static ulong g_ndb_cluster_connection_pool_pos= 0;
+static pthread_mutex_t g_ndb_cluster_connection_pool_mutex;
+
+int ndbcluster_connect(int (*connect_callback)(void))
+{
+ int res;
+ DBUG_ENTER("ndbcluster_connect");
+ // Set connectstring if specified
+ if (opt_ndbcluster_connectstring != 0)
+ DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
+ if ((g_ndb_cluster_connection=
+ new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
+ {
+ sql_print_error("NDB: failed to allocate global "
+ "ndb cluster connection object");
+ DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
+ opt_ndbcluster_connectstring));
+ my_errno= HA_ERR_OUT_OF_MEM;
+ goto ndbcluster_connect_error;
+ }
+ {
+ char buf[128];
+ my_snprintf(buf, sizeof(buf), "mysqld --server-id=%lu", server_id);
+ g_ndb_cluster_connection->set_name(buf);
+ }
+ g_ndb_cluster_connection->set_optimized_node_selection
+ (opt_ndb_optimized_node_selection);
+
+ // Create a Ndb object to open the connection to NDB
+ if ( (g_ndb= new Ndb(g_ndb_cluster_connection, "sys")) == 0 )
+ {
+ sql_print_error("NDB: failed to allocate global ndb object");
+ DBUG_PRINT("error", ("failed to create global ndb object"));
+ my_errno= HA_ERR_OUT_OF_MEM;
+ goto ndbcluster_connect_error;
+ }
+ if (g_ndb->init() != 0)
+ {
+ DBUG_PRINT("error", ("%d message: %s",
+ g_ndb->getNdbError().code,
+ g_ndb->getNdbError().message));
+ goto ndbcluster_connect_error;
+ }
+
+ /* Connect to management server */
+
+ struct timeval end_time;
+ gettimeofday(&end_time, 0);
+ end_time.tv_sec+= opt_ndb_wait_connected;
+
+ while ((res= g_ndb_cluster_connection->connect(0,0,0)) == 1)
+ {
+ struct timeval now_time;
+ gettimeofday(&now_time, 0);
+ if (now_time.tv_sec > end_time.tv_sec ||
+ (now_time.tv_sec == end_time.tv_sec &&
+ now_time.tv_usec >= end_time.tv_usec))
+ break;
+ sleep(1);
+ }
+
+ {
+ g_ndb_cluster_connection_pool_alloc= opt_ndb_cluster_connection_pool;
+ g_ndb_cluster_connection_pool= (Ndb_cluster_connection**)
+ my_malloc(g_ndb_cluster_connection_pool_alloc *
+ sizeof(Ndb_cluster_connection*),
+ MYF(MY_WME | MY_ZEROFILL));
+ pthread_mutex_init(&g_ndb_cluster_connection_pool_mutex,
+ MY_MUTEX_INIT_FAST);
+ g_ndb_cluster_connection_pool[0]= g_ndb_cluster_connection;
+ for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ if ((g_ndb_cluster_connection_pool[i]=
+ new Ndb_cluster_connection(opt_ndbcluster_connectstring,
+ g_ndb_cluster_connection)) == 0)
+ {
+ sql_print_error("NDB[%u]: failed to allocate cluster connect object",
+ i);
+ DBUG_PRINT("error",("Ndb_cluster_connection[%u](%s)",
+ i, opt_ndbcluster_connectstring));
+ goto ndbcluster_connect_error;
+ }
+ {
+ char buf[128];
+ my_snprintf(buf, sizeof(buf), "mysqld --server-id=%lu (connection %u)",
+ server_id, i+1);
+ g_ndb_cluster_connection_pool[i]->set_name(buf);
+ }
+ g_ndb_cluster_connection_pool[i]->set_optimized_node_selection
+ (opt_ndb_optimized_node_selection);
+ }
+ }
+
+ if (res == 0)
+ {
+ connect_callback();
+ for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
+ {
+ // not connected to mgmd yet, try again
+ g_ndb_cluster_connection_pool[i]->connect(0,0,0);
+ if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
+ {
+ sql_print_warning("NDB[%u]: starting connect thread", i);
+ g_ndb_cluster_connection_pool[i]->start_connect_thread();
+ continue;
+ }
+ }
+ DBUG_PRINT("info",
+ ("NDBCLUSTER storage engine (%u) at %s on port %d", i,
+ g_ndb_cluster_connection_pool[i]->get_connected_host(),
+ g_ndb_cluster_connection_pool[i]->get_connected_port()));
+
+ struct timeval now_time;
+ gettimeofday(&now_time, 0);
+ ulong wait_until_ready_time = (end_time.tv_sec > now_time.tv_sec) ?
+ end_time.tv_sec - now_time.tv_sec : 1;
+ res= g_ndb_cluster_connection_pool[i]->
+ wait_until_ready(wait_until_ready_time,3);
+ if (res == 0)
+ {
+ sql_print_information("NDB[%u]: all storage nodes connected", i);
+ }
+ else if (res > 0)
+ {
+ sql_print_information("NDB[%u]: some storage nodes connected", i);
+ }
+ else if (res < 0)
+ {
+ sql_print_information("NDB[%u]: no storage nodes connected (timed out)", i);
+ }
+ }
+ }
+ else if (res == 1)
+ {
+ for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ if (g_ndb_cluster_connection_pool[i]->
+ start_connect_thread(i == 0 ? connect_callback : NULL))
+ {
+ sql_print_error("NDB[%u]: failed to start connect thread", i);
+ DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
+ goto ndbcluster_connect_error;
+ }
+ }
+#ifndef DBUG_OFF
+ {
+ char buf[1024];
+ DBUG_PRINT("info",
+ ("NDBCLUSTER storage engine not started, "
+ "will connect using %s",
+ g_ndb_cluster_connection->
+ get_connectstring(buf,sizeof(buf))));
+ }
+#endif
+ }
+ else
+ {
+ DBUG_ASSERT(res == -1);
+ DBUG_PRINT("error", ("permanent error"));
+ goto ndbcluster_connect_error;
+ }
+ DBUG_RETURN(0);
+ndbcluster_connect_error:
+ DBUG_RETURN(-1);
+}
+
+int ndbcluster_disconnect()
+{
+ DBUG_ENTER("ndbcluster_disconnect");
+ if (g_ndb)
+ delete g_ndb;
+ g_ndb= NULL;
+ {
+ if (g_ndb_cluster_connection_pool)
+ {
+ /* first in pool is the main one, wait with release */
+ for (unsigned i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ if (g_ndb_cluster_connection_pool[i])
+ delete g_ndb_cluster_connection_pool[i];
+ }
+ my_free((uchar*) g_ndb_cluster_connection_pool, MYF(MY_ALLOW_ZERO_PTR));
+ pthread_mutex_destroy(&g_ndb_cluster_connection_pool_mutex);
+ g_ndb_cluster_connection_pool= 0;
+ }
+ g_ndb_cluster_connection_pool_alloc= 0;
+ g_ndb_cluster_connection_pool_pos= 0;
+ }
+ if (g_ndb_cluster_connection)
+ delete g_ndb_cluster_connection;
+ g_ndb_cluster_connection= NULL;
+ DBUG_RETURN(0);
+}
+
+Ndb_cluster_connection *ndb_get_cluster_connection()
+{
+ pthread_mutex_lock(&g_ndb_cluster_connection_pool_mutex);
+ Ndb_cluster_connection *connection=
+ g_ndb_cluster_connection_pool[g_ndb_cluster_connection_pool_pos];
+ g_ndb_cluster_connection_pool_pos++;
+ if (g_ndb_cluster_connection_pool_pos ==
+ g_ndb_cluster_connection_pool_alloc)
+ g_ndb_cluster_connection_pool_pos= 0;
+ pthread_mutex_unlock(&g_ndb_cluster_connection_pool_mutex);
+ return connection;
+}
+
+ulonglong ndb_get_latest_trans_gci()
+{
+ unsigned i;
+ ulonglong val= *g_ndb_cluster_connection->get_latest_trans_gci();
+ for (i= 1; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ ulonglong tmp= *g_ndb_cluster_connection_pool[i]->get_latest_trans_gci();
+ if (tmp > val)
+ val= tmp;
+ }
+ return val;
+}
+
+void ndb_set_latest_trans_gci(ulonglong val)
+{
+ unsigned i;
+ for (i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ *g_ndb_cluster_connection_pool[i]->get_latest_trans_gci()= val;
+ }
+}
+
+int ndb_has_node_id(uint id)
+{
+ unsigned i;
+ for (i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
+ {
+ if (id == g_ndb_cluster_connection_pool[i]->node_id())
+ return 1;
+ }
+ return 0;
+}
+
+int ndb_get_connection_info(st_ndb_connection_info *ns,
+ Ndb_cluster_connection *c,
+ int report_info)
+{
+ ns->connected_port= c->get_connected_port();
+ ns->connected_host= c->get_connected_host();
+ if (report_info && ns->cluster_node_id != (int) c->node_id())
+ {
+ ns->cluster_node_id= c->node_id();
+ sql_print_information("NDB: NodeID is %lu, management server '%s:%lu'",
+ ns->cluster_node_id, ns->connected_host,
+ ns->connected_port);
+ }
+ else
+ ns->cluster_node_id= c->node_id();
+ ns->number_of_replicas= 0;
+ ns->number_of_ready_data_nodes= c->get_no_ready();
+ ns->number_of_data_nodes= c->no_db_nodes();
+ ns->connect_count= c->get_connect_count();
+ return 0;
+}
+
+#endif /* WITH_NDBCLUSTER_STORAGE_ENGINE */
diff -Nrup a/sql/ha_ndbcluster_connection.h b/sql/ha_ndbcluster_connection.h
--- /dev/null Wed Dec 31 16:00:00 196900
+++ b/sql/ha_ndbcluster_connection.h 2007-11-20 10:49:50 +01:00
@@ -0,0 +1,37 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+int ndbcluster_connect(int (*connect_callback)(void));
+int ndbcluster_disconnect();
+
+Ndb_cluster_connection *ndb_get_cluster_connection();
+ulonglong ndb_get_latest_trans_gci();
+void ndb_set_latest_trans_gci(ulonglong val);
+int ndb_has_node_id(uint id);
+
+struct st_ndb_connection_info {
+ st_ndb_connection_info() { bzero(this, sizeof(struct st_ndb_connection_info)); }
+ long cluster_node_id;
+ const char * connected_host;
+ long connected_port;
+ long number_of_replicas;
+ long number_of_data_nodes;
+ long number_of_ready_data_nodes;
+ long connect_count;
+};
+
+int ndb_get_connection_info(st_ndb_connection_info *ns,
+ Ndb_cluster_connection *c,
+ int report_info= 0);
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.2689) | tomas | 20 Nov |