From: John David Duncan Date: March 7 2012 1:53am Subject: bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3830 to 3831) List-Archive: http://lists.mysql.com/commits/143113 Message-Id: <201203070153.q271rcc9011221@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3831 John David Duncan 2012-03-06 [merge] Merge local 7.2.4-bugs tree into local master added: storage/ndb/memcache/include/ndb_engine_errors.h storage/ndb/memcache/src/ndb_engine_errors.cc modified: storage/ndb/memcache/CMakeLists.txt storage/ndb/memcache/include/LookupTable.h storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/include/Scheduler.h storage/ndb/memcache/include/ndb_engine.h storage/ndb/memcache/include/ndb_error_logger.h storage/ndb/memcache/include/ndb_pipeline.h storage/ndb/memcache/include/workitem.h storage/ndb/memcache/src/Config_v1.cc storage/ndb/memcache/src/ndb_engine.c storage/ndb/memcache/src/ndb_engine_private.h storage/ndb/memcache/src/ndb_error_logger.cc storage/ndb/memcache/src/ndb_pipeline.cc storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/memcache/src/schedulers/S_sched.h storage/ndb/memcache/src/schedulers/Stockholm.cc storage/ndb/memcache/src/schedulers/Stockholm.h storage/ndb/memcache/src/workitem.c 3830 jonas oreland 2012-03-06 [merge] ndb - merge 71 to 72 added: storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java modified: mysql-test/suite/ndb/t/clusterj.test storage/ndb/clusterj/clusterj-api/src/main/java/com/mysql/clusterj/ClusterJHelper.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainFieldHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerFactoryImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/InvocationHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/KeyValueHandlerImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainFieldHandler.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandlerFactory.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandler.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/ClusterConnection.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Column.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBatchingJDBCSetImpl.java storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerBindValuesImpl.java storage/ndb/clusterj/clusterj-jdbc/src/main/java/com/mysql/clusterj/jdbc/ValueHandlerImpl.java storage/ndb/clusterj/clusterj-jdbc/src/main/resources/com/mysql/clusterj/jdbc/Bundle.properties storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAValueHandler.java storage/ndb/clusterj/clusterj-openjpa/src/main/resources/com/mysql/clusterj/openjpa/Bundle.properties storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJModelTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AutoCommitTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BinaryPKTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CharsetTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/LoadTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/MultithreadedTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/VarcharStringLengthTest.java storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql storage/ndb/clusterj/clusterj-tie/pom.xml storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordDeleteOperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/ConnectionPoolTest.java === modified file 'storage/ndb/memcache/CMakeLists.txt' --- a/storage/ndb/memcache/CMakeLists.txt 2011-12-18 23:21:21 +0000 +++ b/storage/ndb/memcache/CMakeLists.txt 2012-03-03 04:28:09 +0000 @@ -92,6 +92,7 @@ set(NDB_MEMCACHE_SOURCE_FILES src/debug.c src/hash_item_util.c src/ndb_configuration.cc + src/ndb_engine_errors.cc src/ndb_engine_private.h src/ndb_error_logger.cc src/ndb_flush.cc === modified file 'storage/ndb/memcache/include/LookupTable.h' --- a/storage/ndb/memcache/include/LookupTable.h 2011-12-12 05:53:36 +0000 +++ b/storage/ndb/memcache/include/LookupTable.h 2012-03-07 00:38:08 +0000 @@ -28,9 +28,11 @@ template class LookupTable { public: int elements; + bool do_free_values; LookupTable(int sz = 128) : elements(0), + do_free_values(false), size(sz), symtab(new Entry *[sz]) { @@ -43,11 +45,13 @@ public: for(int i = 0 ; i < size ; i++) { Entry *sym = symtab[i]; while(sym) { + if(do_free_values) free((void *) sym->value); Entry *next = sym->next; delete sym; sym = next; } } + delete[] symtab; } @@ -58,7 +62,7 @@ public: return sym->value; return 0; } - + void insert(const char *name, T * value) { Uint32 h = do_hash(name) % size; Entry *sym = new Entry(name, value); === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2011-09-30 17:04:30 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2012-03-07 01:22:53 +0000 @@ -30,11 +30,10 @@ #include "ndbmemcache_config.h" #include "KeyPrefix.h" #include "QueryPlan.h" - -struct workitem; +#include "workitem.h" #define VPSZ sizeof(void *) -#define TOTAL_SZ (3 * VPSZ) +#define TOTAL_SZ (3 * VPSZ) + sizeof(int) #define PADDING (64 - TOTAL_SZ) @@ -43,8 +42,11 @@ public: /* Public Methods */ NdbInstance(Ndb_cluster_connection *, int); ~NdbInstance(); + void link_workitem(workitem *); + void unlink_workitem(workitem *); /* Public Instance Variables */ + int id; Ndb *db; NdbInstance *next; workitem *wqitem; @@ -54,4 +56,20 @@ private: }; +inline void NdbInstance::link_workitem(workitem *item) { + assert(item->ndb_instance == NULL); + assert(wqitem == NULL); + + item->ndb_instance = this; + wqitem = item; +} + + +inline void NdbInstance::unlink_workitem(workitem *item) { + assert(wqitem == item); + wqitem->ndb_instance = NULL; + wqitem = NULL; +} + + #endif === modified file 'storage/ndb/memcache/include/Scheduler.h' --- a/storage/ndb/memcache/include/Scheduler.h 2011-10-02 22:32:09 +0000 +++ b/storage/ndb/memcache/include/Scheduler.h 2012-03-07 01:22:53 +0000 @@ -67,9 +67,10 @@ public: requires the scheduler to send & poll an additional operation. */ virtual void reschedule(workitem *) const = 0; - /** io_completed() is called from the NDB Engine thread when an IO - completion notification has been received */ - virtual void io_completed(workitem *) = 0; + /** release() is called from the NDB Engine thread after an operation has + completed. It allows the scheduler to release any resources (such as + the Ndb object) that were allocated in schedule(). */ + virtual void release(workitem *) = 0; /** add_stats() allows the engine to delegate certain statistics to the scheduler. */ === modified file 'storage/ndb/memcache/include/ndb_engine.h' --- a/storage/ndb/memcache/include/ndb_engine.h 2011-12-18 23:21:21 +0000 +++ b/storage/ndb/memcache/include/ndb_engine.h 2012-03-07 01:22:53 +0000 @@ -76,7 +76,6 @@ struct ndb_engine { void **pipelines; void **schedulers; - bool initialized; bool connected; unsigned int cas_hi; === added file 'storage/ndb/memcache/include/ndb_engine_errors.h' --- a/storage/ndb/memcache/include/ndb_engine_errors.h 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/include/ndb_engine_errors.h 2012-03-03 04:28:09 +0000 @@ -0,0 +1,29 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA + */ +#ifndef NDBMEMCACHE_ENGINE_ERRORS_H +#define NDBMEMCACHE_ENGINE_ERRORS_H + +#include + +extern ndberror_struct AppError9001_ReconfLock; +extern ndberror_struct AppError9002_NoNDBs; + + +#endif === modified file 'storage/ndb/memcache/include/ndb_error_logger.h' --- a/storage/ndb/memcache/include/ndb_error_logger.h 2011-12-18 23:21:21 +0000 +++ b/storage/ndb/memcache/include/ndb_error_logger.h 2012-03-03 04:28:09 +0000 @@ -39,12 +39,14 @@ enum { }; int log_ndb_error(const NdbError &); +int log_app_error(ndberror_struct const *); #endif DECLARE_FUNCTIONS_WITH_C_LINKAGE void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level); +void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie); END_FUNCTIONS_WITH_C_LINKAGE === modified file 'storage/ndb/memcache/include/ndb_pipeline.h' --- a/storage/ndb/memcache/include/ndb_pipeline.h 2011-12-18 23:26:44 +0000 +++ b/storage/ndb/memcache/include/ndb_pipeline.h 2012-03-07 01:22:53 +0000 @@ -105,20 +105,24 @@ ndb_pipeline * get_request_pipeline(int /** call into a pipeline for its own statistics */ void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *); +/** execute a "flush_all" operation */ +ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *); + + +/***** SCHEDULER APIS *****/ + /** Global initialization of scheduler, at startup time */ -void * initialize_scheduler(const char *name, int nthreads, int threadnum); +void * scheduler_initialize(const char *name, int nthreads, int threadnum); /** shutdown a scheduler */ -void shutdown_scheduler(void *); +void scheduler_shutdown(ndb_pipeline *); /** pass a workitem to the configured scheduler, for execution */ -ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *, struct workitem *); +ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *, struct workitem *); -/** schedule a "flush_all" operation */ -ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *); +/** release the resources that were used by a completed operation */ +void scheduler_release(ndb_pipeline *, struct workitem *); -/** notify scheduler of IO completion */ -void pipeline_io_completed(ndb_pipeline *, struct workitem *); /***** MEMORY MANAGEMENT APIS *****/ === modified file 'storage/ndb/memcache/include/workitem.h' --- a/storage/ndb/memcache/include/workitem.h 2011-12-11 07:31:26 +0000 +++ b/storage/ndb/memcache/include/workitem.h 2012-03-05 17:51:21 +0000 @@ -26,19 +26,9 @@ #include "ndb_pipeline.h" #include "status_block.h" -/* struct workitem is used in both C and C++ code, requiring a small hack: */ -#ifdef __cplusplus -#include "QueryPlan.h" -#include "NdbInstance.h" -#define CPP_QUERYPLAN QueryPlan -#define CPP_NDBINSTANCE NdbInstance -#define CPP_EXTERNALVALUE ExternalValue -#else -#define CPP_QUERYPLAN void -#define CPP_NDBINSTANCE void -#define CPP_EXTERNALVALUE void -#endif - +struct NdbInstance; +struct QueryPlan; +struct ExternalValue; typedef struct workitem { struct { @@ -63,11 +53,11 @@ typedef struct workitem { uint64_t math_value; /*! IN: incr initial value; OUT: incr result */ hash_item * cache_item; /*! used for write requests */ ndb_pipeline *pipeline; /*! pointer back to request pipeline */ - CPP_NDBINSTANCE *ndb_instance; + struct NdbInstance *ndb_instance; /*! pointer to ndb instance, if applicable */ const void *cookie; /*! memcached's connection cookie */ - CPP_QUERYPLAN *plan; /*! QueryPlan for resolving this request */ - CPP_EXTERNALVALUE *ext_val; /*! ExternalValue */ + struct QueryPlan *plan; /*! QueryPlan for resolving this request */ + struct ExternalValue *ext_val; /*! ExternalValue */ const char *key; /*! pointer to the key */ void * next_step; /*! a worker_step function in ndb_worker.cc */ status_block *status; /*! A static status_block in ndb_worker.cc */ @@ -132,7 +122,6 @@ bool workitem_allocate_rowbuffer_1(worki */ bool workitem_allocate_rowbuffer_2(workitem *, size_t); - /*! returns the name of the memcached operation stored in the workitem. */ const char * workitem_get_operation(workitem *); @@ -146,15 +135,10 @@ void workitem_free(workitem *); */ const char * workitem_get_key_suffix(workitem *item); - /*! Return the size of key buffer needed for a key of length "nkey" */ size_t workitem_get_key_buf_size(int nkey); -/*! Set the workitem's NdbInstance -*/ -void workitem_set_NdbInstance(workitem*, CPP_NDBINSTANCE *); - END_FUNCTIONS_WITH_C_LINKAGE #endif === modified file 'storage/ndb/memcache/src/Config_v1.cc' --- a/storage/ndb/memcache/src/Config_v1.cc 2011-12-18 23:26:44 +0000 +++ b/storage/ndb/memcache/src/Config_v1.cc 2012-03-07 00:38:08 +0000 @@ -59,6 +59,7 @@ config_v1::~config_v1() { delete containers_map; } if(policies_map) { + policies_map->do_free_values = true; delete policies_map; } } === modified file 'storage/ndb/memcache/src/ndb_engine.c' --- a/storage/ndb/memcache/src/ndb_engine.c 2012-02-22 10:09:58 +0000 +++ b/storage/ndb/memcache/src/ndb_engine.c 2012-03-07 01:51:06 +0000 @@ -81,7 +81,7 @@ ENGINE_ERROR_CODE create_instance(uint64 GET_SERVER_API get_server_api, ENGINE_HANDLE **handle ) { - ENGINE_ERROR_CODE e; + ENGINE_ERROR_CODE return_status; SERVER_HANDLE_V1 *api = get_server_api(); if (interface != 1 || api == NULL) { @@ -96,6 +96,7 @@ ENGINE_ERROR_CODE create_instance(uint64 logger = get_stderr_logger(); ndb_eng->npipelines = 0; + ndb_eng->connected = false; ndb_eng->engine.interface.interface = 1; ndb_eng->engine.get_info = ndb_get_info; @@ -139,15 +140,13 @@ ENGINE_ERROR_CODE create_instance(uint64 ndb_eng->info.info.features[2].description = NULL; /* Now call create_instace() for the default engine */ - e = default_engine_create_instance(interface, get_server_api, - & (ndb_eng->m_default_engine)); - if(e != ENGINE_SUCCESS) return e; - - ndb_eng->initialized = true; - ndb_eng->connected = false; + return_status = default_engine_create_instance(interface, get_server_api, + & (ndb_eng->m_default_engine)); - *handle = (ENGINE_HANDLE*) &ndb_eng->engine; - return ENGINE_SUCCESS; + if(return_status == ENGINE_SUCCESS) + *handle = (ENGINE_HANDLE*) &ndb_eng->engine; + + return return_status; } @@ -159,8 +158,6 @@ static const engine_info* ndb_get_info(E /*** initialize ***/ - - static ENGINE_ERROR_CODE ndb_initialize(ENGINE_HANDLE* handle, const char* config_str) { @@ -230,7 +227,7 @@ static ENGINE_ERROR_CODE ndb_initialize( for(i = 0 ; i < nthreads ; i++) { ndb_eng->pipelines[i] = get_request_pipeline(i); ndb_eng->schedulers[i] = - initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i); + scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i); if(ndb_eng->schedulers[i] == 0) { logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n", ndb_eng->startup_options.scheduler); @@ -265,10 +262,10 @@ static void ndb_destroy(ENGINE_HANDLE* h struct ndb_engine* ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); - for(unsigned i = 0 ; i < ndb_eng->npipelines; i ++) { - void *p = ndb_eng->schedulers[i]; + for(unsigned int i = 0 ; i < ndb_eng->npipelines; i ++) { + ndb_pipeline *p = ndb_eng->pipelines[i]; if(p) { - shutdown_scheduler(p); + scheduler_shutdown(p); } } @@ -277,6 +274,28 @@ static void ndb_destroy(ENGINE_HANDLE* h } +/* CALL FLOWS + ---------- + GET: eng.get(), eng.get_item_info()*, eng.release()* + DELETE: eng.remove() + SET (etc): eng.allocate(), eng.item_set_cas(), eng.get_item_info(), + eng.store(), eng.release()* + INCR: eng.arithmetic() + FLUSH: eng.flush() + + * Called only on success (ENGINE_SUCCESS or ENGINE_EWOULDBLOCK) +*/ + + + +/*** Release scheduler resources and free workitem ****/ +void release_and_free(workitem *wqitem) { + DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id); + scheduler_release(wqitem->pipeline, wqitem); + workitem_free(wqitem); +} + + /*** allocate ***/ /* Allocate gets a struct item from the slab allocator, and fills in @@ -321,11 +340,9 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); if(wqitem) { DEBUG_PRINT("Got callback: %s", wqitem->status->comment); - ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); - return_status = wqitem->status->status; - pipeline_io_completed(pipeline, wqitem); - workitem_free(wqitem); - return return_status; + ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); //pop + release_and_free(wqitem); + return wqitem->status->status; } prefix = get_prefix_info_for_key(nkey, key); @@ -361,7 +378,10 @@ static ENGINE_ERROR_CODE ndb_remove(ENGI if(prefix.do_db_delete) { /* Database Delete */ wqitem = new_workitem_for_delete_op(pipeline, prefix, cookie, nkey, key, & cas); DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id); - return_status = pipeline_schedule_operation(pipeline, wqitem); + return_status = scheduler_schedule(pipeline, wqitem); + if(return_status != ENGINE_EWOULDBLOCK) { + release_and_free(wqitem); + } } return return_status; @@ -375,17 +395,12 @@ static void ndb_release(ENGINE_HANDLE* h struct ndb_engine* ndb_eng = ndb_handle(handle); struct default_engine *def_eng = default_handle(ndb_eng); - /* There may be a stack of workitems associated with the connection cookie; - pop this one and free it. - */ workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); if(wqitem) { - /* pipeline_io_completed()? */ - DEBUG_PRINT("Releasing workitem %d.%d.", wqitem->pipeline->id, wqitem->id); ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); - workitem_free(wqitem); + release_and_free(wqitem); } - + if(item && (item != wqitem)) { DEBUG_PRINT("Releasing a hash item."); item_release(def_eng, (hash_item *) item); @@ -404,6 +419,7 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_ struct ndb_engine* ndb_eng = ndb_handle(handle); ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); struct workitem *wqitem; + ENGINE_ERROR_CODE return_status = ENGINE_KEY_ENOENT; wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); @@ -413,19 +429,22 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_ pipeline->id, wqitem->id, wqitem->status->comment); *item = wqitem->cache_item; wqitem->base.complete = 1; - pipeline_io_completed(pipeline, wqitem); - if(wqitem->status->status != ENGINE_SUCCESS) { - DEBUG_PRINT("pop and free the workitem."); - ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); - workitem_free(wqitem); + return_status = wqitem->status->status; + + /* On success the workitem will be read in ndb_get_item_info, then released. + Otherwise: */ + if(return_status != ENGINE_SUCCESS) { + ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop + release_and_free(wqitem); } - /* The workitem will be read in ndb_get_item_info, then released */ - return wqitem->status->status; + + return return_status; } prefix_info_t prefix = get_prefix_info_for_key(nkey, key); /* Cache read */ + /* FIXME: Use the public APIs */ if(prefix.do_mc_read) { *item = item_get(default_handle(ndb_eng), key, nkey); if (*item != NULL) { @@ -439,10 +458,15 @@ static ENGINE_ERROR_CODE ndb_get(ENGINE_ if(prefix.do_db_read) { wqitem = new_workitem_for_get_op(wqitem, pipeline, prefix, cookie, nkey, key); DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id); - return pipeline_schedule_operation(pipeline, wqitem); + return_status = scheduler_schedule(pipeline, wqitem); + if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) { + /* On error we must pop and free */ + ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); + release_and_free(wqitem); + } } - return ENGINE_KEY_ENOENT; + return return_status; } @@ -460,9 +484,11 @@ static ENGINE_ERROR_CODE ndb_get_stats(E DEBUG_ENTER(); if(stat_key && - ((strncasecmp(stat_key, "ndb", 3) == 0) || + ((strncasecmp(stat_key, "ndb", 3) == 0) || (strncasecmp(stat_key, "scheduler", 9) == 0) || - (strncasecmp(stat_key, "reconf", 6) == 0))) + (strncasecmp(stat_key, "reconf", 6) == 0) || + (strncasecmp(stat_key, "errors", 6) == 0) + )) { /* NDB Engine stats */ pipeline_add_stats(pipeline, stat_key, add_stat, cookie); @@ -497,17 +523,15 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN ENGINE_STORE_OPERATION op, uint16_t vbucket __attribute__((unused))) { - prefix_info_t prefix; - struct ndb_engine* ndb_eng = ndb_handle(handle); ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); + ENGINE_ERROR_CODE return_status = ENGINE_NOT_STORED; + prefix_info_t prefix; /* Is this a callback after completed I/O? */ workitem *wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); if(wqitem) { - // fixme: chaining DEBUG_PRINT("Got callback: %s", wqitem->status->comment); - pipeline_io_completed(pipeline, wqitem); return wqitem->status->status; } @@ -525,17 +549,18 @@ static ENGINE_ERROR_CODE ndb_store(ENGIN if(prefix.do_db_write) { wqitem = new_workitem_for_store_op(pipeline, op, prefix, cookie, item, cas); DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id); - return pipeline_schedule_operation(pipeline, wqitem); + return_status = scheduler_schedule(pipeline, wqitem); + if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) { + ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous);//pop + release_and_free(wqitem); + } } - - /* write to cache */ - if(prefix.do_mc_write) { + else if(prefix.do_mc_write) { DEBUG_PRINT(" cache-only store."); - return store_item(default_handle(ndb_eng), item, cas, op, cookie); + return_status = store_item(default_handle(ndb_eng), item, cas, op, cookie); } - /* NOP case: db_write and mc_write are both disabled. */ - return ENGINE_SUCCESS; + return return_status; } @@ -557,20 +582,19 @@ static ENGINE_ERROR_CODE ndb_arithmetic( struct default_engine *def_eng = default_handle(ndb_eng); ndb_pipeline *pipeline = get_my_pipeline_config(ndb_eng); struct workitem *wqitem; + ENGINE_ERROR_CODE return_status; /* Is this a callback after completed I/O? */ wqitem = ndb_eng->server.cookie->get_engine_specific(cookie); if(wqitem && ! wqitem->base.complete) { DEBUG_PRINT("Got arithmetic callback: %s", wqitem->status->comment); - ENGINE_ERROR_CODE status = wqitem->status->status; + return_status = wqitem->status->status; wqitem->base.complete = 1; *result = wqitem->math_value; /* There will be no call to release(), so pop and free now. */ - pipeline_io_completed(pipeline, wqitem); ndb_eng->server.cookie->store_engine_specific(cookie, wqitem->previous); - workitem_free(wqitem); - - return status; + release_and_free(wqitem); + return return_status; } prefix_info_t prefix = get_prefix_info_for_key(nkey, key); @@ -596,9 +620,14 @@ static ENGINE_ERROR_CODE ndb_arithmetic( wqitem = new_workitem_for_arithmetic(pipeline, prefix, cookie, key, nkey, increment, create, delta, initial, cas); DEBUG_PRINT("creating workitem %d.%d", pipeline->id, wqitem->id); - return pipeline_schedule_operation(pipeline, wqitem); -} + return_status = scheduler_schedule(pipeline, wqitem); + + if(! ((return_status == ENGINE_EWOULDBLOCK) || (return_status == ENGINE_SUCCESS))) + release_and_free(wqitem); + + return return_status; +} /*** flush ***/ static ENGINE_ERROR_CODE ndb_flush(ENGINE_HANDLE* handle, === added file 'storage/ndb/memcache/src/ndb_engine_errors.cc' --- a/storage/ndb/memcache/src/ndb_engine_errors.cc 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-03-03 04:28:09 +0000 @@ -0,0 +1,32 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights + reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA +*/ + +#include + +ndberror_struct AppError9001_ReconfLock = + { ndberror_st_temporary , ndberror_cl_application , 9001, -1, + "Could not obtain configuration read lock" + }; + +ndberror_struct AppError9002_NoNDBs = + { ndberror_st_temporary , ndberror_cl_application , 9002, -1, + "No Ndb Instances in freelist" + }; + === modified file 'storage/ndb/memcache/src/ndb_engine_private.h' --- a/storage/ndb/memcache/src/ndb_engine_private.h 2011-10-02 01:39:35 +0000 +++ b/storage/ndb/memcache/src/ndb_engine_private.h 2012-03-07 01:22:53 +0000 @@ -27,6 +27,9 @@ void read_cmdline_options(struct ndb_eng int fetch_core_settings(struct ndb_engine *, struct default_engine *); +void release_and_free(struct workitem *); + + /*************** Declarations of functions that implement the engine interface *********************************************************/ === modified file 'storage/ndb/memcache/src/ndb_error_logger.cc' --- a/storage/ndb/memcache/src/ndb_error_logger.cc 2011-12-19 01:09:00 +0000 +++ b/storage/ndb/memcache/src/ndb_error_logger.cc 2012-03-03 04:28:09 +0000 @@ -53,7 +53,8 @@ class ErrorEntry; ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE]; /* Prototypes */ -void manage_error(const NdbError &, const char * mesg, rel_time_t interval); +void manage_error(int err_code, const char * err_mesg, + const char * extra_mesg, rel_time_t interval); @@ -70,18 +71,28 @@ void ndb_error_logger_init(SERVER_CORE_A } +int log_app_error(ndberror_struct const * error) { + if(error->code < 9100) + manage_error(error->code, error->message, "Scheduler Error", 10); + else + manage_error(error->code, error->message, "Memcached Error", 10); + + return error->status; +} + + int log_ndb_error(const NdbError &error) { switch(error.status) { case ndberror_st_success: break; case ndberror_st_temporary: - manage_error(error, "NDB Temporary Error", 10); + manage_error(error.code, error.message, "NDB Temporary Error", 10); break; case ndberror_st_permanent: case ndberror_st_unknown: - manage_error(error, "NDB Error", 10); + manage_error(error.code, error.message, "NDB Error", 10); break; } /* NDB classifies "Out Of Memory" (827) errors as permament errors, but we @@ -147,33 +158,32 @@ ErrorEntry * error_table_lookup(int code /* Record the error message, and possibly log it. */ -void manage_error(const NdbError & error, const char *type_mesg, rel_time_t interval) { +void manage_error(int err_code, const char * err_mesg, + const char *type_mesg, rel_time_t interval) { char note[256]; ErrorEntry *entry = 0; bool first_ever, interval_passed, flood = false; int current = 0, prior = 0; // array indexes - if(verbose_logging == 0) { - entry = error_table_lookup(error.code, core_api->get_current_time()); - - if((entry->count | 1) == entry->count) - current = 1; // odd count - else - prior = 1; // even count + entry = error_table_lookup(err_code, core_api->get_current_time()); - /* We have four pieces of information: the first timestamp, the two - most recent timestamps, and the error count. When to write a log message? - (A) On the first occurrence of an error. - (B) If a time > interval has passed since the previous message. - (C) At certain count numbers in error flood situations - */ - first_ever = (entry->count == 1); - interval_passed = (entry->time[current] - entry->time[prior] > interval); - if(! interval_passed) - for(Uint32 i = 10 ; i <= entry->count ; i *= 10) - if(entry->count < (10 * i) && (entry->count % i == 0)) - { flood = true; break; } - } + if((entry->count | 1) == entry->count) + current = 1; // odd count + else + prior = 1; // even count + + /* We have four pieces of information: the first timestamp, the two + most recent timestamps, and the error count. When to write a log message? + (A) On the first occurrence of an error. + (B) If a time > interval has passed since the previous message. + (C) At certain count numbers in error flood situations + */ + first_ever = (entry->count == 1); + interval_passed = (entry->time[current] - entry->time[prior] > interval); + if(! interval_passed) + for(Uint32 i = 10 ; i <= entry->count ; i *= 10) + if(entry->count < (10 * i) && (entry->count % i == 0)) + { flood = true; break; } if(verbose_logging || first_ever || interval_passed || flood) { @@ -181,8 +191,26 @@ void manage_error(const NdbError & error snprintf(note, 256, "[occurrence %d of this error]", entry->count); else note[0] = '\0'; + logger->log(LOG_WARNING, 0, "%s %d: %s %s\n", - type_mesg, error.code, error.message, note); + type_mesg, err_code, err_mesg, note); + } +} + + +void ndb_error_logger_stats(ADD_STAT add_stat, const void *cookie) { + char key[128]; + char val[128]; + int klen, vlen; + unsigned int i; + Lock lock(& error_table_lock); + ErrorEntry *sym; + + for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) { + for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) { + klen = sprintf(key, "NDB_Error_%d", sym->error_code); + vlen = sprintf(val, "%lu", sym->count); + add_stat(key, klen, val, vlen, cookie); + } } } - === modified file 'storage/ndb/memcache/src/ndb_pipeline.cc' --- a/storage/ndb/memcache/src/ndb_pipeline.cc 2011-12-19 01:09:00 +0000 +++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2012-03-07 01:22:53 +0000 @@ -37,6 +37,7 @@ #include "schedulers/Stockholm.h" #include "schedulers/S_sched.h" +#include "ndb_error_logger.h" #define DEFAULT_SCHEDULER S::SchedulerWorker @@ -142,6 +143,9 @@ void pipeline_add_stats(ndb_pipeline *se conf.getConnectionPoolById(i)->add_stats(key, add_stat, cookie); } } + else if(strncasecmp(stat_key,"errors",6) == 0) { + ndb_error_logger_stats(add_stat, cookie); + } else if((strncasecmp(stat_key,"scheduler",9) == 0) || (strncasecmp(stat_key,"reconf",6) == 0)) { self->scheduler->add_stats(stat_key, add_stat, cookie); @@ -149,7 +153,14 @@ void pipeline_add_stats(ndb_pipeline *se } -void * initialize_scheduler(const char *cf, int nthreads, int athread) { +ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) { + return ndb_flush_all(self); +} + + +/* The scheduler API */ + +void * scheduler_initialize(const char *cf, int nthreads, int athread) { Scheduler *s = 0; const char *sched_options = 0; @@ -175,28 +186,21 @@ void * initialize_scheduler(const char * } -void shutdown_scheduler(void *v) { - Scheduler *s = (Scheduler *) v; - - s->shutdown(); +void scheduler_shutdown(ndb_pipeline *self) { + self->scheduler->shutdown(); } -ENGINE_ERROR_CODE pipeline_schedule_operation(ndb_pipeline *self, - struct workitem *item) { +ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *self, struct workitem *item) { return self->scheduler->schedule(item); } -ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) { - return ndb_flush_all(self); +void scheduler_release(ndb_pipeline *self, struct workitem *item) { + self->scheduler->release(item); } -void pipeline_io_completed(ndb_pipeline *self, struct workitem *item) { - self->scheduler->io_completed(item); -} - /* The slab allocator API */ === modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc' --- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-01-14 00:52:10 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-07 01:22:53 +0000 @@ -35,6 +35,8 @@ #include "thread_identifier.h" #include "workitem.h" #include "ndb_worker.h" +#include "ndb_engine_errors.h" +#include "ndb_error_logger.h" #include "S_sched.h" @@ -283,6 +285,7 @@ void S::SchedulerWorker::attach_thread(t ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) { int c = item->prefix_info.cluster_id; + ENGINE_ERROR_CODE response_code; NdbInstance *inst; S::WorkerConnection *wc; const KeyPrefix *pfx; @@ -296,7 +299,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc pthread_rwlock_unlock(& reconf_lock); } else { - logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock"); + log_app_error(& AppError9001_ReconfLock); return ENGINE_TMPFAIL; } /* READ LOCK RELEASED */ @@ -315,12 +318,11 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc all we can do is return an error. (Or, alternately, the scheduler may be shutting down.) */ - logger->log(LOG_INFO, 0, "No free NDB instances."); + log_app_error(& AppError9002_NoNDBs); return ENGINE_TMPFAIL; } - inst->wqitem = item; - workitem_set_NdbInstance(item, inst); + inst->link_workitem(item); // Fetch the query plan for this prefix. item->plan = wc->plan_set->getPlanForPrefix(pfx); @@ -331,39 +333,44 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc // Build the NDB transaction op_status_t op_status = worker_prepare_operation(item); - ENGINE_ERROR_CODE response_code; - - switch(op_status) { - case op_async_prepared: - /* Put the prepared item onto a send queue */ - wc->sendqueue->produce(inst); - DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id); - - /* This locking is explained in run_ndb_send_thread() */ - if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock - wc->conn->sem.counter++; // increment - pthread_cond_signal( & wc->conn->sem.not_zero); // signal - pthread_mutex_unlock( & wc->conn->sem.lock); // release - } - - response_code = ENGINE_EWOULDBLOCK; - break; - case op_not_supported: - DEBUG_PRINT("op_status is op_not_supported"); - response_code = ENGINE_ENOTSUP; - break; - case op_overflow: - DEBUG_PRINT("op_status is op_overflow"); - response_code = ENGINE_E2BIG; - break; - case op_async_sent: - DEBUG_PRINT("op_async_sent could be a bug"); - response_code = ENGINE_FAILED; - break; - case op_failed: - DEBUG_PRINT("op_status is op_failed"); - response_code = ENGINE_FAILED; - break; + + // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK. + if(op_status == op_async_prepared) { + /* Put the prepared item onto a send queue */ + wc->sendqueue->produce(inst); + DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id); + + /* This locking is explained in run_ndb_send_thread() */ + if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock + wc->conn->sem.counter++; // increment + pthread_cond_signal( & wc->conn->sem.not_zero); // signal + pthread_mutex_unlock( & wc->conn->sem.lock); // release + } + + response_code = ENGINE_EWOULDBLOCK; + } + else { + switch(op_status) { + case op_not_supported: + DEBUG_PRINT("op_status is op_not_supported"); + response_code = ENGINE_ENOTSUP; + break; + case op_overflow: + DEBUG_PRINT("op_status is op_overflow"); + response_code = ENGINE_E2BIG; + break; + case op_async_sent: + DEBUG_PRINT("op_async_sent could be a bug"); + response_code = ENGINE_FAILED; + break; + case op_failed: + DEBUG_PRINT("op_status is op_failed"); + response_code = ENGINE_FAILED; + break; + default: + DEBUG_PRINT("UNEXPECTED: op_status is %d", op_status); + response_code = ENGINE_FAILED; + } } return response_code; @@ -376,18 +383,19 @@ void S::SchedulerWorker::reschedule(work } -void S::SchedulerWorker::io_completed(workitem *item) { +/* Release the resources used by an operation. + Unlink the NdbInstance from the workitem, and return it to the free list + (or free it, if the scheduler is shutting down). +*/ +void S::SchedulerWorker::release(workitem *item) { DEBUG_ENTER(); NdbInstance *inst = item->ndb_instance; - item->ndb_instance = NULL; if(inst) { - assert(inst->wqitem == item); + inst->unlink_workitem(item); int c = item->prefix_info.cluster_id; S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c)); if(wc && ! wc->sendqueue->is_aborted()) { - // assert(inst->sched_gen_number == s_global->generation); - inst->wqitem = NULL; inst->next = wc->freelist; wc->freelist = inst; DEBUG_PRINT("Returned NdbInstance to freelist."); @@ -533,6 +541,7 @@ S::WorkerConnection::WorkerConnection(Sc int my_ndb_inst = conn->nInst / global->options.n_worker_threads; for(int j = 0 ; j < my_ndb_inst ; j++ ) { NdbInstance *inst = new NdbInstance(conn->conn, 2); + inst->id = ((id.thd + 1) * 10000) + j + 1; inst->next = freelist; freelist = inst; } === modified file 'storage/ndb/memcache/src/schedulers/S_sched.h' --- a/storage/ndb/memcache/src/schedulers/S_sched.h 2011-10-02 02:58:36 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2012-03-07 01:22:53 +0000 @@ -104,7 +104,7 @@ public: ENGINE_ERROR_CODE schedule(workitem *); void yield(workitem *) const {}; void reschedule(workitem *) const; - void io_completed(workitem *); + void release(workitem *); void add_stats(const char *, ADD_STAT, const void *); void shutdown(); bool global_reconfigure(Configuration *); === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc' --- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2011-12-16 10:04:43 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-03-07 01:22:53 +0000 @@ -184,8 +184,8 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s { return ENGINE_TMPFAIL; } - - workitem_set_NdbInstance(newitem, inst); + + inst->link_workitem(newitem); // Fetch the query plan for this prefix. newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx); @@ -216,12 +216,12 @@ ENGINE_ERROR_CODE Scheduler_stockholm::s } -void Scheduler_stockholm::io_completed(workitem *item) { +void Scheduler_stockholm::release(workitem *item) { DEBUG_ENTER(); NdbInstance* inst = item->ndb_instance; - item->ndb_instance = NULL; if(inst) { + inst->unlink_workitem(item); int c = item->prefix_info.cluster_id; inst->next = cluster[c].nextFree; cluster[c].nextFree = inst; === modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h' --- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2011-10-02 02:58:36 +0000 +++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-03-07 01:22:53 +0000 @@ -49,7 +49,7 @@ public: ENGINE_ERROR_CODE schedule(workitem *); void yield(workitem *) const; // inlined void reschedule(workitem *) const; // inlined - void io_completed(workitem *); + void release(workitem *); void add_stats(const char *, ADD_STAT, const void *); void shutdown(); void * run_ndb_commit_thread(int cluster_id); === modified file 'storage/ndb/memcache/src/workitem.c' --- a/storage/ndb/memcache/src/workitem.c 2011-12-11 07:31:26 +0000 +++ b/storage/ndb/memcache/src/workitem.c 2012-03-07 01:22:53 +0000 @@ -182,26 +182,14 @@ const char * workitem_get_operation(work } -void workitem_set_NdbInstance(workitem *item, CPP_NDBINSTANCE * _Ndb_instance) -{ - assert(item->ndb_instance == NULL); - item->ndb_instance = _Ndb_instance; -} - - void workitem_free(workitem *item) { - /* It's OK to free all of these; a free() with class_id 0 is a no-op */ + /* It's OK to free all of these; pipeline_free() with class_id 0 is a no-op */ pipeline_free(item->pipeline, item->row_buffer_1, item->rowbuf1_cls); pipeline_free(item->pipeline, item->row_buffer_2, item->rowbuf2_cls); pipeline_free(item->pipeline, item->ndb_key_buffer, item->keybuf1_cls); pipeline_free(item->pipeline, item->key_buffer_2, item->keybuf2_cls); - if (item->ndb_instance != NULL) - { - assert(false); // TODO Not thread safe, investigate path - } - pipeline_free(item->pipeline, item, workitem_class_id); } No bundle (reason: useless for push emails).