Below is the list of changes that have just been committed into a local
5.1 repository of knielsen. When knielsen does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2007-02-08 12:56:43+01:00, knielsen@ymer.(none) +20 -0
WL#2223: NdbRecord.
Clean up NdbRecord implementation for PK operations.
Implement scan operation using NdbRecord.
storage/ndb/include/ndbapi/NdbDictionary.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +23 -30
Clean up NdbRecord, include index scan support.
storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +34 -1
NdbRecord scan support.
storage/ndb/include/ndbapi/NdbOperation.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +26 -12
Clean up NdbRecord implementation.
storage/ndb/include/ndbapi/NdbReceiver.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +95 -12
NdbRecord scan support.
storage/ndb/include/ndbapi/NdbScanOperation.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +104 -1
NdbRecord scan support.
storage/ndb/include/ndbapi/NdbTransaction.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +97 -97
Clean up NdbRecord implementation.
NdbRecord scan support.
storage/ndb/include/util/Bitmask.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +1 -1
Comment clarification.
storage/ndb/src/ndbapi/NdbDictionary.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +7 -34
Clean up NdbRecord implementation.
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +145 -54
Clean up NdbRecord, include index scan support.
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +91 -41
Clean up NdbRecord, include index scan support.
storage/ndb/src/ndbapi/NdbOperation.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +1 -0
NdbRecord cleanup.
storage/ndb/src/ndbapi/NdbOperationDefine.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +1 -1
Comment clarification.
storage/ndb/src/ndbapi/NdbOperationExec.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +89 -72
NdbRecord scan support.
storage/ndb/src/ndbapi/NdbOperationInt.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +6 -6
NdbRecord scan support.
storage/ndb/src/ndbapi/NdbReceiver.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +152 -46
NdbRecord scan support.
storage/ndb/src/ndbapi/NdbScanOperation.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +568 -18
NdbRecord scan support.
storage/ndb/src/ndbapi/NdbTransaction.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +292 -355
NdbRecord scan support.
Clean up NdbRecord implementation.
storage/ndb/src/ndbapi/ndberror.c@stripped, 2007-02-08 12:56:38+01:00, knielsen@ymer.(none)
+7 -1
NdbRecord scan support.
storage/ndb/test/ndbapi/flexBench.cpp@stripped, 2007-02-08 12:56:38+01:00,
knielsen@ymer.(none) +38 -66
Clean up NdbRecord implementation.
support-files/build-tags@stripped, 2007-02-08 12:56:38+01:00, knielsen@ymer.(none) +1 -1
Add TAGS support for NDB files.
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: knielsen
# Host: ymer.(none)
# Root: /usr/local/mysql/mysql-5.1-wl2223
--- 1.28/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp 2007-02-08 12:56:51 +01:00
+++ 1.29/storage/ndb/include/ndbapi/NdbIndexScanOperation.hpp 2007-02-08 12:56:51 +01:00
@@ -162,6 +162,29 @@ public:
*/
int get_range_no();
+ /* Structure used to describe index scan bounds, for NdbRecord scans. */
+ struct IndexBound {
+ /* Row containing lower bound, or NULL for scan from the start. */
+ const char *low_key;
+ /* Number of columns in lower bound, for bounding by partial prefix. */
+ Uint32 low_key_count;
+ /* True for less-than-or-equal, false for strictly less-than. */
+ bool low_inclusive;
+ /* Row containing upper bound, or NULL for scan to the end. */
+ const char * high_key;
+ /* Number of columns in upper bound, for bounding by partial prefix. */
+ Uint32 high_key_count;
+ /* True for greater-than-or-equal, false for strictly greater-than. */
+ bool high_inclusive;
+ /*
+ Value to identify this bound, may be read with get_range_no().
+ Must be < 8192 (set to zero if not using range_no).
+ Note that for ordered scans, the range_no must be strictly increasing
+ for each range, or the result set will not be sorted correctly.
+ */
+ Uint32 range_no;
+ };
+
/**
* Is current scan sorted
*/
@@ -178,6 +201,10 @@ private:
int setBound(const NdbColumnImpl*, int type, const void* aValue);
int insertBOUNDS(Uint32 * data, Uint32 sz);
+ int ndbrecord_insert_bound(const NdbRecord *key_record,
+ Uint32 column_index,
+ const char *row,
+ Uint32 bound_type);
Uint32 getKeyFromSCANTABREQ(Uint32* data, Uint32 size);
virtual int equal_impl(const NdbColumnImpl*, const char*);
@@ -185,9 +212,15 @@ private:
void fix_get_values();
int next_result_ordered(bool fetchAllowed, bool forceSend = false);
+ int next_result_ordered_ndbrecord(const char * & out_row,
+ bool fetchAllowed,
+ bool forceSend);
+ void ordered_insert_receiver(Uint32 start, NdbReceiver *receiver);
+ int ordered_send_scan_wait_for_all(bool forceSend);
int send_next_scan_ordered(Uint32 idx);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
-
+ int NdbIndexScanOperation::compare_ndbrecord(const NdbReceiver *r1,
+ const NdbReceiver *r2) const;
Uint32 m_sort_columns;
Uint32 m_this_bound_start;
Uint32 * m_first_bound_word;
--- 1.56/storage/ndb/include/ndbapi/NdbTransaction.hpp 2007-02-08 12:56:51 +01:00
+++ 1.57/storage/ndb/include/ndbapi/NdbTransaction.hpp 2007-02-08 12:56:51 +01:00
@@ -20,9 +20,10 @@
#include "NdbError.hpp"
#include "NdbDictionary.hpp"
#include "Ndb.hpp"
+#include "NdbOperation.hpp"
+#include <NdbIndexScanOperation.hpp>
class NdbTransaction;
-class NdbOperation;
class NdbScanOperation;
class NdbIndexScanOperation;
class NdbIndexOperation;
@@ -571,72 +572,104 @@ public:
#endif
- NdbOperation *readTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, const char *key_row,
+ NdbOperation *readTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *readTuple(const char *tableName,
- const NdbRecord *key_rec, const char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *insertTuple(const NdbDictionary::Table *table,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask= 0);
- NdbOperation *insertTuple(const char *tableName,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask= 0);
- NdbOperation *updateTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *updateTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
+ NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
+ const unsigned char *result_mask= 0);
+ NdbOperation *insertTuple(const NdbRecord *rec, const char *row,
+ const unsigned char *mask= 0);
+ NdbOperation *updateTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *deleteTuple(const NdbDictionary::Table *table,
- const NdbRecord *rec, const char *row= 0);
- NdbOperation *deleteTuple(const char *tableName,
- const NdbRecord *rec, const char *row= 0);
- NdbOperation *dirtyWriteTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *dirtyWriteTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *writeTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *writeTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
+ const unsigned char *mask= 0);
+ NdbOperation *writeTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *simpleReadTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *simpleReadTuple(const char *tableName,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *dirtyReadTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *dirtyReadTuple(const char *tableName,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask= 0);
- NdbOperation *dirtyUpdateTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
- NdbOperation *dirtyUpdateTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask= 0);
+ const unsigned char *mask= 0);
+ NdbOperation *deleteTuple(const NdbRecord *key_rec, const char *key_row);
+
+ /*
+ Scan a table, using NdbRecord to read out column data.
+
+ The result_record pointer must remain valid until after the call to
+ execute().
+
+ The result_mask pointer is optional, if present only columns for which
+ the corresponding bit in result_mask is set will be retrieved in the
+ scan. The result_mask is copied internally, so in contrast to
+ result_record need not be valid at execute().
+
+ The parallel argument is the desired parallelism, or 0 for maximum
+ parallelism (receiving rows from all fragments in parallel).
+ */
+ NdbScanOperation *
+ scanTable(const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
+ const unsigned char *result_mask= 0,
+ Uint32 scan_flags= 0,
+ Uint32 parallel= 0,
+ Uint32 batch= 0);
+
+//private:
+ /*
+ Do an index range scan (optionally ordered) of a table.
+
+ The key_record describes the index to be scanned. It must be a
+ primary key record for the index, ie. it must specify exactly the
+ key columns of the index.
+
+ The result_record describes the rows to be returned from the scan. For an
+ ordered index scan, result_record must be a key record for the index to
+ be scanned, that is it must include at least all of the column in the
+ index.
+
+ Both the key_record and the result_record must be created from the Index
+ to be scanned, not from the underlying table.
+
+ The call uses a callback function as a flexible way of specifying multiple
+ range bounds. The callback will be called once for each bound to define
+ lower and upper key value etc.
+
+ The callback received a private callback_data void *, and the index of the
+ bound (0 .. num_key_bounds). However, it is guaranteed that it will be
+ called in ordered sequence, so it is permissible to ignore the passed
+ bound_index and just return the values for the next bound (for example
+ if data is kept in a linked list).
+
+ The callback can return 0 to denote success, and -1 to denote error (the
+ latter causing the creation of the NdbIndexScanOperation to fail).
+
+ This multi-range method is only for use in mysqld code.
+ */
+ NdbIndexScanOperation *
+ scanIndex(const NdbRecord *key_record,
+ int (*get_bound_callback)(void *callback_data,
+ Uint32 bound_index,
+ NdbIndexScanOperation::IndexBound & bound),
+ void *callback_data,
+ Uint32 num_key_bounds,
+ const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
+ const unsigned char *result_mask= 0,
+ Uint32 scan_flags= 0,
+ Uint32 parallel= 0,
+ Uint32 batch= 0);
+
+public:
+
+ /* A convenience wrapper for simpler specification of a single bound. */
+ NdbIndexScanOperation *
+ scanIndex(const NdbRecord *key_record,
+ const char *low_key,
+ Uint32 low_key_count,
+ bool low_inclusive,
+ const char * high_key,
+ Uint32 high_key_count,
+ bool high_inclusive,
+ const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode= NdbOperation::LM_Read,
+ const unsigned char *result_mask= 0,
+ Uint32 scan_flags= 0,
+ Uint32 parallel= 0,
+ Uint32 batch= 0);
private:
/**
@@ -748,39 +781,6 @@ private:
NdbOperation* getNdbOperation(const class NdbTableImpl* aTable,
NdbOperation* aNextOp = 0,
bool useRec= false);
- NdbOperation *readTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, const char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask);
- NdbOperation *insertTuple(const NdbTableImpl *table,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask);
- NdbOperation *updateTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask);
- NdbOperation *deleteTuple(const NdbTableImpl *table,
- const NdbRecord *rec, const char *row);
- NdbOperation *dirtyWriteTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask);
- NdbOperation *writeTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask);
- NdbOperation *simpleReadTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask);
- NdbOperation *dirtyReadTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask);
- NdbOperation *dirtyUpdateTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask);
NdbIndexScanOperation* getNdbScanOperation(const class NdbTableImpl* aTable);
NdbIndexOperation* getNdbIndexOperation(const class NdbIndexImpl* anIndex,
--- 1.89/storage/ndb/include/ndbapi/NdbDictionary.hpp 2007-02-08 12:56:51 +01:00
+++ 1.90/storage/ndb/include/ndbapi/NdbDictionary.hpp 2007-02-08 12:56:51 +01:00
@@ -1414,29 +1414,23 @@ public:
};
struct RecordSpecification {
- enum RecTypes {
- AttrOffsetNotNULL= 1,
- AttrOffsetNULL= 2
- };
-
- enum RecTypes type;
/*
- Column is given by NdbDictionary::Column pointer, if not NULL, else by
- name, if not NULL, else by id.
+ Column described by this entry (the column maximum size defines field
+ size in row).
+ Note that even when creating an NdbRecord for an index, the column
+ pointers must be to columns obtained from the underlying table, not
+ from the index itself.
*/
- // ToDo: Hm, a bit clumsy interface that one needs to init colPtr and
- // colName to NULL to use colNumber...
- const Column *colPtr;
- const char *colName;
- Uint32 colNumber;
-
+ const Column *column;
/* Offset of data from start of a row. */
- Uint32 dataOffset;
-
- /* Offset from start of row of byte containing NULL bit. */
- Uint32 bmOffset;
- /* NULL bit, 0-7. */
- Uint32 bmBit;
+ Uint32 offset;
+ /*
+ Offset from start of row of byte containing NULL bit.
+ Not used for columns that are not NULLable.
+ */
+ Uint32 nullbit_byte_offset;
+ /* NULL bit, 0-7. Not used for columns that are not NULLable. */
+ Uint32 nullbit_bit_in_byte;
};
struct AutoGrowSpecification {
@@ -1939,23 +1933,22 @@ public:
int removeTableGlobal(const Table &ndbtab, int invalidate) const;
#endif
- NdbRecord *createRecord(const char *tableName,
+ /*
+ Create an NdbRecord for use in table operations.
+ */
+ NdbRecord *createRecord(const Table *table,
const RecordSpecification *recSpec,
Uint32 length,
Uint32 elemSize);
- NdbRecord *createRecord(const Table *table,
+
+ /*
+ Create an NdbRecord for use in index operations.
+ */
+ NdbRecord *createRecord(const Index *index,
const RecordSpecification *recSpec,
Uint32 length,
Uint32 elemSize);
void releaseRecord(NdbRecord *rec);
-
- Uint32 *getRecAttrSet(const NdbRecord *rec);
- void releaseRecAttrSet(Uint32 *attrSet);
-
- void recAttrSetEnable(Uint32 *attrSet, Uint32 attrId);
- void recAttrSetEnable(Uint32 *attrSet, const char *tableName,
- const char *colName);
-
};
};
--- 1.42/storage/ndb/include/ndbapi/NdbOperation.hpp 2007-02-08 12:56:51 +01:00
+++ 1.43/storage/ndb/include/ndbapi/NdbOperation.hpp 2007-02-08 12:56:51 +01:00
@@ -740,11 +740,13 @@ public:
/**
* Get table name of this operation.
+ * Not supported for NdbRecord operation.
*/
const char* getTableName() const;
/**
* Get table object for this operation
+ * Not supported for NdbRecord operation.
*/
const NdbDictionary::Table * getTable() const;
@@ -987,9 +989,6 @@ protected:
virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode);
- void handleFailedAI_ElemLen(); // When not all attribute data
- // were received
-
int incCheck(const NdbColumnImpl* anAttrObject);
int initial_interpreterCheck();
int intermediate_interpreterCheck();
@@ -1104,16 +1103,31 @@ protected:
sure if it is worth the loss of code clarity though.
*/
- /* NdbRecord describing the placement of Primary key in row. */
- const NdbRecord *thePKRec;
- /* Row containing the primary key to operate on. */
- const char *thePKRow;
- /* NdbRecord describing attributes to update. */
- const NdbRecord *theUpdRec;
+ /*
+ NdbRecord describing the placement of Primary key in row.
+ As a special case, we set this to NULL for scan lock take-over operations,
+ in which case the m_key_row points to keyinfo obtained from the KEYINFO20
+ signal.
+ */
+ const NdbRecord *m_key_record;
+ /* Row containing the primary key to operate on, or KEYINFO20 data. */
+ const char *m_key_row;
+ /* Size in words of keyinfo in m_key_row. */
+ Uint32 m_keyinfo_length;
+ /*
+ NdbRecord describing attributes to update (or read for scans).
+ We also use m_attribute_record!=NULL to indicate that the operation is
+ using the NdbRecord interface (as opposed to NdbRecAttr).
+ */
+ const NdbRecord *m_attribute_record;
/* Row containing the update values. */
- const char *theUpdRow;
- /* Optional bitmask to disable selected columns. */
- const Uint32 *theReadMask;
+ const char *m_attribute_row;
+ /*
+ Bitmask to disable selected columns.
+ Do not use clas Bitmask/BitmaskPOD here, to avoid having to
+ #include <Bitmask.hpp> in application code.
+ */
+ Uint32 m_read_mask[(NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5];
// Blobs in this operation
NdbBlob* theBlobList;
--- 1.19/storage/ndb/include/ndbapi/NdbReceiver.hpp 2007-02-08 12:56:51 +01:00
+++ 1.20/storage/ndb/include/ndbapi/NdbReceiver.hpp 2007-02-08 12:56:51 +01:00
@@ -67,7 +67,6 @@ private:
Ndb* m_ndb;
Uint32 m_id;
Uint32 m_tcPtrI;
- Uint32 m_hidden_count;
ReceiverType m_type;
void* m_owner;
NdbReceiver* m_next;
@@ -80,6 +79,12 @@ private:
void do_get_value(NdbReceiver*, Uint32 rows, Uint32 key_size, Uint32 range);
void prepareSend();
void calculate_batch_size(Uint32, Uint32, Uint32&, Uint32&, Uint32&);
+ /*
+ Set up buffers for receiving TRANSID_AI and KEYINFO20 signals
+ during a scan using NdbRecord.
+ */
+ int do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
+ Uint32 key_size, Uint32 read_range_no);
int execKEYINFO20(Uint32 info, const Uint32* ptr, Uint32 len);
int execTRANSID_AI(const Uint32* ptr, Uint32 len);
@@ -89,19 +94,49 @@ private:
/*
We need to keep different state for old NdbRecAttr based operation and for
new NdbRecord style operation.
- ToDo: Could save a little memory by overlapping old and new style state
- using anonymous unions.
*/
- bool usingNdbRecord;
- const NdbRecord *theNdbRecord;
- char *theRow;
- class NdbRecAttr* theFirstRecAttr;
- class NdbRecAttr* theCurrentRecAttr;
+ bool m_using_ndb_record;
+ union {
+ /* members used for NdbRecAttr operation. */
+ struct {
+ class NdbRecAttr* theFirstRecAttr;
+ class NdbRecAttr* theCurrentRecAttr;
+ Uint32 m_hidden_count;
+ };
+
+ /* members used for NdbRecord operation. */
+ struct {
+ const NdbRecord *m_ndb_record;
+ char *m_row;
+ /* Block of memory used to receive all rows in a batch during scan. */
+ char *m_row_buffer;
+ /*
+ Offsets between two rows in m_row_buffer.
+ This can be different from m_ndb_record->m_row_size, as we sometimes
+ store extra information after each row (range_no and keyinfo).
+ For non-scan operations, this is set to zero.
+ */
+ Uint32 m_row_offset;
+ /*
+ m_read_range_no is true if we are storing the range_no at the end of
+ each row during scans.
+ */
+ bool m_read_range_no;
+ };
+ };
+ /*
+ m_rows is only used in NdbRecAttr mode, but is kept during NdbRecord mode
+ operation to avoid the need for re-allocation.
+ */
class NdbRecAttr** m_rows;
- /* Index into theNdbRecord of next col to receive in TRANSID_AI. */
- Uint32 m_RecPos;
- Uint32 m_list_index; // When using multiple
+ /*
+ When an NdbReceiver is sitting in the NdbScanOperation::m_sent_receivers
+ array, waiting to receive TRANSID_AI data from the kernel, its index into
+ m_sent_receivers is stored in m_list_index, so that we can remove it when
+ done without having to search for it.
+ */
+ Uint32 m_list_index;
/*
m_current_row serves two purposes, both used during scans:
@@ -114,16 +149,40 @@ private:
2. While rows are being delivered to the application (and the receiver is
sitting in the NdbScanOperation::m_api_receivers array), it holds the
next row to be delivered to the application.
+
+ For NdbRecord operation, it works similarly, but instead indexes rows in
+ the RdbRecord m_row_buffer.
*/
Uint32 m_current_row;
+ /* m_result_rows: Total number of rows contained in this batch. */
Uint32 m_result_rows;
+ /* m_defined_rows: One less that the allocated length of the m_rows array. */
Uint32 m_defined_rows;
+ /*
+ m_expected_result_length: Total number of 32-bit words of TRANSID_AI and
+ KEYINFO20 data to receive. This is set to zero until SCAN_TABCONF has
+ been received.
+ */
Uint32 m_expected_result_length;
Uint32 m_received_result_length;
bool nextResult() const { return m_current_row < m_result_rows; }
NdbRecAttr* copyout(NdbReceiver&);
+ /* get_row() returns the next available row during NdbRecord scans. */
+ const char *get_row();
+ /*
+ peek_row() returns the row pointer that get_row() will return on next call,
+ without advancing the internal pointer.
+ So two successive calls to peek_row() will return the same pointer, whereas
+ two successive calls to get_row would return different pointers.
+ */
+ const char *peek_row() const;
+ /* get_range_no() returns the range_no from the last returned row. */
+ int NdbReceiver::get_range_no() const;
+ /* get_keyinfo20)_ returns keyinfo from KEYINFO20 signal. */
+ int get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
+ const char * & data_ptr) const;
};
#ifdef NDB_NO_DROPPED_SIGNAL
@@ -145,10 +204,19 @@ NdbReceiver::checkMagicNumber() const {
inline
void
NdbReceiver::prepareSend(){
+ /* Set pointers etc. to prepare for receiving the first row of the batch. */
m_current_row = 0;
m_received_result_length = 0;
m_expected_result_length = 0;
- theCurrentRecAttr = theFirstRecAttr;
+ if (m_using_ndb_record)
+ {
+ if (m_type==NDB_SCANRECEIVER)
+ m_row= m_row_buffer;
+ }
+ else
+ {
+ theCurrentRecAttr = theFirstRecAttr;
+ }
}
inline
@@ -171,6 +239,21 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
m_expected_result_length = len;
return (tmp == len ? 1 : 0);
}
+
+inline
+const char *
+NdbReceiver::get_row()
+{
+ return m_row_buffer + m_current_row++ * m_row_offset;
+}
+
+inline
+const char *
+NdbReceiver::peek_row() const
+{
+ return m_row_buffer + m_current_row * m_row_offset;
+}
+
#endif // DOXYGEN_SHOULD_SKIP_INTERNAL
#endif
--- 1.45/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2007-02-08 12:56:51 +01:00
+++ 1.46/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2007-02-08 12:56:51 +01:00
@@ -16,6 +16,7 @@
#ifndef NdbScanOperation_H
#define NdbScanOperation_H
+#include <strings.h>
#include <NdbOperation.hpp>
class NdbBlob;
@@ -171,6 +172,28 @@ public:
*/
int nextResult(bool fetchAllowed = true, bool forceSend = false);
+ /*
+ NdbRecord version of nextResult.
+ This sets a pointer to the next row in out_row (if returning 0). This
+ pointer is valid (only) until the next call to nextResult() with
+ fetchAllowed==true.
+ The NdbRecord object defining the row format was specified in the
+ NdbTransaction::scanTable (or scanIndex) call.
+ */
+ int nextResult(const char * & out_row,
+ bool fetchAllowed = true, bool forceSend = false);
+
+ /*
+ Fetch the next row, copying out the row from internal buffers into a
+ user-supplied row and NdbRecord describing row format.
+ This makes it possible to receive the rows in a different NdbRecord format
+ from the one used internally to buffer rows or specified in
+ NdbTransaction::scanTable/scanIndex.
+ */
+ int nextResult(NdbRecord *record,
+ char * out_row,
+ bool fetchAllowed = true, bool forceSend = false);
+
/**
* Close scan
*/
@@ -219,6 +242,38 @@ public:
*/
int deleteCurrentTuple(NdbTransaction* takeOverTransaction);
+ /*
+ NdbRecord versions of scan lock take-over operations.
+
+ Note that calling NdbRecord scan lock take-over on an NdbRecAttr-style
+ scan is not valid, nor is calling NdbRecAttr-style scan lock take-over
+ on an NdbRecord-style scan.
+ */
+
+ /*
+ Take over the lock without changing the row.
+ Optionally also read from the row (call with default value NULL for row
+ to not read any attributes.).
+ The NdbRecord * is required even when not reading any attributes.
+ */
+ NdbOperation *lockCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record,
+ char *row= 0,
+ const unsigned char *mask= 0);
+
+ /*
+ Update the current tuple, NdbRecord version.
+ Values to update with are contained in the passed-in row.
+ */
+ NdbOperation *updateCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record,
+ const char *row,
+ const unsigned char *mask= 0);
+
+ /* Delete the current tuple. */
+ NdbOperation *deleteCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record);
+
/**
* Restart scan with exactly the same
* getValues and search conditions
@@ -322,7 +377,11 @@ protected:
int getKeyFromKEYINFO20(Uint32* data, Uint32 & size);
NdbOperation* takeOverScanOp(OperationType opType, NdbTransaction*);
-
+ NdbOperation* takeOverScanOpNdbRecord(OperationType opType,
+ NdbTransaction* pTrans,
+ const NdbRecord *record,
+ char *row,
+ const unsigned char *mask);
bool m_ordered;
bool m_descending;
Uint32 m_read_range_no;
@@ -379,6 +438,50 @@ NdbScanOperation::deleteCurrentTuple(Ndb
if(res == 0)
return -1;
return 0;
+}
+
+inline
+NdbOperation *
+NdbScanOperation::lockCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record,
+ char *row,
+ const unsigned char *mask)
+{
+ unsigned char empty_mask[NDB_MAX_ATTRIBUTES_IN_TABLE>>3];
+ /* Default is to not read any attributes, just take over the lock. */
+ if (!row)
+ {
+ bzero(empty_mask, sizeof(empty_mask));
+ mask= &empty_mask[0];
+ }
+ return takeOverScanOpNdbRecord(NdbOperation::ReadRequest, takeOverTrans,
+ record, row, mask);
+}
+
+inline
+NdbOperation *
+NdbScanOperation::updateCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record,
+ const char *row,
+ const unsigned char *mask)
+{
+ /*
+ We share the code implementing lockCurrentTuple() and updateCurrentTuple().
+ For lock the row may be updated, for update it is const.
+ Therefore we need to cast away const here, though we won't actually change
+ the row since we pass type 'UpdateRequest'.
+ */
+ return takeOverScanOpNdbRecord(NdbOperation::UpdateRequest, takeOverTrans,
+ record, (char *)row, mask);
+}
+
+inline
+NdbOperation *
+NdbScanOperation::deleteCurrentTuple(NdbTransaction *takeOverTrans,
+ const NdbRecord *record)
+{
+ return takeOverScanOpNdbRecord(NdbOperation::DeleteRequest, takeOverTrans,
+ record, 0, 0);
}
#endif
--- 1.21/storage/ndb/include/util/Bitmask.hpp 2007-02-08 12:56:51 +01:00
+++ 1.22/storage/ndb/include/util/Bitmask.hpp 2007-02-08 12:56:51 +01:00
@@ -19,7 +19,7 @@
#include <ndb_global.h>
/**
- * Bitmask implementation. Size is given explicitly
+ * Bitmask implementation. Size (in 32-bit words) is given explicitly
* (as first argument). All methods are static.
*/
class BitmaskImpl {
--- 1.72/storage/ndb/src/ndbapi/NdbTransaction.cpp 2007-02-08 12:56:51 +01:00
+++ 1.73/storage/ndb/src/ndbapi/NdbTransaction.cpp 2007-02-08 12:56:51 +01:00
@@ -26,6 +26,7 @@
#include "API.hpp"
#include "NdbBlob.hpp"
+#include <AttributeHeader.hpp>
#include <signaldata/TcKeyConf.hpp>
#include <signaldata/TcIndx.hpp>
#include <signaldata/TcCommit.hpp>
@@ -1174,227 +1175,6 @@ NdbOperation* NdbTransaction::getNdbOper
return NULL;
}//NdbTransaction::getNdbOperation()
-NdbOperation *
-NdbTransaction::readTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, const char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- return readTuple(&NdbTableImpl::getImpl(*table),
- key_rec, key_row, result_rec, result_row, result_mask);
-}
-
-NdbOperation *
-NdbTransaction::readTuple(const char *tableName,
- const NdbRecord *key_rec, const char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return readTuple(table, key_rec, key_row, result_rec, result_row,
- result_mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::insertTuple(const NdbDictionary::Table *table,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask)
-{
- return insertTuple(&NdbTableImpl::getImpl(*table),
- rec, row, mask);
-}
-
-NdbOperation *
-NdbTransaction::insertTuple(const char *tableName,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return insertTuple(table, rec, row, mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::updateTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- return updateTuple(&NdbTableImpl::getImpl(*table),
- pk_rec, pk_row, attr_rec, attr_row, mask);
-}
-
-NdbOperation *
-NdbTransaction::updateTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return updateTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::deleteTuple(const NdbDictionary::Table *table,
- const NdbRecord *rec, const char *row)
-{
- return deleteTuple(&NdbTableImpl::getImpl(*table), rec, row);
-}
-
-NdbOperation *
-NdbTransaction::deleteTuple(const char *tableName,
- const NdbRecord *rec, const char *row)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return deleteTuple(table, rec, row);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::dirtyWriteTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- return dirtyWriteTuple(&NdbTableImpl::getImpl(*table),
- pk_rec, pk_row, attr_rec, attr_row, mask);
-}
-
-NdbOperation *
-NdbTransaction::dirtyWriteTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return dirtyWriteTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::writeTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- return writeTuple(&NdbTableImpl::getImpl(*table),
- pk_rec, pk_row, attr_rec, attr_row, mask);
-}
-
-NdbOperation *
-NdbTransaction::writeTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return writeTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::simpleReadTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- return simpleReadTuple(&NdbTableImpl::getImpl(*table),
- key_rec, key_row, result_rec, result_row, result_mask);
-}
-
-NdbOperation *
-NdbTransaction::simpleReadTuple(const char *tableName,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return simpleReadTuple(table, key_rec, key_row, result_rec, result_row,
- result_mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::dirtyReadTuple(const NdbDictionary::Table *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- return dirtyReadTuple(&NdbTableImpl::getImpl(*table),
- key_rec, key_row, result_rec, result_row, result_mask);
-}
-
-NdbOperation *
-NdbTransaction::dirtyReadTuple(const char *tableName,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return dirtyReadTuple(table, key_rec, key_row, result_rec, result_row,
- result_mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
-NdbOperation *
-NdbTransaction::dirtyUpdateTuple(const NdbDictionary::Table *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- return dirtyUpdateTuple(&NdbTableImpl::getImpl(*table),
- pk_rec, pk_row, attr_rec, attr_row, mask);
-}
-
-NdbOperation *
-NdbTransaction::dirtyUpdateTuple(const char *tableName,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- const NdbTableImpl* table= theNdb->theDictionary->getTable(tableName);
- if (table){
- return dirtyUpdateTuple(table, pk_rec, pk_row, attr_rec, attr_row, mask);
- } else {
- setErrorCode(theNdb->theDictionary->getNdbError().code);
- return NULL;
- }
-}
-
// NdbScanOperation
/*****************************************************************************
@@ -2396,19 +2176,19 @@ NdbTransaction::getNextCompletedOperatio
}
NdbOperation *
-NdbTransaction::readTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, const char *key_row,
+NdbTransaction::readTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
+ NdbOperation::LockMode lock_mode,
+ const unsigned char *result_mask)
{
/* Check that the NdbRecord specifies the full primary key. */
- if (!(key_rec->flags & NdbRecord::RecIsPKRecord))
+ if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
+ NdbOperation *op= getNdbOperation(key_rec->table, NULL, true);
if(!op)
return op;
@@ -2420,9 +2200,9 @@ NdbTransaction::readTuple(const NdbTable
theSimpleState= 0;
/* Setup the record/row for sending the primary key. */
- op->thePKRec= key_rec;
- op->thePKRow= key_row;
- op->theReadMask= result_mask;
+ op->m_key_record= key_rec;
+ op->m_key_row= key_row;
+ result_rec->copyMask(op->m_read_mask, result_mask);
/* Setup the record/row for receiving the results. */
op->theReceiver.getValues(result_rec, result_row);
@@ -2431,18 +2211,17 @@ NdbTransaction::readTuple(const NdbTable
}
NdbOperation *
-NdbTransaction::insertTuple(const NdbTableImpl *table,
- const NdbRecord *rec, const char *row,
- const Uint32 *mask)
+NdbTransaction::insertTuple(const NdbRecord *rec, const char *row,
+ const unsigned char *mask)
{
/* Check that the NdbRecord specifies the full primary key. */
- if (!(rec->flags & NdbRecord::RecHasAllPKs))
+ if (!(rec->flags & NdbRecord::RecHasAllKeys))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
+ NdbOperation *op= getNdbOperation(rec->table, NULL, true);
if(!op)
return op;
@@ -2454,29 +2233,28 @@ NdbTransaction::insertTuple(const NdbTab
theSimpleState= 0;
/* Setup the record/row for sending the primary key. */
- op->thePKRec= rec;
- op->thePKRow= row;
- op->theUpdRec= rec;
- op->theUpdRow= row;
- op->theReadMask= mask;
+ op->m_key_record= rec;
+ op->m_key_row= row;
+ op->m_attribute_record= rec;
+ op->m_attribute_row= row;
+ rec->copyMask(op->m_read_mask, mask);
return op;
}
NdbOperation *
-NdbTransaction::updateTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
+NdbTransaction::updateTuple(const NdbRecord *key_rec, const char *key_row,
const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
+ const unsigned char *mask)
{
/* Check that the NdbRecord specifies the full primary key. */
- if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+ if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
+ NdbOperation *op= getNdbOperation(key_rec->table, NULL, true);
if(!op)
return op;
@@ -2488,27 +2266,26 @@ NdbTransaction::updateTuple(const NdbTab
theSimpleState= 0;
/* Setup the record/row for sending the primary key. */
- op->thePKRec= pk_rec;
- op->thePKRow= pk_row;
- op->theUpdRec= attr_rec;
- op->theUpdRow= attr_row;
- op->theReadMask= mask;
+ op->m_key_record= key_rec;
+ op->m_key_row= key_row;
+ op->m_attribute_record= attr_rec;
+ op->m_attribute_row= attr_row;
+ attr_rec->copyMask(op->m_read_mask, mask);
return op;
}
NdbOperation *
-NdbTransaction::deleteTuple(const NdbTableImpl *table,
- const NdbRecord *rec, const char *row)
+NdbTransaction::deleteTuple(const NdbRecord *key_rec, const char *key_row)
{
/* Check that the NdbRecord specifies the full primary key. */
- if (!(rec->flags & NdbRecord::RecIsPKRecord))
+ if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
+ NdbOperation *op= getNdbOperation(key_rec->table, NULL, true);
if(!op)
return op;
@@ -2520,165 +2297,325 @@ NdbTransaction::deleteTuple(const NdbTab
theSimpleState= 0;
/* Setup the record/row for sending the primary key. */
- op->thePKRec= rec;
- op->thePKRow= row;
+ op->m_key_record= key_rec;
+ op->m_key_row= key_row;
+ /* Set the m_attribute_record so we know this is an NdbRecord operation. */
+ op->m_attribute_record= key_rec;
return op;
}
NdbOperation *
-NdbTransaction::dirtyWriteTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
+NdbTransaction::writeTuple(const NdbRecord *key_rec, const char *key_row,
+ const NdbRecord *attr_rec, const char *attr_row,
+ const unsigned char *mask)
{
/* Check that the NdbRecord specifies the full primary key. */
- if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+ if (!(key_rec->flags & NdbRecord::RecIsKeyRecord))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
+ NdbOperation *op= getNdbOperation(key_rec->table, NULL, true);
if(!op)
return op;
op->theStatus= NdbOperation::UseNdbRecord;
op->theOperationType= NdbOperation::WriteRequest;
op->theErrorLine++;
- op->theLockMode= NdbOperation::LM_CommittedRead;
- op->theSimpleIndicator = 1;
- op->theDirtyIndicator = 1;
+ op->theLockMode= NdbOperation::LM_Exclusive;
theSimpleState= 0;
- /* Setup the record/row for sending the primary key. */
- op->thePKRec= pk_rec;
- op->thePKRow= pk_row;
- op->theUpdRec= attr_rec;
- op->theUpdRow= attr_row;
- op->theReadMask= mask;
+ op->m_key_record= key_rec;
+ op->m_key_row= key_row;
+ op->m_attribute_record= attr_rec;
+ op->m_attribute_row= attr_row;
+ attr_rec->copyMask(op->m_read_mask, mask);
return op;
}
-NdbOperation *
-NdbTransaction::writeTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
+NdbScanOperation *
+NdbTransaction::scanTable(const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode,
+ const unsigned char *result_mask,
+ Uint32 scan_flags,
+ Uint32 parallel,
+ Uint32 batch)
{
- /* Check that the NdbRecord specifies the full primary key. */
- if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
+ /*
+ For some reason, normal scan operations are created as index scan
+ operations ... :-(
+ */
+ NdbIndexScanOperation *op_idx;
+ NdbScanOperation *op;
+ Uint32 fragCount;
+ int res;
+
+ op_idx= getNdbScanOperation(result_record->table);
+ if (op_idx==NULL)
{
- setOperationErrorCodeAbort(4279);
+ setOperationErrorCodeAbort(4000);
return NULL;
}
+ op= op_idx;
- NdbOperation *op= getNdbOperation(table, NULL, true);
- if(!op)
- return op;
+ res= op->readTuples(lock_mode, scan_flags, parallel, batch);
+ if (res==-1)
+ goto giveup_err;
- op->theStatus= NdbOperation::UseNdbRecord;
- op->theOperationType= NdbOperation::WriteRequest;
- op->theErrorLine++;
- op->theLockMode= NdbOperation::LM_Exclusive;
+ for (Uint32 i= 0; i<result_record->noOfColumns; i++)
+ {
+ const NdbRecord::Attr *col;
+ Uint32 ah;
+ Uint32 attrId;
- theSimpleState= 0;
+ col= &result_record->columns[i];
- op->thePKRec= pk_rec;
- op->thePKRow= pk_row;
- op->theUpdRec= attr_rec;
- op->theUpdRow= attr_row;
- op->theReadMask= mask;
+ /* Skip column if result_mask says so. But cannot mask pseudo columns. */
+ attrId= col->attrId;
+ if ( result_mask &&
+ !(attrId & AttributeHeader::PSEUDO) &&
+ !(result_mask[attrId>>3] & (1<<(attrId & 7))) )
+ {
+ continue;
+ }
- return op;
-}
+ AttributeHeader::init(&ah, attrId, 0);
+ res= op->insertATTRINFO(ah);
+ if (res==-1)
+ goto giveup_err;
-NdbOperation *
-NdbTransaction::simpleReadTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- /**
- * Currently/still disabled
- */
- return readTuple(table, key_rec, key_row, result_rec, result_row, result_mask);
+ if (col->flags & NdbRecord::IsDisk)
+ op->m_no_disk_flag= false;
+ }
+
+ /*
+ We set theStatus=UseNdbRecord to allow the addition of an interpreted
+ program (NdbScanFilter ...), but nothing else.
+ */
+ op->theInitialReadSize= op->theTotalCurrAI_Len - 5;
+ op->theStatus= NdbOperation::UseNdbRecord;
+ op->m_attribute_record= result_record;
+
+ return op_idx;
+
+ giveup_err:
+ theNdb->releaseScanOperation(op_idx);
+ return NULL;
}
-NdbOperation *
-NdbTransaction::dirtyReadTuple(const NdbTableImpl *table,
- const NdbRecord *key_rec, char *key_row,
- const NdbRecord *result_rec, char *result_row,
- const Uint32 *result_mask)
-{
- /* Check that the NdbRecord specifies the full primary key. */
- if (!(key_rec->flags & NdbRecord::RecIsPKRecord))
+NdbIndexScanOperation *
+NdbTransaction::scanIndex(
+ const NdbRecord *key_record,
+ int (*get_key_bound_callback)(void *callback_data,
+ Uint32 bound_index,
+ NdbIndexScanOperation::IndexBound & bound),
+ void *callback_data,
+ Uint32 num_key_bounds,
+ const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode,
+ const unsigned char *result_mask,
+ Uint32 scan_flags,
+ Uint32 parallel,
+ Uint32 batch)
+{
+ NdbIndexScanOperation *op;
+ const NdbTableImpl *index_table_impl; // The index schema object
+ const NdbTableImpl *table_impl; // The table schema object
+ int res;
+ Uint32 i,j;
+ Uint32 previous_range_no;
+
+ if (!(key_record->flags & NdbRecord::RecIsKeyRecord))
{
setOperationErrorCodeAbort(4279);
return NULL;
}
+ if ((scan_flags & NdbScanOperation::SF_OrderBy) &&
+ !(result_record->flags & NdbRecord::RecHasAllKeys))
+ {
+ /* For ordering, we need all keys in the result row. */
+ setOperationErrorCodeAbort(4279);
+ return NULL;
+ }
+ if (key_record->tableId != result_record->tableId)
+ {
+ setOperationErrorCodeAbort(4283);
+ return NULL;
+ }
- NdbOperation *op= getNdbOperation(table, NULL, true);
- if(!op)
- return op;
+ index_table_impl= key_record->table;
+ table_impl= result_record->table;
+ op= getNdbScanOperation(index_table_impl);
+ if (op==NULL)
+ {
+ setOperationErrorCodeAbort(4000);
+ return NULL;
+ }
+ op->m_type= NdbOperation::OrderedIndexScan;
+ op->m_currentTable= table_impl;
+
+ op->m_attribute_record= result_record; // Mark using NdbRecord for readTuples
+ res= op->readTuples(lock_mode, scan_flags, parallel, batch);
+ if (res==-1)
+ goto giveup_err;
+
+ /* Fix theStatus as set in readTuples(). */
op->theStatus= NdbOperation::UseNdbRecord;
- op->theOperationType= NdbOperation::ReadRequest;
- op->theErrorLine++;
- op->theLockMode= NdbOperation::LM_CommittedRead;
- op->theSimpleIndicator = 1;
- op->theDirtyIndicator = 1;
- theSimpleState= 0;
+ for (i= 0; i<result_record->noOfColumns; i++)
+ {
+ const NdbRecord::Attr *col;
+ Uint32 ah;
+ Uint32 attrId;
- /* Setup the record/row for sending the primary key. */
- op->thePKRec= key_rec;
- op->thePKRow= key_row;
- op->theReadMask= result_mask;
+ col= &result_record->columns[i];
- /* Setup the record/row for receiving the results. */
- op->theReceiver.getValues(result_rec, result_row);
+ /*
+ Skip column if result_mask says so.
+ But cannot mask pseudo columns, nor key columns in ordered scans.
+ */
+ attrId= col->attrId;
+ if ( result_mask &&
+ !(attrId & AttributeHeader::PSEUDO) &&
+ !( (scan_flags & NdbScanOperation::SF_OrderBy) &&
+ (col->flags & NdbRecord::IsKey) ) &&
+ !(result_mask[attrId>>3] & (1<<(attrId & 7))) )
+ {
+ continue;
+ }
- return op;
-}
+ AttributeHeader::init(&ah, attrId, 0);
+ res= op->insertATTRINFO(ah);
+ if (res==-1)
+ goto giveup_err;
-NdbOperation *
-NdbTransaction::dirtyUpdateTuple(const NdbTableImpl *table,
- const NdbRecord *pk_rec, const char *pk_row,
- const NdbRecord *attr_rec, const char *attr_row,
- const Uint32 *mask)
-{
- /* Check that the NdbRecord specifies the full primary key. */
- if (!(pk_rec->flags & NdbRecord::RecIsPKRecord))
- {
- setOperationErrorCodeAbort(4279);
- return NULL;
+ if (col->flags & NdbRecord::IsDisk)
+ op->m_no_disk_flag= false;
}
- NdbOperation *op= getNdbOperation(table, NULL, true);
- if(!op)
- return op;
+ op->theInitialReadSize= op->theTotalCurrAI_Len - 5;
- op->theStatus= NdbOperation::UseNdbRecord;
- op->theOperationType= NdbOperation::UpdateRequest;
- op->theErrorLine++;
- op->theLockMode= NdbOperation::LM_CommittedRead;
- op->theSimpleIndicator = 1;
- op->theDirtyIndicator = 1;
+ /*
+ Set up index range bounds, write into keyinfo.
- theSimpleState= 0;
+ ToDo: We need to compare each attribute lower with upper bound, and use an
+ x=a condition instead of a<=x<=b as appropriate.
- /* Setup the record/row for sending the primary key. */
- op->thePKRec= pk_rec;
- op->thePKRow= pk_row;
- op->theUpdRec= attr_rec;
- op->theUpdRow= attr_row;
- op->theReadMask= mask;
+ ToDo: We also need to send distribution key, but _only_ if distribution
+ key is scanned equal to the same value for all ranges (see BUG#25821).
+ */
+
+ for (i= 0; i<num_key_bounds; i++)
+ {
+ NdbIndexScanOperation::IndexBound bound;
+ int res;
+ Uint32 key_count;
+
+ res= get_key_bound_callback(callback_data, i, bound);
+ if (res==-1)
+ {
+ setOperationErrorCodeAbort(4280);
+ goto giveup_err;
+ }
+ if ( (scan_flags & NdbScanOperation::SF_ReadRangeNo) &&
+ (scan_flags & NdbScanOperation::SF_OrderBy) )
+ {
+ if (i>0 && previous_range_no >= bound.range_no)
+ {
+ setOperationErrorCodeAbort(4282);
+ goto giveup_err;
+ }
+ previous_range_no= bound.range_no;
+ }
+
+ key_count= bound.low_key_count;
+ if (key_count < bound.high_key_count)
+ key_count= bound.high_key_count;
+
+ if (key_count > key_record->key_index_length)
+ {
+ /* Too many keys specified for key bound. */
+ setOperationErrorCodeAbort(4281);
+ goto giveup_err;
+ }
+
+ for (j= 0; j<key_count; j++)
+ {
+ if (bound.low_key && j<bound.low_key_count)
+ {
+ op->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
+ bound.low_key,
+ bound.low_inclusive ?
+ NdbIndexScanOperation::BoundLE :
+ NdbIndexScanOperation::BoundLT);
+ }
+ if (bound.high_key && j<bound.high_key_count)
+ {
+ op->ndbrecord_insert_bound(key_record, key_record->key_indexes[j],
+ bound.high_key,
+ bound.high_inclusive ?
+ NdbIndexScanOperation::BoundGE :
+ NdbIndexScanOperation::BoundGT);
+ }
+ }
+ op->end_of_bound(bound.range_no);
+ /*
+ ToDo: If both lower and upper, non-strict bounds, compare for
+ equality, to transform into a single 'equal' constraint, and possibly
+ set distributionKey.
+ */
+ }
return op;
+
+ giveup_err:
+ theNdb->releaseScanOperation(op);
+ return NULL;
}
+
+static int scanIndexCallback(void *callback_data,
+ Uint32 bound_index,
+ NdbIndexScanOperation::IndexBound & bound)
+{
+ const NdbIndexScanOperation::IndexBound *s=
+ (NdbIndexScanOperation::IndexBound *)callback_data;
+ bound= *s;
+ return 0;
+}
+
+NdbIndexScanOperation *
+NdbTransaction::scanIndex(const NdbRecord *key_record,
+ const char *low_key,
+ Uint32 low_key_count,
+ bool low_inclusive,
+ const char * high_key,
+ Uint32 high_key_count,
+ bool high_inclusive,
+ const NdbRecord *result_record,
+ NdbOperation::LockMode lock_mode,
+ const unsigned char *result_mask,
+ Uint32 scan_flags,
+ Uint32 parallel,
+ Uint32 batch)
+{
+ NdbIndexScanOperation::IndexBound s;
+ s.low_key= low_key;
+ s.low_key_count= low_key_count;
+ s.low_inclusive= low_inclusive;
+ s.high_key= high_key;
+ s.high_key_count= high_key_count;
+ s.high_inclusive= high_inclusive;
+ s.range_no= 0;
+ return scanIndex(key_record, scanIndexCallback, &s, 1,
+ result_record, lock_mode, result_mask,
+ scan_flags, parallel, batch);
+}
+
#ifdef VM_TRACE
#define CASE(x) case x: ndbout << " " << #x; break
--- 1.69/storage/ndb/src/ndbapi/NdbDictionary.cpp 2007-02-08 12:56:51 +01:00
+++ 1.70/storage/ndb/src/ndbapi/NdbDictionary.cpp 2007-02-08 12:56:51 +01:00
@@ -1480,25 +1480,24 @@ NdbDictionary::Dictionary::removeTableGl
}
NdbRecord *
-NdbDictionary::Dictionary::createRecord(const char *tableName,
+NdbDictionary::Dictionary::createRecord(const Table *table,
const RecordSpecification *recSpec,
Uint32 length,
Uint32 elemSize)
{
- const NdbTableImpl *table= m_impl.getTable(tableName);
- if(table)
- return createRecord(table, recSpec, length, elemSize);
- else
- return NULL;
+ return m_impl.createRecord(&NdbTableImpl::getImpl(*table),
+ recSpec,
+ length,
+ elemSize);
}
NdbRecord *
-NdbDictionary::Dictionary::createRecord(const Table *table,
+NdbDictionary::Dictionary::createRecord(const Index *index,
const RecordSpecification *recSpec,
Uint32 length,
Uint32 elemSize)
{
- return m_impl.createRecord(&NdbTableImpl::getImpl(*table),
+ return m_impl.createRecord(&NdbIndexImpl::getImpl(*index),
recSpec,
length,
elemSize);
@@ -1508,32 +1507,6 @@ void
NdbDictionary::Dictionary::releaseRecord(NdbRecord *rec)
{
m_impl.releaseRecord_impl(rec);
-}
-
-Uint32 *
-NdbDictionary::Dictionary::getRecAttrSet(const NdbRecord *rec)
-{
- return m_impl.getRecAttrSet(rec);
-}
-
-void
-NdbDictionary::Dictionary::releaseRecAttrSet(Uint32 *attrSet)
-{
- m_impl.releaseRecAttrSet(attrSet);
-}
-
-void
-NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet, Uint32 attrId)
-{
- m_impl.recAttrSetEnable(attrSet, attrId);
-}
-
-void
-NdbDictionary::Dictionary::recAttrSetEnable(Uint32 *attrSet,
- const char *tableName,
- const char *colName)
-{
- m_impl.recAttrSetEnable(attrSet, tableName, colName);
}
void NdbDictionary::Dictionary::putTable(const NdbDictionary::Table * table)
--- 1.158/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2007-02-08 12:56:51 +01:00
+++ 1.159/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2007-02-08 12:56:51 +01:00
@@ -3067,12 +3067,12 @@ NdbDictInterface::create_index_obj_from_
NdbDictionary::Object::Type type = idx->m_type = tab->m_indexType;
idx->m_logging = tab->m_logging;
idx->m_temporary = tab->m_temporary;
- // skip last attribute (NDB$PK or NDB$TNODE)
const Uint32 distKeys = prim->m_noOfDistributionKeys;
Uint32 keyCount = (distKeys ? distKeys : prim->m_noOfKeys);
unsigned i;
+ // skip last attribute (NDB$PK or NDB$TNODE)
for(i = 0; i+1<tab->m_columns.size(); i++){
NdbColumnImpl* org = tab->m_columns[i];
@@ -4536,9 +4536,10 @@ NdbDictionaryImpl::createRecord(const Nd
Uint32 elemSize)
{
NdbRecord *rec= NULL;
- Uint32 tableNumPK;
+ Uint32 numKeys, tableNumKeys;
Uint32 oldAttrId;
- Uint32 numPK;
+ bool isIndex;
+ Uint32 i;
/*
In later versions we can use elemSize to provide backwards
@@ -4550,69 +4551,105 @@ NdbDictionaryImpl::createRecord(const Nd
return NULL;
}
- rec= (NdbRecord *)
- calloc(1, sizeof(NdbRecord) + (length-1)*elemSize);
+ isIndex= table->m_indexType==NdbDictionary::Object::OrderedIndex;
+
+ /* Count the number of key columns in the table or index. */
+ if (isIndex)
+ {
+ assert(table->m_index);
+ /* Ignore the extra NDB$TNODE column at the end. */
+ tableNumKeys= table->m_columns.size() - 1;
+ }
+ else
+ {
+ tableNumKeys= 0;
+ for (i= 0; i<table->m_columns.size(); i++)
+ {
+ if (table->m_columns[i]->m_pk)
+ tableNumKeys++;
+ }
+ }
+ /*
+ We need to allocate space for
+ 1. The struct itself.
+ 2. The columns[] array at the end of struct (length #columns).
+ 3. An extra Uint32 array key_indexes (length #key columns).
+ */
+ rec= (NdbRecord *)calloc(1, sizeof(NdbRecord) +
+ (length-1)*sizeof(NdbRecord::Attr) +
+ tableNumKeys*sizeof(Uint32));
if (!rec)
{
m_error.code= 4000;
return NULL;
}
+ Uint32 *key_indexes= (Uint32 *)((unsigned char *)rec + sizeof(NdbRecord) +
+ (length-1)*sizeof(NdbRecord::Attr));
+ rec->table= table;
rec->tableId= table->m_id;
rec->tableVersion= table->m_version;
rec->flags= 0;
- rec->totalTableColumns= table->m_columns.size();
rec->noOfColumns= length;
- for (Uint32 i= 0; i<length; i++)
+ Uint32 max_offset= 0;
+ for (i= 0; i<length; i++)
{
const NdbDictionary::RecordSpecification *rs= &recSpec[i];
const NdbColumnImpl *col;
- if (rs->colPtr)
- col= &NdbColumnImpl::getImpl(*(rs->colPtr));
- else if (rs->colName)
- col= table->getColumn(rs->colName);
- else
- col= table->getColumn(rs->colNumber);
- if(!col)
+ col= &NdbColumnImpl::getImpl(*(rs->column));
+ if (!col)
{
m_error.code= 4277;
goto err;
}
+ if (col->getBlobType())
+ {
+ /* Blobs are not yet supported for NdbRecord. */
+ m_error.code= 4275;
+ goto err;
+ }
NdbRecord::Attr *recCol= &rec->columns[i];
- bool isVarCol= (col->m_arrayType==NDB_ARRAYTYPE_SHORT_VAR ||
- col->m_arrayType==NDB_ARRAYTYPE_MEDIUM_VAR);
-
recCol->attrId= col->m_attrId;
- recCol->offset= rs->dataOffset;
+ recCol->column_no= col->m_column_no;
+ recCol->index_attrId= ~0;
+ recCol->offset= rs->offset;
recCol->maxSize= col->m_attrSize*col->m_arraySize;
+ if (recCol->offset+recCol->maxSize > max_offset)
+ max_offset= recCol->offset+recCol->maxSize;
+ recCol->charset_info= col->m_cs;
+ recCol->compare_function= NdbSqlUtil::getType(col->m_type).m_cmp;
recCol->flags= 0;
- if(col->m_pk)
- recCol->flags|= NdbRecord::IsPK;
-
- switch(rs->type)
+ if (!isIndex && col->m_pk)
+ recCol->flags|= NdbRecord::IsKey;
+ /* For indexes, we set key membership below. */
+ if (col->m_storageType == NDB_STORAGETYPE_DISK)
+ recCol->flags|= NdbRecord::IsDisk;
+ if (col->m_nullable)
+ {
+ recCol->flags|= NdbRecord::IsNullable;
+ recCol->nullbit_byte_offset= rs->nullbit_byte_offset;
+ recCol->nullbit_bit_in_byte= rs->nullbit_bit_in_byte;
+ }
+ bool isVarCol;
+ if (col->m_arrayType==NDB_ARRAYTYPE_SHORT_VAR)
+ {
+ recCol->flags|= NdbRecord::IsVar1ByteLen;
+ isVarCol= true;
+ }
+ else if (col->m_arrayType==NDB_ARRAYTYPE_MEDIUM_VAR)
{
- case NdbDictionary::RecordSpecification::AttrOffsetNotNULL:
- if (!isVarCol)
- {
- recCol->type= NdbRecord::AttrNotNULL;
- }
- else
- assert(0); // ToDo
- break;
-
- case NdbDictionary::RecordSpecification::AttrOffsetNULL:
- assert(0); // ToDo
- break;
-
- default:
- /* Wrong type supplied by caller. */
- m_error.code= 4118;
- goto err;
+ recCol->flags|= NdbRecord::IsVar2ByteLen;
+ isVarCol= true;
+ }
+ else
+ {
+ isVarCol= false;
}
}
+ rec->m_row_size= max_offset;
/* Now we sort the array in attrId order. */
qsort(rec->columns,
@@ -4626,11 +4663,12 @@ NdbDictionaryImpl::createRecord(const Nd
read/update.
Also test for duplicate columns, easy now that they are sorted.
+ Also set up key_indexes array.
*/
oldAttrId= ~0;
- numPK= 0;
- for (Uint32 i= 0; i<rec->noOfColumns; i++)
+ numKeys= 0;
+ for (i= 0; i<rec->noOfColumns; i++)
{
NdbRecord::Attr *recCol= &rec->columns[i];
if (i > 0 && oldAttrId==recCol->attrId)
@@ -4640,26 +4678,43 @@ NdbDictionaryImpl::createRecord(const Nd
}
oldAttrId= recCol->attrId;
- if (recCol->flags & NdbRecord::IsPK)
- numPK++;
+ if (isIndex)
+ {
+ Uint32 colNo= recCol->column_no;
+ int key_idx;
+ if (colNo < table->m_index->m_key_ids.size() &&
+ (key_idx= table->m_index->m_key_ids[colNo]) != -1)
+ {
+ assert((Uint32)key_idx < tableNumKeys);
+ recCol->flags|= NdbRecord::IsKey;
+ key_indexes[key_idx]= i;
+ recCol->index_attrId= table->m_columns[key_idx]->m_attrId;
+ numKeys++;
+ }
+ }
+ else
+ {
+ if (recCol->flags & NdbRecord::IsKey)
+ {
+ key_indexes[numKeys]= i;
+ numKeys++;
+ }
+ }
}
-
+ rec->key_indexes= key_indexes;
+ rec->key_index_length= tableNumKeys;
/*
Since we checked for duplicates, we can check for primary key completeness
simply by counting.
*/
- tableNumPK= 0;
- for (Uint32 i= 0; i<table->m_columns.size(); i++)
- {
- if (table->m_columns[i]->m_pk)
- tableNumPK++;
- }
- if (numPK >= tableNumPK)
+ if (numKeys == tableNumKeys)
{
- rec->flags|= NdbRecord::RecHasAllPKs;
- if (numPK == tableNumPK)
- rec->flags|= NdbRecord::RecIsPKRecord;
+ rec->flags|= NdbRecord::RecHasAllKeys;
+ if (rec->noOfColumns == tableNumKeys)
+ rec->flags|= NdbRecord::RecIsKeyRecord;
}
+ if (isIndex)
+ rec->flags|= NdbRecord::RecIsIndex;
return rec;
@@ -4667,6 +4722,42 @@ NdbDictionaryImpl::createRecord(const Nd
if (rec)
free(rec);
return NULL;
+}
+
+NdbRecord *
+NdbDictionaryImpl::createRecord(const NdbIndexImpl *index_impl,
+ const NdbDictionary::RecordSpecification *recSpec,
+ Uint32 length,
+ Uint32 elemSize)
+{
+ return createRecord(index_impl->getIndexTable(), recSpec, length, elemSize);
+}
+
+void
+NdbRecord::copyMask(Uint32 *dst, const unsigned char *src) const
+{
+ Uint32 i;
+
+ BitmaskImpl::clear((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5, dst);
+ if (src)
+ {
+ for (i= 0; i<noOfColumns; i++)
+ {
+ Uint32 attrId= columns[i].attrId;
+ if (!(attrId & AttributeHeader::PSEUDO) &&
+ src[attrId>>3] & (1 << (attrId&7)))
+ BitmaskImpl::set((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5, dst, attrId);
+ }
+ }
+ else
+ {
+ for (i= 0; i<noOfColumns; i++)
+ {
+ Uint32 attrId= columns[i].attrId;
+ if (!(attrId & AttributeHeader::PSEUDO))
+ BitmaskImpl::set((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5, dst, attrId);
+ }
+ }
}
void NdbDictionaryImpl::releaseRecord_impl(NdbRecord *rec)
--- 1.74/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2007-02-08 12:56:51 +01:00
+++ 1.75/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2007-02-08 12:56:51 +01:00
@@ -22,6 +22,7 @@
#include <BaseString.hpp>
#include <Vector.hpp>
#include <UtilBuffer.hpp>
+#include <NdbSqlUtil.hpp>
#include <NdbDictionary.hpp>
#include <Bitmask.hpp>
#include <AttributeList.hpp>
@@ -82,7 +83,7 @@ public:
int m_length;
int m_column_no;
CHARSET_INFO * m_cs; // not const in MySQL
-
+
bool m_pk;
bool m_distributionKey;
bool m_nullable;
@@ -180,6 +181,12 @@ public:
*/
Uint32 m_columnHashMask;
Vector<Uint32> m_columnHash;
+ /*
+ List of all columns in the table.
+ Note that for index table objects, there is one additional column at the
+ end, NDB$TNODE (ordered index) or NDB$PK. This must be taken into account
+ if iterating over columns.
+ */
Vector<NdbColumnImpl *> m_columns;
void computeAggregates();
void buildColumnHash();
@@ -586,13 +593,16 @@ public:
ie. that it describes _exactly_ the primary key attributes, no more and
no less. This is a requirement for the PK record used in read/update.
*/
- RecIsPKRecord= 0x1,
+ RecIsKeyRecord= 0x1,
/*
This flag tells whether this NdbRecord includes _at least_ all PK columns
(and possibly other columns), which is a requirement for insert.
*/
- RecHasAllPKs= 0x2
+ RecHasAllKeys= 0x2,
+
+ /* This NdbRecord is for an ordered index, not a table. */
+ RecIsIndex= 0x4
};
/* Flag bits for individual columns in the NdbRecord. */
@@ -602,22 +612,31 @@ public:
This flag tells whether the column is part of the primary key, used
for insert.
*/
- IsPK= 0x1
- };
-
- enum RecAttrTypes
- {
- AttrNotNULL
+ IsKey= 0x1,
+ /* This flag is true if column is disk based. */
+ IsDisk= 0x2,
+ /* True if column can be NULL and has a NULL bit. */
+ IsNullable= 0x04,
+ /*
+ Flags for determining the actual length of data (which for varsize
+ columns is different from the maximum size.
+ The flags are mutually exclusive.
+ */
+ IsVar1ByteLen= 0x08,
+ IsVar2ByteLen= 0x10
};
struct Attr
{
+ Uint32 attrId;
+ Uint32 column_no;
/*
- Type of this record attribute, determines which other members in the
- struct are valid.
+ The index_attrId member is the attribute id in the index table object,
+ which is used to specify ordered index bounds in KEYINFO signal.
+ Note that this is different from the normal attribute id in the main
+ table, unless the ordered index is on columns (0..N).
*/
- enum RecAttrTypes type;
- Uint32 attrId;
+ Uint32 index_attrId;
/* Offset of data from the start of a row. */
Uint32 offset;
/*
@@ -628,18 +647,71 @@ public:
/* Flags, or-ed from enum ColFlags. */
Uint32 flags;
+
+ /* Character set information, for ordered index merge sort. */
+ CHARSET_INFO *charset_info;
+ /* Function used to compare attributes during merge sort. */
+ NdbSqlUtil::Cmp *compare_function;
+
+
+ /* NULL bit location (only for nullable columns, ie. flags&IsNullable). */
+ Uint32 nullbit_byte_offset;
+ Uint32 nullbit_bit_in_byte;
+
+ bool get_var_length(const char *row, Uint32& len) const
+ {
+ if (flags & IsVar1ByteLen)
+ len= 1 + *((Uint8*)(row+offset));
+ else if (flags & IsVar2ByteLen)
+ len= 2 + uint2korr(row+offset);
+ else
+ len= maxSize;
+ return len <= maxSize;
+ }
+ bool is_null(const char *row) const
+ {
+ return (flags & IsNullable) &&
+ (row[nullbit_byte_offset] & (1 << nullbit_bit_in_byte));
+ }
};
+ /*
+ ToDo: For now we need to hang on to the Table *, since lots of the
+ existing code (class NdbOperation*, class NdbScanFilter) depends
+ on having access to it.
+ Long-term, we want to eliminate it (instead relying only on copying
+ tableId, fragmentCount etc. into the NdbRecord.
+ */
+ const NdbTableImpl *table;
+
Uint32 tableId;
Uint32 tableVersion;
/* Flags, or-ed from enum RecFlags. */
Uint32 flags;
- /* Total number of attributes in table. */
- Uint32 totalTableColumns;
+ /* Size of row (really end of right-most defined attribute in row). */
+ Uint32 m_row_size;
+
+ /*
+ Array of index (into columns[]) of primary key columns, in order.
+ Physical storage for these is after columns[] array.
+ This array is only fully initialised if flags&RecHasAllKeys.
+ */
+ const Uint32 *key_indexes;
+ /* Length of key_indexes array. */
+ Uint32 key_index_length;
/* The real size of the array at the end of this struct. */
Uint32 noOfColumns;
struct Attr columns[1];
+
+ /* Copy a user-supplied mask to internal mask. */
+ void copyMask(Uint32 *dst, const unsigned char *src) const;
+
+ /* Clear internal mask. */
+ void clearMask(Uint32 *dst) const
+ {
+ BitmaskImpl::clear((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5, dst);
+ }
};
@@ -743,33 +815,11 @@ public:
const NdbDictionary::RecordSpecification *recSpec,
Uint32 length,
Uint32 elemSize);
+ NdbRecord *createRecord(const NdbIndexImpl *index,
+ const NdbDictionary::RecordSpecification *recSpec,
+ Uint32 length,
+ Uint32 elemSize);
void releaseRecord_impl(NdbRecord *rec);
-
- Uint32 *getRecAttrSet(const NdbRecord *rec)
- {
- Uint32 *attrSet;
-
- attrSet= (Uint32 *)calloc((rec->totalTableColumns+31)>>5, sizeof(*attrSet));
- return attrSet;
- }
-
- void releaseRecAttrSet(Uint32 *attrSet)
- {
- free(attrSet);
- }
-
- void recAttrSetEnable(Uint32 *attrSet, Uint32 attrId)
- {
- attrSet[attrId>>5]|= 1<<(attrId&31);
- }
-
- void recAttrSetEnable(Uint32 *attrSet, const char *tableName, const char *colName)
- {
- // ToDo: check table/column not found ...
- const NdbTableImpl *table= getTable(tableName);
- const NdbDictionary::Column *col= table->getColumn(colName);
- recAttrSetEnable(attrSet, col->getAttrId());
- }
private:
NdbTableImpl * fetchGlobalTableImplRef(const GlobalCacheInitObject &obj);
--- 1.21/storage/ndb/src/ndbapi/NdbOperation.cpp 2007-02-08 12:56:51 +01:00
+++ 1.22/storage/ndb/src/ndbapi/NdbOperation.cpp 2007-02-08 12:56:51 +01:00
@@ -160,6 +160,7 @@ NdbOperation::init(const NdbTableImpl* t
theScanInfo = 0;
theTotalNrOfKeyWordInSignal = 8;
theMagicNumber = 0xABCDEF01;
+ m_attribute_record= NULL;
theBlobList = NULL;
m_abortOption = -1;
m_no_disk_flag = 1;
--- 1.25/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2007-02-08 12:56:51 +01:00
+++ 1.26/storage/ndb/src/ndbapi/NdbOperationDefine.cpp 2007-02-08 12:56:51 +01:00
@@ -356,7 +356,7 @@ NdbOperation::getValue_impl(const NdbCol
setErrorCodeAbort(4230);
return NULL;
}//if
- // MASV - How would execution come here?
+ /* Final read, after running interpreted instructions. */
theStatus = FinalGetValue;
} else {
setErrorCodeAbort(4230);
--- 1.26/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2007-02-08 12:56:51 +01:00
+++ 1.27/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2007-02-08 12:56:51 +01:00
@@ -359,6 +359,21 @@ NdbOperation::prepareSendInterpreted()
} else {
return -1;
}//if
+ } else if (theStatus == UseNdbRecord &&
+ (theOperationType == OpenScanRequest ||
+ theOperationType == OpenRangeScanRequest)) {
+ /*
+ With NdbRecord scans, we set up the initial read section when the
+ operation was created, and we only allow the addition of an interpreted
+ program.
+ */
+ if (tTotalCurrAI_Len > tInitReadSize + 5)
+ {
+ if (insertATTRINFO(Interpreter::EXIT_OK) != -1)
+ theInterpretedSize = (tTotalCurrAI_Len + 1) - (tInitReadSize + 5);
+ else
+ return -1;
+ }
} else if (theStatus == FinalGetValue) {
theFinalReadSize = tTotalCurrAI_Len -
@@ -515,48 +530,54 @@ NdbOperation::prepareSendNdbRecord(Uint3
Uint32 *keyInfoPtr, *attrInfoPtr;
Uint32 remain;
int res;
+ Uint32 no_disk_flag;
assert(theStatus==UseNdbRecord);
/* Not yet support for NdbRecord with interpreted operations. */
assert(!theInterpretIndicator);
- const NdbRecord *key_rec= thePKRec;
- const char *key_row= thePKRow;
+ const NdbRecord *key_rec= m_key_record;
+ const char *key_row= m_key_row;
const NdbRecord *result_rec, *upd_rec;
const char *updRow;
- const Uint32 *result_mask= theReadMask;
TcKeyReq *tcKeyReq= CAST_PTR(TcKeyReq, theTCREQ->getDataPtrSend());
- Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId, key_rec);
+ Uint32 hdrSize= fillTcKeyReqHdr(tcKeyReq, aTC_ConnectPtr, aTransId,
+ m_attribute_record);
keyInfoPtr= theTCREQ->getDataPtrSend() + hdrSize;
-
- // Fill in keyinfo (in TCKEYREQ signal, spilling into KEYINFO signals)
remain= TcKeyReq::MaxKeyInfo;
- theTotalNrOfKeyWordInSignal= 0;
- for (Uint32 i= 0; i<key_rec->noOfColumns; i++)
- {
- const NdbRecord::Attr *col;
- col= &key_rec->columns[i];
- /*
- This is a special case for insert, which allows extra columns in the key
- NdbRecord, since it uses only a single record.
- */
- if(!(col->flags&NdbRecord::IsPK))
- continue;
-
- switch (col->type)
+ /* Fill in keyinfo (in TCKEYREQ signal, spilling into KEYINFO signals). */
+ if (!key_rec)
+ {
+ /* This means that key_row contains the KEYINFO20 data. */
+ res= insertKEYINFO_NdbRecord(aTC_ConnectPtr, aTransId, key_row,
+ m_keyinfo_length*4, &keyInfoPtr, &remain);
+ if (res)
+ return res;
+ }
+ else
+ {
+ theTotalNrOfKeyWordInSignal= 0;
+ for (Uint32 i= 0; i<key_rec->key_index_length; i++)
{
- case NdbRecord::AttrNotNULL:
- res= insertKEYINFO_NdbRecord(aTC_ConnectPtr, aTransId,
- &key_row[col->offset],
- col->maxSize, &keyInfoPtr, &remain);
- if(res)
- return res;
- break;
+ const NdbRecord::Attr *col;
+
+ col= &key_rec->columns[key_rec->key_indexes[i]];
- default:
- assert(false);
+ assert(!(col->flags & NdbRecord::IsNullable));
+ Uint32 length;
+ if (!col->get_var_length(key_row, length))
+ {
+ /* Hm, corrupt varchar length. */
+ setErrorCodeAbort(4209);
+ return -1;
+ }
+ res= insertKEYINFO_NdbRecord(aTC_ConnectPtr, aTransId,
+ &key_row[col->offset],
+ length, &keyInfoPtr, &remain);
+ if (res)
+ return res;
}
}
@@ -571,12 +592,14 @@ NdbOperation::prepareSendNdbRecord(Uint3
attrInfoPtr= theTCREQ->getDataPtrSend() + hdrSize +
(theTupKeyLen > TcKeyReq::MaxKeyInfo ? TcKeyReq::MaxKeyInfo : theTupKeyLen);
+ no_disk_flag= m_no_disk_flag;
+
OperationType tOpType= theOperationType;
if ((tOpType == InsertRequest) || (tOpType == WriteRequest) ||
(tOpType == UpdateRequest))
{
- upd_rec= theUpdRec;
- updRow= theUpdRow;
+ upd_rec= m_attribute_record;
+ updRow= m_attribute_row;
for (Uint32 i= 0; i<upd_rec->noOfColumns; i++)
{
const NdbRecord::Attr *col;
@@ -584,35 +607,41 @@ NdbOperation::prepareSendNdbRecord(Uint3
col= &upd_rec->columns[i];
Uint32 attrId= col->attrId;
- if (result_mask)
+ if (!(attrId & AttributeHeader::PSEUDO) &&
+ !BitmaskImpl::get((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5,
+ m_read_mask, attrId))
+ continue;
+
+ if (col->flags & NdbRecord::IsDisk)
+ no_disk_flag= 0;
+
+ Uint32 length;
+ if (col->is_null(updRow))
+ length= 0;
+ else if (!col->get_var_length(key_row, length))
{
- if (!(result_mask[attrId>>5] & (1<<(attrId&31))))
- continue;
+ /* Hm, corrupt varchar length. */
+ setErrorCodeAbort(4209);
+ return -1;
}
-
- switch (col->type)
+ res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
+ attrId, length,
+ &attrInfoPtr, &remain);
+ if(res)
+ return res;
+ if (length > 0)
{
- case NdbRecord::AttrNotNULL:
- res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
- attrId, col->maxSize,
- &attrInfoPtr, &remain);
- if(res)
- return res;
- res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
- &updRow[col->offset], col->maxSize,
- &attrInfoPtr, &remain);
- if(res)
- return res;
- break;
-
- default:
- assert(false);
+ res= insertATTRINFOData_NdbRecord(aTC_ConnectPtr, aTransId,
+ &updRow[col->offset], col->maxSize,
+ &attrInfoPtr, &remain);
+ if(res)
+ return res;
}
}
}
else if (tOpType == ReadRequest)
{
- result_rec= theReceiver.theNdbRecord;
+ result_rec= theReceiver.m_ndb_record;
for (Uint32 i= 0; i<result_rec->noOfColumns; i++)
{
const NdbRecord::Attr *col;
@@ -620,11 +649,13 @@ NdbOperation::prepareSendNdbRecord(Uint3
col= &result_rec->columns[i];
Uint32 attrId= col->attrId;
- if (result_mask)
- {
- if (!(result_mask[attrId>>5] & (1<<(attrId&31))))
- continue;
- }
+ if (!(attrId & AttributeHeader::PSEUDO) &&
+ !BitmaskImpl::get((NDB_MAX_ATTRIBUTES_IN_TABLE+31)>>5,
+ m_read_mask, attrId))
+ continue;
+
+ if (col->flags & NdbRecord::IsDisk)
+ no_disk_flag= 0;
res= insertATTRINFOHdr_NdbRecord(aTC_ConnectPtr, aTransId,
attrId, 0,
@@ -646,6 +677,7 @@ NdbOperation::prepareSendNdbRecord(Uint3
setErrorCodeAbort(4257);
return -1;
}
+ TcKeyReq::setNoDiskFlag(tcKeyReq->requestInfo, no_disk_flag);
TcKeyReq::setAttrinfoLen(tcKeyReq->attrLen, theTotalCurrAI_Len);
TcKeyReq::setAIInTcKeyReq(tcKeyReq->requestInfo,
theTotalCurrAI_Len < TcKeyReq::MaxAttrInfo ?
@@ -687,7 +719,7 @@ NdbOperation::fillTcKeyReqHdr(TcKeyReq *
TcKeyReq::setCommitFlag(reqInfo, theCommitIndicator);
TcKeyReq::setStartFlag(reqInfo, theStartIndicator);
TcKeyReq::setInterpretedFlag(reqInfo, theInterpretIndicator);
- TcKeyReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
+ /* We will setNoDiskFlag() later when we have checked all columns. */
TcKeyReq::setDirtyFlag(reqInfo, theDirtyIndicator);
TcKeyReq::setOperationType(reqInfo, theOperationType);
Uint8 abortOption=
@@ -972,18 +1004,3 @@ NdbOperation::receiveTCKEYREF( NdbApiSig
return -1;
}
-
-
-void
-NdbOperation::handleFailedAI_ElemLen()
-{
- NdbRecAttr* tRecAttr = theReceiver.theFirstRecAttr;
- while (tRecAttr != NULL) {
- tRecAttr->setNULL();
- tRecAttr = tRecAttr->next();
- }//while
-}//NdbOperation::handleFailedAI_ElemLen()
-
-
-
-
--- 1.20/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2007-02-08 12:56:51 +01:00
+++ 1.21/storage/ndb/src/ndbapi/NdbOperationInt.cpp 2007-02-08 12:56:51 +01:00
@@ -69,7 +69,7 @@ NdbOperation::incCheck(const NdbColumnIm
(tNdbColumnImpl->m_pk != false) ||
(tNdbColumnImpl->m_nullable))
goto inc_check_error2;
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
; // Simply continue with interpretation
} else if (theStatus == GetValue) {
theInitialReadSize = theTotalCurrAI_Len - 5;
@@ -126,7 +126,7 @@ NdbOperation::write_attrCheck(const NdbC
if ((tNdbColumnImpl->getInterpretableType() == false) ||
(tNdbColumnImpl->m_pk))
goto write_attr_check_error2;
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
; // Simply continue with interpretation
} else if (theStatus == SubroutineExec) {
; // Simply continue with interpretation
@@ -173,7 +173,7 @@ NdbOperation::read_attrCheck(const NdbCo
goto read_attr_check_error1;
if (tNdbColumnImpl->getInterpretableType() == false)
goto read_attr_check_error2;
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
; // Simply continue with interpretation
} else if (theStatus == GetValue) {
theInitialReadSize = theTotalCurrAI_Len - 5;
@@ -210,7 +210,7 @@ int
NdbOperation::initial_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
return 0; // Simply continue with interpretation
} else if (theStatus == GetValue) {
theInitialReadSize = theTotalCurrAI_Len - 5;
@@ -234,7 +234,7 @@ int
NdbOperation::labelCheck()
{
if ((theInterpretIndicator == 1)) {
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
return 0; // Simply continue with interpretation
} else if (theStatus == GetValue) {
theInitialReadSize = theTotalCurrAI_Len - 5;
@@ -260,7 +260,7 @@ int
NdbOperation::intermediate_interpreterCheck()
{
if ((theInterpretIndicator == 1)) {
- if (theStatus == ExecInterpretedValue) {
+ if (theStatus == ExecInterpretedValue || theStatus == UseNdbRecord) {
return 0; // Simply continue with interpretation
} else if (theStatus == SubroutineExec) {
return 0; // Simply continue with interpretation
--- 1.22/storage/ndb/src/ndbapi/NdbReceiver.cpp 2007-02-08 12:56:51 +01:00
+++ 1.23/storage/ndb/src/ndbapi/NdbReceiver.cpp 2007-02-08 12:56:51 +01:00
@@ -29,7 +29,7 @@ NdbReceiver::NdbReceiver(Ndb *aNdb) :
m_id(NdbObjectIdMap::InvalidId),
m_type(NDB_UNINITIALIZED),
m_owner(0),
- usingNdbRecord(false)
+ m_using_ndb_record(false)
{
theCurrentRecAttr = theFirstRecAttr = 0;
m_defined_rows = 0;
@@ -51,20 +51,31 @@ NdbReceiver::init(ReceiverType type, boo
{
theMagicNumber = 0x11223344;
m_type = type;
- usingNdbRecord= useRec;
+ m_using_ndb_record= useRec;
m_owner = owner;
if (m_id == NdbObjectIdMap::InvalidId) {
if (m_ndb)
m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
}
- theFirstRecAttr = NULL;
- theCurrentRecAttr = NULL;
+ if (useRec)
+ {
+ m_ndb_record= NULL;
+ m_row= NULL;
+ m_row_buffer= NULL;
+ m_row_offset= 0;
+ m_read_range_no= false;
+ }
+ else
+ {
+ theFirstRecAttr = NULL;
+ theCurrentRecAttr = NULL;
+ }
}
void
NdbReceiver::release(){
- if (!usingNdbRecord)
+ if (!m_using_ndb_record)
{
NdbRecAttr* tRecAttr = theFirstRecAttr;
while (tRecAttr != NULL)
@@ -76,11 +87,16 @@ NdbReceiver::release(){
theFirstRecAttr = NULL;
theCurrentRecAttr = NULL;
}
+ else
+ {
+ delete[] m_row_buffer;
+ m_row_buffer= NULL;
+ }
}
NdbRecAttr *
NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
- assert(!usingNdbRecord);
+ assert(!m_using_ndb_record);
NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
@@ -101,11 +117,10 @@ NdbReceiver::getValue(const NdbColumnImp
void
NdbReceiver::getValues(const NdbRecord* rec, char *row_ptr)
{
- assert(usingNdbRecord);
+ assert(m_using_ndb_record);
- theNdbRecord= rec;
- m_RecPos= 0;
- theRow= row_ptr;
+ m_ndb_record= rec;
+ m_row= row_ptr;
}
#define KEY_ATTR_ID (~(Uint32)0)
@@ -129,6 +144,7 @@ NdbReceiver::calculate_batch_size(Uint32
Uint32 max_batch_byte_size= tp->get_batch_byte_size();
Uint32 max_batch_size= tp->get_batch_size();
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
+ /* ToDo: Use the NdbRecord to calculate size here instead for NdbRecord operation. */
NdbRecAttr *rec_attr= theFirstRecAttr;
while (rec_attr != NULL) {
Uint32 attr_size= rec_attr->getColumn()->getSizeInBytes();
@@ -179,6 +195,7 @@ NdbReceiver::do_get_value(NdbReceiver *
Uint32 batch_size,
Uint32 key_size,
Uint32 range_no){
+ assert(!m_using_ndb_record);
if(batch_size > m_defined_rows){
delete[] m_rows;
m_defined_rows = batch_size;
@@ -239,8 +256,40 @@ NdbReceiver::do_get_value(NdbReceiver *
return;
}
+int
+NdbReceiver::do_setup_ndbrecord(const NdbRecord *ndb_record, Uint32 batch_size,
+ Uint32 key_size, Uint32 read_range_no)
+{
+ Uint32 rowsize= ndb_record->m_row_size;
+ /* Room for range_no. */
+ if (read_range_no)
+ rowsize+= 4;
+ /*
+ If keyinfo, need room for max. key + 4 bytes of actual key length + 4
+ bytes of scan info (all from KEYINFO20 signal).
+ */
+ if (key_size)
+ rowsize+= 8 + key_size*4;
+ /* Ensure 4-byte alignment. */
+ rowsize= (rowsize+3) & 0xfffffffc;
+
+ char *row_buffer= new char[batch_size*rowsize];
+ if (!row_buffer)
+ return -1;
+
+ m_using_ndb_record= true;
+ m_ndb_record= ndb_record;
+ m_row= row_buffer;
+ m_row_buffer= row_buffer;
+ m_row_offset= rowsize;
+ m_read_range_no= read_range_no;
+
+ return 0;
+}
+
NdbRecAttr*
NdbReceiver::copyout(NdbReceiver & dstRec){
+ assert(!m_using_ndb_record);
NdbRecAttr *src = m_rows[m_current_row++];
NdbRecAttr *dst = dstRec.theFirstRecAttr;
NdbRecAttr *start = src;
@@ -258,75 +307,107 @@ NdbReceiver::copyout(NdbReceiver & dstRe
return start;
}
+int
+NdbReceiver::get_range_no() const
+{
+ int range_no;
+ assert(m_using_ndb_record);
+ Uint32 idx= m_current_row;
+ if (idx == 0 || !m_read_range_no)
+ return -1;
+ memcpy(&range_no,
+ m_row_buffer + (idx-1)*m_row_offset + m_ndb_record->m_row_size,
+ 4);
+ return range_no;
+}
+
+int
+NdbReceiver::get_keyinfo20(Uint32 & scaninfo, Uint32 & length,
+ const char * & data_ptr) const
+{
+ assert(m_using_ndb_record);
+ Uint32 idx= m_current_row;
+ if (idx == 0)
+ return -1; // No rows fetched yet
+ const char *p= m_row_buffer + (idx-1)*m_row_offset + m_ndb_record->m_row_size;
+ if (m_read_range_no)
+ p+= 4;
+ scaninfo= uint4korr(p);
+ p+= 4;
+ length= uint4korr(p);
+ p+= 4;
+ data_ptr= p;
+ return 0;
+}
+
+/* Set NdbRecord field to non-NULL value. */
static void assignToRec(const NdbRecord::Attr *col,
char *row,
const Uint32 *src,
Uint32 byteSize)
{
- switch (col->type)
- {
- case NdbRecord::AttrNotNULL:
- memcpy(&row[col->offset], src, byteSize);
- break;
+ /* Set NULLable attribute to "not NULL". */
+ if (col->flags & NdbRecord::IsNullable)
+ row[col->nullbit_byte_offset]&= ~(1 << col->nullbit_bit_in_byte);
- default:
- assert(false);
- }
+ memcpy(&row[col->offset], src, byteSize);
}
+/* Set NdbRecord field to NULL. */
static void setRecToNULL(const NdbRecord::Attr *col,
char *row)
{
- switch (col->type)
- {
- case NdbRecord::AttrNotNULL:
- assert(false);
- break;
-
- default:
- assert(false);
- }
+ assert(col->flags & NdbRecord::IsNullable);
+ row[col->nullbit_byte_offset]|= 1 << col->nullbit_bit_in_byte;
}
int
NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
{
- if (usingNdbRecord)
+ if (m_using_ndb_record)
{
Uint32 exp= m_expected_result_length;
Uint32 tmp= m_received_result_length + aLength;
- const NdbRecord *rec= theNdbRecord;
+ const NdbRecord *rec= m_ndb_record;
+ Uint32 rec_pos= 0;
while (aLength > 0)
{
AttributeHeader ah(* aDataPtr++);
const Uint32 attrId= ah.getAttributeId();
+ Uint32 attrSize= ah.getByteSize();
aLength--;
+ /* Special case for RANGE_NO, which is stored just after the row. */
+ if (attrId==AttributeHeader::RANGE_NO)
+ {
+ assert(m_read_range_no);
+ assert(attrSize==4);
+ memcpy(m_row+m_ndb_record->m_row_size, aDataPtr++, 4);
+ aLength--;
+ continue;
+ }
+
/*
- Set all not returned columns to NULL.
+ Skip all not returned columns.
The rows should be returned in the same order as we requested them
(which is in any case in attribute ID order).
*/
- while (m_RecPos < rec->noOfColumns &&
- rec->columns[m_RecPos].attrId < attrId)
- {
- setRecToNULL(&rec->columns[m_RecPos], theRow);
- m_RecPos++;
- }
+ while (rec_pos < rec->noOfColumns &&
+ rec->columns[rec_pos].attrId < attrId)
+ rec_pos++;
/* We should never get back an attribute not originally requested. */
- assert(m_RecPos < rec->noOfColumns &&
- rec->columns[m_RecPos].attrId == attrId);
+ assert(rec_pos < rec->noOfColumns &&
+ rec->columns[rec_pos].attrId == attrId);
- Uint32 attrSize= ah.getByteSize();
if (attrSize == 0)
{
- setRecToNULL(&rec->columns[m_RecPos], theRow);
+ setRecToNULL(&rec->columns[rec_pos], m_row);
}
else
{
- assert(attrSize <= rec->columns[m_RecPos].maxSize);
+ assert(attrSize <= rec->columns[rec_pos].maxSize);
Uint32 sizeInWords= (attrSize+3)>>2;
/* Not sure how to deal with this, shouldn't happen. */
if (unlikely(sizeInWords > aLength))
@@ -335,14 +416,15 @@ NdbReceiver::execTRANSID_AI(const Uint32
attrSize= 4*aLength;
}
- assignToRec(&rec->columns[m_RecPos], theRow, aDataPtr, attrSize);
+ assignToRec(&rec->columns[rec_pos], m_row, aDataPtr, attrSize);
aDataPtr+= sizeInWords;
aLength-= sizeInWords;
}
- m_RecPos++;
+ rec_pos++;
}
m_received_result_length = tmp;
+ m_row+= m_row_offset;
return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0);
}
@@ -374,9 +456,10 @@ NdbReceiver::execTRANSID_AI(const Uint32
back attributes in the wrong order).
So dump some info for debugging, and abort.
*/
- ndbout_c("%p: tAttrId: %d currRecAttr: %p tAttrSize: %d %d", this,
- tAttrId, currRecAttr,
- tAttrSize, currRecAttr->get_size_in_bytes());
+ ndbout_c("this=%p: tAttrId: %d currRecAttr: %p theCurrentRecAttr: %p "
+ "tAttrSize: %d %d", this,
+ tAttrId, currRecAttr, theCurrentRecAttr, tAttrSize,
+ currRecAttr ? currRecAttr->get_size_in_bytes() : 0);
currRecAttr = theCurrentRecAttr;
while(currRecAttr != 0){
ndbout_c("%d ", currRecAttr->attrId());
@@ -402,6 +485,29 @@ NdbReceiver::execTRANSID_AI(const Uint32
int
NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
{
+ if (m_using_ndb_record)
+ {
+ /* Copy in the keyinfo after the user row and any range_no value. */
+
+ char *keyinfo_ptr= m_row_buffer +
+ m_current_row++ * m_row_offset +
+ m_ndb_record->m_row_size;
+ if (m_read_range_no)
+ keyinfo_ptr+= 4;
+
+ int4store(keyinfo_ptr, info);
+ keyinfo_ptr+= 4;
+ int4store(keyinfo_ptr, aLength);
+ keyinfo_ptr+= 4;
+ memcpy(keyinfo_ptr, aDataPtr, 4*aLength);
+
+ Uint32 tmp= m_received_result_length + aLength;
+ m_received_result_length = tmp;
+
+ return (tmp == m_expected_result_length ? 1 : 0);
+ }
+
+ /* The old method, using NdbRecAttr. */
NdbRecAttr* currRecAttr = m_rows[m_current_row++];
assert(currRecAttr->attrId() == KEY_ATTR_ID);
/*
--- 1.106/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2007-02-08 12:56:51 +01:00
+++ 1.107/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2007-02-08 12:56:51 +01:00
@@ -308,7 +308,6 @@ NdbScanOperation::receiver_delivered(Ndb
last = m_conf_receivers_count;
m_conf_receivers[last] = tRec;
m_conf_receivers_count = last + 1;
- tRec->m_list_index = last;
tRec->m_current_row = 0;
}
}
@@ -356,6 +355,7 @@ NdbScanOperation::getFirstATTRINFOScan()
return -1;
}
tSignal->setSignal(m_attrInfoGSN);
+ /* The offset 8 is for 3 words of header + 5 words of section sizes. */
theAI_LenInCurrAI = 8;
theATTRINFOptr = &tSignal->getDataPtrSend()[8];
theFirstATTRINFO = tSignal;
@@ -440,6 +440,151 @@ int NdbScanOperation::nextResult(bool fe
return res;
}
+/* nextResult() for NdbRecord operation. */
+int
+NdbScanOperation::nextResult(const char * & out_row,
+ bool fetchAllowed, bool forceSend)
+{
+ /* ToDo: Also handle blobs, like in NdbRecAttr nextResult(). */
+
+ if (m_ordered)
+ return ((NdbIndexScanOperation*)this)->next_result_ordered_ndbrecord
+ (out_row, fetchAllowed, forceSend);
+
+ /* Return a row immediately if any is available. */
+ while (m_current_api_receiver < m_api_receivers_count)
+ {
+ NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
+ if (tRec->nextResult())
+ {
+ out_row= tRec->get_row();
+ return 0;
+ }
+ m_current_api_receiver++;
+ }
+
+ if (!fetchAllowed)
+ {
+ /*
+ Application wants to be informed that no more rows are available
+ immediately.
+ */
+ return 2;
+ }
+
+ /* Now we have to wait for more rows (or end-of-file on all receivers). */
+ Uint32 nodeId = theNdbCon->theDBnode;
+ TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ int retVal= 2;
+ Uint32 idx, last;
+ /*
+ The rest needs to be done under mutex due to synchronization with receiver
+ thread.
+ */
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+
+ const Uint32 seq= theNdbCon->theNodeSequence;
+
+ if(theError.code)
+ {
+ goto err4;
+ }
+
+ if(seq == tp->getNodeSequence(nodeId) &&
+ send_next_scan(m_current_api_receiver, false) == 0)
+ {
+ idx= m_current_api_receiver;
+ last= m_api_receivers_count;
+ Uint32 timeout= tp->m_waitfor_timeout;
+
+ do {
+ if (theError.code){
+ setErrorCode(theError.code);
+ return -1;
+ }
+
+ Uint32 cnt= m_conf_receivers_count;
+ Uint32 sent= m_sent_receivers_count;
+
+ if (cnt > 0)
+ {
+ /* New receivers with completed batches available. */
+ memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
+ last+= cnt;
+ m_conf_receivers_count= 0;
+ }
+ else if (retVal == 2 && sent > 0)
+ {
+ /* No completed... */
+ int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ continue;
+ } else if(ret_code == -1){
+ retVal= -1;
+ } else {
+ idx= last;
+ retVal= -2; //return_code;
+ }
+ }
+ else if (retVal == 2)
+ {
+ /**
+ * No completed & no sent -> EndOfData
+ */
+ theError.code= -1; // make sure user gets error if he tries again
+ return 1;
+ }
+
+ if (retVal == 0)
+ break;
+
+ while (idx < last)
+ {
+ NdbReceiver* tRec= m_api_receivers[idx];
+ if (tRec->nextResult())
+ {
+ out_row= tRec->get_row();
+ retVal= 0;
+ break;
+ }
+ idx++;
+ }
+ } while(retVal == 2);
+ } else {
+ retVal = -3;
+ }
+
+ m_api_receivers_count= last;
+ m_current_api_receiver= idx;
+
+ switch(retVal)
+ {
+ case 0:
+ case 1:
+ case 2:
+ return retVal;
+ case -1:
+ setErrorCode(4008); // Timeout
+ break;
+ case -2:
+ setErrorCode(4028); // Node fail
+ break;
+ case -3: // send_next_scan -> return fail (set error-code self)
+ if(theError.code == 0)
+ setErrorCode(4028); // seq changed = Node fail
+ break;
+ case -4:
+err4:
+ setErrorCode(theError.code);
+ break;
+ }
+
+ theNdbCon->theTransactionIsStarted= false;
+ theNdbCon->theReleaseOnClose= true;
+ return -1;
+}
+
int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
{
if(m_ordered)
@@ -767,6 +912,7 @@ Remark: Puts the the final data
***************************************************************************/
int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
Uint64 aTransactionId){
+ int res;
if (theInterpretIndicator != 1 ||
(theOperationType != OpenScanRequest &&
@@ -781,8 +927,16 @@ int NdbScanOperation::prepareSendScan(Ui
// first ATTRINFO signal.
if (prepareSendInterpreted() == -1)
return -1;
-
- if(m_ordered){
+
+ /*
+ When using getValue() in ordered scans, we need to request "behind the
+ scenes" any part of the primary key that is not request explicitly by the
+ application, so that we will be able to perform the necessary merge sort.
+
+ When using NdbRecord, this is not needed (as the NdbRecord used in ordered
+ scans is required to include the full primary key).
+ */
+ if(!m_attribute_record && m_ordered){
((NdbIndexScanOperation*)this)->fix_get_values();
}
@@ -819,14 +973,37 @@ int NdbScanOperation::prepareSendScan(Ui
ScanTabReq::setNoDiskFlag(reqInfo, m_no_disk_flag);
req->requestInfo = reqInfo;
- for(Uint32 i = 0; i<theParallelism; i++){
- m_receivers[i]->do_get_value(&theReceiver, batch_size,
- key_size,
- m_read_range_no);
+ if (theStatus == UseNdbRecord)
+ {
+ for (Uint32 i = 0; i<theParallelism; i++)
+ {
+ /*
+ ToDo: Allocate receive buffers here in one big chunk, and hand it over
+ to receivers in pieces.
+ Needs some delicate handling of the way the memory is freed to avoid
+ dangling pointers and leaks in error case.
+ */
+ res= m_receivers[i]->do_setup_ndbrecord(m_attribute_record, batch_size,
+ key_size, m_read_range_no);
+ if (res==-1)
+ {
+ setErrorCodeAbort(4000); // "Memory allocation error"
+ return res;
+ }
+ }
+ }
+ else
+ {
+ for(Uint32 i = 0; i<theParallelism; i++){
+ m_receivers[i]->do_get_value(&theReceiver, batch_size,
+ key_size,
+ m_read_range_no);
+ }
}
return 0;
}
+
/*****************************************************************************
int doSend()
@@ -973,6 +1150,12 @@ NdbScanOperation::getKeyFromKEYINFO20(Ui
NdbOperation*
NdbScanOperation::takeOverScanOp(OperationType opType, NdbTransaction* pTrans)
{
+ if (m_attribute_record)
+ {
+ setErrorCodeAbort(4284);
+ return NULL;
+ }
+
/*
Get the first NdbRecAttr object of the row, which contains the 'KeyInfo'
data from KEYINFO20, with the scanInfo_Node value from KEYINFO20 appended
@@ -1068,6 +1251,93 @@ NdbScanOperation::takeOverScanOp(Operati
return 0;
}
+NdbOperation*
+NdbScanOperation::takeOverScanOpNdbRecord(OperationType opType,
+ NdbTransaction* pTrans,
+ const NdbRecord *record,
+ char *row,
+ const unsigned char *mask)
+{
+ int res;
+
+ if (!m_attribute_record)
+ {
+ setErrorCodeAbort(4284);
+ return NULL;
+ }
+ if (!record)
+ {
+ setErrorCodeAbort(4285);
+ return NULL;
+ }
+ if (!m_keyInfo)
+ {
+ // Cannot take over lock if no keyinfo was requested
+ setErrorCodeAbort(4604);
+ return NULL;
+ }
+
+ NdbOperation *op= pTrans->getNdbOperation(record->table, NULL, true);
+ if (!op)
+ return NULL;
+
+ pTrans->theSimpleState= 0;
+ op->theStatus= NdbOperation::UseNdbRecord;
+ op->theOperationType= opType;
+ op->m_key_record= NULL; // This means m_key_row has KEYINFO20 data
+ op->m_attribute_record= record;
+ /*
+ The m_key_row pointer is only valid until next call of
+ nextResult(fetchAllowed=true). But that is ok, since the lock is also
+ only valid until that time, so the application must execute() the new
+ operation before then.
+ */
+
+ /* Now find the current row, and extract keyinfo. */
+ Uint32 idx= m_current_api_receiver;
+ if (idx >= m_api_receivers_count)
+ return NULL;
+ const NdbReceiver *receiver= m_api_receivers[m_current_api_receiver];
+ Uint32 infoword;
+ res= receiver->get_keyinfo20(infoword, op->m_keyinfo_length, op->m_key_row);
+ if (res==-1)
+ return NULL;
+ Uint32 scanInfo= 0;
+ TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
+ Uint32 fragment= infoword >> 20;
+ TcKeyReq::setTakeOverScanFragment(scanInfo, fragment);
+ TcKeyReq::setTakeOverScanInfo(scanInfo, infoword & 0x3FFFF);
+ op->theScanInfo= scanInfo;
+ op->theDistrKeyIndicator_= 1;
+ op->theDistributionKey= fragment;
+
+ switch (opType)
+ {
+ case ReadRequest:
+ op->theLockMode= theLockMode;
+ /*
+ Apart from taking over the row lock, we also support reading again,
+ though typical usage will probably use an empty mask to read nothing.
+ */
+ op->m_attribute_row= row;
+ record->copyMask(op->m_read_mask, mask);
+ op->theReceiver.getValues(record, row);
+ break;
+ case UpdateRequest:
+ op->m_attribute_row= row;
+ record->copyMask(op->m_read_mask, mask);
+ break;
+ case DeleteRequest:
+ break;
+ default:
+ assert(false);
+ return NULL;
+ }
+
+ /* ToDo: Handle blobs, like in takeOverScanOp(). */
+ return op;
+}
+
NdbBlob*
NdbScanOperation::getBlobHandle(const char* anAttrName)
{
@@ -1279,6 +1549,67 @@ error:
return -1;
}
+int
+NdbIndexScanOperation::ndbrecord_insert_bound(const NdbRecord *key_record,
+ Uint32 column_index,
+ const char *row,
+ Uint32 bound_type)
+{
+ Uint32 currLen= theTotalNrOfKeyWordInSignal;
+ Uint32 remaining= KeyInfo::DataLength - currLen;
+ const NdbRecord::Attr *column= &key_record->columns[column_index];
+
+ bool is_null= column->is_null(row);
+ Uint32 len= 0;
+
+ if (!is_null)
+ if (!column->get_var_length(row, len)) {
+ setErrorCodeAbort(4209);
+ return -1;
+ }
+
+ /* Insert attribute header. */
+ Uint32 tIndexAttrId= column->index_attrId;
+ Uint32 sizeInWords= (len + 3) / 4;
+ AttributeHeader ah(tIndexAttrId, sizeInWords << 2);
+ const Uint32 ahValue= ah.m_value;
+ const void *aValue= row+column->offset;
+ const bool aligned= (UintPtr(aValue) & 3) == 0;
+
+ /*
+ The nobytes flag is false if there are extra padding bytes at the end,
+ which we need to zero out.
+ */
+ const bool nobytes= (len & 0x3) == 0;
+ const Uint32 totalLen= 2 + sizeInWords;
+ Uint32 tupKeyLen= theTupKeyLen;
+ if (remaining > totalLen && aligned && nobytes){
+ Uint32 * dst= theKEYINFOptr + currLen;
+ * dst ++ = bound_type;
+ * dst ++ = ahValue;
+ memcpy(dst, aValue, 4 * sizeInWords);
+ theTotalNrOfKeyWordInSignal= currLen + totalLen;
+ } else {
+ if(!aligned || !nobytes){
+ Uint32 tempData[2000];
+ if (len > sizeof(tempData))
+ len= sizeof(tempData);
+ tempData[0] = bound_type;
+ tempData[1] = ahValue;
+ tempData[2 + (len >> 2)] = 0;
+ memcpy(tempData+2, aValue, len);
+ insertBOUNDS(tempData, 2+sizeInWords);
+ } else {
+ Uint32 buf[2] = { bound_type, ahValue };
+ insertBOUNDS(buf, 2);
+ insertBOUNDS((Uint32*)aValue, sizeInWords);
+ }
+ }
+ theTupKeyLen= tupKeyLen + totalLen;
+
+ return 0;
+}
+
Uint32
NdbIndexScanOperation::getKeyFromSCANTABREQ(Uint32* data, Uint32 size)
{
@@ -1335,17 +1666,19 @@ NdbIndexScanOperation::readTuples(LockMo
m_current_api_receiver = m_sent_receivers_count;
m_api_receivers_count = m_sent_receivers_count;
- m_sort_columns = cnt;
- for(Uint32 i = 0; i<cnt; i++){
- const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
- const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
- NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
- UintPtr newVal = UintPtr(tmp);
- theTupleKeyDefined[i][0] = FAKE_PTR;
- theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
+ if (!m_attribute_record)
+ {
+ for(Uint32 i = 0; i<cnt; i++){
+ const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
+ const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
+ NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
+ UintPtr newVal = UintPtr(tmp);
+ theTupleKeyDefined[i][0] = FAKE_PTR;
+ theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
#if (SIZEOF_CHARP == 8)
- theTupleKeyDefined[i][2] = (newVal >> 32);
+ theTupleKeyDefined[i][2] = (newVal >> 32);
#endif
+ }
}
}
m_this_bound_start = 0;
@@ -1363,8 +1696,6 @@ NdbIndexScanOperation::fix_get_values(){
Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
assert(cnt < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
- const NdbIndexImpl * idx = m_accessTable->m_index;
- const NdbTableImpl * tab = m_currentTable;
for(Uint32 i = 0; i<cnt; i++){
Uint32 val = theTupleKeyDefined[i][0];
switch(val){
@@ -1420,6 +1751,63 @@ NdbIndexScanOperation::compare(Uint32 sk
return 0;
}
+int
+NdbIndexScanOperation::compare_ndbrecord(const NdbReceiver *r1,
+ const NdbReceiver *r2) const
+{
+ Uint32 i;
+ int jdir= 1 - 2 * (int)m_descending;
+ const NdbRecord *record= m_attribute_record;
+
+ assert(record->flags & NdbRecord::RecHasAllKeys);
+ assert(jdir == 1 || jdir == -1);
+
+ const char *a_row= r1->peek_row();
+ const char *b_row= r2->peek_row();
+
+ /* First compare range_no if needed. */
+ if (m_read_range_no)
+ {
+ Uint32 a_range_no= uint4korr(a_row+record->m_row_size);
+ Uint32 b_range_no= uint4korr(b_row+record->m_row_size);
+ if (a_range_no != b_range_no)
+ return (a_range_no < b_range_no ? -1 : 1);
+ }
+
+ for (i= 0; i<record->key_index_length; i++)
+ {
+ const NdbRecord::Attr *col= &record->columns[record->key_indexes[i]];
+
+ bool a_is_null= col->is_null(a_row);
+ bool b_is_null= col->is_null(b_row);
+ if (a_is_null)
+ {
+ if (!b_is_null)
+ return -1 * jdir;
+ }
+ else
+ {
+ if (b_is_null)
+ return 1 * jdir;
+
+ Uint32 offset= col->offset;
+ Uint32 maxSize= col->maxSize;
+ const char *a_ptr= a_row + offset;
+ const char *b_ptr= b_row + offset;
+ void *info= col->charset_info;
+ int res=
+ (*col->compare_function)(info, a_ptr, maxSize, b_ptr, maxSize, true);
+ if (res)
+ {
+ assert(res != NdbSqlUtil::CmpUnknown);
+ return res * jdir;
+ }
+ }
+ }
+
+ return 0;
+}
+
/*
This function does the merge-sort of the parallel ordered index scans, needed
to return a single sorted stream of rows to the application.
@@ -1593,6 +1981,158 @@ NdbIndexScanOperation::next_result_order
return 1;
}
+/* NdbRecord version of next_result_ordered. */
+int
+NdbIndexScanOperation::next_result_ordered_ndbrecord(const char * & out_row,
+ bool fetchAllowed,
+ bool forceSend)
+{
+ Uint32 current;
+
+ /*
+ Retrieve more rows if necessary, then sort the array of receivers.
+
+ The special case m_current_api_receiver==theParallelism is for the
+ initial call, where we need to wait for and sort all receviers.
+ */
+ if (m_current_api_receiver==theParallelism ||
+ !m_api_receivers[m_current_api_receiver]->nextResult())
+ {
+ if (!fetchAllowed)
+ return 2; // No more data available now
+
+ /* Wait for all receivers to be retrieved. */
+ int count= ordered_send_scan_wait_for_all(forceSend);
+ if (count == -1)
+ return -1;
+
+ /*
+ Insert all newly retrieved receivers in sorted array.
+ The receivers are left in m_conf_receivers for us to move into place.
+ */
+ current= m_current_api_receiver;
+ for (int i= 0; i < count; i++)
+ ordered_insert_receiver(current--, m_conf_receivers[i]);
+ m_current_api_receiver= current;
+ }
+ else
+ {
+ /*
+ Just make sure the first receiver (from which we just returned a row, so
+ it may no longer be in the correct sort position) is placed correctly.
+ */
+ current= m_current_api_receiver;
+ ordered_insert_receiver(current + 1, m_api_receivers[current]);
+ }
+
+ /* Now just return the next row (if any). */
+ if (current < theParallelism && m_api_receivers[current]->nextResult())
+ {
+ out_row= m_api_receivers[current]->get_row();
+ return 0;
+ }
+ else
+ {
+ theError.code= -1;
+ return 1; // End-of-file
+ }
+}
+
+/* Insert a newly fully-retrieved receiver in the correct sorted place. */
+void
+NdbIndexScanOperation::ordered_insert_receiver(Uint32 start,
+ NdbReceiver *receiver)
+{
+ /*
+ Binary search to find the position of the first receiver with no rows
+ smaller than the first row for this receiver. We need to insert this
+ receiver just before that position.
+ */
+ Uint32 first= start;
+ Uint32 last= theParallelism;
+ while (first < last)
+ {
+ Uint32 idx= (first+last)/2;
+ int res= compare_ndbrecord(receiver, m_api_receivers[idx]);
+ if (res <= 0)
+ last= idx;
+ else
+ first= idx+1;
+ }
+
+ /* Move down any receivers that go before this one, then insert it. */
+ if (last > start)
+ memmove(&m_api_receivers[start-1],
+ &m_api_receivers[start],
+ (last - start) * sizeof(m_api_receivers[0]));
+ m_api_receivers[last-1]= receiver;
+}
+
+/*
+ This method is called during (NdbRecord) ordered index scans when all rows
+ from one batch of one fragment scan are exhausted (identified by
+ m_current_api_receiver).
+
+ It sends a SCAN_NEXTREQ signal for the fragment and waits for the batch to
+ be fully received.
+
+ As a special case, it is also called at the start of the scan. In this case,
+ no signal is sent, it just waits for the initial batch to be fully received
+ from all fragments.
+
+ The method returns -1 for error, and otherwise the number of fragments that
+ were received (this will be 0 or 1, except for the initial call where it
+ will be equal to theParallelism).
+
+ The NdbReceiver object(s) are left in the m_conf_receivers array. Note that
+ it is safe to read from m_conf_receivers without mutex protection immediately
+ after return from this method; as all fragments are fully received no new
+ receivers can enter that array until the next call to this method.
+*/
+int
+NdbIndexScanOperation::ordered_send_scan_wait_for_all(bool forceSend)
+{
+ TransporterFacade* tp= theNdb->theImpl->m_transporter_facade;
+
+ PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
+ theNdb->theNdbBlockNumber);
+ if(theError.code)
+ return -1;
+
+ Uint32 seq= theNdbCon->theNodeSequence;
+ Uint32 nodeId= theNdbCon->theDBnode;
+ Uint32 timeout= tp->m_waitfor_timeout;
+ if (seq == tp->getNodeSequence(nodeId) &&
+ !send_next_scan_ordered(m_current_api_receiver))
+ {
+ while (m_sent_receivers_count > 0 && !theError.code)
+ {
+ int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
+ if (ret_code == 0 && seq == tp->getNodeSequence(nodeId))
+ continue;
+ if(ret_code == -1){
+ setErrorCode(4008);
+ } else {
+ setErrorCode(4028);
+ }
+ return -1;
+ }
+
+ if(theError.code){
+ setErrorCode(theError.code);
+ return -1;
+ }
+
+ Uint32 new_receivers= m_conf_receivers_count;
+ m_conf_receivers_count= 0;
+ assert(new_receivers<=1 || new_receivers==theParallelism);
+ return new_receivers;
+ } else {
+ setErrorCode(4028);
+ return -1;
+ }
+}
+
/*
This method is used in ordered index scan to acknowledge the reception of
one batch of fragment scan rows and request the sending of another batch (it
@@ -1862,6 +2402,16 @@ NdbIndexScanOperation::end_of_bound(Uint
int
NdbIndexScanOperation::get_range_no()
{
+ if (m_attribute_record)
+ {
+ Uint32 idx= m_current_api_receiver;
+ if (idx >= m_api_receivers_count)
+ return -1;
+
+ const NdbReceiver *tRec= m_api_receivers[m_current_api_receiver];
+ return tRec->get_range_no();
+ }
+
NdbRecAttr* tRecAttr = m_curr_row;
if(m_read_range_no && tRecAttr)
{
--- 1.17/storage/ndb/test/ndbapi/flexBench.cpp 2007-02-08 12:56:51 +01:00
+++ 1.18/storage/ndb/test/ndbapi/flexBench.cpp 2007-02-08 12:56:51 +01:00
@@ -29,7 +29,6 @@ Arguments:
-lkn Number of long primary keys, default 1
-lks Size of each long primary key, default 1
-simple Use simple read to read from database
- -dirty Use dirty read to read from database
-write Use writeTuple in insert and update
-stdtables Use standard table names
-no_table_create Don't create tables in db
@@ -112,7 +111,6 @@ static unsigned int tSizeOfLongP
//Program Flags
static int theSimpleFlag = 0;
-static int theDirtyFlag = 0;
static int theWriteFlag = 0;
static int theStdTableNameFlag = 0;
static int theTableCreateFlag = 0;
@@ -609,7 +607,7 @@ static void* flexBenchThread(void* pArg)
tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ;
unsigned** longKeyAttrValue;
NdbRecord** pRec= NULL;
- Uint32** pAttrSet= NULL;
+ unsigned char** pAttrSet= NULL;
int nRefOpOffset= 0;
NdbDictionary::Dictionary *dict= NULL;
NdbDictionary::RecordSpecification recSpec[MAXATTR+MAXNOLONGKEY];
@@ -625,7 +623,7 @@ static void* flexBenchThread(void* pArg)
pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
pNdb = new Ndb(g_cluster_connection, "TEST_DB" );
pRec= (NdbRecord **)calloc(tNoOfTables*3, sizeof(*pRec));
- pAttrSet= (Uint32 **)calloc(tNoOfTables, sizeof(*pAttrSet));
+ pAttrSet= (unsigned char **)calloc(tNoOfTables, sizeof(*pAttrSet));
if (!attrValue || !attrRefValue || !pOps || !pNdb || !pRec || !pAttrSet)
{
@@ -648,17 +646,15 @@ static void* flexBenchThread(void* pArg)
dict= pNdb->getDictionary();
for (int tab= 0; tab<tNoOfTables; tab++)
{
+ const NdbDictionary::Table *table= dict->getTable(tableName[tab]);
int numPKs= (useLongKeys ? tNoOfLongPK : 1);
/* First create NdbRecord for just the primary key(s). */
if (!useLongKeys)
{
- recSpec[0].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
- recSpec[0].colPtr= NULL;
- recSpec[0].colName= NULL;
- recSpec[0].colNumber= 0;
- recSpec[0].dataOffset= 0;
- pRec[tab]= dict->createRecord(tableName[tab],
+ recSpec[0].column= table->getColumn(0);;
+ recSpec[0].offset= 0;
+ pRec[tab]= dict->createRecord(table,
recSpec,
1,
sizeof(recSpec[0]));
@@ -667,12 +663,10 @@ static void* flexBenchThread(void* pArg)
{
for (Uint32 i= 0; i<tNoOfLongPK; i++)
{
- recSpec[i].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
- recSpec[i].colPtr= NULL;
- recSpec[i].colName= longKeyAttrName[i];
- recSpec[i].dataOffset= sizeof(unsigned)*tSizeOfLongPK*i;
+ recSpec[i].column= table->getColumn(longKeyAttrName[i]);
+ recSpec[i].offset= sizeof(unsigned)*tSizeOfLongPK*i;
}
- pRec[tab]= dict->createRecord(tableName[tab],
+ pRec[tab]= dict->createRecord(table,
recSpec,
tNoOfLongPK,
sizeof(recSpec[0]));
@@ -682,14 +676,11 @@ static void* flexBenchThread(void* pArg)
Uint32 count= 0;
for (Uint32 i= 1; i<tNoOfAttributes; i++)
{
- recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
- recSpec[count].colPtr= NULL;
- recSpec[count].colName= NULL;
- recSpec[count].colNumber= i+numPKs-1;
- recSpec[count].dataOffset= sizeof(int)*tAttributeSize*i;
+ recSpec[count].column= table->getColumn(i+numPKs-1);
+ recSpec[count].offset= sizeof(int)*tAttributeSize*i;
count++;
}
- pRec[tab+tNoOfTables]= dict->createRecord(tableName[tab],
+ pRec[tab+tNoOfTables]= dict->createRecord(table,
recSpec,
count,
sizeof(recSpec[0]));
@@ -699,26 +690,21 @@ static void* flexBenchThread(void* pArg)
count= 0;
for (Uint32 i= (useLongKeys?1:0); i<tNoOfAttributes; i++)
{
- recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
- recSpec[count].colPtr= NULL;
- recSpec[count].colName= NULL;
- recSpec[count].colNumber= i-1+numPKs;
- recSpec[count].dataOffset= sizeof(int)*tAttributeSize*i;
+ recSpec[count].column= table->getColumn(i-1+numPKs);
+ recSpec[count].offset= sizeof(int)*tAttributeSize*i;
count++;
}
if (useLongKeys)
{
for (Uint32 i= 0; i<tNoOfLongPK; i++)
{
- recSpec[count].type= NdbDictionary::RecordSpecification::AttrOffsetNotNULL;
- recSpec[count].colPtr= NULL;
- recSpec[count].colName= longKeyAttrName[i];
- recSpec[count].dataOffset= sizeof(int)*tAttributeSize*tNoOfAttributes +
- sizeof(unsigned)*tSizeOfLongPK*i;
+ recSpec[count].column= table->getColumn(longKeyAttrName[i]);
+ recSpec[count].offset= sizeof(int)*tAttributeSize*tNoOfAttributes +
+ sizeof(unsigned)*tSizeOfLongPK*i;
count++;
}
}
- pRec[tab+2*tNoOfTables]= dict->createRecord(tableName[tab],
+ pRec[tab+2*tNoOfTables]= dict->createRecord(table,
recSpec,
count,
sizeof(recSpec[0]));
@@ -734,7 +720,8 @@ static void* flexBenchThread(void* pArg)
}
/* Attribute set for reading just one attribute, when verifying delete. */
- pAttrSet[tab]= dict->getRecAttrSet(pRec[tab]);
+ pAttrSet[tab]=
+ (unsigned char *)calloc(tNoOfAttributes-1+numPKs, sizeof(char));
if (pAttrSet[tab]==NULL) {
// This is a fatal error, abort program
ndbout << "Failed to allocate NdbRecAttrSet in thread" << threadNo;
@@ -742,10 +729,7 @@ static void* flexBenchThread(void* pArg)
tResult = 13;
goto end;
}
- if (useLongKeys)
- dict->recAttrSetEnable(pAttrSet[tab], tableName[tab], longKeyAttrName[0]);
- else
- dict->recAttrSetEnable(pAttrSet[tab], (Uint32)0);
+ pAttrSet[tab][0]|= 1; // Set bit for attrId 0
}
if(useLongKeys){
@@ -878,50 +862,41 @@ static void* flexBenchThread(void* pArg)
switch (tType) {
case stInsert: // Insert case
- if (theWriteFlag == 1 && theDirtyFlag == 1)
- pOps[countTables]= pTrans->dirtyWriteTuple(tabName, pk_record, pRowPK,
- attr_record, pRowAttr);
- else if (theWriteFlag == 1)
- pOps[countTables]= pTrans->writeTuple(tabName, pk_record, pRowPK,
+ if (theWriteFlag == 1)
+ pOps[countTables]= pTrans->writeTuple(pk_record, pRowPK,
attr_record, pRowAttr);
else
- pOps[countTables]= pTrans->insertTuple(tabName, all_record, (char *)pRow);
+ pOps[countTables]= pTrans->insertTuple(all_record, (char *)pRow);
break;
case stRead: // Read Case
if (theSimpleFlag == 1)
- pOps[countTables]= pTrans->simpleReadTuple(tabName, pk_record, pRowPK,
- attr_record, (char *)pRow);
- else if (theDirtyFlag == 1)
- pOps[countTables]= pTrans->dirtyReadTuple(tabName, pk_record, pRowPK,
- attr_record, (char *)pRow);
+ /* Apparently simpleRead is identical to normal read currently. */
+ pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
+ attr_record, (char *)pRow,
+ NdbOperation::LM_Read);
else
- pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+ pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
attr_record, (char *)pRow);
break;
case stUpdate: // Update Case
- if (theWriteFlag == 1 && theDirtyFlag == 1)
- pOps[countTables]= pTrans->dirtyWriteTuple(tabName, pk_record, pRowPK,
- attr_record, pRowAttr);
- else if (theWriteFlag == 1)
- pOps[countTables]= pTrans->writeTuple(tabName, pk_record, pRowPK,
+ if (theWriteFlag == 1)
+ pOps[countTables]= pTrans->writeTuple(pk_record, pRowPK,
attr_record, pRowAttr);
- else if (theDirtyFlag == 1)
- pOps[countTables]= pTrans->dirtyUpdateTuple(tabName, pk_record, pRowPK,
- attr_record, pRowAttr);
else
- pOps[countTables]= pTrans->updateTuple(tabName, pk_record, pRowPK,
- attr_record, pRowAttr);
+ pOps[countTables]= pTrans->updateTuple(pk_record, pRowPK,
+ attr_record, pRowAttr);
break;
case stDelete: // Delete Case
- pOps[countTables]= pTrans->deleteTuple(tabName, pk_record, pRowPK);
+ pOps[countTables]= pTrans->deleteTuple(pk_record, pRowPK);
break;
case stVerify:
- pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+ pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
attr_record, (char *)pRow);
break;
case stVerifyDelete:
- pOps[countTables]= pTrans->readTuple(tabName, pk_record, pRowPK,
+ pOps[countTables]= pTrans->readTuple(pk_record, pRowPK,
pk_record, (char *)pRow,
+ NdbOperation::LM_Read,
pAttrSet[countTables]);
break;
default:
@@ -1063,7 +1038,7 @@ static void* flexBenchThread(void* pArg)
{
for (Uint32 i= 0; i<tNoOfTables; i++)
if (pAttrSet[i])
- dict->releaseRecAttrSet(pAttrSet[i]);
+ free(pAttrSet[i]);
free(pAttrSet);
}
if(pRec)
@@ -1167,8 +1142,6 @@ static int readArguments(int argc, const
theSimpleFlag = 1;
}else if (strcmp(argv[i], "-write") == 0){
theWriteFlag = 1;
- }else if (strcmp(argv[i], "-dirty") == 0){
- theDirtyFlag = 1;
}else if (strcmp(argv[i], "-no_table_create") == 0){
theTableCreateFlag = 1;
}else if (strcmp(argv[i], "-temp") == 0){
@@ -1293,7 +1266,6 @@ static void input_error(){
ndbout << " -lks Size of each long primary key, default 1" << endl;
ndbout << " -simple Use simple read to read from database" << endl;
- ndbout << " -dirty Use dirty read to read from database" << endl;
ndbout << " -write Use writeTuple in insert and update" << endl;
ndbout << " -stdtables Use standard table names" << endl;
ndbout << " -no_table_create Don't create tables in db" << endl;
--- 1.1/support-files/build-tags 2007-02-08 12:56:51 +01:00
+++ 1.2/support-files/build-tags 2007-02-08 12:56:51 +01:00
@@ -1,7 +1,7 @@
#! /bin/sh
rm -f TAGS
-filter='\.cc$\|\.c$\|\.h$\|\.yy$'
+filter='\.cc$\|\.c$\|\.h$\|\.yy\|\.[ch]pp$'
files=`bk -r sfiles -gU | grep $filter `
for f in $files ;
do
--- 1.82/storage/ndb/src/ndbapi/ndberror.c 2007-02-08 12:56:51 +01:00
+++ 1.83/storage/ndb/src/ndbapi/ndberror.c 2007-02-08 12:56:51 +01:00
@@ -618,7 +618,13 @@ ErrorBundle ErrorCodes[] = {
{ 4276, DMEC, AE, "API version mismatch or wrong
sizeof(NdbDictionary::RecordSpecification)" },
{ 4277, DMEC, AE, "Missing column specification in NdbDictionary::RecordSpecification"
},
{ 4278, DMEC, AE, "Duplicate column specification in
NdbDictionary::RecordSpecification" },
- { 4279, DMEC, AE, "NdbRecord for tuple access is not a primary key NdbRecord" },
+ { 4279, DMEC, AE, "NdbRecord for tuple access is not an index key NdbRecord" },
+ { 4280, DMEC, AE, "Error returned from application scanIndex() callback" },
+ { 4281, DMEC, AE, "Too many keys specified for key bound in scanIndex" },
+ { 4282, DMEC, AE, "range_no not strictly increasing in ordered multi-range index scan"
},
+ { 4283, DMEC, AE, "key_record and result_record in index scan are not for the same
index" },
+ { 4284, DMEC, AE, "Cannot mix NdbRecAttr and NdbRecord operations for scan take-over"
},
+ { 4285, DMEC, AE, "NULL NdbRecord pointer" }
};
static
| Thread |
|---|
| • bk commit into 5.1 tree (knielsen:1.2388) | knielsen | 8 Feb |