Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1931 05/11/06 00:20:37 tomas@stripped[tomas] +13 -0
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
sql/mysqld.cc
1.485 05/11/06 00:20:24 tomas@stripped[tomas] +11 -0
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
sql/mysql_priv.h
1.340 05/11/06 00:20:23 tomas@stripped[tomas] +3 -0
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
sql/ha_ndbcluster.h
1.97 05/11/06 00:20:23 tomas@stripped[tomas] +30 -4
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
sql/ha_ndbcluster.cc
1.214 05/11/06 00:20:23 tomas@stripped[tomas] +673 -212
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/t/ndb_read_multi_range.test
1.5 05/11/06 00:20:23 tomas@stripped[tomas] +1 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/t/ndb_partition_error.test
1.3 05/11/06 00:20:22 tomas@stripped[tomas] +1 -0
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/t/ndb_multi.test
1.6 05/11/06 00:20:22 tomas@stripped[tomas] +2 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/t/ndb_bitfield.test
1.5 05/11/06 00:20:22 tomas@stripped[tomas] +2 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/r/ndb_read_multi_range.result
1.4 05/11/06 00:20:22 tomas@stripped[tomas] +1 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/r/ndb_partition_error.result
1.6 05/11/06 00:20:22 tomas@stripped[tomas] +5 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/r/ndb_multi.result
1.5 05/11/06 00:20:22 tomas@stripped[tomas] +6 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/r/ndb_bitfield.result
1.6 05/11/06 00:20:22 tomas@stripped[tomas] +10 -2
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
mysql-test/r/ndb_autodiscover2.result
1.5 05/11/06 00:20:22 tomas@stripped[tomas] +1 -1
Ndb handler cleanup:
- removed some returns on ndb internal error codes, return ndb cause in warnings
- moved all errorcode mapping mysqld-ndb to ndberror.c
- ndb util thread to discover all ndb tables at startup
- ndb util thread to wait for mysqld startup
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/mysql-5.1-new
--- 1.339/sql/mysql_priv.h 2005-11-05 12:20:31 +01:00
+++ 1.340/sql/mysql_priv.h 2005-11-06 00:20:23 +01:00
@@ -1224,6 +1224,9 @@
#ifdef HAVE_OPENSSL
extern pthread_mutex_t LOCK_des_key_file;
#endif
+extern pthread_mutex_t LOCK_server_started;
+extern pthread_cond_t COND_server_started;
+extern int mysqld_server_started;
extern rw_lock_t LOCK_grant, LOCK_sys_init_connect, LOCK_sys_init_slave;
extern pthread_cond_t COND_refresh, COND_thread_count, COND_manager;
extern pthread_attr_t connection_attrib;
--- 1.484/sql/mysqld.cc 2005-11-04 21:09:55 +01:00
+++ 1.485/sql/mysqld.cc 2005-11-06 00:20:24 +01:00
@@ -501,6 +501,10 @@
pthread_cond_t COND_refresh,COND_thread_count;
pthread_t signal_thread;
pthread_attr_t connection_attrib;
+pthread_mutex_t LOCK_server_started;
+pthread_cond_t COND_server_started;
+
+int mysqld_server_started= 0;
/* replication parameters, if master_host is not NULL, we are a slave */
uint master_port= MYSQL_PORT, master_connect_retry = 60;
@@ -2765,6 +2769,8 @@
(void) pthread_mutex_init(&LOCK_rpl_status, MY_MUTEX_INIT_FAST);
(void) pthread_cond_init(&COND_rpl_status, NULL);
#endif
+ (void) pthread_mutex_init(&LOCK_server_started, MY_MUTEX_INIT_FAST);
+ (void) pthread_cond_init(&COND_server_started,NULL);
sp_cache_init();
/* Parameter for threads created for connections */
(void) pthread_attr_init(&connection_attrib);
@@ -3450,6 +3456,10 @@
mysqld_port,
MYSQL_COMPILATION_COMMENT);
+ // Signal threads waiting for server to be started
+ mysqld_server_started= 1;
+ pthread_cond_signal(&COND_server_started);
+
#if defined(__NT__) || defined(HAVE_SMEM)
handle_connections_methods();
#else
@@ -3497,6 +3507,7 @@
CloseHandle(hEventShutdown);
}
#endif
+ clean_up(1);
wait_for_signal_thread_to_end();
clean_up_mutexes();
my_end(opt_endinfo ? MY_CHECK_ERROR | MY_GIVE_INFO : 0);
--- 1.4/mysql-test/r/ndb_autodiscover2.result 2005-09-21 04:16:18 +02:00
+++ 1.5/mysql-test/r/ndb_autodiscover2.result 2005-11-06 00:20:22 +01:00
@@ -6,7 +6,7 @@
4 5
show status like 'handler_discover%';
Variable_name Value
-Handler_discover 1
+Handler_discover 0
drop table t9;
select * from t10;
ERROR HY000: Got error 4263 'Invalid blob attributes or invalid blob parts table' from
NDBCLUSTER
--- 1.213/sql/ha_ndbcluster.cc 2005-11-04 21:09:54 +01:00
+++ 1.214/sql/ha_ndbcluster.cc 2005-11-06 00:20:23 +01:00
@@ -46,6 +46,7 @@
static const int max_transactions= 3; // should really be 2 but there is a transaction to
much allocated when loch table is used
static const char *ha_ndb_ext=".ndb";
+static const char share_prefix[]= "./";
static int ndbcluster_close_connection(THD *thd);
static int ndbcluster_commit(THD *thd, bool all);
@@ -99,12 +100,15 @@
}
// Typedefs for long names
+typedef NdbDictionary::Object NDBOBJ;
typedef NdbDictionary::Column NDBCOL;
typedef NdbDictionary::Table NDBTAB;
typedef NdbDictionary::Index NDBINDEX;
typedef NdbDictionary::Dictionary NDBDICT;
+typedef NdbDictionary::Event NDBEVENT;
-bool ndbcluster_inited= FALSE;
+static int ndbcluster_inited= 0;
+static int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -117,9 +121,12 @@
static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)));
-static void ndb_set_fragmentation(NDBTAB & tab, TABLE *table, uint pk_len);
-static NDB_SHARE *get_share(const char *table_name);
-static void free_share(NDB_SHARE *share);
+static NDB_SHARE *get_share(const char *key,
+ bool create_if_not_exists= TRUE,
+ bool have_lock= FALSE);
+static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
+static void real_free_share(NDB_SHARE **share);
+static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
static int unpackfrm(const void **data, uint *len,
@@ -128,6 +135,33 @@
static int ndb_get_table_statistics(Ndb*, const char *,
struct Ndb_statistics *);
+#ifndef DBUG_OFF
+void print_records(TABLE *table, const char *record)
+{
+ if (_db_on_)
+ {
+ for (uint j= 0; j < table->s->fields; j++)
+ {
+ char buf[40];
+ int pos= 0;
+ Field *field= table->field[j];
+ const byte* field_ptr= field->ptr - table->record[0] + record;
+ int pack_len= field->pack_length();
+ int n= pack_len < 10 ? pack_len : 10;
+
+ for (int i= 0; i < n && pos < 20; i++)
+ {
+ pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
+ }
+ buf[pos]= 0;
+ DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
+ }
+ }
+}
+#else
+#define print_records(a,b)
+#endif
+
// Util thread variables
static pthread_t ndb_util_thread;
pthread_mutex_t LOCK_ndb_util_thread;
@@ -179,65 +213,70 @@
{NullS, NullS, SHOW_LONG}
};
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
+extern Uint64 g_latest_trans_gci;
+
/*
Error handling functions
*/
-struct err_code_mapping
-{
- int ndb_err;
- int my_err;
- int show_warning;
-};
+/* Note for merge: old mapping table, moved to storage/ndb/ndberror.c */
-static const err_code_mapping err_map[]=
+static int ndb_to_mysql_error(const NdbError *ndberr)
{
- { 626, HA_ERR_KEY_NOT_FOUND, 0 },
- { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 721, HA_ERR_TABLE_EXIST, 1 },
- { 4244, HA_ERR_TABLE_EXIST, 1 },
-
- { 709, HA_ERR_NO_SUCH_TABLE, 0 },
-
- { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
-
- { 623, HA_ERR_RECORD_FILE_FULL, 1 },
- { 624, HA_ERR_RECORD_FILE_FULL, 1 },
- { 625, HA_ERR_RECORD_FILE_FULL, 1 },
- { 826, HA_ERR_RECORD_FILE_FULL, 1 },
- { 827, HA_ERR_RECORD_FILE_FULL, 1 },
- { 832, HA_ERR_RECORD_FILE_FULL, 1 },
+ /* read the mysql mapped error code */
+ int error= ndberr->mysql_code;
- { 284, HA_ERR_TABLE_DEF_CHANGED, 0 },
-
- { 0, 1, 0 },
-
- { -1, -1, 1 }
-};
+ switch (error)
+ {
+ /* errors for which we do not add warnings, just return mapped error code
+ */
+ case HA_ERR_NO_SUCH_TABLE:
+ case HA_ERR_KEY_NOT_FOUND:
+ case HA_ERR_FOUND_DUPP_KEY:
+ return error;
+
+ /* Mapping missing, go with the ndb error code*/
+ case -1:
+ error= ndberr->code;
+ break;
+ /* Mapping exists, go with the mapped code */
+ default:
+ break;
+ }
-static int ndb_to_mysql_error(const NdbError *err)
-{
- uint i;
- for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
- if (err_map[i].show_warning)
- {
- // Push the NDB error message as warning
+ /*
+ Push the NDB error message as warning
+ - Used to be able to use SHOW WARNINGS toget more info on what the error is
+ - Used by replication to see if the error was temporary
+ */
+ if (ndberr->status == NdbError::TemporaryError)
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- err->code, err->message, "NDB");
- }
- if (err_map[i].my_err == -1)
- return err->code;
- return err_map[i].my_err;
+ ER_GET_TEMPORARY_ERRMSG, ER(ER_GET_TEMPORARY_ERRMSG),
+ ndberr->code, ndberr->message, "NDB");
+ else
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ ndberr->code, ndberr->message, "NDB");
+ return error;
}
+int execute_no_commit_ignore_no_key(ha_ndbcluster *h, NdbTransaction *trans)
+{
+ int res= trans->execute(NdbTransaction::NoCommit,
+ NdbTransaction::AO_IgnoreError,
+ h->m_force_send);
+ if (res == 0)
+ return 0;
+
+ const NdbError &err= trans->getNdbError();
+ if (err.classification != NdbError::ConstraintViolation &&
+ err.classification != NdbError::NoDataFound)
+ return res;
+ return 0;
+}
inline
int execute_no_commit(ha_ndbcluster *h, NdbTransaction *trans)
@@ -247,9 +286,11 @@
if (m_batch_execute)
return 0;
#endif
- return trans->execute(NdbTransaction::NoCommit,
- NdbTransaction::AbortOnError,
- h->m_force_send);
+ return h->m_ignore_no_key ?
+ execute_no_commit_ignore_no_key(h,trans) :
+ trans->execute(NdbTransaction::NoCommit,
+ NdbTransaction::AbortOnError,
+ h->m_force_send);
}
inline
@@ -437,29 +478,38 @@
# The mapped error code
*/
-void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+void
+ha_ndbcluster::invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
+ const char *tabname, bool global)
{
- NDBDICT *dict= get_ndb()->getDictionary();
+ NDBDICT *dict= ndb->getDictionary();
DBUG_ENTER("invalidate_dictionary_cache");
- DBUG_PRINT("info", ("invalidating %s", m_tabname));
+ DBUG_PRINT("info", ("invalidating %s", tabname));
if (global)
{
- const NDBTAB *tab= dict->getTable(m_tabname);
+ const NDBTAB *tab= dict->getTable(tabname);
if (!tab)
DBUG_VOID_RETURN;
if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
{
// Global cache has already been invalidated
- dict->removeCachedTable(m_tabname);
+ dict->removeCachedTable(tabname);
global= FALSE;
}
else
- dict->invalidateTable(m_tabname);
+ dict->invalidateTable(tabname);
}
else
- dict->removeCachedTable(m_tabname);
+ dict->removeCachedTable(tabname);
table->s->version=0L; /* Free when thread is ready */
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::invalidate_dictionary_cache(bool global)
+{
+ NDBDICT *dict= get_ndb()->getDictionary();
+ invalidate_dictionary_cache(table, get_ndb(), m_tabname, global);
/* Invalidate indexes */
for (uint i= 0; i < table->s->keys; i++)
{
@@ -491,7 +541,6 @@
break;
}
}
- DBUG_VOID_RETURN;
}
int ha_ndbcluster::ndb_err(NdbTransaction *trans)
@@ -648,14 +697,15 @@
*/
int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, bool *set_blob_value)
+ uint fieldnr, int row_offset,
+ bool *set_blob_value)
{
- const byte* field_ptr= field->ptr;
- uint32 pack_len= field->pack_length();
+ const byte* field_ptr= field->ptr + row_offset;
+ uint32 pack_len= field->pack_length();
DBUG_ENTER("set_ndb_value");
- DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
+ DBUG_PRINT("enter", ("%d: %s type: %u len=%d is_null=%s",
fieldnr, field->field_name, field->type(),
- pack_len, field->is_null()?"Y":"N"));
+ pack_len, field->is_null(row_offset) ? "Y" : "N"));
DBUG_DUMP("value", (char*) field_ptr, pack_len);
DBUG_ASSERT(ndb_supported_type(field->type()));
@@ -666,7 +716,7 @@
{
pack_len= sizeof(empty_field);
field_ptr= (byte *)&empty_field;
- if (field->is_null())
+ if (field->is_null(row_offset))
empty_field= 0;
else
empty_field= 1;
@@ -675,12 +725,15 @@
{
if (field->type() != MYSQL_TYPE_BIT)
{
- if (field->is_null())
+ if (field->is_null(row_offset))
+ {
+ DBUG_PRINT("info", ("field is NULL"));
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr,
(char*)NULL, pack_len) != 0));
+ }
// Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr,
+ DBUG_RETURN(ndb_op->setValue(fieldnr,
(char*)field_ptr, pack_len) != 0);
}
else // if (field->type() == MYSQL_TYPE_BIT)
@@ -690,7 +743,7 @@
// Round up bit field length to nearest word boundry
pack_len= ((pack_len + 3) >> 2) << 2;
DBUG_ASSERT(pack_len <= 8);
- if (field->is_null())
+ if (field->is_null(row_offset))
// Set value to NULL
DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
DBUG_PRINT("info", ("bit field"));
@@ -709,7 +762,7 @@
NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
if (ndb_blob != NULL)
{
- if (field->is_null())
+ if (field->is_null(row_offset))
DBUG_RETURN(ndb_blob->setNull() != 0);
Field_blob *field_blob= (Field_blob*)field;
@@ -790,8 +843,8 @@
{
char *buf= m_blobs_buffer + offset;
uint32 len= 0xffffffff; // Max uint32
- DBUG_PRINT("value", ("read blob ptr=%x len=%u",
- (UintPtr)buf, (uint)blob_len));
+ DBUG_PRINT("value", ("read blob ptr=%lx len=%u",
+ buf, (uint) blob_len));
if (ndb_blob->readData(buf, len) != 0)
DBUG_RETURN(-1);
DBUG_ASSERT(len == blob_len);
@@ -902,6 +955,19 @@
of table accessed in NDB
*/
+static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
+ uint pack_length)
+{
+ DBUG_ENTER("cmp_frm");
+ /*
+ Compare FrmData in NDB with frm file from disk.
+ */
+ if ((pack_length != ndbtab->getFrmLength()) ||
+ (memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
+ DBUG_RETURN(1);
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::get_metadata(const char *path)
{
Ndb *ndb= get_ndb();
@@ -939,8 +1005,7 @@
DBUG_RETURN(1);
}
- if ((pack_length != tab->getFrmLength()) ||
- (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ if (cmp_frm(tab, pack_data, pack_length))
{
if (!invalidating_ndb_table)
{
@@ -951,9 +1016,9 @@
else
{
DBUG_PRINT("error",
- ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
+ ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
pack_length, tab->getFrmLength(),
- memcmp(pack_data, tab->getFrmData(), pack_length)));
+ memcmp(pack_data, tab->getFrmData(), pack_length)));
DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
error= 3;
@@ -1980,7 +2045,7 @@
DBUG_ENTER("write_row");
- if (m_ignore_dup_key && table->s->primary_key != MAX_KEY)
+ if (!m_use_write && m_ignore_dup_key && table->s->primary_key !=
MAX_KEY)
{
int peek_res= peek_row(record);
@@ -2057,7 +2122,8 @@
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
- set_ndb_value(op, field, i, &set_blob_value))
+ (ha_get_bit_in_write_set(i + 1) || !m_use_write) &&
+ set_ndb_value(op, field, i, record-table->record[0], &set_blob_value))
{
m_skip_auto_increment= TRUE;
ERR_RETURN(op->getNdbError());
@@ -2299,7 +2365,7 @@
Field *field= table->field[i];
if (ha_get_bit_in_write_set(i+1) &&
(!(field->flags & PRI_KEY_FLAG)) &&
- set_ndb_value(op, field, i))
+ set_ndb_value(op, field, i, new_data - table->record[0]))
ERR_RETURN(op->getNdbError());
}
@@ -2414,51 +2480,73 @@
set to null.
*/
-void ha_ndbcluster::unpack_record(byte* buf)
+static void ndb_unpack_record(TABLE *table, NdbValue *value,
+ MY_BITMAP *defined, byte *buf)
{
+ Field **p_field= table->field, *field= *p_field;
uint row_offset= (uint) (buf - table->record[0]);
- Field **field, **end;
- NdbValue *value= m_value;
- DBUG_ENTER("unpack_record");
+ DBUG_ENTER("ndb_unpack_record");
- end= table->field + table->s->fields;
-
// Set null flag(s)
bzero(buf, table->s->null_bytes);
- for (field= table->field;
- field < end;
- field++, value++)
+ for ( ; field;
+ p_field++, value++, field= *p_field)
{
if ((*value).ptr)
{
- if (! ((*field)->flags & BLOB_FLAG))
+ if (!(field->flags & BLOB_FLAG))
{
- if ((*value).rec->isNULL())
- (*field)->set_null(row_offset);
- else if ((*field)->type() == MYSQL_TYPE_BIT)
+ int is_null= (*value).rec->isNULL();
+ if (is_null)
+ {
+ if (is_null > 0)
+ {
+ DBUG_PRINT("info",("[%u] NULL",
+ (*value).rec->getColumn()->getColumnNo()));
+ field->set_null(row_offset);
+ }
+ else
+ {
+ DBUG_PRINT("info",("[%u] UNDEFINED",
+ (*value).rec->getColumn()->getColumnNo()));
+ bitmap_clear_bit(defined,
+ (*value).rec->getColumn()->getColumnNo());
+ }
+ }
+ else if (field->type() == MYSQL_TYPE_BIT)
{
- uint pack_len= (*field)->pack_length();
- if (pack_len < 5)
+ byte *save_field_ptr= field->ptr;
+ field->ptr= save_field_ptr + row_offset;
+ if (field->pack_length() < 5)
{
DBUG_PRINT("info", ("bit field H'%.8X",
(*value).rec->u_32_value()));
- ((Field_bit *) *field)->store((longlong)
- (*value).rec->u_32_value(),
- FALSE);
+ ((Field_bit*) field)->store((longlong)
+ (*value).rec->u_32_value(), FALSE);
}
else
{
DBUG_PRINT("info", ("bit field H'%.8X%.8X",
- *(Uint32 *)(*value).rec->aRef(),
- *((Uint32 *)(*value).rec->aRef()+1)));
- ((Field_bit *) *field)->store((longlong)
- (*value).rec->u_64_value(), TRUE);
+ *(Uint32*) (*value).rec->aRef(),
+ *((Uint32*) (*value).rec->aRef()+1)));
+ ((Field_bit*) field)->store((longlong)
+ (*value).rec->u_64_value(),TRUE);
}
+ field->ptr= save_field_ptr;
+ DBUG_PRINT("info",("[%u] SET",
+ (*value).rec->getColumn()->getColumnNo()));
+ DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
+ }
+ else
+ {
+ DBUG_PRINT("info",("[%u] SET",
+ (*value).rec->getColumn()->getColumnNo()));
+ DBUG_DUMP("info", (const char*) field->ptr, field->field_length);
}
}
else
{
- NdbBlob* ndb_blob= (*value).blob;
+ NdbBlob *ndb_blob= (*value).blob;
bool isNull= TRUE;
#ifndef DBUG_OFF
int ret=
@@ -2466,11 +2554,16 @@
ndb_blob->getNull(isNull);
DBUG_ASSERT(ret == 0);
if (isNull)
- (*field)->set_null(row_offset);
+ field->set_null(row_offset);
}
}
}
-
+ DBUG_VOID_RETURN;
+}
+
+void ha_ndbcluster::unpack_record(byte *buf)
+{
+ ndb_unpack_record(table, m_value, 0, buf);
#ifndef DBUG_OFF
// Read and print all values that was fetched
if (table->s->primary_key == MAX_KEY)
@@ -2486,7 +2579,6 @@
}
//print_results();
#endif
- DBUG_VOID_RETURN;
}
/*
@@ -3207,8 +3299,9 @@
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
- DBUG_PRINT("enter", ("thd: %x, thd_ndb: %x, thd_ndb->lock_count: %d",
- thd, thd_ndb, thd_ndb->lock_count));
+ DBUG_PRINT("enter", ("this: %x thd: %lx thd_ndb: %lx "
+ "thd_ndb->lock_count: %d",
+ this, thd, thd_ndb, thd_ndb->lock_count));
if (lock_type != F_UNLCK)
{
@@ -3461,7 +3554,8 @@
while ((share= it++))
{
pthread_mutex_lock(&share->mutex);
- DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
share->table_name, share->commit_count));
+ DBUG_PRINT("info", ("Invalidate commit_count for %s, share->commit_count: %d ",
+ share->key, share->commit_count));
share->commit_count= 0;
share->commit_count_lock++;
pthread_mutex_unlock(&share->mutex);
@@ -3790,6 +3884,10 @@
return 0;
}
+/*
+ Create a table in NDB Cluster
+*/
+
int ha_ndbcluster::create(const char *name,
TABLE *form,
HA_CREATE_INFO *info)
@@ -3815,7 +3913,8 @@
caller.
Do Ndb specific stuff, such as create a .ndb file
*/
- my_errno= write_ndb_file();
+ if ((my_errno= write_ndb_file()))
+ DBUG_RETURN(my_errno);
DBUG_RETURN(my_errno);
}
@@ -3829,7 +3928,7 @@
if (packfrm(data, length, &pack_data, &pack_length))
DBUG_RETURN(2);
- DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
+ DBUG_PRINT("info", ("setFrm data=%lx len=%d", pack_data, pack_length));
tab.setFrm(pack_data, pack_length);
my_free((char*)data, MYF(0));
my_free((char*)pack_data, MYF(0));
@@ -4027,14 +4126,22 @@
if (!(orig_tab= dict->getTable(m_tabname)))
ERR_RETURN(dict->getNdbError());
}
+
m_table= (void *)orig_tab;
// Change current database to that of target table
set_dbname(to);
ndb->setDatabaseName(m_dbname);
- if (!(result= alter_table_name(new_tabname)))
+
+ if ((result= alter_table_name(new_tabname)))
{
- // Rename .ndb file
- result= handler::rename_table(from, to);
+ DBUG_RETURN(result);
+ }
+
+ // Rename .ndb file
+ if ((result= handler::rename_table(from, to)))
+ {
+ // ToDo in 4.1 should rollback alter table...
+ DBUG_RETURN(result);
}
DBUG_RETURN(result);
@@ -4050,7 +4157,7 @@
Ndb *ndb= get_ndb();
NDBDICT *dict= ndb->getDictionary();
const NDBTAB *orig_tab= (const NDBTAB *) m_table;
- DBUG_ENTER("alter_table_name_table");
+ DBUG_ENTER("alter_table_name");
NdbDictionary::Table new_tab= *orig_tab;
new_tab.setName(to);
@@ -4069,6 +4176,38 @@
*/
+/* static version which does not need a handler */
+
+int
+ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name)
+{
+ DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+ NDBDICT *dict= ndb->getDictionary();
+
+ /* Drop the table from NDB */
+
+ int res;
+ if (h)
+ {
+ res= h->drop_table();
+ }
+ else
+ {
+ ndb->setDatabaseName(db);
+ res= dict->dropTable(table_name);
+ }
+
+ if (res)
+ {
+ DBUG_RETURN(res);
+ }
+
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::delete_table(const char *name)
{
DBUG_ENTER("ha_ndbcluster::delete_table");
@@ -4081,9 +4220,8 @@
/* Call ancestor function to delete .ndb file */
handler::delete_table(name);
-
- /* Drop the table from NDB */
- DBUG_RETURN(drop_table());
+
+ DBUG_RETURN(delete_table(this, get_ndb(),name, m_dbname, m_tabname));
}
@@ -4149,6 +4287,15 @@
Constructor for the NDB Cluster table handler
*/
+#define HA_NDBCLUSTER_TABLE_FLAGS \
+ HA_REC_NOT_IN_SEQ | \
+ HA_NULL_IN_KEY | \
+ HA_AUTO_PART_KEY | \
+ HA_NO_PREFIX_CHAR_KEYS | \
+ HA_NEED_READ_RANGE_BUFFER | \
+ HA_CAN_GEOMETRY | \
+ HA_CAN_BIT_FIELD
+
ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
handler(&ndbcluster_hton, table_arg),
m_active_trans(NULL),
@@ -4156,13 +4303,7 @@
m_table(NULL),
m_table_version(-1),
m_table_info(NULL),
- m_table_flags(HA_REC_NOT_IN_SEQ |
- HA_NULL_IN_KEY |
- HA_AUTO_PART_KEY |
- HA_NO_PREFIX_CHAR_KEYS |
- HA_NEED_READ_RANGE_BUFFER |
- HA_CAN_GEOMETRY |
- HA_CAN_BIT_FIELD),
+ m_table_flags(HA_NDBCLUSTER_TABLE_FLAGS),
m_share(0),
m_part_info(NULL),
m_use_partition_function(FALSE),
@@ -4170,6 +4311,7 @@
m_use_write(FALSE),
m_ignore_dup_key(FALSE),
m_primary_key_update(FALSE),
+ m_ignore_no_key(FALSE),
m_rows_to_insert((ha_rows) 1),
m_rows_inserted((ha_rows) 0),
m_bulk_insert_rows((ha_rows) 1024),
@@ -4223,7 +4365,9 @@
DBUG_ENTER("~ha_ndbcluster");
if (m_share)
- free_share(m_share);
+ {
+ free_share(&m_share);
+ }
release_metadata();
my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
m_blobs_buffer= 0;
@@ -4256,8 +4400,8 @@
int res;
KEY *key;
DBUG_ENTER("open");
- DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
- name, mode, test_if_locked));
+ DBUG_PRINT("enter", ("this: %d name: %s mode: %d test_if_locked: %d",
+ this, name, mode, test_if_locked));
// Setup ref_length to make room for the whole
// primary key to be written in the ref variable
@@ -4277,7 +4421,8 @@
set_tabname(name);
if (check_ndb_connection()) {
- free_share(m_share); m_share= 0;
+ free_share(&m_share);
+ m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
@@ -4306,7 +4451,8 @@
int ha_ndbcluster::close(void)
{
DBUG_ENTER("close");
- free_share(m_share); m_share= 0;
+ free_share(&m_share);
+ m_share= 0;
release_metadata();
DBUG_RETURN(0);
}
@@ -4509,21 +4655,24 @@
ERR_RETURN(dict->getNdbError());
for (i= 0 ; i < list.count ; i++)
{
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ NdbDictionary::Dictionary::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, dbname))
+ if (my_strcasecmp(system_charset_info, elmt.database, dbname))
continue;
- DBUG_PRINT("info", ("%s must be dropped", t.name));
- drop_list.push_back(thd->strdup(t.name));
+ DBUG_PRINT("info", ("%s must be dropped", elmt.name));
+ drop_list.push_back(thd->strdup(elmt.name));
}
// Drop any tables belonging to database
+ char full_path[FN_REFLEN];
+ char *tmp= strxnmov(full_path, FN_REFLEN, share_prefix, dbname, "/", NullS);
ndb->setDatabaseName(dbname);
List_iterator_fast<char> it(drop_list);
while ((tabname=it++))
{
- if (dict->dropTable(tabname))
+ strxnmov(tmp, FN_REFLEN - (tmp - full_path), tabname, NullS);
+ if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
{
const NdbError err= dict->getNdbError();
if (err.code != 709)
@@ -4536,6 +4685,92 @@
DBUG_RETURN(ret);
}
+/*
+ find all tables in ndb and discover those needed
+*/
+static int ndbcluster_find_all_files(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_find_all_files");
+ Ndb* ndb;
+ char key[FN_REFLEN];
+ NdbDictionary::Dictionary::List list;
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ NDBDICT *dict= ndb->getDictionary();
+
+ int unhandled, retries= 5;
+ do
+ {
+ if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(dict->getNdbError());
+ unhandled= 0;
+ for (uint i= 0 ; i < list.count ; i++)
+ {
+ NDBDICT::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
+ if (!(elmt.state == NDBOBJ::StateBuilding ||
+ elmt.state == NDBOBJ::StateOnline))
+ {
+ sql_print_information("NDB: skipping setup table %s.%s, in state %d",
+ elmt.database, elmt.name, elmt.state);
+ continue;
+ }
+
+ ndb->setDatabaseName(elmt.database);
+ const NDBTAB *ndbtab;
+
+ if (!(ndbtab= dict->getTable(elmt.name)))
+ {
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ elmt.database, elmt.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ unhandled++;
+ continue;
+ }
+
+ if (ndbtab->getFrmLength() == 0)
+ continue;
+
+ strxnmov(key, FN_LEN, mysql_data_home, "/",
+ elmt.database, "/", elmt.name, NullS);
+ const void *data= 0, *pack_data= 0;
+ uint length, pack_length;
+ int discover= 0;
+ if (readfrm(key, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: missing frm for %s.%s, discovering...",
+ elmt.database, elmt.name);
+ }
+ else if (cmp_frm(ndbtab, pack_data, pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
+ elmt.database, elmt.name);
+ }
+ my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+ if (discover)
+ {
+ /* ToDo 4.1 database needs to be created if missing */
+ pthread_mutex_lock(&LOCK_open);
+ if (ha_create_table_from_engine(thd, elmt.database, elmt.name))
+ {
+ /* ToDo 4.1 handle error */
+ }
+ pthread_mutex_unlock(&LOCK_open);
+ }
+ }
+ }
+ while (unhandled && retries--);
+
+ DBUG_RETURN(0);
+}
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files)
@@ -4547,7 +4782,7 @@
Ndb* ndb;
char name[FN_REFLEN];
HASH ndb_tables, ok_tables;
- NdbDictionary::Dictionary::List list;
+ NDBDICT::List list;
if (!(ndb= check_ndb_in_thd(thd)))
DBUG_RETURN(HA_ERR_NO_CONNECTION);
@@ -4578,11 +4813,11 @@
for (i= 0 ; i < list.count ; i++)
{
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ NDBDICT::List::Element& elmt= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", elmt.database, elmt.name));
// Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, db))
+ if (my_strcasecmp(system_charset_info, elmt.database, db))
continue;
// Apply wildcard to list of tables in NDB
@@ -4590,14 +4825,14 @@
{
if (lower_case_table_names)
{
- if (wild_case_compare(files_charset_info, t.name, wild))
+ if (wild_case_compare(files_charset_info, elmt.name, wild))
continue;
}
- else if (wild_compare(t.name,wild,0))
+ else if (wild_compare(elmt.name,wild,0))
continue;
}
- DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
- my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
+ DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", elmt.name));
+ my_hash_insert(&ndb_tables, (byte*)thd->strdup(elmt.name));
}
char *file_name;
@@ -4645,10 +4880,15 @@
file_name= hash_element(&ndb_tables, i);
if (!hash_search(&ok_tables, file_name, strlen(file_name)))
{
- DBUG_PRINT("info", ("%s must be discovered", file_name));
- // File is in list of ndb tables and not in ok_tables
- // This table need to be created
- create_list.push_back(thd->strdup(file_name));
+ strxnmov(name, sizeof(name),
+ mysql_data_home, "/", db, "/", file_name, reg_ext, NullS);
+ if (access(name, F_OK))
+ {
+ DBUG_PRINT("info", ("%s must be discovered", file_name));
+ // File is in list of ndb tables and not in ok_tables
+ // This table need to be created
+ create_list.push_back(thd->strdup(file_name));
+ }
}
}
@@ -4704,6 +4944,7 @@
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
+ pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
@@ -4778,6 +5019,7 @@
(void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
(hash_get_key) ndbcluster_get_key,0,0);
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -4840,6 +5082,7 @@
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
ndbcluster_inited= 0;
+ ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
@@ -5112,7 +5355,7 @@
char name[FN_REFLEN];
NDB_SHARE *share;
- (void)strxnmov(name, FN_REFLEN, "./",dbname,"/",tabname,NullS);
+ (void)strxnmov(name, FN_REFLEN, share_prefix, dbname, "/", tabname, NullS);
DBUG_PRINT("enter", ("name: %s", name));
pthread_mutex_lock(&ndbcluster_mutex);
if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
@@ -5120,8 +5363,7 @@
strlen(name))))
{
pthread_mutex_unlock(&ndbcluster_mutex);
- DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables",
- name));
+ DBUG_PRINT("info", ("Table %s not found in ndbcluster_open_tables", name));
DBUG_RETURN(1);
}
share->use_count++;
@@ -5136,7 +5378,7 @@
DBUG_PRINT("info", ("Getting commit_count: %llu from share",
share->commit_count));
pthread_mutex_unlock(&share->mutex);
- free_share(share);
+ free_share(&share);
DBUG_RETURN(0);
}
}
@@ -5151,7 +5393,7 @@
struct Ndb_statistics stat;
if (ndb_get_table_statistics(ndb, tabname, &stat))
{
- free_share(share);
+ free_share(&share);
DBUG_RETURN(1);
}
@@ -5168,7 +5410,7 @@
*commit_count= 0;
}
pthread_mutex_unlock(&share->mutex);
- free_share(share);
+ free_share(&share);
DBUG_RETURN(0);
}
@@ -5305,6 +5547,60 @@
}
+#ifndef DBUG_OFF
+static void dbug_print_table(const char *info, TABLE *table)
+{
+ if (table == 0)
+ {
+ DBUG_PRINT("info",("%s: (null)", info));
+ return;
+ }
+ DBUG_PRINT("info",
+ ("%s: %s.%s s->fields: %d "
+ "reclength: %d rec_buff_length: %d record[0]: %lx "
+ "record[1]: %lx",
+ info,
+ table->s->db,
+ table->s->table_name,
+ table->s->fields,
+ table->s->reclength,
+ table->s->rec_buff_length,
+ table->record[0],
+ table->record[1]));
+
+ for (unsigned int i= 0; i < table->s->fields; i++)
+ {
+ Field *f= table->field[i];
+ DBUG_PRINT("info",
+ ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
+ "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
+ i,
+ f->field_name,
+ f->flags,
+ (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
+ (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
+ (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
+ (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
+ (f->flags & BLOB_FLAG) ? ",blob" : "",
+ (f->flags & BINARY_FLAG) ? ",binary" : "",
+ f->real_type(),
+ f->pack_length(),
+ f->ptr, f->ptr - table->record[0],
+ f->null_bit,
+ f->null_ptr, (byte*) f->null_ptr - table->record[0]));
+ if (f->type() == MYSQL_TYPE_BIT)
+ {
+ Field_bit *g= (Field_bit*) f;
+ DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
+ "bit_ofs: %u bit_len: %u",
+ g->field_length, g->bit_ptr,
+ (byte*) g->bit_ptr-table->record[0],
+ g->bit_ofs, g->bit_len));
+ }
+ }
+}
+#endif
+
/*
Handling the shared NDB_SHARE structure that is needed to
provide table locking.
@@ -5313,68 +5609,195 @@
data we want to or can share.
*/
-static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
+static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused)))
{
- *length=share->table_name_length;
- return (byte*) share->table_name;
+ *length= share->key_length;
+ return (byte*) share->key;
}
-static NDB_SHARE* get_share(const char *table_name)
+#ifndef DBUG_OFF
+static void dbug_print_open_tables()
+{
+ DBUG_ENTER("dbug_print_open_tables");
+ for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+ {
+ NDB_SHARE *share= (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
+ DBUG_PRINT("share",
+ ("[%d] 0x%lx key: %s key_length: %d",
+ i, share, share->key, share->key_length));
+ DBUG_PRINT("share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ }
+ DBUG_VOID_RETURN;
+}
+#else
+#define dbug_print_open_tables()
+#endif
+
+/*
+ Increase refcount on existing share.
+ Always returns share and cannot fail.
+*/
+static NDB_SHARE *get_share(NDB_SHARE *share)
{
- NDB_SHARE *share;
pthread_mutex_lock(&ndbcluster_mutex);
- uint length=(uint) strlen(table_name);
- if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
- (byte*) table_name,
- length)))
+ share->use_count++;
+
+ dbug_print_open_tables();
+
+ DBUG_PRINT("get_share",
+ ("0x%lx key: %s key_length: %d",
+ share, share->key, share->key_length));
+ DBUG_PRINT("get_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return share;
+}
+
+/*
+ Get a share object for key
+
+ Returns share for key, and increases the refcount on the share.
+
+ create_if_not_exists == TRUE:
+ creates share if it does not alreade exist
+ returns 0 only due to out of memory, and then sets my_error
+
+ create_if_not_exists == FALSE:
+ returns 0 if share does not exist
+
+ have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
+ bool have_lock)
+{
+ NDB_SHARE *share;
+ if (!have_lock)
+ pthread_mutex_lock(&ndbcluster_mutex);
+ uint length= (uint) strlen(key);
+ if (!(share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key,
+ length)))
{
- if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
+ if (!create_if_not_exists)
+ {
+ DBUG_PRINT("error", ("get_share: %s does not exist", key));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ return 0;
+ }
+ if ((share= (NDB_SHARE*) my_malloc(sizeof(*share),
MYF(MY_WME | MY_ZEROFILL))))
{
- share->table_name_length=length;
- share->table_name=(char*) (share+1);
- strmov(share->table_name,table_name);
+ MEM_ROOT **root_ptr=
+ my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
+ MEM_ROOT *old_root= *root_ptr;
+ init_sql_alloc(&share->mem_root, 1024, 0);
+ *root_ptr= &share->mem_root; // remember to reset before return
+
+ /* enough space for key, db, and table_name */
+ share->key= alloc_root(*root_ptr, 2 * (length + 1));
+ share->key_length= length;
+ strmov(share->key, key);
if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
{
- pthread_mutex_unlock(&ndbcluster_mutex);
- my_free((gptr) share,0);
+ free_root(&share->mem_root, MYF(0));
+ my_free((gptr) share, 0);
+ *root_ptr= old_root;
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
return 0;
}
thr_lock_init(&share->lock);
- pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
+ pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST);
share->commit_count= 0;
share->commit_count_lock= 0;
+ share->db= share->key + length + 1;
+ ha_ndbcluster::set_dbname(key, share->db);
+ share->table_name= share->db + strlen(share->db) + 1;
+ ha_ndbcluster::set_tabname(key, share->table_name);
+ *root_ptr= old_root;
}
else
{
- DBUG_PRINT("error", ("Failed to alloc share"));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ DBUG_PRINT("error", ("get_share: failed to alloc share"));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ my_error(ER_OUTOFMEMORY, MYF(0), sizeof(*share));
return 0;
}
}
share->use_count++;
- DBUG_PRINT("share",
- ("table_name: %s, length: %d, use_count: %d, commit_count: %d",
- share->table_name, share->table_name_length, share->use_count,
- share->commit_count));
- pthread_mutex_unlock(&ndbcluster_mutex);
+ dbug_print_open_tables();
+
+ DBUG_PRINT("get_share",
+ ("0x%lx key: %s key_length: %d key: %s",
+ share, share->key, share->key_length, key));
+ DBUG_PRINT("get_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
return share;
}
+static void real_free_share(NDB_SHARE **share)
+{
+ DBUG_PRINT("real_free_share",
+ ("0x%lx key: %s key_length: %d",
+ (*share), (*share)->key, (*share)->key_length));
+ DBUG_PRINT("real_free_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ (*share)->db, (*share)->table_name,
+ (*share)->use_count, (*share)->commit_count));
+
+ hash_delete(&ndbcluster_open_tables, (byte*) *share);
+ thr_lock_delete(&(*share)->lock);
+ pthread_mutex_destroy(&(*share)->mutex);
+ free_root(&(*share)->mem_root, MYF(0));
+
+ my_free((gptr) *share, MYF(0));
+ *share= 0;
+
+ dbug_print_open_tables();
+}
-static void free_share(NDB_SHARE *share)
+/*
+ decrease refcount of share
+ calls real_free_share when refcount reaches 0
+
+ have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
+*/
+static void free_share(NDB_SHARE **share, bool have_lock)
{
- pthread_mutex_lock(&ndbcluster_mutex);
- if (!--share->use_count)
+ if (!have_lock)
+ pthread_mutex_lock(&ndbcluster_mutex);
+ if ((*share)->util_lock == current_thd)
+ (*share)->util_lock= 0;
+ if (!--(*share)->use_count)
{
- hash_delete(&ndbcluster_open_tables, (byte*) share);
- thr_lock_delete(&share->lock);
- pthread_mutex_destroy(&share->mutex);
- my_free((gptr) share, MYF(0));
+ real_free_share(share);
}
- pthread_mutex_unlock(&ndbcluster_mutex);
+ else
+ {
+ dbug_print_open_tables();
+ DBUG_PRINT("free_share",
+ ("0x%lx key: %s key_length: %d",
+ *share, (*share)->key, (*share)->key_length));
+ DBUG_PRINT("free_share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ (*share)->db, (*share)->table_name,
+ (*share)->use_count, (*share)->commit_count));
+ }
+ if (!have_lock)
+ pthread_mutex_unlock(&ndbcluster_mutex);
}
@@ -5405,14 +5828,14 @@
uint blob_len;
frm_blob_struct* blob;
DBUG_ENTER("packfrm");
- DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
+ DBUG_PRINT("enter", ("data: 0x%lx len: %d", data, len));
error= 1;
org_len= len;
if (my_compress((byte*)data, &org_len, &comp_len))
goto err;
- DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
+ DBUG_PRINT("info", ("org_len: %d comp_len: %d", org_len, comp_len));
DBUG_DUMP("compressed", (char*)data, org_len);
error= 2;
@@ -5432,7 +5855,8 @@
*pack_len= blob_len;
error= 0;
- DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
+ DBUG_PRINT("exit",
+ ("pack_data: 0x%lx pack_len: %d", *pack_data, *pack_len));
err:
DBUG_RETURN(error);
@@ -5446,7 +5870,7 @@
byte *data;
ulong complen, orglen, ver;
DBUG_ENTER("unpackfrm");
- DBUG_PRINT("enter", ("pack_data: %x", pack_data));
+ DBUG_PRINT("enter", ("pack_data: 0x%lx", pack_data));
complen= uint4korr((char*)&blob->head.complen);
orglen= uint4korr((char*)&blob->head.orglen);
@@ -5471,7 +5895,7 @@
*unpack_data= data;
*unpack_len= complen;
- DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
+ DBUG_PRINT("exit", ("frmdata: 0x%lx, len: %d", *unpack_data, *unpack_len));
DBUG_RETURN(0);
}
@@ -5484,11 +5908,10 @@
DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table));
NdbTransaction* pTrans= ndb->startTransaction();
+ if (pTrans == NULL)
+ ERR_RETURN(ndb->getNdbError());
do
{
- if (pTrans == NULL)
- break;
-
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL)
break;
@@ -6010,6 +6433,7 @@
THD *thd; /* needs to be first for thread_stack */
Ndb* ndb;
struct timespec abstime;
+ List<NDB_SHARE> util_open_tables;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
@@ -6030,10 +6454,51 @@
delete ndb;
DBUG_RETURN(NULL);
}
+ thd->init_for_queries();
+ thd->version=refresh_version;
+ thd->set_time();
+ thd->main_security_ctx.host_or_ip= "";
+ thd->client_capabilities = 0;
+ my_net_init(&thd->net, 0);
+ thd->main_security_ctx.master_access= ~0;
+ thd->main_security_ctx.priv_user = 0;
+
+ /*
+ wait for mysql server to start
+ */
+ pthread_mutex_lock(&LOCK_server_started);
+ while (!mysqld_server_started)
+ pthread_cond_wait(&COND_server_started, &LOCK_server_started);
+ pthread_mutex_unlock(&LOCK_server_started);
+
+ /*
+ Wait for cluster to start
+ */
+ pthread_mutex_lock(&LOCK_ndb_util_thread);
+ while (!ndb_cluster_node_id)
+ {
+ /* ndb not connected yet */
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&COND_ndb_util_thread,
+ &LOCK_ndb_util_thread,
+ &abstime);
+ if (abort_loop)
+ {
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ goto ndb_util_thread_end;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+ /*
+ Get all table definitions from the storage node
+ */
+ ndbcluster_find_all_files(thd);
+
+ ndbcluster_util_inited= 1;
- List<NDB_SHARE> util_open_tables;
set_timespec(abstime, 0);
- for (;;)
+ for (;!abort_loop;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
@@ -6041,10 +6506,10 @@
&LOCK_ndb_util_thread,
&abstime);
pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
+#ifdef NDB_EXTRA_DEBUG_UTIL_THREAD
DBUG_PRINT("ndb_util_thread", ("Started, ndb_cache_check_time: %d",
ndb_cache_check_time));
-
+#endif
if (abort_loop)
break; /* Shutting down server */
@@ -6075,20 +6540,12 @@
List_iterator_fast<NDB_SHARE> it(util_open_tables);
while ((share= it++))
{
- /* Split tab- and dbname */
- char buf[FN_REFLEN];
- char *tabname, *db;
- uint length= dirname_length(share->table_name);
- tabname= share->table_name+length;
- memcpy(buf, share->table_name, length-1);
- buf[length-1]= 0;
- db= buf+dirname_length(buf);
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s",
- share->table_name));
+ share->key));
/* Contact NDB to get commit count for table */
- ndb->setDatabaseName(db);
+ ndb->setDatabaseName(share->db);
struct Ndb_statistics stat;
uint lock;
@@ -6096,17 +6553,17 @@
lock= share->commit_count_lock;
pthread_mutex_unlock(&share->mutex);
- if (ndb_get_table_statistics(ndb, tabname, &stat) == 0)
+ if (ndb_get_table_statistics(ndb, share->table_name, &stat) == 0)
{
DBUG_PRINT("ndb_util_thread",
("Table: %s, commit_count: %llu, rows: %llu",
- share->table_name, stat.commit_count, stat.row_count));
+ share->key, stat.commit_count, stat.row_count));
}
else
{
DBUG_PRINT("ndb_util_thread",
("Error: Could not get commit count for table %s",
- share->table_name));
+ share->key));
stat.commit_count= 0;
}
@@ -6116,7 +6573,7 @@
pthread_mutex_unlock(&share->mutex);
/* Decrease the use count and possibly free share */
- free_share(share);
+ free_share(&share);
}
/* Clear the list of open tables */
@@ -6143,7 +6600,8 @@
abstime.tv_nsec-= 1000000000;
}
}
-
+ndb_util_thread_end:
+ net_end(&thd->net);
thd->cleanup();
delete thd;
delete ndb;
@@ -7480,6 +7938,9 @@
DBUG_RETURN(0);
}
+/*
+ Implements the SHOW NDB STATUS command.
+*/
int
ndbcluster_show_status(THD* thd)
{
--- 1.96/sql/ha_ndbcluster.h 2005-10-06 10:25:58 +02:00
+++ 1.97/sql/ha_ndbcluster.h 2005-11-06 00:20:23 +01:00
@@ -25,6 +25,7 @@
#pragma interface /* gcc class implementation */
#endif
+#include <ndbapi/NdbApi.hpp>
#include <ndbapi_limits.h>
class Ndb; // Forward declaration
@@ -36,10 +37,13 @@
class NdbIndexScanOperation;
class NdbBlob;
class NdbIndexStat;
+class NdbEventOperation;
// connectstring to cluster if given by mysqld
extern const char *ndbcluster_connectstring;
extern ulong ndb_cache_check_time;
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
typedef enum ndb_index_type {
UNDEFINED_INDEX = 0,
@@ -63,13 +67,25 @@
uint index_stat_query_count;
} NDB_INDEX_DATA;
+typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+
+typedef enum {
+ NSS_INITIAL= 0,
+ NSS_DROPPED
+} NDB_SHARE_STATE;
+
typedef struct st_ndbcluster_share {
+ MEM_ROOT mem_root;
THR_LOCK lock;
pthread_mutex_t mutex;
- char *table_name;
- uint table_name_length,use_count;
+ char *key;
+ uint key_length;
+ THD *util_lock;
+ uint use_count;
uint commit_count_lock;
ulonglong commit_count;
+ char *db;
+ char *table_name;
} NDB_SHARE;
typedef enum ndb_item_type {
@@ -595,9 +611,16 @@
bool check_if_incompatible_data(HA_CREATE_INFO *info,
uint table_changes);
+ static void invalidate_dictionary_cache(TABLE *table, Ndb *ndb,
+ const char *tabname, bool global);
private:
+ friend int ndbcluster_drop_database(const char *path);
int alter_table_name(const char *to);
+ static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name);
int drop_table();
int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
@@ -643,7 +666,8 @@
uint fieldnr, const byte* field_ptr);
int set_ndb_key(NdbOperation*, Field *field,
uint fieldnr, const byte* field_ptr);
- int set_ndb_value(NdbOperation*, Field *field, uint fieldnr, bool *set_blob_value= 0);
+ int set_ndb_value(NdbOperation*, Field *field, uint fieldnr,
+ int row_offset= 0, bool *set_blob_value= 0);
int get_ndb_value(NdbOperation*, Field *field, uint fieldnr, byte*);
friend int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg);
int get_ndb_blobs_value(NdbBlob *last_ndb_blob);
@@ -688,6 +712,7 @@
NdbScanOperation* op);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
+ friend int execute_no_commit_ignore_no_key(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit_ie(ha_ndbcluster*, NdbTransaction*);
@@ -704,7 +729,6 @@
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob
- typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
partition_info *m_part_info;
byte *m_rec0;
@@ -715,6 +739,7 @@
bool m_ignore_dup_key;
bool m_primary_key_update;
bool m_write_op;
+ bool m_ignore_no_key;
ha_rows m_rows_to_insert;
ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows;
@@ -760,3 +785,4 @@
void ndbcluster_print_error(int error, const NdbOperation *error_op);
int ndbcluster_show_status(THD*);
+
--- 1.5/mysql-test/r/ndb_bitfield.result 2005-09-21 04:16:19 +02:00
+++ 1.6/mysql-test/r/ndb_bitfield.result 2005-11-06 00:20:22 +01:00
@@ -201,10 +201,18 @@
pk1 bit(9) not null primary key,
b int
) engine=ndbcluster;
-ERROR HY000: Can't create table './test/t1.frm' (errno: 739)
+ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
+show warnings;
+Level Code Message
+Error 1296 Got error 739 'Unsupported primary key length' from NDB
+Error 1005 Can't create table './test/t1.frm' (errno: 140)
create table t1 (
pk1 int not null primary key,
b bit(9),
key(b)
) engine=ndbcluster;
-ERROR HY000: Can't create table './test/t1.frm' (errno: 743)
+ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
+show warnings;
+Level Code Message
+Error 1296 Got error 743 'Unsupported character set in table or index' from NDB
+Error 1005 Can't create table './test/t1.frm' (errno: 140)
--- 1.4/mysql-test/r/ndb_multi.result 2005-09-21 04:16:19 +02:00
+++ 1.5/mysql-test/r/ndb_multi.result 2005-11-06 00:20:22 +01:00
@@ -29,7 +29,12 @@
create table t1 (a int) engine=ndbcluster;
insert into t1 value (2);
select * from t1;
-ERROR HY000: Got error 241 'Invalid schema object version' from NDBCLUSTER
+ERROR HY000: Table definition has changed, please retry transaction
+show warnings;
+Level Code Message
+Error 1296 Got error 241 'Invalid schema object version' from NDB
+Error 1412 Table definition has changed, please retry transaction
+Error 1105 Unknown error
select * from t1;
a
2
--- 1.4/mysql-test/t/ndb_bitfield.test 2005-09-14 15:46:28 +02:00
+++ 1.5/mysql-test/t/ndb_bitfield.test 2005-11-06 00:20:22 +01:00
@@ -104,6 +104,7 @@
pk1 bit(9) not null primary key,
b int
) engine=ndbcluster;
+show warnings;
--error 1005
create table t1 (
@@ -111,4 +112,4 @@
b bit(9),
key(b)
) engine=ndbcluster;
-
+show warnings;
--- 1.5/mysql-test/t/ndb_multi.test 2005-07-28 02:21:45 +02:00
+++ 1.6/mysql-test/t/ndb_multi.test 2005-11-06 00:20:22 +01:00
@@ -38,8 +38,9 @@
insert into t1 value (2);
connection server1;
# Currently a retry is required remotely
---error 1296
+--error 1412
select * from t1;
+show warnings;
select * from t1;
# Connect to server2 and use the tables from there
--- 1.5/mysql-test/r/ndb_partition_error.result 2005-11-05 22:56:56 +01:00
+++ 1.6/mysql-test/r/ndb_partition_error.result 2005-11-06 00:20:22 +01:00
@@ -11,7 +11,11 @@
(partition x1 values less than (5) nodegroup 12,
partition x2 values less than (10) nodegroup 13,
partition x3 values less than (20) nodegroup 14);
-ERROR HY000: Can't create table './test/t1.frm' (errno: 771)
+ERROR HY000: Can't create table './test/t1.frm' (errno: 140)
+show warnings;
+Level Code Message
+Error 1296 Got error 771 'Given NODEGROUP doesn't exist in this cluster' from NDB
+Error 1005 Can't create table './test/t1.frm' (errno: 140)
CREATE TABLE t1 (
a int not null,
b int not null,
--- 1.2/mysql-test/t/ndb_partition_error.test 2005-07-22 22:39:01 +02:00
+++ 1.3/mysql-test/t/ndb_partition_error.test 2005-11-06 00:20:22 +01:00
@@ -26,6 +26,7 @@
(partition x1 values less than (5) nodegroup 12,
partition x2 values less than (10) nodegroup 13,
partition x3 values less than (20) nodegroup 14);
+show warnings;
#
# Partition by range, create normal valid table
--- 1.3/mysql-test/r/ndb_read_multi_range.result 2004-12-01 12:43:28 +01:00
+++ 1.4/mysql-test/r/ndb_read_multi_range.result 2005-11-06 00:20:22 +01:00
@@ -1,4 +1,4 @@
-DROP TABLE IF EXISTS t1, r1;
+DROP TABLE IF EXISTS t1, t2, r1;
create table t1 (
a int primary key,
b int not null,
--- 1.4/mysql-test/t/ndb_read_multi_range.test 2005-04-21 11:11:17 +02:00
+++ 1.5/mysql-test/t/ndb_read_multi_range.test 2005-11-06 00:20:23 +01:00
@@ -2,7 +2,7 @@
-- source include/not_embedded.inc
--disable_warnings
-DROP TABLE IF EXISTS t1, r1;
+DROP TABLE IF EXISTS t1, t2, r1;
--enable_warnings
#
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1931) | tomas | 5 Nov |