List:Commits« Previous MessageNext Message »
From:John David Duncan Date:December 19 2011 12:29am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3733 to 3736)
View as plain text  
 3736 John David Duncan	2011-12-18
      Address some small logging-related issues to make server more ready for production deployments.

    modified:
      storage/ndb/memcache/include/ndb_pipeline.h
      storage/ndb/memcache/src/Config_v1.cc
      storage/ndb/memcache/src/ndb_configuration.cc
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/unit/alloc.cc
 3735 John David Duncan	2011-12-18
      This patch fixes several issues with FLUSH_ALL. 
       (1) Flush large values without corrupting relationships between main table and parts table.
       (2) Flush succesfully in temporary error situations (e.g. very low DataMemory) 
       (3) Abandon operation in permanent error conditions.
       (4) Avoid flooding the log file with errors.

    added:
      storage/ndb/memcache/include/ndb_error_logger.h
      storage/ndb/memcache/src/ndb_error_logger.cc
      storage/ndb/memcache/src/ndb_flush.cc
    modified:
      storage/ndb/memcache/CMakeLists.txt
      storage/ndb/memcache/include/ExternalValue.h
      storage/ndb/memcache/include/Operation.h
      storage/ndb/memcache/include/ndb_engine.h
      storage/ndb/memcache/src/ExternalValue.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_worker.cc
 3734 John David Duncan	2011-12-18
      Create a server role called "large" which uses large (external) values by default.

    modified:
      storage/ndb/memcache/scripts/ndb_memcache_metadata.sql
      storage/ndb/memcache/scripts/update_to_1.2.sql
 3733 John David Duncan	2011-12-16 [merge]
      mysql-test/CMakeLists.txt fix with MCP comment

    modified:
      mysql-test/CMakeLists.txt
      mysql-test/mysql-test-run.pl
      storage/ndb/memcache/CMakeLists.txt
=== modified file 'storage/ndb/memcache/CMakeLists.txt'
--- a/storage/ndb/memcache/CMakeLists.txt	2011-12-16 20:13:17 +0000
+++ b/storage/ndb/memcache/CMakeLists.txt	2011-12-18 23:21:21 +0000
@@ -93,6 +93,8 @@ set(NDB_MEMCACHE_SOURCE_FILES
     src/hash_item_util.c
     src/ndb_configuration.cc
     src/ndb_engine_private.h
+    src/ndb_error_logger.cc
+    src/ndb_flush.cc
     src/ndb_pipeline.cc
     src/ndb_worker.cc
     src/schedulers

=== modified file 'storage/ndb/memcache/include/ExternalValue.h'
--- a/storage/ndb/memcache/include/ExternalValue.h	2011-12-12 21:34:19 +0000
+++ b/storage/ndb/memcache/include/ExternalValue.h	2011-12-18 23:21:21 +0000
@@ -54,6 +54,7 @@ public:
   static TableSpec * createContainerRecord(const char *);
   static op_status_t do_write(workitem *);
   static op_status_t do_delete(workitem *);
+  static int do_delete(memory_pool *, NdbTransaction *, QueryPlan *, Operation &);
   static op_status_t do_read_header(workitem *, ndb_async_callback *, worker_step *);
   static void append_after_read(NdbTransaction *, workitem *);
   static bool setupKey(workitem *, Operation &);

=== modified file 'storage/ndb/memcache/include/Operation.h'
--- a/storage/ndb/memcache/include/Operation.h	2011-12-09 20:18:16 +0000
+++ b/storage/ndb/memcache/include/Operation.h	2011-12-18 23:21:21 +0000
@@ -107,6 +107,8 @@ public: 
   // delete
   const NdbOperation *deleteTuple(NdbTransaction *tx,
                                   NdbOperation::OperationOptions *options = 0);
+  const NdbOperation *deleteCurrentTuple(NdbScanOperation *, NdbTransaction *,
+                                         NdbOperation::OperationOptions *opts = 0);
 
   // write
   const NdbOperation *writeTuple(NdbTransaction *tx);
@@ -116,10 +118,11 @@ public: 
                                   NdbOperation::OperationOptions *options = 0);
 
   // scan
-  NdbScanOperation *scanTable(NdbTransaction *tx);
+  NdbScanOperation *scanTable(NdbTransaction *tx,
+                              NdbOperation::LockMode lmod = NdbOperation::LM_Read,
+                              NdbScanOperation::ScanOptions *options = 0);
   NdbIndexScanOperation *scanIndex(NdbTransaction *tx,
                                    NdbIndexScanOperation::IndexBound *bound);
-
  };
   
 
@@ -268,8 +271,20 @@ inline const NdbOperation * 
                          row_mask, options);
 }
 
