#At file:///home/tomas/mysql_src/mysql-6.0-ndb/
2691 Tomas Ulin 2008-12-10 [merge]
merge and use new Ed_connection in run_query
modified:
sql/ha_ndbcluster_binlog.cc
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2008-12-09 18:59:54 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2008-12-10 13:28:23 +0000
@@ -24,6 +24,7 @@
#include "rpl_injector.h"
#include "rpl_filter.h"
#include "slave.h"
+#include "sql_prepare.h"
#include "ha_ndbcluster_binlog.h"
#include <ndbapi/NdbDictionary.hpp>
#include <ndbapi/ndb_cluster_connection.hpp>
@@ -245,78 +246,58 @@ static void dbug_print_table(const char
- purging the ndb_binlog_index
- creating the ndb_apply_status table
*/
+static void copy_warnings(THD *thd, List<MYSQL_ERROR> *src)
+{
+ List_iterator_fast<MYSQL_ERROR> err_it(*src);
+ MYSQL_ERROR *err;
+
+ while ((err= err_it++))
+ push_warning(thd, err->level, err->code, err->msg);
+}
static void run_query(THD *thd, char *buf, char *end,
- const int *no_print_error, my_bool disable_binlog)
+ const int *no_print_error, my_bool disable_binlog,
+ my_bool reset_error)
{
- ulong save_thd_query_length= thd->query_length;
- char *save_thd_query= thd->query;
- ulong save_thread_id= thd->variables.pseudo_thread_id;
- struct system_status_var save_thd_status_var= thd->status_var;
- THD_TRANS save_thd_transaction_all= thd->transaction.all;
- THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
ulonglong save_thd_options= thd->options;
DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->options));
- NET save_thd_net= thd->net;
- const char* found_semicolon= NULL;
- bzero((char*) &thd->net, sizeof(NET));
- thd->query_length= end - buf;
- thd->query= buf;
- thd->variables.pseudo_thread_id= thread_id;
- thd->transaction.stmt.modified_non_trans_table= FALSE;
+ LEX_STRING query= {buf, end - buf};
if (disable_binlog)
thd->options&= ~OPTION_BIN_LOG;
-
- DBUG_PRINT("query", ("%s", thd->query));
+
+ DBUG_PRINT("query", ("%s", query.str));
DBUG_ASSERT(!thd->in_sub_stmt);
DBUG_ASSERT(!thd->locked_tables_mode);
- mysql_parse(thd, thd->query, thd->query_length, &found_semicolon);
-
- if (no_print_error && thd->is_slave_error)
{
- int i;
- Thd_ndb *thd_ndb= get_thd_ndb(thd);
- for (i= 0; no_print_error[i]; i++)
- if ((thd_ndb->m_error_code == no_print_error[i]) ||
- (thd->stmt_da->sql_errno() == (unsigned) no_print_error[i]))
- break;
- if (!no_print_error[i])
- sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
- buf,
- thd->stmt_da->message(),
- thd->stmt_da->sql_errno(),
- thd_ndb->m_error_code,
- (int) thd->is_error(), thd->is_slave_error);
- }
+ Ed_connection con(thd);
+ bool res= con.execute_direct(query);
- /*
- After executing statement we should unlock and close tables open
- by it as well as release meta-data locks obtained by it.
- */
- close_thread_tables(thd);
+ if (no_print_error && res)
+ {
+ int i;
+ Thd_ndb *thd_ndb= get_thd_ndb(thd);
+ for (i= 0; no_print_error[i]; i++)
+ if ((thd_ndb->m_error_code == no_print_error[i]) ||
+ (con.get_last_errno() == (unsigned)no_print_error[i]))
+ break;
+ if (!no_print_error[i])
+ sql_print_error("NDB: %s: error %s %d(ndb: %d)",
+ buf,
+ con.get_last_error(),
+ con.get_last_errno(),
+ thd_ndb->m_error_code);
+ }
- /*
- XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
- can not be called from within a statement, and
- run_query() can be called from anywhere, including from within
- a sub-statement.
- This particular reset is a temporary hack to avoid an assert
- for double assignment of the diagnostics area when run_query()
- is called from ndbcluster_reset_logs(), which is called from
- mysql_flush().
- */
- thd->stmt_da->reset_diagnostics_area();
+ if (res && !reset_error)
+ {
+ copy_warnings(thd, con.get_warn_list());
+ my_message(con.get_last_errno(), con.get_last_error(), MYF(ME_NO_WARNING_FOR_ERROR));
+ }
+ }
thd->options= save_thd_options;
- thd->query_length= save_thd_query_length;
- thd->query= save_thd_query;
- thd->variables.pseudo_thread_id= save_thread_id;
- thd->status_var= save_thd_status_var;
- thd->transaction.all= save_thd_transaction_all;
- thd->transaction.stmt= save_thd_transaction_stmt;
- thd->net= save_thd_net;
}
static void
@@ -531,7 +512,26 @@ static int ndbcluster_reset_logs(THD *th
char buf[1024];
char *end= strmov(buf, "TRUNCATE " NDB_REP_DB "." NDB_REP_TABLE);
- run_query(thd, buf, end, NULL, TRUE);
+ run_query(thd, buf, end, NULL, TRUE, FALSE);
+
+ /*
+ Calling function only expects and handles error cases,
+ so reset state if not an error as not to hit asserts
+ in upper layers
+ */
+ while (thd->stmt_da->is_error())
+ {
+ if (thd->stmt_da->sql_errno() == ER_NO_SUCH_TABLE)
+ {
+ /*
+ If table does not exist ignore the error as it
+ is a consistant behavior
+ */
+ break;
+ }
+ DBUG_RETURN(1);
+ }
+ thd->stmt_da->reset_diagnostics_area();
DBUG_RETURN(0);
}
@@ -556,7 +556,16 @@ ndbcluster_binlog_index_purge_file(THD *
NDB_REP_DB "." NDB_REP_TABLE
" WHERE File='"), file), "'");
- run_query(thd, buf, end, NULL, TRUE);
+ run_query(thd, buf, end, NULL, TRUE, FALSE);
+ if (thd->stmt_da->is_error() &&
+ thd->stmt_da->sql_errno() == ER_NO_SUCH_TABLE)
+ {
+ /*
+ If table does not exist ignore the error as it
+ is a consistant behavior
+ */
+ thd->stmt_da->reset_diagnostics_area();
+ }
DBUG_RETURN(0);
}
@@ -673,16 +682,16 @@ static void ndbcluster_reset_slave(THD *
DBUG_ENTER("ndbcluster_reset_slave");
char buf[1024];
char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
- run_query(thd, buf, end, NULL, TRUE);
- if (thd->main_da.is_error() &&
- ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE) ||
- (thd->main_da.sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
+ run_query(thd, buf, end, NULL, TRUE, FALSE);
+ if (thd->stmt_da->is_error() &&
+ ((thd->stmt_da->sql_errno() == ER_NO_SUCH_TABLE) ||
+ (thd->stmt_da->sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
{
/*
If table does not exist ignore the error as it
is a consistant behavior
*/
- thd->main_da.reset_diagnostics_area();
+ thd->stmt_da->reset_diagnostics_area();
/*
ndbcluster_silent
- avoid "no table mysql.ndb_apply_status" warning - ER_NO_SUCH_TABLE
@@ -1052,7 +1061,7 @@ static int ndbcluster_create_ndb_apply_s
end= strmov(buf, "FLUSH TABLE " NDB_REP_DB "." NDB_APPLY_TABLE);
const int no_print_error[1]= {0};
- run_query(thd, buf, end, no_print_error, TRUE);
+ run_query(thd, buf, end, no_print_error, TRUE, TRUE);
}
}
@@ -1074,7 +1083,7 @@ static int ndbcluster_create_ndb_apply_s
721, // Table already exist
4009,
0}; // do not print error 701 etc
- run_query(thd, buf, end, no_print_error, TRUE);
+ run_query(thd, buf, end, no_print_error, TRUE, TRUE);
DBUG_RETURN(0);
}
@@ -1125,7 +1134,7 @@ static int ndbcluster_create_schema_tabl
end= strmov(buf, "FLUSH TABLE " NDB_REP_DB "." NDB_SCHEMA_TABLE);
const int no_print_error[1]= {0};
- run_query(thd, buf, end, no_print_error, TRUE);
+ run_query(thd, buf, end, no_print_error, TRUE, TRUE);
}
}
@@ -1151,7 +1160,7 @@ static int ndbcluster_create_schema_tabl
721, // Table already exist
4009,
0}; // do not print error 701 etc
- run_query(thd, buf, end, no_print_error, TRUE);
+ run_query(thd, buf, end, no_print_error, TRUE, TRUE);
DBUG_RETURN(0);
}
@@ -1268,9 +1277,8 @@ static int ndbcluster_find_all_databases
const int no_print_error[1]= {0};
run_query(thd, query, query + query_length,
no_print_error, /* print error */
- TRUE); /* don't binlog the query */
- /* always reset here */
- thd->main_da.reset_diagnostics_area();
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
}
}
else if (strncasecmp("ALTER", query, 5) == 0)
@@ -1284,13 +1292,12 @@ static int ndbcluster_find_all_databases
name_len= my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
run_query(thd, name, name + name_len,
no_print_error, /* print error */
- TRUE); /* don't binlog the query */
- thd->main_da.reset_diagnostics_area();
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
run_query(thd, query, query + query_length,
no_print_error, /* print error */
- TRUE); /* don't binlog the query */
- /* always reset here */
- thd->main_da.reset_diagnostics_area();
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
}
}
else if (strncasecmp("DROP", query, 4) == 0)
@@ -2330,13 +2337,13 @@ ndb_binlog_thread_handle_schema_event(TH
if (! ndbcluster_check_if_local_table(schema->db, schema->name))
{
thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
- const int no_print_error[1]=
- {ER_BAD_TABLE_ERROR}; /* ignore missing table */
+ const int no_print_error[2]=
+ {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
run_query(thd, schema->query,
schema->query + schema->query_length,
no_print_error, // /* don't print error */
- TRUE); // /* don't binlog the query */
-
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
/* binlog dropping table after any table operations */
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
@@ -2424,7 +2431,8 @@ ndb_binlog_thread_handle_schema_event(TH
run_query(thd, schema->query,
schema->query + schema->query_length,
no_print_error, /* print error */
- TRUE); /* don't binlog the query */
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
/* binlog dropping database after any table operations */
post_epoch_log_list->push_back(schema, mem_root);
/* acknowledge this query _after_ epoch completion */
@@ -2452,7 +2460,8 @@ ndb_binlog_thread_handle_schema_event(TH
run_query(thd, schema->query,
schema->query + schema->query_length,
no_print_error, /* print error */
- TRUE); /* don't binlog the query */
+ TRUE, /* don't binlog the query */
+ TRUE); /* reset error */
log_query= 1;
break;
}