#At file:///home/msvensson/mysql/6.4/
3105 Magnus Svensson 2008-11-19 [merge]
Merge
modified:
include/my_socket_win32.h
mysql-test/suite/ndb/t/disabled.def
sql/ha_ndbcluster.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_connection.cc
storage/ndb/include/mgmapi/mgmapi.h
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp
storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp
storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp
storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/mgmapi/LocalConfig.cpp
storage/ndb/src/mgmapi/LocalConfig.hpp
storage/ndb/src/mgmapi/mgmapi.cpp
storage/ndb/test/include/HugoOperations.hpp
storage/ndb/test/include/HugoTransactions.hpp
storage/ndb/test/src/HugoOperations.cpp
storage/ndb/test/src/HugoTransactions.cpp
storage/ndb/test/src/NDBT_Tables.cpp
storage/ndb/test/tools/hugoPkRead.cpp
=== modified file 'include/my_socket_win32.h'
--- a/include/my_socket_win32.h 2008-08-21 12:02:12 +0000
+++ b/include/my_socket_win32.h 2008-11-17 21:00:50 +0000
@@ -53,13 +53,19 @@ static inline int my_socket_nfds(my_sock
static inline size_t my_recv(my_socket s, char* buf, size_t len, int flags)
{
- return recv(s.s, buf, len, flags);
+ int ret= recv(s.s, buf, len, flags);
+ if (ret == SOCKET_ERROR)
+ return -1;
+ return ret;
}
static inline
size_t my_send(my_socket s, const char* buf, size_t len, int flags)
{
- return send(s.s, buf, len, flags);
+ int ret= send(s.s, buf, len, flags);
+ if (ret == SOCKET_ERROR)
+ return -1;
+ return ret;
}
static inline int my_socket_reuseaddr(my_socket s, int enable)
@@ -180,7 +186,8 @@ static inline ssize_t my_socket_readv(my
int iovcnt)
{
DWORD rv=0;
- WSARecv(s.s,(LPWSABUF)iov,iovcnt,&rv,0,0,0);
+ if (WSARecv(s.s,(LPWSABUF)iov,iovcnt,&rv,0,0,0) == SOCKET_ERROR)
+ return -1;
return rv;
}
@@ -188,7 +195,8 @@ static inline ssize_t my_socket_writev(m
int iovcnt)
{
DWORD rv=0;
- WSASend(s.s,(LPWSABUF)iov,iovcnt,&rv,0,0,0);
+ if (WSASend(s.s,(LPWSABUF)iov,iovcnt,&rv,0,0,0) == SOCKET_ERROR)
+ return -1;
return rv;
}
=== modified file 'mysql-test/suite/ndb/t/disabled.def'
--- a/mysql-test/suite/ndb/t/disabled.def 2008-10-28 19:40:45 +0000
+++ b/mysql-test/suite/ndb/t/disabled.def 2008-11-18 08:11:19 +0000
@@ -9,8 +9,8 @@
# Do not use any TAB characters for whitespace.
#
##############################################################################
-partition_03ndb : BUG#16385 2006-03-24 mikael Partitions: crash when updating a range partitioned NDB table
+ndb_reconnect : disabled until reconnect patch is fixed
ndb_partition_error2 : HF is not sure if the test can work as internded on all the platforms
# the below testcase have been reworked to avoid the bug, test contains comment, keep bug open
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2008-11-17 19:03:20 +0000
+++ b/sql/ha_ndbcluster.cc 2008-11-18 10:28:03 +0000
@@ -8266,11 +8266,13 @@ Ndb* check_ndb_in_thd(THD* thd, bool val
return NULL;
set_thd_ndb(thd, thd_ndb);
}
+#ifdef NOT_YET
else if (validate_ndb && !thd_ndb->valid_ndb())
{
if (!thd_ndb->recycle_ndb(thd))
return NULL;
}
+#endif
return thd_ndb->ndb;
}
@@ -11196,7 +11198,7 @@ pthread_handler_t ndb_util_thread_func(v
have been created.
If not try to create it
*/
- if (!check_ndb_in_thd(thd, false))
+ if (!check_ndb_in_thd(thd, false))
{
set_timespec(abstime, 1);
continue;
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2008-11-17 19:03:20 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2008-11-18 19:04:49 +0000
@@ -5481,7 +5481,9 @@ pthread_handler_t ndb_binlog_thread_func
pthread_mutex_unlock(&LOCK_thread_count);
thd->lex->start_transaction_opt= 0;
+#ifdef NOT_YET
restart_cluster_failure:
+#endif
if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
s_ndb->init())
{
@@ -5736,17 +5738,6 @@ restart:
res= i_ndb->pollEvents(tot_poll_wait, &gci);
tot_poll_wait= 0;
}
- else
- {
- /*
- Just consume any events, not used if no binlogging
- e.g. node failure events
- */
- Uint64 tmp_gci;
- if (i_ndb->pollEvents(0, &tmp_gci))
- while (i_ndb->nextEvent())
- ;
- }
int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
ndb_latest_received_binlog_epoch= gci;
@@ -5794,7 +5785,7 @@ restart:
&post_epoch_log_list,
&post_epoch_unlock_list,
&mem_root);
-
+#ifdef NOT_YET
if (unlikely(pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE))
{
sql_print_information("NDB Binlog: cluster failure detected");
@@ -5819,7 +5810,7 @@ restart:
sql_print_information("NDB Binlog: restarting");
goto restart_cluster_failure;
}
-
+#endif
DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
"<empty>"));
@@ -5852,7 +5843,35 @@ restart:
}
}
- if (res > 0)
+ if (!ndb_binlog_running)
+ {
+ /*
+ Just consume any events, not used if no binlogging
+ e.g. node failure events
+ */
+ Uint64 tmp_gci;
+ if (i_ndb->pollEvents(0, &tmp_gci))
+ {
+ NdbEventOperation *pOp;
+ while ((pOp= i_ndb->nextEvent()))
+ {
+ if ((unsigned) pOp->getEventType() >=
+ (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
+ {
+ ndb_binlog_index_row row;
+ ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
+ }
+ }
+ if (i_ndb->getEventOperation() == NULL &&
+ s_ndb->getEventOperation() == NULL &&
+ do_ndbcluster_binlog_close_connection == BCCC_running)
+ {
+ DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
+ do_ndbcluster_binlog_close_connection= BCCC_restart;
+ }
+ }
+ }
+ else if (res > 0)
{
DBUG_PRINT("info", ("pollEvents res: %d", res));
thd->proc_info= "Processing events";
=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc 2008-10-28 11:01:39 +0000
+++ b/sql/ha_ndbcluster_connection.cc 2008-11-18 16:31:46 +0000
@@ -136,7 +136,8 @@ int ndbcluster_connect(int (*connect_cal
connect_callback();
for (unsigned i= 0; i < g_ndb_cluster_connection_pool_alloc; i++)
{
- if (g_ndb_cluster_connection_pool[i]->node_id() == 0)
+ int node_id= g_ndb_cluster_connection_pool[i]->node_id();
+ if (node_id == 0)
{
// not connected to mgmd yet, try again
g_ndb_cluster_connection_pool[i]->connect(0,0,0);
@@ -146,6 +147,7 @@ int ndbcluster_connect(int (*connect_cal
g_ndb_cluster_connection_pool[i]->start_connect_thread();
continue;
}
+ node_id= g_ndb_cluster_connection_pool[i]->node_id();
}
DBUG_PRINT("info",
("NDBCLUSTER storage engine (%u) at %s on port %d", i,
@@ -159,18 +161,21 @@ int ndbcluster_connect(int (*connect_cal
now_time= NdbTick_CurrentMillisecond();
} while (res != 0 && now_time < end_time);
+ const char *msg= 0;
if (res == 0)
{
- sql_print_information("NDB[%u]: all storage nodes connected", i);
+ msg= "all storage nodes connected";
}
else if (res > 0)
{
- sql_print_information("NDB[%u]: some storage nodes connected", i);
+ msg= "some storage nodes connected";
}
else if (res < 0)
{
- sql_print_information("NDB[%u]: no storage nodes connected (timed out)", i);
+ msg= "no storage nodes connected (timed out)";
}
+ sql_print_information("NDB[%u]: NodeID: %d, %s",
+ i, node_id, msg);
}
}
else if (res == 1)
=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h 2008-10-05 07:13:39 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h 2008-11-18 08:34:42 +0000
@@ -554,6 +554,15 @@ extern "C" {
*/
const char *ndb_mgm_get_connected_host(NdbMgmHandle handle);
+ /**
+ * Gets connection bind address
+ *
+ * @param handle Management handle
+ *
+ * @return hostname
+ */
+ const char *ndb_mgm_get_connected_bind_address(NdbMgmHandle handle);
+
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** @} *********************************************************************/
/**
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-16 15:29:16 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2008-11-18 10:28:03 +0000
@@ -697,11 +697,10 @@ Lgman::execFSWRITEREQ(Signal* signal)
FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
m_file_pool.getPtr(ptr, req->userPointer);
- m_global_page_pool.getPtr(page_ptr, req->data.pageData[0]);
+ m_shared_page_pool.getPtr(page_ptr, req->data.pageData[0]);
if (req->varIndex == 0)
{
- jam();
File_formats::Undofile::Zero_page* page =
(File_formats::Undofile::Zero_page*)page_ptr.p;
page->m_page_header.init(File_formats::FT_Undofile,
@@ -715,7 +714,6 @@ Lgman::execFSWRITEREQ(Signal* signal)
}
else
{
- jam();
File_formats::Undofile::Undo_page* page =
(File_formats::Undofile::Undo_page*)page_ptr.p;
page->m_page_header.m_page_lsn_hi = 0;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.hpp 2008-11-18 10:28:03 +0000
@@ -258,6 +258,8 @@ protected:
public:
SimulatedBlock& m_fs;
+
+ Uint32 m_page_cnt;
Ptr<GlobalPage> m_page_ptr;
};
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-11-06 16:52:59 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-11-18 10:28:03 +0000
@@ -194,9 +194,15 @@ Ndbfs::execFSOPENREQ(Signal* signal)
if(fsOpenReq->fileFlags & FsOpenReq::OM_INIT)
{
+ jam();
+ Uint32 cnt = 16; // 512k
Ptr<GlobalPage> page_ptr;
- if(m_global_page_pool.seize(page_ptr) == false)
+ m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &page_ptr.i, &cnt, 1);
+ if(cnt == 0)
{
+ file->m_page_ptr.setNull();
+ file->m_page_cnt = 0;
+
FsRef * const fsRef = (FsRef *)&signal->theData[0];
fsRef->userPointer = userPointer;
fsRef->setErrorCode(fsRef->errorCode, FsRef::fsErrOutOfMemory);
@@ -204,12 +210,15 @@ Ndbfs::execFSOPENREQ(Signal* signal)
sendSignal(userRef, GSN_FSOPENREF, signal, 3, JBB);
return;
}
+ m_shared_page_pool.getPtr(page_ptr);
file->m_page_ptr = page_ptr;
+ file->m_page_cnt = cnt;
}
else
{
ndbassert(file->m_page_ptr.isNull());
file->m_page_ptr.setNull();
+ file->m_page_cnt = 0;
}
if(signal->getNoOfSections() == 0){
@@ -714,8 +723,12 @@ Ndbfs::report(Request * request, Signal*
if(!request->file->m_page_ptr.isNull())
{
- m_global_page_pool.release(request->file->m_page_ptr);
+ assert(request->file->m_page_cnt > 0);
+ m_ctx.m_mm.release_pages(RT_DBTUP_PAGE,
+ request->file->m_page_ptr.i,
+ request->file->m_page_cnt);
request->file->m_page_ptr.setNull();
+ request->file->m_page_cnt = 0;
}
if (request->error) {
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-10-21 13:20:59 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/PosixAsyncFile.cpp 2008-11-18 10:28:03 +0000
@@ -32,6 +32,8 @@
#include <signaldata/FsOpenReq.hpp>
#include <signaldata/FsReadWriteReq.hpp>
+#include <NdbTick.h>
+
// use this to test broken pread code
//#define HAVE_BROKEN_PREAD
@@ -330,22 +332,40 @@ no_odirect:
posix_fallocate(theFd, 0, sz);
#endif
+#ifdef VM_TRACE
+#define TRACE_INIT
+#endif
+
+#ifdef TRACE_INIT
+ Uint32 write_cnt = 0;
+ Uint64 start = NdbTick_CurrentMillisecond();
+#endif
while(off < sz)
{
- req->filePointer = 0; // DATA 0
- req->userPointer = request->theUserPointer; // DATA 2
- req->numberOfPages = 1; // DATA 5
- req->varIndex = index++;
- req->data.pageData[0] = m_page_ptr.i;
-
- m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
- FsReadWriteReq::FixedLength + 1,
- 0 // wl4391_todo This EXECUTE_DIRECT is thread safe
- );
+ off_t size = 0;
+ Uint32 cnt = 0;
+ while (cnt < m_page_cnt && (off + size) < sz)
+ {
+ req->filePointer = 0; // DATA 0
+ req->userPointer = request->theUserPointer; // DATA 2
+ req->numberOfPages = 1; // DATA 5
+ req->varIndex = index++;
+ req->data.pageData[0] = m_page_ptr.i + cnt;
+
+ m_fs.EXECUTE_DIRECT(block, GSN_FSWRITEREQ, signal,
+ FsReadWriteReq::FixedLength + 1,
+ 0);
+
+ cnt++;
+ size += request->par.open.page_size;
+ }
retry:
- Uint32 size = request->par.open.page_size;
+ off_t save_size = size;
char* buf = (char*)m_page_ptr.p;
while(size > 0){
+#ifdef TRACE_INIT
+ write_cnt++;
+#endif
int n;
if(use_gz)
n= azwrite(&azf,buf,size);
@@ -382,8 +402,22 @@ no_odirect:
request->error = err;
return;
}
- off += request->par.open.page_size;
+ off += save_size;
}
+ ::fsync(theFd);
+#ifdef TRACE_INIT
+ Uint64 stop = NdbTick_CurrentMillisecond();
+ Uint64 diff = stop - start;
+ if (diff == 0)
+ diff = 1;
+ ndbout_c("wrote %umb in %u writes %us -> %ukb/write %umb/s",
+ Uint32(sz /1024/1024),
+ write_cnt,
+ Uint32(diff / 1000),
+ Uint32(sz / 1024 / write_cnt),
+ Uint32(sz / diff));
+#endif
+
if(lseek(theFd, 0, SEEK_SET) != 0)
request->error = errno;
}
@@ -401,12 +435,6 @@ no_odirect:
if (request->error)
return;
-#elif defined HAVE_DIRECTIO && defined(DIRECTIO_ON)
- if (directio(theFd, DIRECTIO_ON) == -1)
- {
- ndbout_c("%s Failed to set DIRECTIO_ON errno: %u",
- theFileName.c_str(), errno);
- }
#endif
}
@@ -423,6 +451,7 @@ no_odirect:
#endif
}
#endif
+
if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
{
#ifdef O_SYNC
@@ -439,6 +468,24 @@ no_odirect:
}
#endif
}
+
+#if ! defined(O_DIRECT) && defined HAVE_DIRECTIO && defined(DIRECTIO_ON)
+ if (flags & FsOpenReq::OM_DIRECT)
+ {
+ if (directio(theFd, DIRECTIO_ON) == -1)
+ {
+ ndbout_c("%s Failed to set DIRECTIO_ON errno: %u",
+ theFileName.c_str(), errno);
+ }
+#ifdef VM_TRACE
+ else
+ {
+ ndbout_c("%s DIRECTIO_ON", theFileName.c_str());
+ }
+#endif
+ }
+#endif
+
if(use_gz)
{
int err;
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-10-22 10:17:12 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp 2008-11-17 20:58:11 +0000
@@ -207,22 +207,30 @@ int
Win32AsyncFile::readBuffer(Request* req, char * buf, size_t size, off_t offset){
req->par.readWrite.pages[0].size = 0;
- DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
- if(dwSFP != offset) {
- return GetLastError();
- }
-
while (size > 0) {
size_t bytes_read = 0;
+ OVERLAPPED ov;
+ bzero(&ov, sizeof(ov));
+
+ LARGE_INTEGER li;
+ li.QuadPart = offset;
+ ov.Offset = li.LowPart;
+ ov.OffsetHigh = li.HighPart;
+
DWORD dwBytesRead;
BOOL bRead = ReadFile(hFile,
buf,
size,
&dwBytesRead,
- 0);
+ &ov);
if(!bRead){
- return GetLastError();
+ int err = GetLastError();
+ if (err == ERROR_HANDLE_EOF && req->action == Request::readPartial)
+ {
+ return 0;
+ }
+ return err;
}
bytes_read = dwBytesRead;
@@ -257,12 +265,15 @@ Win32AsyncFile::writeBuffer(const char *
m_write_wo_sync += size;
- DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
- if(dwSFP != offset) {
- return GetLastError();
- }
-
while (size > 0) {
+ OVERLAPPED ov;
+ bzero(&ov, sizeof(ov));
+
+ LARGE_INTEGER li;
+ li.QuadPart = offset;
+ ov.Offset = li.LowPart;
+ ov.OffsetHigh = li.HighPart;
+
if (size < bytes_to_write){
// We are at the last chunk
bytes_to_write = size;
@@ -270,7 +281,7 @@ Win32AsyncFile::writeBuffer(const char *
size_t bytes_written = 0;
DWORD dwWritten;
- BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, 0);
+ BOOL bWrite = WriteFile(hFile, buf, bytes_to_write, &dwWritten, &ov);
if(!bWrite) {
return GetLastError();
}
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-16 15:32:38 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2008-11-18 10:28:03 +0000
@@ -829,7 +829,7 @@ Tsman::execFSWRITEREQ(Signal* signal)
FsReadWriteReq* req= (FsReadWriteReq*)signal->getDataPtr();
m_file_pool.getPtr(ptr, req->userPointer);
- m_global_page_pool.getPtr(page_ptr, req->data.pageData[0]);
+ m_shared_page_pool.getPtr(page_ptr, req->data.pageData[0]);
memset(page_ptr.p, 0, File_formats::NDB_PAGE_SIZE);
Uint32 page_no = req->varIndex;
=== modified file 'storage/ndb/src/mgmapi/LocalConfig.cpp'
--- a/storage/ndb/src/mgmapi/LocalConfig.cpp 2008-11-06 16:52:59 +0000
+++ b/storage/ndb/src/mgmapi/LocalConfig.cpp 2008-11-18 16:33:59 +0000
@@ -22,6 +22,7 @@
LocalConfig::LocalConfig(){
error_line = 0; error_msg[0] = 0;
_ownNodeId= 0;
+ bind_address_port= 0;
}
bool
@@ -153,6 +154,11 @@ const char *hostNameTokens[] = {
0
};
+const char *bindAddressTokens[] = {
+ "bind-address=%[^:]:%i",
+ 0
+};
+
const char *fileNameTokens[] = {
"file://%s",
"file=%s",
@@ -179,6 +185,10 @@ LocalConfig::parseHostName(const char *
mgmtSrvrId.type = MgmId_TCP;
mgmtSrvrId.name.assign(tempString);
mgmtSrvrId.port = port;
+ /* assign default bind_address if available */
+ if (bind_address.length())
+ mgmtSrvrId.bind_address.assign(bind_address);
+ mgmtSrvrId.bind_address_port = bind_address_port;
ids.push_back(mgmtSrvrId);
return true;
}
@@ -193,6 +203,41 @@ LocalConfig::parseHostName(const char *
}
bool
+LocalConfig::parseBindAddress(const char * buf)
+{
+ char tempString[1024];
+ char tempString2[1024];
+ int port;
+ do
+ {
+ for(int i = 0; bindAddressTokens[i] != 0; i++)
+ {
+ if (sscanf(buf, bindAddressTokens[i], tempString, &port) == 2)
+ {
+ if (ids.size() == 0)
+ {
+ /* assign default bind_address */
+ bind_address.assign(tempString);
+ bind_address_port = port;
+ return true;
+ }
+ /* override bind_address on latest mgmd */
+ MgmtSrvrId &mgmtSrvrId= ids[ids.size()-1];
+ mgmtSrvrId.bind_address.assign(tempString);
+ mgmtSrvrId.bind_address_port = port;
+ return true;
+ }
+ }
+ if (buf == tempString2)
+ break;
+ // try to add port 0 to see if it works
+ BaseString::snprintf(tempString2, sizeof(tempString2),"%s:0", buf);
+ buf= tempString2;
+ } while(1);
+ return false;
+}
+
+bool
LocalConfig::parseFileName(const char * buf){
char tempString[1024];
for(int i = 0; fileNameTokens[i] != 0; i++) {
@@ -222,6 +267,8 @@ LocalConfig::parseString(const char * co
continue;
if (parseHostName(tok))
continue;
+ if (parseBindAddress(tok))
+ continue;
if (parseFileName(tok))
continue;
=== modified file 'storage/ndb/src/mgmapi/LocalConfig.hpp'
--- a/storage/ndb/src/mgmapi/LocalConfig.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/src/mgmapi/LocalConfig.hpp 2008-11-17 14:20:48 +0000
@@ -33,6 +33,8 @@ struct MgmtSrvrId {
MgmtSrvrId_Type type;
BaseString name;
unsigned int port;
+ BaseString bind_address;
+ unsigned int bind_address_port;
};
struct LocalConfig {
@@ -43,6 +45,9 @@ struct LocalConfig {
int error_line;
char error_msg[256];
+ BaseString bind_address;
+ unsigned int bind_address_port;
+
LocalConfig();
~LocalConfig();
bool init(const char *connectString = 0,
@@ -58,6 +63,7 @@ struct LocalConfig {
bool parseNodeId(const char *buf);
bool parseHostName(const char *buf);
+ bool parseBindAddress(const char *buf);
bool parseFileName(const char *buf);
bool parseString(const char *buf, BaseString &err);
char * makeConnectString(char *buf, int sz);
=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp 2008-11-05 07:57:18 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp 2008-11-18 08:34:42 +0000
@@ -106,6 +106,7 @@ struct ndb_mgm_handle {
int mgmd_version_minor;
int mgmd_version_build;
char * m_bindaddress;
+ int m_bindaddress_port;
};
#define SET_ERROR(h, e, s) setError((h), (e), __LINE__, (s))
@@ -189,6 +190,7 @@ ndb_mgm_create_handle()
h->errstream = stdout;
h->m_name = 0;
h->m_bindaddress = 0;
+ h->m_bindaddress_port = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
@@ -246,10 +248,22 @@ ndb_mgm_set_bindaddress(NdbMgmHandle han
free(handle->m_bindaddress);
if (arg)
+ {
handle->m_bindaddress = strdup(arg);
+ char *port = strchr(handle->m_bindaddress, ':');
+ if (port != 0)
+ {
+ handle->m_bindaddress_port = atoi(port+1);
+ *port = 0;
+ }
+ else
+ handle->m_bindaddress_port = 0;
+ }
else
+ {
handle->m_bindaddress = 0;
-
+ handle->m_bindaddress_port = 0;
+ }
DBUG_RETURN(0);
}
@@ -541,50 +555,6 @@ ndb_mgm_connect(NdbMgmHandle handle, int
my_socket sockfd;
my_socket_invalidate(&sockfd);
Uint32 i;
- SocketClient s(0, 0);
- s.set_connect_timeout((handle->timeout+999)/1000);
- if (!s.init())
- {
- fprintf(handle->errstream,
- "Unable to create socket, "
- "while trying to connect with connect string: %s\n",
- cfg.makeConnectString(buf,sizeof(buf)));
-
- setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
- "Unable to create socket, "
- "while trying to connect with connect string: %s\n",
- cfg.makeConnectString(buf,sizeof(buf)));
- DBUG_RETURN(-1);
- }
-
- if (handle->m_bindaddress)
- {
- BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress);
- unsigned short portno = 0;
- char * port = strchr(buf, ':');
- if (port != 0)
- {
- portno = atoi(port+1);
- * port = 0;
- }
- int err;
- if ((err = s.bind(buf, portno)) != 0)
- {
- fprintf(handle->errstream,
- "Unable to bind local address %s errno: %d, "
- "while trying to connect with connect string: %s\n",
- handle->m_bindaddress, err,
- cfg.makeConnectString(buf,sizeof(buf)));
-
- setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
- "Unable to bind local address %s errno: %d, "
- "while trying to connect with connect string: %s\n",
- handle->m_bindaddress, err,
- cfg.makeConnectString(buf,sizeof(buf)));
- DBUG_RETURN(-1);
- }
- }
-
while (!my_socket_valid(sockfd))
{
// do all the mgmt servers
@@ -592,6 +562,58 @@ ndb_mgm_connect(NdbMgmHandle handle, int
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
+
+ SocketClient s(0, 0);
+ const char *bind_address= NULL;
+ unsigned short bind_address_port= 0;
+ s.set_connect_timeout((handle->timeout+999)/1000);
+ if (!s.init())
+ {
+ fprintf(handle->errstream,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+ if (handle->m_bindaddress)
+ {
+ bind_address= handle->m_bindaddress;
+ bind_address_port= handle->m_bindaddress_port;
+ }
+ else if (cfg.ids[i].bind_address.length())
+ {
+ bind_address= cfg.ids[i].bind_address.c_str();
+ bind_address_port= cfg.ids[i].bind_address_port;
+ }
+ if (bind_address)
+ {
+ int err;
+ if ((err = s.bind(bind_address, bind_address_port)) != 0)
+ {
+ if (!handle->m_bindaddress)
+ {
+ // retry with next mgmt server
+ continue;
+ }
+ fprintf(handle->errstream,
+ "Unable to bind local address '%s:%d' errno: %d, "
+ "while trying to connect with connect string: '%s'\n",
+ bind_address, (int)bind_address_port, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
+ "Unable to bind local address '%s:%d' errno: %d, "
+ "while trying to connect with connect string: '%s'\n",
+ bind_address, (int)bind_address_port, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+ }
sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port);
if (my_socket_valid(sockfd))
break;
@@ -1739,8 +1761,33 @@ ndb_mgm_listen_event_internal(NdbMgmHand
const char *hostname= ndb_mgm_get_connected_host(handle);
int port= ndb_mgm_get_connected_port(handle);
- SocketClient s(hostname, port);
- const NDB_SOCKET_TYPE sockfd = s.connect();
+ const char *bind_address= ndb_mgm_get_connected_bind_address(handle);
+ SocketClient s(0, 0);
+ s.set_connect_timeout((handle->timeout+999)/1000);
+ if (!s.init())
+ {
+ fprintf(handle->errstream, "Unable to create socket");
+ setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
+ "Unable to create socket");
+ return -1;
+ }
+ if (bind_address)
+ {
+ int err;
+ if ((err = s.bind(bind_address, 0)) != 0)
+ {
+ fprintf(handle->errstream,
+ "Unable to bind local address '%s:0' err: %d, errno: %d, "
+ "while trying to connect with connect string: '%s:%d'\n",
+ bind_address, err, errno, hostname, port);
+ setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
+ "Unable to bind local address '%s:0' errno: %d, errno: %d, "
+ "while trying to connect with connect string: '%s:%d'\n",
+ bind_address, err, errno, hostname, port);
+ return -1;
+ }
+ }
+ const NDB_SOCKET_TYPE sockfd = s.connect(hostname, port);
if (!my_socket_valid(sockfd))
{
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
@@ -2367,6 +2414,19 @@ const char *ndb_mgm_get_connectstring(Nd
}
extern "C"
+const char *ndb_mgm_get_connected_bind_address(NdbMgmHandle handle)
+{
+ if (handle->cfg_i >= 0)
+ {
+ if (handle->m_bindaddress)
+ return handle->m_bindaddress;
+ if (handle->cfg.ids[handle->cfg_i].bind_address.length())
+ return handle->cfg.ids[handle->cfg_i].bind_address.c_str();
+ }
+ return 0;
+}
+
+extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype,
int log_event)
=== modified file 'storage/ndb/test/include/HugoOperations.hpp'
--- a/storage/ndb/test/include/HugoOperations.hpp 2008-02-19 15:00:29 +0000
+++ b/storage/ndb/test/include/HugoOperations.hpp 2008-11-17 09:26:25 +0000
@@ -51,9 +51,14 @@ public:
int numRecords = 1);
int pkReadRecord(Ndb*,
- int recordNo,
- int numRecords = 1,
- NdbOperation::LockMode lm = NdbOperation::LM_Read);
+ int record,
+ int numRecords = 1,
+ NdbOperation::LockMode lm = NdbOperation::LM_Read);
+
+ int pkReadRandRecord(Ndb*,
+ int records,
+ int numRecords = 1,
+ NdbOperation::LockMode lm = NdbOperation::LM_Read);
int pkUpdateRecord(Ndb*,
int recordNo,
=== modified file 'storage/ndb/test/include/HugoTransactions.hpp'
--- a/storage/ndb/test/include/HugoTransactions.hpp 2007-09-28 12:00:57 +0000
+++ b/storage/ndb/test/include/HugoTransactions.hpp 2008-11-17 09:28:34 +0000
@@ -64,7 +64,8 @@ public:
int pkReadRecords(Ndb*,
int records,
int batchsize = 1,
- NdbOperation::LockMode = NdbOperation::LM_Read);
+ NdbOperation::LockMode = NdbOperation::LM_Read,
+ int rand = 0);
int scanUpdateRecords(Ndb*, NdbScanOperation::ScanFlag,
int records,
=== modified file 'storage/ndb/test/src/HugoOperations.cpp'
--- a/storage/ndb/test/src/HugoOperations.cpp 2008-10-08 08:36:09 +0000
+++ b/storage/ndb/test/src/HugoOperations.cpp 2008-11-18 08:52:21 +0000
@@ -142,6 +142,82 @@ rand_lock_mode:
return NDBT_OK;
}
+int HugoOperations::pkReadRandRecord(Ndb* pNdb,
+ int records,
+ int numRecords,
+ NdbOperation::LockMode lm){
+ int a;
+ allocRows(numRecords);
+ indexScans.clear();
+ int check;
+
+ NdbOperation* pOp = 0;
+ pIndexScanOp = 0;
+
+ for(int r=0; r < numRecords; r++){
+
+ if(pOp == 0)
+ {
+ pOp = getOperation(pTrans, NdbOperation::ReadRequest);
+ }
+ if (pOp == NULL) {
+ ERR(pTrans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+rand_lock_mode:
+ switch(lm){
+ case NdbOperation::LM_Read:
+ case NdbOperation::LM_Exclusive:
+ case NdbOperation::LM_CommittedRead:
+ case NdbOperation::LM_SimpleRead:
+ if(idx && idx->getType() == NdbDictionary::Index::OrderedIndex &&
+ pIndexScanOp == 0)
+ {
+ pIndexScanOp = ((NdbIndexScanOperation*)pOp);
+ check = pIndexScanOp->readTuples(lm);
+ /* Record NdbIndexScanOperation ptr for later... */
+ indexScans.push_back(pIndexScanOp);
+ }
+ else
+ check = pOp->readTuple(lm);
+ break;
+ default:
+ lm = (NdbOperation::LockMode)((rand() >> 16) & 3);
+ goto rand_lock_mode;
+ }
+
+ if( check == -1 ) {
+ ERR(pTrans->getNdbError());
+ return NDBT_FAILED;
+ }
+
+ // Define primary keys
+ if (equalForRow(pOp, rand() % records) != 0)
+ return NDBT_FAILED;
+
+ if(pIndexScanOp)
+ pIndexScanOp->end_of_bound(r);
+
+ if(r == 0 || pIndexScanOp == 0)
+ {
+ // Define attributes to read
+ for(a = 0; a<tab.getNoOfColumns(); a++){
+ if((rows[r]->attributeStore(a) =
+ pOp->getValue(tab.getColumn(a)->getName())) == 0) {
+ ERR(pTrans->getNdbError());
+ return NDBT_FAILED;
+ }
+ }
+ }
+ /* Note pIndexScanOp will point to the 'last' index scan op
+ * we used. The full list is in the indexScans vector
+ */
+ pOp = pIndexScanOp;
+ }
+ return NDBT_OK;
+}
+
int HugoOperations::pkUpdateRecord(Ndb* pNdb,
int recordNo,
int numRecords,
=== modified file 'storage/ndb/test/src/HugoTransactions.cpp'
--- a/storage/ndb/test/src/HugoTransactions.cpp 2008-02-20 11:33:23 +0000
+++ b/storage/ndb/test/src/HugoTransactions.cpp 2008-11-17 09:28:34 +0000
@@ -812,7 +812,8 @@ int
HugoTransactions::pkReadRecords(Ndb* pNdb,
int records,
int batch,
- NdbOperation::LockMode lm){
+ NdbOperation::LockMode lm,
+ int _rand){
int reads = 0;
int r = 0;
int retryAttempt = 0;
@@ -857,11 +858,23 @@ HugoTransactions::pkReadRecords(Ndb* pNd
if (timer_active)
NdbTick_getMicroTimer(&timer_start);
- if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
+ if (_rand == 0)
{
- ERR(pTrans->getNdbError());
- closeTransaction(pNdb);
- return NDBT_FAILED;
+ if(pkReadRecord(pNdb, r, batch, lm) != NDBT_OK)
+ {
+ ERR(pTrans->getNdbError());
+ closeTransaction(pNdb);
+ return NDBT_FAILED;
+ }
+ }
+ else
+ {
+ if(pkReadRandRecord(pNdb, records, batch, lm) != NDBT_OK)
+ {
+ ERR(pTrans->getNdbError());
+ closeTransaction(pNdb);
+ return NDBT_FAILED;
+ }
}
check = pTrans->execute(Commit, AbortOnError);
=== modified file 'storage/ndb/test/src/NDBT_Tables.cpp'
--- a/storage/ndb/test/src/NDBT_Tables.cpp 2008-11-11 12:54:17 +0000
+++ b/storage/ndb/test/src/NDBT_Tables.cpp 2008-11-18 08:52:21 +0000
@@ -1131,6 +1131,8 @@ NDBT_Tables::create_default_tablespace(N
}
Uint32 mb = 96;
+ Uint32 files = 13;
+
{
char buf[256];
if (NdbEnv_GetEnv("UNDOSIZE", buf, sizeof(buf)))
@@ -1140,7 +1142,19 @@ NDBT_Tables::create_default_tablespace(N
}
}
+ {
+ char buf[256];
+ if (NdbEnv_GetEnv("UNDOFILES", buf, sizeof(buf)))
+ {
+ files = atoi(buf);
+ ndbout_c("Using max %u dd-undo files", files);
+ }
+ }
+
Uint32 sz = 32;
+ while (mb > files * sz)
+ sz += 32;
+
for (Uint32 i = 0; i * sz < mb; i++)
{
char tmp[256];
@@ -1149,7 +1163,7 @@ NDBT_Tables::create_default_tablespace(N
if (strcmp(uf.getPath(), tmp) != 0)
{
uf.setPath(tmp);
- uf.setSize(sz*1024*1024);
+ uf.setSize(Uint64(sz)*1024*1024);
uf.setLogfileGroup("DEFAULT-LG");
res = pDict->createUndofile(uf, true);
@@ -1175,30 +1189,39 @@ NDBT_Tables::create_default_tablespace(N
return NDBT_FAILED;
}
}
+
+ mb = 128;
+ {
+ char buf[256];
+ if (NdbEnv_GetEnv("DATASIZE", buf, sizeof(buf)))
+ {
+ mb = atoi(buf);
+ ndbout_c("Using %umb dd-data", mb);
+ }
+ }
+ sz = 64;
+ files = 13;
{
- NdbDictionary::Datafile df = pDict->getDatafile(0, "datafile01.dat");
- if (strcmp(df.getPath(), "datafile01.dat") != 0)
+ char buf[256];
+ if (NdbEnv_GetEnv("DATAFILES", buf, sizeof(buf)))
{
- df.setPath("datafile01.dat");
- df.setSize(64*1024*1024);
- df.setTablespace("DEFAULT-TS");
-
- res = pDict->createDatafile(df, true);
- if(res != 0){
- g_err << "Failed to create datafile:"
- << endl << pDict->getNdbError() << endl;
- return NDBT_FAILED;
- }
+ files = atoi(buf);
+ ndbout_c("Using max %u dd-data files", files);
}
}
-
+
+ while (mb > files * sz)
+ sz += 32;
+ for (Uint32 i = 0; i * sz < mb; i++)
{
- NdbDictionary::Datafile df = pDict->getDatafile(0, "datafile02.dat");
- if (strcmp(df.getPath(), "datafile02.dat") != 0)
+ char tmp[256];
+ BaseString::snprintf(tmp, sizeof(tmp), "datafile%u.dat", i);
+ NdbDictionary::Datafile df = pDict->getDatafile(0, tmp);
+ if (strcmp(df.getPath(), tmp) != 0)
{
- df.setPath("datafile02.dat");
- df.setSize(64*1024*1024);
+ df.setPath(tmp);
+ df.setSize(Uint64(sz)*1024*1024);
df.setTablespace("DEFAULT-TS");
res = pDict->createDatafile(df, true);
@@ -1209,7 +1232,7 @@ NDBT_Tables::create_default_tablespace(N
}
}
}
-
+
return NDBT_OK;
}
=== modified file 'storage/ndb/test/tools/hugoPkRead.cpp'
--- a/storage/ndb/test/tools/hugoPkRead.cpp 2007-07-14 08:48:51 +0000
+++ b/storage/ndb/test/tools/hugoPkRead.cpp 2008-11-17 09:26:25 +0000
@@ -34,6 +34,7 @@ struct ThrInput {
int records;
int batch;
int stats;
+ int rand;
};
struct ThrOutput {
@@ -51,7 +52,8 @@ int main(int argc, const char** argv){
int _batch = 1;
const char* _tabname = NULL;
int _help = 0;
-
+ int _rand = 0;
+
struct getargs args[] = {
{ "aborts", 'a', arg_integer, &_abort, "percent of transactions that are aborted", "abort%" },
{ "loops", 'l', arg_integer, &_loops, "number of times to run this program(0=infinite loop)", "loops" },
@@ -59,6 +61,7 @@ int main(int argc, const char** argv){
{ "stats", 's', arg_flag, &_stats, "report latency per batch", "stats" },
{ "batch", 'b', arg_integer, &_batch, "batch value(not 0)", "batch" },
{ "records", 'r', arg_integer, &_records, "Number of records", "records" },
+ { "rand", 0, arg_flag, &_rand, "Read random records within range","rand"},
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
@@ -116,6 +119,7 @@ int main(int argc, const char** argv){
input.records = _records;
input.batch = _batch;
input.stats = _stats;
+ input.rand = _rand;
// output is stats
ThrOutput output;
@@ -171,7 +175,9 @@ static void hugoPkRead(NDBT_Thread& thr)
int ret;
ret = hugoTrans.pkReadRecords(thr.get_ndb(),
input->records,
- input->batch);
+ input->batch,
+ NdbOperation::LM_Read,
+ input->rand);
if (ret != 0)
thr.set_err(ret);
}
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (msvensson:3105) | Magnus Svensson | 19 Nov |