3831 John David Duncan 2012-03-06 [merge]
Merge local 7.2.4-bugs tree into local master
added:
storage/ndb/memcache/include/ndb_engine_errors.h
storage/ndb/memcache/src/ndb_engine_errors.cc
modified:
storage/ndb/memcache/CMakeLists.txt
storage/ndb/memcache/include/LookupTable.h
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/include/ndb_engine.h
storage/ndb/memcache/include/ndb_error_logger.h
storage/ndb/memcache/include/ndb_pipeline.h
storage/ndb/memcache/include/workitem.h
storage/ndb/memcache/src/Config_v1.cc
storage/ndb/memcache/src/ndb_engine.c
storage/ndb/memcache/src/ndb_engine_private.h
storage/ndb/memcache/src/ndb_error_logger.cc
storage/ndb/memcache/src/ndb_pipeline.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
storage/ndb/memcache/src/workitem.c
3830 jonas oreland 2012-03-06 [merge]
ndb - merge 71 to 72
added:
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java
modified:
mysql-test/suite/ndb/t/clusterj.test
storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainFieldHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerFactoryImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/InvocationHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/KeyValueHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainFieldHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandlerFactory.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/ClusterConnection.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Column.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBatchingJDBCSetImpl.java
storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBindValuesImpl.java
storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerImpl.java
storage/ndb/clusterj/clusterj-jdbc/src/main/resources/com/mysql/clusterj/jdbc/Bundle.properties
storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java
storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java
storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAValueHandler.java
storage/ndb/clusterj/clusterj-openjpa/src/main/resources/com/mysql/clusterj/openjpa/Bundle.properties
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJModelTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AutoCommitTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BinaryPKTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CharsetTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/LoadTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/MultithreadedTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/VarcharStringLengthTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java
storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql
storage/ndb/clusterj/clusterj-tie/pom.xml
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordDeleteOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/ConnectionPoolTest.java
=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt 2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/CMakeLists.txt 2012-03-03 04:28:09 +0000
@@ -92,6 +92,7 @@ set(NDB_MEMCACHE_SOURCE_FILES
src/debug.c
src/hash_item_util.c
src/ndb_configuration.cc
+ src/ndb_engine_errors.cc
src/ndb_engine_private.h
src/ndb_error_logger.cc
src/ndb_flush.cc
=== modified file 'storage/ndb/memcache/include/LookupTable.h'
--- a/storage/ndb/memcache/include/LookupTable.h 2011-12-12 05:53:36 +0000
+++ b/storage/ndb/memcache/include/LookupTable.h 2012-03-07 00:38:08 +0000
@@ -28,9 +28,11 @@
template<typename T> class LookupTable {
public:
int elements;
+ bool do_free_values;
LookupTable(int sz = 128) :
elements(0),
+ do_free_values(false),
size(sz),
symtab(new Entry *[sz])
{
@@ -43,11 +45,13 @@ public:
for(int i = 0 ; i < size ; i++) {
Entry *sym = symtab[i];
while(sym) {
+ if(do_free_values) free((void *) sym->value);
Entry *next = sym->next;
delete sym;
sym = next;
}
}
+ delete[] symtab;
}
@@ -58,7 +62,7 @@ public:
return sym->value;
return 0;
}
-
+
void insert(const char *name, T * value) {
Uint32 h = do_hash(name) % size;
Entry *sym = new Entry(name, value);
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2011-09-30 17:04:30 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2012-03-07 01:22:53 +0000
@@ -30,11 +30,10 @@
#include "ndbmemcache_config.h"
#include "KeyPrefix.h"
#include "QueryPlan.h"
-
-struct workitem;
+#include "workitem.h"
#define VPSZ sizeof(void *)
-#define TOTAL_SZ (3 * VPSZ)
+#define TOTAL_SZ (3 * VPSZ) + sizeof(int)
#define PADDING (64 - TOTAL_SZ)
@@ -43,8 +42,11 @@ public:
/* Public Methods */
NdbInstance(Ndb_cluster_connection *, int);
~NdbInstance();
+ void link_workitem(workitem *);
+ void unlink_workitem(workitem *);
/* Public Instance Variables */
+ int id;
Ndb *db;
NdbInstance *next;
workitem *wqitem;
@@ -54,4 +56,20 @@ private:
};
+inline void NdbInstance::link_workitem(workitem *item) {
+ assert(item->ndb_instance == NULL);
+ assert(wqitem == NULL);
+
+ item->ndb_instance = this;
+ wqitem = item;
+}
+
+
+inline void NdbInstance::unlink_workitem(workitem *item) {
+ assert(wqitem == item);
+ wqitem->ndb_instance = NULL;
+ wqitem = NULL;
+}
+
+
#endif
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2011-10-02 22:32:09 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2012-03-07 01:22:53 +0000
@@ -67,9 +67,10 @@ public:
requires the scheduler to send & poll an additional operation. */
virtual void reschedule(workitem *) const = 0;
- /** io_completed() is called from the NDB Engine thread when an IO
- completion notification has been received */
- virtual void io_completed(workitem *) = 0;
+ /** release() is called from the NDB Engine thread after an operation has
+ completed. It allows the scheduler to release any resources (such as
+ the Ndb object) that were allocated in schedule(). */
+ virtual void release(workitem *) = 0;
/** add_stats() allows the engine to delegate certain statistics
to the scheduler. */
=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h 2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h 2012-03-07 01:22:53 +0000
@@ -76,7 +76,6 @@ struct ndb_engine {
void **pipelines;
void **schedulers;
- bool initialized;
bool connected;
unsigned int cas_hi;
=== added file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h 2012-03-03 04:28:09 +0000
@@ -0,0 +1,29 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+ */
+#ifndef NDBMEMCACHE_ENGINE_ERRORS_H
+#define NDBMEMCACHE_ENGINE_ERRORS_H
+
+#include <ndberror.h>
+
+extern ndberror_struct AppError9001_ReconfLock;
+extern ndberror_struct AppError9002_NoNDBs;
+
+
+#endif
=== modified file 'storage/ndb/memcache/include/ndb_error_logger.h'
--- a/storage/ndb/memcache/include/ndb_error_logger.h 2011-12-18 23:21:21 +0000
+++ b/storage/ndb/memcache/include/ndb_error_logger.h 2012-03-03 04:28:09 +0000
@@ -39,12 +39,14 @@ enum {
};
int log_ndb_error(const NdbError &);
+int log_app_error(ndberror_struct const *);
#endif
DECLARE_FUNCTIONS_WITH_C_LINKAGE
void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level);
+void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie);
END_FUNCTIONS_WITH_C_LINKAGE
=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h 2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h 2012-03-07 01:22:53 +0000
@@ -105,20 +105,24 @@ ndb_pipeline * get_request_pipeline(int
/** call into a pipeline for its own statistics */
void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
+/** execute a "flush_all" operation */
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+
+
+/***** SCHEDULER APIS *****/
+
/** Global initialization of scheduler, at startup time */
-void * initialize_scheduler(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(const char *name, int nthreads, int threadnum);
/** shutdown a scheduler */
-void shutdown_scheduler(void *);
+void scheduler_shutdown(ndb_pipeline *);
/** pass a workitem to the configured scheduler, for execution */
-ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *);
+ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *, struct workitem *);
-/** schedule a "flush_all" operation */
-ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *);
+/** release the resources that were used by a completed operation */
+void scheduler_release(ndb_pipeline *, struct workitem *);
-/** notify scheduler of IO completion */
-void pipeline_io_completed(ndb_pipeline *, struct workitem *);
/***** MEMORY MANAGEMENT APIS *****/
=== modified file 'storage/ndb/memcache/include/workitem.h'
--- a/storage/ndb/memcache/include/workitem.h 2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/include/workitem.h 2012-03-05 17:51:21 +0000
@@ -26,19 +26,9 @@
#include "ndb_pipeline.h"
#include "status_block.h"
-/* struct workitem is used in both C and C++ code, requiring a small hack: */
-#ifdef __cplusplus
-#include "QueryPlan.h"
-#include "NdbInstance.h"
-#define CPP_QUERYPLAN QueryPlan
-#define CPP_NDBINSTANCE NdbInstance
-#define CPP_EXTERNALVALUE ExternalValue
-#else
-#define CPP_QUERYPLAN void
-#define CPP_NDBINSTANCE void
-#define CPP_EXTERNALVALUE void
-#endif
-
+struct NdbInstance;
+struct QueryPlan;
+struct ExternalValue;
typedef struct workitem {
struct {
@@ -63,11 +53,11 @@ typedef struct workitem {
uint64_t math_value; /*! IN: incr initial value; OUT: incr result */
hash_item * cache_item; /*! used for write requests */
ndb_pipeline *pipeline; /*! pointer back to request pipeline */
- CPP_NDBINSTANCE *ndb_instance;
+ struct NdbInstance *ndb_instance;
/*! pointer to ndb instance, if applicable */
const void *cookie; /*! memcached's connection cookie */
- CPP_QUERYPLAN *plan; /*! QueryPlan for resolving this request */
- CPP_EXTERNALVALUE *ext_val; /*! ExternalValue */
+ struct QueryPlan *plan; /*! QueryPlan for resolving this request */
+ struct ExternalValue *ext_val; /*! ExternalValue */
const char *key; /*! pointer to the key */
void * next_step; /*! a worker_step function in ndb_worker.cc */
status_block *status; /*! A static status_block in ndb_worker.cc */
@@ -132,7 +122,6 @@ bool workitem_allocate_rowbuffer_1(worki
*/
bool workitem_allocate_rowbuffer_2(workitem *, size_t);
-
/*! returns the name of the memcached operation stored in the workitem.
*/
const char * workitem_get_operation(workitem *);
@@ -146,15 +135,10 @@ void workitem_free(workitem *);
*/
const char * workitem_get_key_suffix(workitem *item);
-
/*! Return the size of key buffer needed for a key of length "nkey"
*/
size_t workitem_get_key_buf_size(int nkey);
-/*! Set the workitem's NdbInstance
-*/
-void workitem_set_NdbInstance(workitem*, CPP_NDBINSTANCE *);
-
END_FUNCTIONS_WITH_C_LINKAGE
#endif
=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc 2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/src/Config_v1.cc 2012-03-07 00:38:08 +0000
@@ -59,6 +59,7 @@ config_v1::~config_v1() {
delete containers_map;
}
if(policies_map) {
+ policies_map->do_free_values = true;
delete policies_map;
}
}
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c 2012-02-22 10:09:58 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c 2012-03-07 01:51:06 +0000
@@ -81,7 +81,7 @@ ENGINE_ERROR_CODE create_instance(uint64
GET_SERVER_API get_server_api,
ENGINE_HANDLE **handle ) {
- ENGINE_ERROR_CODE e;
+ ENGINE_ERROR_CODE return_status;
SERVER_HANDLE_V1 *api = get_server_api();
if (interface != 1 || api == NULL) {
@@ -96,6 +96,7 @@ ENGINE_ERROR_CODE create_instance(uint64
logger = get_stderr_logger();
ndb_eng->npipelines = 0;
+ ndb_eng->connected = false;
ndb_eng->engine.interface.interface = 1;
ndb_eng->engine.get_info = ndb_get_info;
@@ -139,15 +140,13 @@ ENGINE_ERROR_CODE create_instance(uint64
ndb_eng->info.info.features[2].description = NULL;
/* Now call create_instace() for the default engine */
- e = default_engine_create_instance(interface, get_server_api,
- & (ndb_eng->m_default_engine));
- if(e != ENGINE_SUCCESS) return e;
-
- ndb_eng->initialized = true;
- ndb_eng->connected = false;
+ return_status = default_engine_create_instance(interface, get_server_api,
+ & (ndb_eng->m_default_engine));
- *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
- return ENGINE_SUCCESS;
+ if(return_status == ENGINE_SUCCESS)
+ *handle = (ENGINE_HANDLE*) &ndb_eng->engine;
+
+ return return_status;
}
@@ -159,8 +158,6 @@ static const engine_info* ndb_get_info(E
/*** initialize ***/
-
-
static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle,
const char* config_str)
{
@@ -230,7 +227,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
for(i = 0 ; i < nthreads ; i++) {
ndb_eng->pipelines[i] = get_request_pipeline(i);
ndb_eng->schedulers[i] =
- initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i);
+ scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
if(ndb_eng->schedulers[i] == 0) {
logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
ndb_eng->startup_options.scheduler);
@@ -265,10 +262,10 @@ static void ndb_destroy(ENGINE_HANDLE* h
struct ndb_engine* ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
- for(unsigned i = 0 ; i < ndb_eng->npipelines; i ++) {
- void *p = ndb_eng->schedulers[i];
+ for(unsigned int i = 0 ; i < ndb_eng->npipelines; i ++) {
+ ndb_pipeline *p = ndb_eng->pipelines[i];
if(p) {
- shutdown_scheduler(p);
+ scheduler_shutdown(p);
}
}
@@ -277,6 +274,28 @@ static void ndb_destroy(ENGINE_HANDLE* h
}
+/* CALL FLOWS
+ ----------
+ GET: eng.get(), eng.get_item_info()*, eng.release()*
+ DELETE: eng.remove()
+ SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(),
+ eng.store(), eng.release()*
+ INCR: eng.arithmetic()
+ FLUSH: eng.flush()
+
+ * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK)
+*/
+
+
+
+/*** Release scheduler resources and free workitem ****/
+void release_and_free(workitem *wqitem) {
+ DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
+ scheduler_release(wqitem->pipeline, wqitem);
+ workitem_free(wqitem);
+}
+
+
/*** allocate ***/
/* Allocate gets a struct item from the slab allocator, and fills in
@@ -321,11 +340,9 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
if(wqitem) {
DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
- ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
- return_status = wqitem->status->status;
- pipeline_io_completed(pipeline, wqitem);
- workitem_free(wqitem);
- return return_status;
+ ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop
+ release_and_free(wqitem);
+ return wqitem->status->status;
}
prefix = get_prefix_info_for_key(nkey, key);
@@ -361,7 +378,10 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI
if(prefix.do_db_delete) { /* Database Delete */
wqitem = new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, key, & cas);
DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
- return_status = pipeline_schedule_operation(pipeline, wqitem);
+ return_status = scheduler_schedule(pipeline, wqitem);
+ if(return_status != ENGINE_EWOULDBLOCK) {
+ release_and_free(wqitem);
+ }
}
return return_status;
@@ -375,17 +395,12 @@ static void ndb_release(ENGINE_HANDLE* h
struct ndb_engine* ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
- /* There may be a stack of workitems associated with the connection cookie;
- pop this one and free it.
- */
workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
if(wqitem) {
- /* pipeline_io_completed()? */
- DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id);
ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
- workitem_free(wqitem);
+ release_and_free(wqitem);
}
-
+
if(item && (item != wqitem)) {
DEBUG_PRINT("Releasing a hash item.");
item_release(def_eng, (hash_item *) item);
@@ -404,6 +419,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
struct ndb_engine* ndb_eng = ndb_handle(handle);
ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
struct workitem *wqitem;
+ ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT;
wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
@@ -413,19 +429,22 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
pipeline->id, wqitem->id, wqitem->status->comment);
*item = wqitem->cache_item;
wqitem->base.complete = 1;
- pipeline_io_completed(pipeline, wqitem);
- if(wqitem->status->status != ENGINE_SUCCESS) {
- DEBUG_PRINT("pop and free the workitem.");
- ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
- workitem_free(wqitem);
+ return_status = wqitem->status->status;
+
+ /* On success the workitem will be read in ndb_get_item_info, then released.
+ Otherwise: */
+ if(return_status != ENGINE_SUCCESS) {
+ ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
+ release_and_free(wqitem);
}
- /* The workitem will be read in ndb_get_item_info, then released */
- return wqitem->status->status;
+
+ return return_status;
}
prefix_info_t prefix = get_prefix_info_for_key(nkey, key);
/* Cache read */
+ /* FIXME: Use the public APIs */
if(prefix.do_mc_read) {
*item = item_get(default_handle(ndb_eng), key, nkey);
if (*item != NULL) {
@@ -439,10 +458,15 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_
if(prefix.do_db_read) {
wqitem = new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, key);
DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
- return pipeline_schedule_operation(pipeline, wqitem);
+ return_status = scheduler_schedule(pipeline, wqitem);
+ if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
+ /* On error we must pop and free */
+ ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
+ release_and_free(wqitem);
+ }
}
- return ENGINE_KEY_ENOENT;
+ return return_status;
}
@@ -460,9 +484,11 @@ static ENGINE_ERROR_CODE ndb_get_stats(E
DEBUG_ENTER();
if(stat_key &&
- ((strncasecmp(stat_key, "ndb", 3) == 0) ||
+ ((strncasecmp(stat_key, "ndb", 3) == 0) ||
(strncasecmp(stat_key, "scheduler", 9) == 0) ||
- (strncasecmp(stat_key, "reconf", 6) == 0)))
+ (strncasecmp(stat_key, "reconf", 6) == 0) ||
+ (strncasecmp(stat_key, "errors", 6) == 0)
+ ))
{
/* NDB Engine stats */
pipeline_add_stats(pipeline, stat_key, add_stat, cookie);
@@ -497,17 +523,15 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
ENGINE_STORE_OPERATION op,
uint16_t vbucket __attribute__((unused)))
{
- prefix_info_t prefix;
-
struct ndb_engine* ndb_eng = ndb_handle(handle);
ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
+ ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED;
+ prefix_info_t prefix;
/* Is this a callback after completed I/O? */
workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
if(wqitem) {
- // fixme: chaining
DEBUG_PRINT("Got callback: %s", wqitem->status->comment);
- pipeline_io_completed(pipeline, wqitem);
return wqitem->status->status;
}
@@ -525,17 +549,18 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN
if(prefix.do_db_write) {
wqitem = new_workitem_for_store_op(pipeline, op, prefix, cookie, item, cas);
DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
- return pipeline_schedule_operation(pipeline, wqitem);
+ return_status = scheduler_schedule(pipeline, wqitem);
+ if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) {
+ ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop
+ release_and_free(wqitem);
+ }
}
-
- /* write to cache */
- if(prefix.do_mc_write) {
+ else if(prefix.do_mc_write) {
DEBUG_PRINT(" cache-only store.");
- return store_item(default_handle(ndb_eng), item, cas, op, cookie);
+ return_status = store_item(default_handle(ndb_eng), item, cas, op, cookie);
}
- /* NOP case: db_write and mc_write are both disabled. */
- return ENGINE_SUCCESS;
+ return return_status;
}
@@ -557,20 +582,19 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
struct default_engine *def_eng = default_handle(ndb_eng);
ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng);
struct workitem *wqitem;
+ ENGINE_ERROR_CODE return_status;
/* Is this a callback after completed I/O? */
wqitem = ndb_eng->server.cookie->get_engine_specific(cookie);
if(wqitem && ! wqitem->base.complete) {
DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment);
- ENGINE_ERROR_CODE status = wqitem->status->status;
+ return_status = wqitem->status->status;
wqitem->base.complete = 1;
*result = wqitem->math_value;
/* There will be no call to release(), so pop and free now. */
- pipeline_io_completed(pipeline, wqitem);
ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);
- workitem_free(wqitem);
-
- return status;
+ release_and_free(wqitem);
+ return return_status;
}
prefix_info_t prefix = get_prefix_info_for_key(nkey, key);
@@ -596,9 +620,14 @@ static ENGINE_ERROR_CODE ndb_arithmetic(
wqitem = new_workitem_for_arithmetic(pipeline, prefix, cookie, key, nkey,
increment, create, delta, initial, cas);
DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id);
- return pipeline_schedule_operation(pipeline, wqitem);
-}
+ return_status = scheduler_schedule(pipeline, wqitem);
+
+ if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS)))
+ release_and_free(wqitem);
+
+ return return_status;
+}
/*** flush ***/
static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle,
=== added file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-03-03 04:28:09 +0000
@@ -0,0 +1,32 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights
+ reserved.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA
+*/
+
+#include <ndberror.h>
+
+ndberror_struct AppError9001_ReconfLock =
+ { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+ "Could not obtain configuration read lock"
+ };
+
+ndberror_struct AppError9002_NoNDBs =
+ { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+ "No Ndb Instances in freelist"
+ };
+
=== modified file 'storage/ndb/memcache/src/ndb_engine_private.h'
--- a/storage/ndb/memcache/src/ndb_engine_private.h 2011-10-02 01:39:35 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_private.h 2012-03-07 01:22:53 +0000
@@ -27,6 +27,9 @@ void read_cmdline_options(struct ndb_eng
int fetch_core_settings(struct ndb_engine *, struct default_engine *);
+void release_and_free(struct workitem *);
+
+
/*************** Declarations of functions that implement
the engine interface
*********************************************************/
=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc 2011-12-19 01:09:00 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc 2012-03-03 04:28:09 +0000
@@ -53,7 +53,8 @@ class ErrorEntry;
ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE];
/* Prototypes */
-void manage_error(const NdbError &, const char * mesg, rel_time_t interval);
+void manage_error(int err_code, const char * err_mesg,
+ const char * extra_mesg, rel_time_t interval);
@@ -70,18 +71,28 @@ void ndb_error_logger_init(SERVER_CORE_A
}
+int log_app_error(ndberror_struct const * error) {
+ if(error->code < 9100)
+ manage_error(error->code, error->message, "Scheduler Error", 10);
+ else
+ manage_error(error->code, error->message, "Memcached Error", 10);
+
+ return error->status;
+}
+
+
int log_ndb_error(const NdbError &error) {
switch(error.status) {
case ndberror_st_success:
break;
case ndberror_st_temporary:
- manage_error(error, "NDB Temporary Error", 10);
+ manage_error(error.code, error.message, "NDB Temporary Error", 10);
break;
case ndberror_st_permanent:
case ndberror_st_unknown:
- manage_error(error, "NDB Error", 10);
+ manage_error(error.code, error.message, "NDB Error", 10);
break;
}
/* NDB classifies "Out Of Memory" (827) errors as permament errors, but we
@@ -147,33 +158,32 @@ ErrorEntry * error_table_lookup(int code
/* Record the error message, and possibly log it. */
-void manage_error(const NdbError & error, const char *type_mesg, rel_time_t interval) {
+void manage_error(int err_code, const char * err_mesg,
+ const char *type_mesg, rel_time_t interval) {
char note[256];
ErrorEntry *entry = 0;
bool first_ever, interval_passed, flood = false;
int current = 0, prior = 0; // array indexes
- if(verbose_logging == 0) {
- entry = error_table_lookup(error.code, core_api->get_current_time());
-
- if((entry->count | 1) == entry->count)
- current = 1; // odd count
- else
- prior = 1; // even count
+ entry = error_table_lookup(err_code, core_api->get_current_time());
- /* We have four pieces of information: the first timestamp, the two
- most recent timestamps, and the error count. When to write a log message?
- (A) On the first occurrence of an error.
- (B) If a time > interval has passed since the previous message.
- (C) At certain count numbers in error flood situations
- */
- first_ever = (entry->count == 1);
- interval_passed = (entry->time[current] - entry->time[prior] > interval);
- if(! interval_passed)
- for(Uint32 i = 10 ; i <= entry->count ; i *= 10)
- if(entry->count < (10 * i) && (entry->count % i == 0))
- { flood = true; break; }
- }
+ if((entry->count | 1) == entry->count)
+ current = 1; // odd count
+ else
+ prior = 1; // even count
+
+ /* We have four pieces of information: the first timestamp, the two
+ most recent timestamps, and the error count. When to write a log message?
+ (A) On the first occurrence of an error.
+ (B) If a time > interval has passed since the previous message.
+ (C) At certain count numbers in error flood situations
+ */
+ first_ever = (entry->count == 1);
+ interval_passed = (entry->time[current] - entry->time[prior] > interval);
+ if(! interval_passed)
+ for(Uint32 i = 10 ; i <= entry->count ; i *= 10)
+ if(entry->count < (10 * i) && (entry->count % i == 0))
+ { flood = true; break; }
if(verbose_logging || first_ever || interval_passed || flood)
{
@@ -181,8 +191,26 @@ void manage_error(const NdbError & error
snprintf(note, 256, "[occurrence %d of this error]", entry->count);
else
note[0] = '\0';
+
logger->log(LOG_WARNING, 0, "%s %d: %s %s\n",
- type_mesg, error.code, error.message, note);
+ type_mesg, err_code, err_mesg, note);
+ }
+}
+
+
+void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie) {
+ char key[128];
+ char val[128];
+ int klen, vlen;
+ unsigned int i;
+ Lock lock(& error_table_lock);
+ ErrorEntry *sym;
+
+ for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
+ for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) {
+ klen = sprintf(key, "NDB_Error_%d", sym->error_code);
+ vlen = sprintf(val, "%lu", sym->count);
+ add_stat(key, klen, val, vlen, cookie);
+ }
}
}
-
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-12-19 01:09:00 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2012-03-07 01:22:53 +0000
@@ -37,6 +37,7 @@
#include "schedulers/Stockholm.h"
#include "schedulers/S_sched.h"
+#include "ndb_error_logger.h"
#define DEFAULT_SCHEDULER S::SchedulerWorker
@@ -142,6 +143,9 @@ void pipeline_add_stats(ndb_pipeline *se
conf.getConnectionPoolById(i)->add_stats(key, add_stat, cookie);
}
}
+ else if(strncasecmp(stat_key,"errors",6) == 0) {
+ ndb_error_logger_stats(add_stat, cookie);
+ }
else if((strncasecmp(stat_key,"scheduler",9) == 0)
|| (strncasecmp(stat_key,"reconf",6) == 0)) {
self->scheduler->add_stats(stat_key, add_stat, cookie);
@@ -149,7 +153,14 @@ void pipeline_add_stats(ndb_pipeline *se
}
-void * initialize_scheduler(const char *cf, int nthreads, int athread) {
+ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
+ return ndb_flush_all(self);
+}
+
+
+/* The scheduler API */
+
+void * scheduler_initialize(const char *cf, int nthreads, int athread) {
Scheduler *s = 0;
const char *sched_options = 0;
@@ -175,28 +186,21 @@ void * initialize_scheduler(const char *
}
-void shutdown_scheduler(void *v) {
- Scheduler *s = (Scheduler *) v;
-
- s->shutdown();
+void scheduler_shutdown(ndb_pipeline *self) {
+ self->scheduler->shutdown();
}
-ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *self,
- struct workitem *item) {
+ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *self, struct workitem *item) {
return self->scheduler->schedule(item);
}
-ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
- return ndb_flush_all(self);
+void scheduler_release(ndb_pipeline *self, struct workitem *item) {
+ self->scheduler->release(item);
}
-void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) {
- self->scheduler->io_completed(item);
-}
-
/* The slab allocator API */
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-01-14 00:52:10 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-07 01:22:53 +0000
@@ -35,6 +35,8 @@
#include "thread_identifier.h"
#include "workitem.h"
#include "ndb_worker.h"
+#include "ndb_engine_errors.h"
+#include "ndb_error_logger.h"
#include "S_sched.h"
@@ -283,6 +285,7 @@ void S::SchedulerWorker::attach_thread(t
ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
int c = item->prefix_info.cluster_id;
+ ENGINE_ERROR_CODE response_code;
NdbInstance *inst;
S::WorkerConnection *wc;
const KeyPrefix *pfx;
@@ -296,7 +299,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
pthread_rwlock_unlock(& reconf_lock);
}
else {
- logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock");
+ log_app_error(& AppError9001_ReconfLock);
return ENGINE_TMPFAIL;
}
/* READ LOCK RELEASED */
@@ -315,12 +318,11 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
all we can do is return an error.
(Or, alternately, the scheduler may be shutting down.)
*/
- logger->log(LOG_INFO, 0, "No free NDB instances.");
+ log_app_error(& AppError9002_NoNDBs);
return ENGINE_TMPFAIL;
}
- inst->wqitem = item;
- workitem_set_NdbInstance(item, inst);
+ inst->link_workitem(item);
// Fetch the query plan for this prefix.
item->plan = wc->plan_set->getPlanForPrefix(pfx);
@@ -331,39 +333,44 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
// Build the NDB transaction
op_status_t op_status = worker_prepare_operation(item);
- ENGINE_ERROR_CODE response_code;
-
- switch(op_status) {
- case op_async_prepared:
- /* Put the prepared item onto a send queue */
- wc->sendqueue->produce(inst);
- DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
-
- /* This locking is explained in run_ndb_send_thread() */
- if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock
- wc->conn->sem.counter++; // increment
- pthread_cond_signal( & wc->conn->sem.not_zero); // signal
- pthread_mutex_unlock( & wc->conn->sem.lock); // release
- }
-
- response_code = ENGINE_EWOULDBLOCK;
- break;
- case op_not_supported:
- DEBUG_PRINT("op_status is op_not_supported");
- response_code = ENGINE_ENOTSUP;
- break;
- case op_overflow:
- DEBUG_PRINT("op_status is op_overflow");
- response_code = ENGINE_E2BIG;
- break;
- case op_async_sent:
- DEBUG_PRINT("op_async_sent could be a bug");
- response_code = ENGINE_FAILED;
- break;
- case op_failed:
- DEBUG_PRINT("op_status is op_failed");
- response_code = ENGINE_FAILED;
- break;
+
+ // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
+ if(op_status == op_async_prepared) {
+ /* Put the prepared item onto a send queue */
+ wc->sendqueue->produce(inst);
+ DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
+
+ /* This locking is explained in run_ndb_send_thread() */
+ if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock
+ wc->conn->sem.counter++; // increment
+ pthread_cond_signal( & wc->conn->sem.not_zero); // signal
+ pthread_mutex_unlock( & wc->conn->sem.lock); // release
+ }
+
+ response_code = ENGINE_EWOULDBLOCK;
+ }
+ else {
+ switch(op_status) {
+ case op_not_supported:
+ DEBUG_PRINT("op_status is op_not_supported");
+ response_code = ENGINE_ENOTSUP;
+ break;
+ case op_overflow:
+ DEBUG_PRINT("op_status is op_overflow");
+ response_code = ENGINE_E2BIG;
+ break;
+ case op_async_sent:
+ DEBUG_PRINT("op_async_sent could be a bug");
+ response_code = ENGINE_FAILED;
+ break;
+ case op_failed:
+ DEBUG_PRINT("op_status is op_failed");
+ response_code = ENGINE_FAILED;
+ break;
+ default:
+ DEBUG_PRINT("UNEXPECTED: op_status is %d", op_status);
+ response_code = ENGINE_FAILED;
+ }
}
return response_code;
@@ -376,18 +383,19 @@ void S::SchedulerWorker::reschedule(work
}
-void S::SchedulerWorker::io_completed(workitem *item) {
+/* Release the resources used by an operation.
+ Unlink the NdbInstance from the workitem, and return it to the free list
+ (or free it, if the scheduler is shutting down).
+*/
+void S::SchedulerWorker::release(workitem *item) {
DEBUG_ENTER();
NdbInstance *inst = item->ndb_instance;
- item->ndb_instance = NULL;
if(inst) {
- assert(inst->wqitem == item);
+ inst->unlink_workitem(item);
int c = item->prefix_info.cluster_id;
S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
if(wc && ! wc->sendqueue->is_aborted()) {
- // assert(inst->sched_gen_number == s_global->generation);
- inst->wqitem = NULL;
inst->next = wc->freelist;
wc->freelist = inst;
DEBUG_PRINT("Returned NdbInstance to freelist.");
@@ -533,6 +541,7 @@ S::WorkerConnection::WorkerConnection(Sc
int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
for(int j = 0 ; j < my_ndb_inst ; j++ ) {
NdbInstance *inst = new NdbInstance(conn->conn, 2);
+ inst->id = ((id.thd + 1) * 10000) + j + 1;
inst->next = freelist;
freelist = inst;
}
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h 2011-10-02 02:58:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2012-03-07 01:22:53 +0000
@@ -104,7 +104,7 @@ public:
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const {};
void reschedule(workitem *) const;
- void io_completed(workitem *);
+ void release(workitem *);
void add_stats(const char *, ADD_STAT, const void *);
void shutdown();
bool global_reconfigure(Configuration *);
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-03-07 01:22:53 +0000
@@ -184,8 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
{
return ENGINE_TMPFAIL;
}
-
- workitem_set_NdbInstance(newitem, inst);
+
+ inst->link_workitem(newitem);
// Fetch the query plan for this prefix.
newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
@@ -216,12 +216,12 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s
}
-void Scheduler_stockholm::io_completed(workitem *item) {
+void Scheduler_stockholm::release(workitem *item) {
DEBUG_ENTER();
NdbInstance* inst = item->ndb_instance;
- item->ndb_instance = NULL;
if(inst) {
+ inst->unlink_workitem(item);
int c = item->prefix_info.cluster_id;
inst->next = cluster[c].nextFree;
cluster[c].nextFree = inst;
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-10-02 02:58:36 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-03-07 01:22:53 +0000
@@ -49,7 +49,7 @@ public:
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const; // inlined
void reschedule(workitem *) const; // inlined
- void io_completed(workitem *);
+ void release(workitem *);
void add_stats(const char *, ADD_STAT, const void *);
void shutdown();
void * run_ndb_commit_thread(int cluster_id);
=== modified file 'storage/ndb/memcache/src/workitem.c'
--- a/storage/ndb/memcache/src/workitem.c 2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/src/workitem.c 2012-03-07 01:22:53 +0000
@@ -182,26 +182,14 @@ const char * workitem_get_operation(work
}
-void workitem_set_NdbInstance(workitem *item, CPP_NDBINSTANCE * _Ndb_instance)
-{
- assert(item->ndb_instance == NULL);
- item->ndb_instance = _Ndb_instance;
-}
-
-
void workitem_free(workitem *item)
{
- /* It's OK to free all of these; a free() with class_id 0 is a no-op */
+ /* It's OK to free all of these; pipeline_free() with class_id 0 is a no-op */
pipeline_free(item->pipeline, item->row_buffer_1, item->rowbuf1_cls);
pipeline_free(item->pipeline, item->row_buffer_2, item->rowbuf2_cls);
pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls);
pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls);
- if (item->ndb_instance != NULL)
- {
- assert(false); // TODO Not thread safe, investigate path
- }
-
pipeline_free(item->pipeline, item, workitem_class_id);
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3830 to 3831) | John David Duncan | 8 Mar |