-inline NdbScanOperation * Operation::scanTable(NdbTransaction *tx) {
-  return tx->scanTable(record->ndb_record);
+inline NdbScanOperation * 
+  Operation::scanTable(NdbTransaction *tx, NdbOperation::LockMode lmode,
+                       NdbScanOperation::ScanOptions *opts) {
+    return tx->scanTable(record->ndb_record, lmode,
+                         read_mask_ptr, opts, 0);
 }
 
+inline const NdbOperation * 
+  Operation::deleteCurrentTuple(NdbScanOperation *scanop,
+                                NdbTransaction *tx,
+                                NdbOperation::OperationOptions *opts) {
+    return scanop->deleteCurrentTuple(tx, record->ndb_record, buffer, 
+                                      read_mask_ptr, opts);
+}
+
+
 #endif

=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h	2011-12-18 23:21:21 +0000
@@ -64,6 +64,7 @@ struct ndb_engine {
   struct {
     size_t nthreads;
     bool cas_enabled;  
+    size_t verbose;
   } server_options;
   
   union {

=== added file 'storage/ndb/memcache/include/ndb_error_logger.h'
--- a/storage/ndb/memcache/include/ndb_error_logger.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/include/ndb_error_logger.h	2011-12-18 23:21:21 +0000
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+#ifndef NDBMEMCACHE_NDB_ERROR_LOGGER_H
+#define NDBMEMCACHE_NDB_ERROR_LOGGER_H
+
+/* Log NDB error messages, 
+   but take care to prevent flooding the log file with repeated errors.
+*/
+
+
+#include "ndbmemcache_global.h"
+#include "workitem.h"
+
+#ifdef __cplusplus
+#include <NdbApi.hpp>
+
+enum {
+  ERR_SUCCESS = ndberror_st_success,  /* == 0 */
+  ERR_TEMP = ndberror_st_temporary,
+  ERR_PERM = ndberror_st_permanent,
+  ERR_UR   = ndberror_st_unknown
+};
+
+int log_ndb_error(const NdbError &);
+#endif
+
+
+DECLARE_FUNCTIONS_WITH_C_LINKAGE
+
+void ndb_error_logger_init(SERVER_CORE_API *, size_t log_level);
+
+END_FUNCTIONS_WITH_C_LINKAGE
+
+
+#endif
+

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	2011-12-07 00:10:27 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	2011-12-18 23:26:44 +0000
@@ -100,7 +100,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
 
 /** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline();
+ndb_pipeline * get_request_pipeline(int thd_id);
 
 /** call into a pipeline for its own statistics */
 void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);

=== modified file 'storage/ndb/memcache/scripts/ndb_memcache_metadata.sql'
--- a/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/scripts/ndb_memcache_metadata.sql	2011-12-18 23:15:39 +0000
@@ -252,6 +252,7 @@ INSERT INTO memcache_server_roles (role_
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("db-only", 1);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("mc-only", 2);
 INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("ndb-caching", 3);
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
 
 -- ndb_clusters table 
 -- Create an entry for the primary cluster.
@@ -303,7 +304,9 @@ INSERT INTO key_prefixes (server_role_id
 
          (3, "",    0, "caching", "demo_table"),    /* ndb-caching role */
          (3, "t:",  0, "caching", "demo_tabs"),
-         (3, "b:",  0, "caching", "demo_ext")
+         (3, "b:",  0, "caching", "demo_ext"),
+         
+         (4, ""  ,  0, "ndb-test", "demo_ext");
 ;
 
 

=== modified file 'storage/ndb/memcache/scripts/update_to_1.2.sql'
--- a/storage/ndb/memcache/scripts/update_to_1.2.sql	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/scripts/update_to_1.2.sql	2011-12-18 23:15:39 +0000
@@ -48,6 +48,8 @@ CREATE TABLE IF NOT EXISTS `external_val
 
 INSERT INTO meta VALUES ("ndbmemcache", "1.2");
 
+INSERT INTO memcache_server_roles (role_name, role_id) VALUES ("large", 4);
+
 UPDATE  containers 
   SET   expire_time_column = "expire_time",
         flags = "flags" 
@@ -67,6 +69,8 @@ INSERT INTO key_prefixes (server_role_id
           (0, "b:",  0, "ndb-test", "demo_ext"),
           (1, "b:",  0, "ndb-only", "demo_ext"),
           (3, "t:",  0, "caching", "demo_tabs"),
-          (3, "b:",  0, "caching", "demo_ext")     ;
+          (3, "b:",  0, "caching", "demo_ext"),
+          (4, ""  ,  0, "ndb-test", "demo_ext");
+
  
 

=== modified file 'storage/ndb/memcache/src/Config_v1.cc'
--- a/storage/ndb/memcache/src/Config_v1.cc	2011-12-12 05:08:03 +0000
+++ b/storage/ndb/memcache/src/Config_v1.cc	2011-12-18 23:26:44 +0000
@@ -463,7 +463,6 @@ bool config_v1::store_prefix(const char 
                              TableSpec *table, 
                              int cluster_id, 
                              char *cache_policy) {
-  DEBUG_PRINT("%s", name);
   KeyPrefix prefix(name);
   prefix_info_t * info_ptr;
   

=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/ExternalValue.cc	2011-12-18 23:21:21 +0000
@@ -70,6 +70,42 @@ TableSpec * ExternalValue::createContain
 }
 
 
+/* This is called from FLUSH_ALL.
+   It returns the number of parts deleted.
+   It uses a memory pool, passed in, to allocate key buffers.
+*/
+int ExternalValue::do_delete(memory_pool *mpool, NdbTransaction *delTx, 
+                             QueryPlan *plan, Operation & op) {
+  Uint32 id, nparts = 0;
+  QueryPlan * extern_plan = plan->extern_store;
+  
+  if(extern_plan 
+     && ! (op.isNull(COL_STORE_EXT_SIZE) || op.isNull(COL_STORE_EXT_ID))) {
+
+    /* How many parts? */
+    Uint32 stripe_size = extern_plan->val_record->value_length;
+    Uint32 len = op.getIntValue(COL_STORE_EXT_SIZE);
+    id  = op.getIntValue(COL_STORE_EXT_ID);  
+    nparts = len / stripe_size;
+    if(len % stripe_size) nparts += 1;
+
+    /* Delete them */
+    int key_size = extern_plan->key_record->rec_size;
+    
+    for(int i = 0; i < nparts ; i++) {
+      Operation part_op(extern_plan);
+      part_op.key_buffer = (char *) memory_pool_alloc(mpool, key_size);
+      
+      part_op.clearKeyNullBits();
+      part_op.setKeyPartInt(COL_STORE_KEY + 0, id);
+      part_op.setKeyPartInt(COL_STORE_KEY + 1, i);    
+      part_op.deleteTuple(delTx);
+    }
+  }
+  return nparts;
+}
+
+
 inline bool ExternalValue::setupKey(workitem *item, Operation &op) {
   op.key_buffer = item->ndb_key_buffer;
   op.clearKeyNullBits();

=== modified file 'storage/ndb/memcache/src/ndb_configuration.cc'
--- a/storage/ndb/memcache/src/ndb_configuration.cc	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/src/ndb_configuration.cc	2011-12-18 23:26:44 +0000
@@ -31,6 +31,7 @@
 #include "NdbInstance.h"
 #include "thread_identifier.h"
 #include "Scheduler.h"
+#include "ExternalValue.h"
 
 /* A static global variable */
 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -158,9 +159,10 @@ void disconnect_all() {
 /* This function has C linkage */
 void print_debug_startup_info() {
   int sz[4];
-  DEBUG_PRINT("  sizeof Ndb        : %lu", sz[0]=sizeof(Ndb));
-  DEBUG_PRINT("  sizeof NdbInstance: %lu", sz[1]=sizeof(NdbInstance));
-  DEBUG_PRINT("  sizeof workitem: %lu", sizeof(workitem));
+  DEBUG_PRINT("  sizeof Ndb           : %lu", sz[0]=sizeof(Ndb));
+  DEBUG_PRINT("  sizeof NdbInstance   : %lu", sz[1]=sizeof(NdbInstance));
+  DEBUG_PRINT("  sizeof workitem      : %lu", sizeof(workitem));
+  DEBUG_PRINT("  sizeof ExternalValue : %lu", sizeof(ExternalValue));
 }
 
 

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2011-12-18 23:21:21 +0000
@@ -38,6 +38,7 @@
 #include "Scheduler.h"
 #include "thread_identifier.h"
 #include "timing.h"
+#include "ndb_error_logger.h"
 
 /* Global variables */
 EXTENSION_LOGGER_DESCRIPTOR *logger;
@@ -205,6 +206,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
   fetch_core_settings(ndb_eng, def_eng);
   nthreads = ndb_eng->server_options.nthreads;
 
+  /* Initialize the error handler */
+  ndb_error_logger_init(def_eng->server.core, ndb_eng->server_options.verbose);
+
   logger->log(LOG_WARNING, NULL, "Server started with %d threads.\n", nthreads);
   logger->log(LOG_WARNING, NULL, "Priming the pump ... ");
   timing_point(& pump_time);
@@ -224,7 +228,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ndb_eng->pipelines  = malloc(nthreads * sizeof(void *));
   ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
   for(i = 0 ; i < nthreads ; i++) {
-    ndb_eng->pipelines[i] = get_request_pipeline();
+    ndb_eng->pipelines[i] = get_request_pipeline(i);
     ndb_eng->schedulers[i] = 
       initialize_scheduler(ndb_eng->startup_options.scheduler, nthreads, i);
     if(ndb_eng->schedulers[i] == 0) {
@@ -789,6 +793,9 @@ int fetch_core_settings(struct ndb_engin
     { .key = "num_threads",
       .datatype = DT_SIZE,
       .value.dt_size = &engine->server_options.nthreads },
+    { .key = "verbosity",
+      .datatype = DT_SIZE,
+      .value.dt_size = &engine->server_options.verbose },
     { .key = NULL }
   };
   

=== added file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc	2011-12-18 23:21:21 +0000
@@ -0,0 +1,188 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+/* System headers */
+#include <pthread.h>
+#include <time.h>
+#include <stdio.h>
+#include <string.h>
+#include <assert.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include "memcached/extension_loggers.h"
+#include "memcached/server_api.h"
+
+#include "ndb_error_logger.h"
+
+
+/* ***********************************************************************
+   ndb_error_logger 
+
+   Log NDB error messages, but try to avoid flooding the logfile with them. 
+   *********************************************************************** 
+*/
+
+
+/* Memcached externals */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+SERVER_CORE_API * core_api;
+size_t verbose_logging;
+
+/* Internal Static Globals and Declarations */
+#define ERROR_HASH_TABLE_SIZE 251
+pthread_mutex_t error_table_lock;
+class ErrorEntry;
+ErrorEntry * error_hash_table[ERROR_HASH_TABLE_SIZE];
+
+/* Prototypes */
+void manage_error(const NdbError &, const char * mesg, int interval);
+
+
+
+/********* PUBLIC API *************************************/
+/* Initialize the NDB Error Logger */
+void ndb_error_logger_init(SERVER_CORE_API * api, size_t level) {
+  int r = pthread_mutex_init(& error_table_lock, NULL);
+  if(r) logger->log(LOG_WARNING,0, "CANNOT INIT ERROR MUTEX: %d\n", r);
+  core_api = api;
+  verbose_logging = level;
+  
+  for(int i = 0; i < ERROR_HASH_TABLE_SIZE; i++) 
+    error_hash_table[i] = 0;
+}
+
+
+int log_ndb_error(const NdbError &error) {
+  switch(error.status) {
+    case ndberror_st_success:
+      break;
+
+    case ndberror_st_temporary:
+      manage_error(error, "NDB Temporary Error", 10);
+      break;
+
+    case ndberror_st_permanent:
+    case ndberror_st_unknown:
+      manage_error(error, "NDB Error", 10);
+      break;
+  }
+  /* NDB classifies "Out Of Memory" (827) errors as permament errors, but we 
+     reclassify them to temporary */
+  if(error.classification == NdbError::InsufficientSpace)
+    return ERR_TEMP;
+  return error.status;
+}
+
+
+/********* IMPLEMENTATION *******************************/
+
+class ErrorEntry {
+public:
+  unsigned int error_code;
+  rel_time_t first;
+  rel_time_t time[2];   /* odd and even timestamps */
+  Uint32 count;
+  ErrorEntry *next;
+  
+  ErrorEntry(int code, rel_time_t tm) :
+    error_code(code), first(tm), count(1), next(0) 
+  { 
+    time[0] = 0;
+    time[1] = tm;
+  };
+};
+
+
+class Lock {
+public:
+  pthread_mutex_t *mutex;
+  int status;
+  Lock(pthread_mutex_t *m) : mutex(m) { status = pthread_mutex_lock(mutex); }
+  ~Lock()                             { pthread_mutex_unlock(mutex); }
+};
+
+ErrorEntry * error_table_lookup(int code, rel_time_t now);
+
+
+/* Lock the error table and look up an error. 
+   If found, increment the count and set either the odd or even timestamp.
+   If not found, create.
+*/
+ErrorEntry * error_table_lookup(int code, rel_time_t now) {
+  int hash_val = code % ERROR_HASH_TABLE_SIZE;
+  Lock lock(& error_table_lock);
+  ErrorEntry *sym;
+  
+  for(sym = error_hash_table[hash_val] ; sym != 0 ; sym = sym->next) {
+    if(sym->error_code == code) {
+      sym->time[(++(sym->count)) % 2] = now;
+      return sym;
+    }
+  }
+
+  /* Create */
+  sym = new ErrorEntry(code, now);
+  sym->next = error_hash_table[hash_val];
+  error_hash_table[hash_val] = sym;
+  return sym;
+}
+
+
+/* Record the error message, and possibly log it. */
+void manage_error(const NdbError & error, const char *type_mesg, int interval) {
+  char note[256];
+  ErrorEntry *entry = 0;
+  bool first_ever, interval_passed, flood = false;
+  int current = 0, prior = 0;  // array indexes
+
+  if(verbose_logging == 0) { 
+    entry = error_table_lookup(error.code, core_api->get_current_time());
+
+    if((entry->count | 1) == entry->count)
+      current = 1;  // odd count
+    else
+      prior   = 1;  // even count
+
+    /* We have four pieces of information: the first timestamp, the two 
+       most recent timestamps, and the error count. When to write a log message?
+       (A) On the first occurrence of an error. 
+       (B) If a time > interval has passed since the previous message.
+       (C) At certain count numbers in error flood situations
+    */
+    first_ever = (entry->count == 1);
+    interval_passed = (entry->time[current] - entry->time[prior] > interval);
+    if(! interval_passed) 
+      for(int i = 10 ; i <= entry->count ; i *= 10) 
+        if(entry->count < (10 * i) && (entry->count % i == 0))
+          { flood = true; break; }
+  }
+  
+  if(verbose_logging || first_ever || interval_passed || flood) 
+  {
+    if(flood) 
+      snprintf(note, 256, "[occurrence %d of this error]", entry->count);
+    else
+      note[0] = '\0';
+    logger->log(LOG_WARNING, 0, "%s %d: %s %s\n", 
+                type_mesg, error.code, error.message, note);
+  }
+}
+  

=== added file 'storage/ndb/memcache/src/ndb_flush.cc'
--- a/storage/ndb/memcache/src/ndb_flush.cc	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/memcache/src/ndb_flush.cc	2011-12-18 23:21:21 +0000
@@ -0,0 +1,290 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights
+ reserved.
+ 
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License
+ as published by the Free Software Foundation; version 2 of
+ the License.
+ 
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+ 
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301  USA
+ */
+
+/* System headers */
+#define __STDC_FORMAT_MACROS 
+#include <assert.h>
+#include <ctype.h>
+
+/* Memcache headers */
+#include "memcached/types.h"
+#include <memcached/extension_loggers.h>
+
+/* NDB headers */
+#include "NdbApi.hpp"
+
+/* NDB Memcache headers */
+#include "ndbmemcache_global.h"
+#include "Configuration.h"
+#include "ExternalValue.h"
+#include "debug.h"
+#include "Operation.h"
+#include "NdbInstance.h"
+#include "ndb_pipeline.h"
+#include "ndb_error_logger.h"
+#include "ndb_worker.h"
+
+/* Extern pointers */
+extern EXTENSION_LOGGER_DESCRIPTOR *logger;
+
+/* Scan helpers */
+
+// nextResult() return values:
+enum { 
+  fetchError         = -1,
+  fetchOK            =  0,
+  fetchScanFinished  =  1,
+  fetchCacheEmpty    =  2
+};
+
+enum { fetchFromThisBatch = false, fetchNewBatchFromKernel = true };
+enum { SendImmediate = true, sendDeferred = false };
+
+bool scan_delete(NdbInstance *, QueryPlan *);
+bool scan_delete_ext_val(ndb_pipeline *, NdbInstance *, QueryPlan *);
+
+
+/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
+
+/* Flush all is a fully synchronous operation -- 
+ the memcache server is waiting for a response, and the thread is blocked.
+ */
+ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
+  DEBUG_ENTER();
+  bool r;
+  const Configuration &conf = get_Configuration();
+  
+  DEBUG_PRINT(" %d prefixes", conf.nprefixes);
+  for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
+    const KeyPrefix *pfx = conf.getPrefix(i);
+    if(pfx->info.use_ndb && pfx->info.do_db_flush) {
+      ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
+      Ndb_cluster_connection *conn = pool->getMainConnection();
+      NdbInstance inst(conn, 128);
+      QueryPlan plan(inst.db, pfx->table);
+      if(plan.pk_access) {
+        // To flush, scan the table and delete every row
+        if(plan.canHaveExternalValue()) {
+          DEBUG_PRINT("prefix %d - doing ExternalValue delete");
+          r = scan_delete_ext_val(pipeline, &inst, &plan);
+        }
+        else {
+          DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
+          r = scan_delete(&inst, &plan);
+        }
+        if(! r) logger->log(LOG_WARNING, 0, "-- FLUSH_ALL Failed.\n");
+      }
+      else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
+                       "is not primary key", i, pfx->table->table_name);
+    }
+    else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
+                     i, pfx->table ? pfx->table->table_name : "",
+                     pfx->info.use_ndb, pfx->info.do_db_flush);
+  }
+  
+  return ENGINE_SUCCESS;
+}
+
+
+bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
+  DEBUG_ENTER();
+  bool rescan, fetch_option;
+  int rFetch, rExec, rDel, batch_size, rows_deleted;
+  int error_status = 0;
+  const int max_errors = 100000;
+  
+  struct {
+    unsigned int errors;
+    unsigned int rows;
+    unsigned short scans;
+    unsigned short commit_batches;
+  } stats = {0, 0, 0, 0 };
+  
+  /* The outer loop performs an initial table scan and then possibly some 
+     rescans, which are triggered whenever some rows have been scanned but, 
+     due to an error condition, have not been deleted.
+   */
+  do {
+    batch_size = 1;   /* slow start. */
+    stats.scans += 1;
+    rescan = false;
+    
+    NdbTransaction *scanTx = inst->db->startTransaction();
+    NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
+    
+    /* Express intent to read with exclusive lock; execute NoCommit */
+    scan->readTuplesExclusive();
+    rExec = scanTx->execute(NdbTransaction::NoCommit);
+    if(rExec != 0) {
+      stats.errors++;
+      error_status = log_ndb_error(scanTx->getNdbError());
+      break;
+    }
+    
+    /* Within a scan, this loop iterates over batches.
+       Batches are committed whenever the batch_size has been reached.
+       Batch size starts at 1 and doubles when a batch is succesful, 
+       until it reaches the result cache size.
+     */
+    while(1) {
+      stats.commit_batches++;
+      NdbTransaction *delTx = inst->db->startTransaction();
+      rows_deleted = 0;
+      fetch_option = fetchNewBatchFromKernel; 
+      bool fetch_more;
+      
+      /* The inner loop iterates over rows within a batch */      
+      do {
+        fetch_more = false;
+        rFetch = scan->nextResult(fetch_option, SendImmediate);
+        switch(rFetch) {
+          case fetchError:
+            stats.errors++;
+            error_status = log_ndb_error(scan->getNdbError());
+            break; 
+            
+          case fetchOK:
+            rDel = scan->deleteCurrentTuple(delTx);
+            if(rDel == 0) {
+              fetch_more = ( ++rows_deleted < batch_size);
+              fetch_option = fetchFromThisBatch;
+            }
+            else {
+              stats.errors++;
+              error_status = log_ndb_error(delTx->getNdbError());
+            }
+            break;
+            
+          case fetchScanFinished:        
+          case fetchCacheEmpty:
+          default:
+            break;        
+        }
+      } while(fetch_more); /* break out of the inner loop to here */
+      
+      /* Quit now if errors were serious */
+      if(error_status > ERR_TEMP)
+        break;
+
+      /* Execute the batch */
+      rExec = delTx->execute(NdbTransaction::Commit, NdbOperation::AbortOnError, 1);
+      if(rExec == 0) {
+        stats.rows += rows_deleted;
+        if(rFetch != fetchCacheEmpty) 
+          batch_size *= 2;
+      }
+      else {
+        stats.errors++;
+        error_status = log_ndb_error(delTx->getNdbError());
+        if(batch_size > 1) 
+          batch_size /= 2;
+        rescan = true;
+      }
+      
+      delTx->close();
+      
+      if(rFetch == fetchScanFinished || (stats.errors > max_errors))
+        break;
+    } /* break out of the batch loop to here */
+    
+    scanTx->close();
+  } while(rescan && (error_status < ERR_PERM) && stats.errors < max_errors);
+  
+  logger->log(LOG_WARNING, 0, "Flushed rows from %s.%s: "
+              "Scans: %d  Batches: %d  Rows: %d  Errors: %d",
+              plan->spec->schema_name, plan->spec->table_name, 
+              stats.scans, stats.commit_batches, stats.rows, stats.errors);
+  
+  return (stats.rows || ! stats.errors);  
+}
+
+
+/* External Values require a different version of FLUSH_ALL, which preserves
+   the referential integrity between the main table and the parts table
+   while deleting.   This one uses the NdbRecord variant of a scan and commits
+   once for each row of the main table.
+*/
+bool scan_delete_ext_val(ndb_pipeline *pipeline, NdbInstance *inst, 
+                         QueryPlan *plan) {
+  DEBUG_ENTER();
+  int r, ext_rows, error_status = 0;
+  bool fetch_more;
+  struct {
+    Uint32 main_rows;
+    Uint32 ext_rows;
+    Uint32 errors;
+  } stats = {0, 0, 0 };
+
+  /* Need KeyInfo when performing scanning delete */
+  NdbScanOperation::ScanOptions opts;
+  opts.optionsPresent=NdbScanOperation::ScanOptions::SO_SCANFLAGS;
+  opts.scan_flags=NdbScanOperation::SF_KeyInfo;
+  
+  memory_pool * pool = pipeline_create_memory_pool(pipeline);
+  NdbTransaction *scanTx = inst->db->startTransaction();
+  Operation op(plan, OP_SCAN);  
+  op.readSelectedColumns();
+  op.readColumn(COL_STORE_EXT_SIZE);
+  op.readColumn(COL_STORE_EXT_ID);
+    
+  NdbScanOperation *scan = op.scanTable(scanTx, NdbOperation::LM_Exclusive, &opts);
+  r = scanTx->execute(NdbTransaction::NoCommit); 
+  
+  if(r == 0) {   /* Here's the scan loop */
+    do {
+      fetch_more = false;
+      r = scan->nextResult((const char **) & op.buffer, true, true);
+      if(r == fetchOK) {
+        fetch_more = true;      
+        NdbTransaction * delTx = inst->db->startTransaction();
+
+        op.deleteCurrentTuple(scan, delTx);                        // main row
+        ext_rows = ExternalValue::do_delete(pool, delTx, plan, op);  // parts
+
+        r = delTx->execute(NdbTransaction::Commit, 
+                           NdbOperation::AbortOnError, 
+                           SendImmediate);
+        if(r)
+          error_status = log_ndb_error(delTx->getNdbError()), stats.errors++;
+        else 
+          stats.main_rows++, stats.ext_rows += ext_rows;
+   
+        memory_pool_free(pool);
+        delTx->close();
+      }
+      else {
+        break;
+      }
+    } while(fetch_more && (error_status < ERR_PERM));
+  }
+  
+  memory_pool_destroy(pool);
+  scanTx->close();
+
+  logger->log(LOG_WARNING, 0, "Flushed %d rows from %s plus "
+              "%d rows from %s.  Errors: %d\n",  
+              stats.main_rows, plan->spec->table_name, 
+              stats.ext_rows, plan->extern_store->spec->table_name,
+              stats.errors);
+  
+  return (stats.main_rows || ! stats.errors);
+}
+
+

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2011-12-07 00:10:27 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2011-12-18 23:26:44 +0000
@@ -85,15 +85,14 @@ ndb_pipeline * ndb_pipeline_initialize(s
   /* Fetch the partially initialized pipeline */
   ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
   
-  /* Set my id */
-  self->id = id;
-    
+  assert(self->id == id);
+  
   /* Set the pointer back to the engine */
   self->engine = engine;
   
   /* And the thread id */
   self->engine_thread_id = pthread_self(); 
-    
+
   /* Create and set a thread identity */
   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
   tid->pipeline = self;
@@ -109,13 +108,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
 
 
 /* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline() { 
+ndb_pipeline * get_request_pipeline(int thd_id) { 
   /* Allocate the pipeline */
   ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline)); 
   
-  /* Initialize */
+  /* Initialize */  
   self->engine = 0;
-  self->id = 0;  
+  self->id = thd_id;
   self->nworkitems = 0;
 
   /* Say hi to the alligator */  

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2011-12-18 23:21:21 +0000
@@ -54,6 +54,7 @@
 #include "ndb_engine.h"
 #include "hash_item_util.h"
 #include "ndb_worker.h"
+#include "ndb_error_logger.h"
 
 /**********************************************************
   Schedduler::schedule()
@@ -122,7 +123,6 @@ worker_step worker_finalize_write;
 /* Misc utility functions */
 void worker_set_cas(ndb_pipeline *, uint64_t *);
 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
-bool scan_delete(NdbInstance *, QueryPlan *);
 void build_hash_item(workitem *, Operation &,  ExpireTime &);
 
 /* Extern pointers */
@@ -166,6 +166,9 @@ status_block status_block_idx_insert = 
 status_block status_block_too_big = 
   { ENGINE_E2BIG, "Value too large"                   };
 
+status_block status_block_no_mem =
+  { ENGINE_ENOMEM, "NDB out of data memory"           }; 
+
 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {  
   /* Be careful here --  ndbmc_atomic32_t might be a signed type.
      Shitfting of signed types behaves differently. */
@@ -723,10 +726,21 @@ void callback_main(int, NdbTransaction *
   else if(tx->getNdbError().code == 897) {
     wqitem->status = & status_block_idx_insert;
   }
+  /* Out of memory */
+  else if(tx->getNdbError().code == 827) {
+    log_ndb_error(tx->getNdbError());
+    wqitem->status = & status_block_no_mem;
+  }
+  /* 284: Table not defined in TC (stale definition) */
+  else if(tx->getNdbError().code == 284) {
+    /* TODO: find a way to handle this error, after an ALTER TABLE */
+    log_ndb_error(tx->getNdbError());
+    wqitem->status = & status_block_misc_error;
+  }
+  
   /* Some other error */
   else  {
-    DEBUG_PRINT("[%d]: %s", 
-                       tx->getNdbError().code, tx->getNdbError().message);
+    log_ndb_error(tx->getNdbError());
     wqitem->status = & status_block_misc_error;
   }
 
@@ -1105,151 +1119,3 @@ int build_cas_routine(NdbInterpretedCode
   return r->finalise();                      // resolve the label/branch
 }
 
-
-/*************** SYNCHRONOUS IMPLEMENTATION OF "FLUSH ALL" ******************/
-
-/* Flush all is a fully synchronous operation -- 
-   the memcache server is waiting for a response, and the thread is blocked.
-*/
-ENGINE_ERROR_CODE ndb_flush_all(ndb_pipeline *pipeline) {
-  DEBUG_ENTER();
-  const Configuration &conf = get_Configuration();
-  
-  DEBUG_PRINT(" %d prefixes", conf.nprefixes);
-  for(unsigned int i = 0 ; i < conf.nprefixes ; i++) {
-    const KeyPrefix *pfx = conf.getPrefix(i);
-    if(pfx->info.use_ndb && pfx->info.do_db_flush) {
-      ClusterConnectionPool *pool = conf.getConnectionPoolById(pfx->info.cluster_id);
-      Ndb_cluster_connection *conn = pool->getMainConnection();
-      NdbInstance inst(conn, 128);
-      QueryPlan plan(inst.db, pfx->table);
-      if(plan.pk_access) {
-        // To flush, scan the table and delete every row
-        DEBUG_PRINT("prefix %d - deleting from %s", i, pfx->table->table_name);
-        scan_delete(&inst, &plan);
-        // If there is an external store, also delete from the large value table
-        if(plan.canHaveExternalValue()) {
-          DEBUG_PRINT("prefix %d - deleting from %s", i, 
-                      plan.extern_store->spec->table_name);
-          scan_delete(&inst, plan.extern_store);
-        }
-      }
-      else DEBUG_PRINT("prefix %d - not scanning table %s -- accees path "
-                       "is not primary key", i, pfx->table->table_name);
-    }
-    else DEBUG_PRINT("prefix %d - not scanning table %s -- use_ndb:%d flush:%d",
-                     i, pfx->table ? pfx->table->table_name : "",
-                     pfx->info.use_ndb, pfx->info.do_db_flush);
-  }
-  
-  return ENGINE_SUCCESS;
-}
-
-
-bool scan_delete(NdbInstance *inst, QueryPlan *plan) {
-  DEBUG_ENTER();
-  int check;
-  bool rescan;
-  int res = 0;
-  const int max_batch_size = 1000;
-  int batch_size = 1;
-  int delTxRowCount = 0;
-  int force_send = 1;
-  struct {
-    unsigned short scans;
-    unsigned short errors;
-    unsigned short rows;
-    unsigned short commit_batches;
-  } stats = {0, 0, 0, 0 };
-  
-  /* To securely scan a whole table, use an outer transaction only for the scan, 
-     but take over each lock in an inner transaction (with a row count) that 
-     deletes 1000 rows per transaction 
-  */  
-  do {
-    stats.scans += 1;
-    rescan = false;
-    NdbTransaction *scanTx = inst->db->startTransaction();
-    NdbTransaction *delTx = inst->db->startTransaction();
-    NdbScanOperation *scan = scanTx->getNdbScanOperation(plan->table);
-    scan->readTuplesExclusive();
-    
-    /* execute NoCommit */
-    if((res = scanTx->execute(NdbTransaction::NoCommit)) != 0) 
-      logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", 
-                  scanTx->getNdbError().message);
-    
-    /* scan and delete.  delTx takes over the lock. */
-    while(scan->nextResult(true) == 0) {
-      do {
-        if((res = scan->deleteCurrentTuple(delTx)) == 0) {
-          delTxRowCount += 1;
-        }
-        else {      
-          logger->log(LOG_WARNING, 0, "deleteCurrentTuple(): %s\n", 
-                      scanTx->getNdbError().message);
-        }
-       } while((check = scan->nextResult(false)) == 0);
-      
-      /* execute a batch (NoCommit) */
-      if(check != -1) {
-        res = delTx->execute(NdbTransaction::NoCommit,
-                             NdbOperation::AbortOnError, force_send);
-        if(res != 0) {
-          stats.errors += 1;
-          if(delTx->getNdbError().code == 827) { 
-            /* DataMemory is full, and the kernel could not create a Copy Tuple
-               for a deleted row.  Rollback this batch, turn off force-send 
-               (for throttling), make batches smalller, and trigger a
-               rescan to clean up these rows. */
-            rescan = true;
-            delTx->execute(NdbTransaction::Rollback);
-            delTx->close();
-            delTx = inst->db->startTransaction();
-            delTxRowCount = 0;
-            if(batch_size > 1) batch_size /= 2;
-            force_send = 0;
-          }
-          else {
-            logger->log(LOG_WARNING, 0, "execute(NoCommit): %s\n", 
-                        delTx->getNdbError().message);
-          }
-        }
-      }
-      
-      /* Execute & commit a batch */
-      if(delTxRowCount >= batch_size) {
-        stats.commit_batches += 1;
-        res = delTx->execute(NdbTransaction::Commit, 
-                             NdbOperation::AbortOnError, force_send);
-        if(res != 0) {
-          stats.errors++;
-          logger->log(LOG_WARNING, 0, "execute(Commit): %s\n", 
-                      delTx->getNdbError().message);
-        }
-        stats.rows += delTxRowCount;
-        delTxRowCount = 0;
-        delTx->close();
-        delTx = inst->db->startTransaction();
-        batch_size *= 2;
-        if(batch_size > max_batch_size) {
-          batch_size = max_batch_size;
-          force_send = 1;
-        }
-      }
-    }
-    /* Final execute & commit */
-    res = delTx->execute(NdbTransaction::Commit);
-    delTx->close();
-    scanTx->close();
-
-  } while(rescan);
-  
-  logger->log(EXTENSION_LOG_INFO, 0, "Flushed all rows from %s.%s: "
-              "Scans: %d  Batches: %d  Rows: %d  Errors: %d",
-              plan->spec->schema_name, plan->spec->table_name, 
-              stats.scans, stats.commit_batches, stats.rows, stats.errors);
-
-  return (res == 0);
-}
-

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2011-12-16 10:04:43 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2011-12-18 23:26:44 +0000
@@ -125,7 +125,7 @@ void S::SchedulerGlobal::reconfigure(Con
 
 void S::SchedulerGlobal::shutdown() {
   if(running) {
-    logger->log(LOG_WARNING, 0, "Shutting down scheduler.");
+    logger->log(LOG_INFO, 0, "Shutting down scheduler.");
 
     /* First shut down each WorkerConnection */
     for(int i = 0; i < nclusters ; i++) {
@@ -249,8 +249,6 @@ void S::SchedulerGlobal::add_stats(const
 void S::SchedulerWorker::init(int my_thread, 
                               int nthreads, 
                               const char * config_string) {
-  
-  DEBUG_ENTER_METHOD("S::SchedulerWorker::init");
   /* On the first call in, initialize the SchedulerGlobal.
    * This will start the send & poll threads for each connection.
    */
@@ -298,6 +296,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
     pthread_rwlock_unlock(& reconf_lock);
   }
   else {
+    logger->log(LOG_INFO, 0, "S Scheduler could not acquire read lock");
     return ENGINE_TMPFAIL;
   }
   /* READ LOCK RELEASED */
@@ -316,6 +315,7 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
      all we can do is return an error. 
      (Or, alternately, the scheduler may be shutting down.)
      */
+    logger->log(LOG_INFO, 0, "No free NDB instances.");
     return ENGINE_TMPFAIL;
   }
   
@@ -514,7 +514,6 @@ void S::Cluster::add_stats(const char *s
 
 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
                                       int thd_id, int cluster_id) {
-  DEBUG_ENTER_METHOD("S::WorkerConnection::WorkerConnection");
   S::Cluster *cl = global->clusters[cluster_id];  
   Configuration *conf = global->conf;
 

=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc	2011-09-30 16:22:30 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc	2011-12-18 23:26:44 +0000
@@ -28,7 +28,7 @@
 #include "all_tests.h"
 
 int run_allocator_test(QueryPlan *, Ndb *, int v) {
-  struct request_pipeline *p = get_request_pipeline();
+  struct request_pipeline *p = get_request_pipeline(0);
   
   memory_pool *p1 = pipeline_create_memory_pool(p);
   int sz = 13;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (john.duncan:3733 to 3736) John David Duncan19 Dec