List:Commits« Previous MessageNext Message »
From:ahristov Date:May 9 2007 9:30am
Subject:PHP mysqlnd svn commit: r363 - branches/threaded_fetch/mysqlnd
View as plain text  
Author: ahristov
Date: 2007-05-09 11:30:56 +0200 (Wed, 09 May 2007)
New Revision: 363

Modified:
   branches/threaded_fetch/mysqlnd/mysqlnd.c
   branches/threaded_fetch/mysqlnd/mysqlnd.h
   branches/threaded_fetch/mysqlnd/mysqlnd_charset.c
   branches/threaded_fetch/mysqlnd/mysqlnd_charset.h
   branches/threaded_fetch/mysqlnd/mysqlnd_palloc.c
   branches/threaded_fetch/mysqlnd/mysqlnd_ps.c
Log:
Threaded fetching. Not quicker, sometimes slower, and crashing,
than old-way non-parallel mysqlnd. Draft code.


Modified: branches/threaded_fetch/mysqlnd/mysqlnd.c
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd.c	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd.c	2007-05-09 09:30:56 UTC (rev 363)
@@ -24,10 +24,13 @@
 #include "mysqlnd_wireprotocol.h"
 #include "mysqlnd_priv.h"
 #include "mysqlnd_statistics.h"
+#include "mysqlnd_charset.h"
 #include "ext/standard/basic_functions.h"
 
 #define MYSQLND_SILENT
 
+static const char * const thread_success_finish = "success"; 
+
 /* the server doesn't support 4byte utf8, but let's make it forward compatible */
 #define MYSQLND_MAX_ALLOWED_USER_LEN	256  /* 64 char * 4byte */
 #define MYSQLND_MAX_ALLOWED_DB_LEN		256  /* 64 char * 4byte */
@@ -62,7 +65,9 @@
 MYSQLND_STATS *mysqlnd_global_stats = NULL;
 static zend_bool mysqlnd_library_initted = FALSE;
 
+void * _mysqlnd_fetch_thread(void *arg);
 
+
 /* {{{ mysqlnd_library_init */
 PHPAPI void mysqlnd_library_init()
 {
@@ -120,26 +125,26 @@
 	}
 
 	if (unbuf->last_row_data) {
-		int i;
+		unsigned int i, ctor_called_count = 0;
 		zend_bool copy_ctor_called;
+		MYSQLND_STATS *global_stats = result->conn? &result->conn->stats:NULL;
 		for (i = 0; i < result->field_count; i++) {
-			if (result->type == MYSQLND_RES_PS) {
-				/* Do nothing, before assigning we will clean */
-				zval_ptr_dtor(&unbuf->last_row_data[i]);
-			} else {
-				mysqlnd_palloc_zval_ptr_dtor(&(unbuf->last_row_data[i]),
-											 result->zval_cache, FALSE,
-											 &copy_ctor_called TSRMLS_CC);
-				if (copy_ctor_called) {
-					MYSQLND_INC_CONN_STATISTIC(&result->conn->stats,
-											   STAT_COPY_ON_WRITE_PERFORMED);
-					/* Increase global stats */
-				} else {
-					MYSQLND_INC_CONN_STATISTIC(&result->conn->stats,
-											   STAT_COPY_ON_WRITE_SAVED);
-				}
+			mysqlnd_palloc_zval_ptr_dtor(&(unbuf->last_row_data[i]),
+										 result->zval_cache,
+										 result->type == MYSQLND_RES_PS,
+										 &copy_ctor_called TSRMLS_CC);
+			if (copy_ctor_called) {
+				ctor_called_count++;
 			}
 		}
+		/* By using value3 macros we hold a mutex only once, there is no value2 */
+		MYSQLND_INC_CONN_STATISTIC_W_VALUE3(global_stats,
+											STAT_COPY_ON_WRITE_PERFORMED,
+											ctor_called_count,
+											STAT_COPY_ON_WRITE_SAVED,
+											result->field_count - ctor_called_count,
+											STAT_COPY_ON_WRITE_PERFORMED, 0);
+		
 		/* Free last row's zvals */
 		efree(unbuf->last_row_data);
 		unbuf->last_row_data = NULL;
@@ -175,11 +180,15 @@
 			MYSQLND_INC_CONN_STATISTIC(NULL, copy_ctor_called? STAT_COPY_ON_WRITE_PERFORMED:
 															   STAT_COPY_ON_WRITE_SAVED);
 		}
-		pefree(current_row, set->persistent);
-		pefree(current_buffer, set->persistent);
+//		pefree(current_row, set->persistent);
+//		pefree(current_buffer, set->persistent);
+		free(current_row);
+		free(current_buffer);
 	}
-	pefree(set->data, set->persistent);
-	pefree(set->row_buffers, set->persistent);
+//	pefree(set->data, set->persistent);
+//	pefree(set->row_buffers, set->persistent);
+	free(set->data);
+	free(set->row_buffers);
 	set->data			= NULL;
 	set->row_buffers	= NULL;
 	set->data_cursor	= NULL;
@@ -195,12 +204,29 @@
 /* {{{ mysqlnd_internal_free_result_buffers */
 void mysqlnd_internal_free_result_buffers(MYSQLND_RES *result TSRMLS_DC)
 {
-
 	if (result->unbuf) {
 		mysqlnd_unbuffered_free_last_data(result TSRMLS_CC);
 		efree(result->unbuf);
 		result->unbuf = NULL;
 	} else if (result->data) {
+		if (result->data->async) {
+			tsrm_mutex_lock(&result->conn->LOCK_async);
+#ifndef MYSQLND_SILENT
+			printf("Lock acquired in %s\n", __FUNCTION__);
+#endif
+			while (result->data->async_finished != TRUE) {
+#ifndef MYSQLND_SILENT
+				printf("Waiting on async_finished\n");
+#endif
+				pthread_cond_wait(&result->conn->COND_async_finished,
+								  &result->conn->LOCK_async);
+			}
+#ifndef MYSQLND_SILENT
+			printf("Releasing lock in %s\n", __FUNCTION__);
+#endif
+			tsrm_mutex_unlock(&result->conn->LOCK_async);
+		}
+
 		mysqlnd_free_buffered_data(result TSRMLS_CC);
 		result->data = NULL;
 	}
@@ -261,8 +287,6 @@
 		result->row_packet = NULL;
 	}
 
-	result->conn = NULL;
-
 	if (result->meta) {
 		mysqlnd_internal_free_result_metadata(result->meta, FALSE TSRMLS_CC);
 		result->meta = NULL;
@@ -279,17 +303,23 @@
 /* {{{ mysqlnd_internal_free_result */
 void mysqlnd_internal_free_result(MYSQLND_RES *result TSRMLS_DC)
 {
+#ifndef MYSQLND_SILENT
+	printf("Freeing result set. result->data=%p async=%d\n",
+		   result->data, result->data? result->data->async:-1);
+#endif
 	/*
 	  result->conn is an address if this is an unbuffered query.
 	  In this case, decrement the reference counter in the connection
 	  object and if needed free the latter. If quit_sent is no
 	*/
+
+	mysqlnd_internal_free_result_contents(result TSRMLS_CC);
+
 	if (result->conn) {
 		result->conn->m->free_reference(result->conn TSRMLS_CC);
 		result->conn = NULL;
 	}
 
-	mysqlnd_internal_free_result_contents(result TSRMLS_CC);
 	efree(result);
 }
 /* }}} */
@@ -394,6 +424,13 @@
 		pefree(conn->net.cmd_buffer.buffer, conn->persistent);
 		conn->net.cmd_buffer.buffer = NULL;
 	}
+
+#ifndef MYSQLND_SILENT
+	printf("Destroying lock in %s\n", __FUNCTION__);
+#endif
+	pthread_mutex_destroy(&conn->LOCK_async);
+	pthread_cond_destroy(&conn->COND_async_finished);
+	pthread_cond_destroy(&conn->COND_work);
 }
 /* }}} */
 
@@ -797,6 +834,24 @@
 									   (char *)&as_unicode);
 		}
 
+		{
+			pthread_t th;
+			pthread_attr_t connection_attrib;
+#ifdef ZTS
+			conn->tsrm_ls = tsrm_ls;
+#endif
+
+			pthread_attr_init(&connection_attrib);
+			pthread_attr_setdetachstate(&connection_attrib, PTHREAD_CREATE_DETACHED);
+			pthread_mutex_init(&conn->LOCK_async, NULL);
+			pthread_cond_init(&conn->COND_async_finished, NULL);
+			pthread_cond_init(&conn->COND_work, NULL);
+			conn->thread_is_running = TRUE;
+			if (pthread_create(&th, &connection_attrib, _mysqlnd_fetch_thread,
(void*)conn)) {
+				conn->thread_is_running = FALSE;
+			}
+		
+		}
 		return conn;
 	}
 err:
@@ -1446,12 +1501,38 @@
 {
 	unsigned int i;
 	zval *row = (zval *) param;
+	MYSQLND_RES_BUFFERED *set = result->data;
+	MYSQLND_RES_METADATA *meta = result->meta;
 
+	if (set->async) {
+		zend_bool invalid;
+		tsrm_mutex_lock(&result->conn->LOCK_async);
+#ifndef MYSQLND_SILENT
+		printf("Lock acquired in %s\n", __FUNCTION__);
+#endif
+		while (set->async_finished != TRUE) {
+#ifndef MYSQLND_SILENT
+			printf("Waiting for async_finished\n");
+#endif
+			pthread_cond_wait(&result->conn->COND_async_finished,
+							  &result->conn->LOCK_async);
+		}
+		invalid = set->async_invalid;
+#ifndef MYSQLND_SILENT
+		printf("Releasing lock in %s\n", __FUNCTION__);
+#endif
+		tsrm_mutex_unlock(&result->conn->LOCK_async);
+		if (invalid == TRUE) {
+			*fetched_anything = FALSE;
+			return FAIL;
+		}
+	}
+
 	/* If we haven't read everything */
-	if (result->data->data_cursor &&
-		(result->data->data_cursor - result->data->data) <
result->data->row_count)
+	if (set->data_cursor &&
+		(set->data_cursor - set->data) < set->row_count)
 	{
-		zval **current_row = *result->data->data_cursor;
+		zval **current_row = *set->data_cursor;
 		for (i = 0; i < result->field_count; i++) {
 			zval *data = current_row[i];
 
@@ -1475,34 +1556,34 @@
 				  the index is a numeric and convert it to it. This however means constant
 				  hashing of the column name, which is not needed as it can be precomputed.
 				*/
-				if (result->meta->zend_hash_keys[i].is_numeric == FALSE) {
+				if (meta->zend_hash_keys[i].is_numeric == FALSE) {
 #if PHP_MAJOR_VERSION >= 6
 					if (UG(unicode)) {
 						zend_u_hash_quick_update(Z_ARRVAL_P(row), IS_UNICODE,
-												 result->meta->zend_hash_keys[i].ustr,
-												 result->meta->zend_hash_keys[i].ulen + 1,
-												 result->meta->zend_hash_keys[i].key,
+												 meta->zend_hash_keys[i].ustr,
+												 meta->zend_hash_keys[i].ulen + 1,
+												 meta->zend_hash_keys[i].key,
 												 (void *) &data, sizeof(zval *), NULL);
 					} else
 #endif
 					{
 						zend_hash_quick_update(Z_ARRVAL_P(row),
-											   result->meta->fields[i].name,
-											   result->meta->fields[i].name_length + 1,
-											   result->meta->zend_hash_keys[i].key,
+											   meta->fields[i].name,
+											   meta->fields[i].name_length + 1,
+											   meta->zend_hash_keys[i].key,
 											   (void *) &data, sizeof(zval *), NULL);
 					}
 				} else {
 					zend_hash_index_update(Z_ARRVAL_P(row),
-										   result->meta->zend_hash_keys[i].key,
+										   meta->zend_hash_keys[i].key,
 										   (void *) &data, sizeof(zval *), NULL);
 				}
 			}
 		}
-		result->data->data_cursor++;
+		set->data_cursor++;
 		*fetched_anything = TRUE;
 	} else {
-		result->data->data_cursor = NULL;
+		set->data_cursor = NULL;
 		*fetched_anything = FALSE;
 #ifndef MYSQLND_SILENT
 		php_printf("NO MORE DATA\n ");
@@ -1515,57 +1596,25 @@
 
 #define STORE_RESULT_PREALLOCATED_SET 32
 
-/* {{{ mysqlnd_store_result */
-static
-MYSQLND_RES *_mysqlnd_store_result(MYSQLND * const conn TSRMLS_DC)
+/* {{{ _mysqlnd_store_result_fetch_data */
+static enum_func_status
+_mysqlnd_store_result_fetch_data(MYSQLND * const conn, MYSQLND_RES_BUFFERED *set,
+								 MYSQLND_RES_METADATA *meta TSRMLS_DC)
 {
 	enum_func_status ret;
+	php_mysql_packet_row row_packet;
 	unsigned int next_extend = STORE_RESULT_PREALLOCATED_SET, free_rows;
-	php_mysql_packet_row row_packet;
-	MYSQLND_RES *result;
-	zend_bool to_cache = FALSE;
 
-	if (!conn->current_result) {
-		return NULL;
-	}
-
-	/* Nothing to store for UPSERT/LOAD DATA*/
-	if (conn->last_query_type != QUERY_SELECT || conn->state != CONN_FETCHING_DATA) {
-		SET_CLIENT_ERROR(conn->error_info, CR_COMMANDS_OUT_OF_SYNC, UNKNOWN_SQLSTATE,
-						 mysqlnd_out_of_sync); 
-		return NULL;
-	}
-
-	MYSQLND_INC_CONN_STATISTIC(&conn->stats, STAT_BUFFERED_SETS);
-
-	result = conn->current_result;
-	conn->current_result = NULL;
-
-	result->conn			= NULL;	/* store result does not reference  the connection */
-	result->type			= MYSQLND_RES_NORMAL;
-	result->m.fetch_row		= result->m.fetch_row_normal_buffered;
-	result->m.fetch_lengths	= mysqlnd_fetch_lengths_buffered;
-
-	conn->state = CONN_FETCHING_DATA;
-
-	result->lengths = ecalloc(result->field_count, sizeof(unsigned long));
-
-	/* Create room for 'next_extend' rows */
-
-	result->data				= pecalloc(1, sizeof(MYSQLND_RES_BUFFERED), to_cache);
-	result->data->data			= pemalloc(next_extend * sizeof(zval **), to_cache);
-	result->data->row_buffers	= pemalloc(next_extend * sizeof(zend_uchar *),
to_cache);
-	result->data->persistent	= to_cache;
-	result->data->qcache		= to_cache?
mysqlnd_qcache_get_cache_reference(conn->qcache):NULL;
-	result->data->references	= 1;
-
-
 	free_rows = next_extend;
 
 	PACKET_INIT_ALLOCA(row_packet, PROT_ROW_PACKET);
-	row_packet.field_count = result->field_count;
+	row_packet.field_count = meta->field_count;
 	row_packet.binary_protocol = FALSE;
-	row_packet.fields_metadata = result->meta->fields;
+	row_packet.fields_metadata = meta->fields;
+
+//	row_packet.persistent_alloc = FALSE;
+	row_packet.persistent_alloc = TRUE;
+
 	/* Let the row packet fill our buffer and skip additional malloc + memcpy */
 	while (FAIL != (ret = PACKET_READ_ALLOCA(row_packet, conn)) && !row_packet.eof)
{
 		int i;
@@ -1573,17 +1622,22 @@
 
 		if (!free_rows) {
 			unsigned long total_rows = free_rows = next_extend = next_extend * 5 / 3; /* extend
with 33% */
-			total_rows += result->data->row_count;
-			result->data->data = perealloc(result->data->data,
-										   total_rows * sizeof(zval **), to_cache);
+			total_rows += set->row_count;
+			set->data = realloc(set->data, total_rows * sizeof(zval **));
 
-			result->data->row_buffers = perealloc(result->data->row_buffers,
-												  total_rows * sizeof(zend_uchar *), to_cache);
+			set->row_buffers = realloc(set->row_buffers,
+										 total_rows * sizeof(zend_uchar *));
+//			set->data = perealloc(set->data, total_rows * sizeof(zval **),
+//								  set->persistent);
+
+//			set->row_buffers = perealloc(set->row_buffers,
+//										 total_rows * sizeof(zend_uchar *),
+//										 set->persistent);
 		}
 		free_rows--;
-		current_row = result->data->data[result->data->row_count] =
row_packet.fields;
-		result->data->row_buffers[result->data->row_count] = row_packet.row_buffer;
-		result->data->row_count++;
+		current_row = set->data[set->row_count] = row_packet.fields;
+		set->row_buffers[set->row_count] = row_packet.row_buffer;
+		set->row_count++;
 
 		/* So row_packet's destructor function won't efree() it */
 		row_packet.fields = NULL;
@@ -1598,9 +1652,9 @@
 			*/
 			if (Z_TYPE_P(current_row[i]) != IS_NULL &&
 				(len = Z_STRLEN_P(current_row[i])) &&
-				result->meta->fields[i].max_length < len)
+				meta->fields[i].max_length < len)
 			{
-				result->meta->fields[i].max_length = len;
+				meta->fields[i].max_length = len;
 			}
 		}
 		/*
@@ -1611,43 +1665,172 @@
 		*/
 	}
 	MYSQLND_INC_CONN_STATISTIC_W_VALUE(&conn->stats, STAT_ROWS_FETCHED_FROM_CLIENT,
-									   result->data->row_count);
+									   set->row_count);
 
 	/* Finally clean */
 	if (row_packet.eof) { 
 		conn->upsert_status.warning_count = row_packet.warning_count;
 		conn->upsert_status.server_status = row_packet.server_status;
 	}
+
 	/* save some memory */
 	if (free_rows) {
-		result->data->data = perealloc(result->data->data,
-									   result->data->row_count * sizeof(zval **),
-									   to_cache);
-		result->data->row_buffers = perealloc(result->data->row_buffers,
-											  result->data->row_count * sizeof(zend_uchar *),
-											  to_cache);
+		set->data = realloc(set->data,
+							  set->row_count * sizeof(zval **));
+
+		set->row_buffers = realloc(set->row_buffers,
+									 set->row_count * sizeof(zend_uchar *));
 	}
+#if 0
+	if (free_rows) {
+		set->data = perealloc(set->data,
+							  set->row_count * sizeof(zval **),
+							  set->persistent);
 
+		set->row_buffers = perealloc(set->row_buffers,
+									 set->row_count * sizeof(zend_uchar *),
+									 set->persistent);
+	}
+#endif
+
 	if (conn->upsert_status.server_status & SERVER_MORE_RESULTS_EXISTS) {
 		conn->state = CONN_NEXT_RESULT_PENDING;
 	} else {
 		conn->state = CONN_READY;
 	}
 
+	if (ret == FAIL) {
+		set->error_info = row_packet.error_info;
+	}
+	PACKET_FREE_ALLOCA(row_packet);
+
+	return ret;
+}
+/* }}} */
+
+
+/* {{{ _mysqlnd_fetch_thread */
+void * _mysqlnd_fetch_thread(void *arg)
+{
+	enum_func_status ret;
+	MYSQLND *conn = (MYSQLND *) arg;
+	MYSQLND_RES *result;
+	void ***tsrm_ls = conn->tsrm_ls;
+#ifndef MYSQLND_SILENT
+	printf("conn=%p tsrm_ls=%p\n", conn, conn->tsrm_ls);
+#endif
+	do {
+		tsrm_mutex_lock(&conn->LOCK_async);
+		while (!conn->shutdown_thread && !(result = conn->current_result)) {
+#ifndef MYSQLND_SILENT
+			printf("Waiting for work in %s\n", __FUNCTION__);
+#endif
+			pthread_cond_wait(&conn->COND_work, &conn->LOCK_async);
+		}
+		if (conn->shutdown_thread == TRUE) {
+			tsrm_mutex_unlock(&conn->LOCK_async);
+			break;
+		}
+		conn->current_result = NULL;
+#ifndef MYSQLND_SILENT
+		printf("Send back response that we started to fetch in %s\n", __FUNCTION__);
+#endif
+		pthread_cond_signal(&conn->COND_work);
+		tsrm_mutex_unlock(&conn->LOCK_async);
+
+		ret = _mysqlnd_store_result_fetch_data(conn, result->data,
+											   result->meta TSRMLS_CC);		
+
+		tsrm_mutex_lock(&conn->LOCK_async);
+#ifndef MYSQLND_SILENT
+		printf("Notify finished fetch in %s\n", __FUNCTION__);
+#endif
+		result->data->async_finished = TRUE;
+		pthread_cond_signal(&conn->COND_async_finished);
+		tsrm_mutex_unlock(&conn->LOCK_async);
+	} while (1);
+
+#ifndef MYSQLND_SILENT
+	printf("Exiting worker thread in %s\n", __FUNCTION__);
+#endif
+	pthread_exit(NULL);
+}
+/* }}} */
+
+
+/* {{{ _mysqlnd_store_result */
+static
+MYSQLND_RES *_mysqlnd_store_result(MYSQLND * const conn TSRMLS_DC)
+{
+	enum_func_status ret;
+	MYSQLND_RES *result;
+	zend_bool to_cache = FALSE;
+
+	if (!conn->current_result) {
+		return NULL;
+	}
+
+	/* Nothing to store for UPSERT/LOAD DATA*/
+	if (conn->last_query_type != QUERY_SELECT || conn->state != CONN_FETCHING_DATA) {
+		SET_CLIENT_ERROR(conn->error_info, CR_COMMANDS_OUT_OF_SYNC, UNKNOWN_SQLSTATE,
+						 mysqlnd_out_of_sync); 
+		return NULL;
+	}
+
+	MYSQLND_INC_CONN_STATISTIC(&conn->stats, STAT_BUFFERED_SETS);
+
+	result = conn->current_result;
+
+	result->type			= MYSQLND_RES_NORMAL;
+	result->m.fetch_row		= result->m.fetch_row_normal_buffered;
+	result->m.fetch_lengths	= mysqlnd_fetch_lengths_buffered;
+
+	conn->state = CONN_FETCHING_DATA;
+
+	result->lengths = ecalloc(result->field_count, sizeof(unsigned long));
+
+	/* Create room for 'next_extend' rows */
+
+	result->conn 				= conn->m->get_reference(conn);
+	result->data				= pecalloc(1, sizeof(MYSQLND_RES_BUFFERED), to_cache);
+//	result->data->data			= pemalloc(STORE_RESULT_PREALLOCATED_SET * sizeof(zval **),
to_cache);
+//	result->data->row_buffers	= pemalloc(STORE_RESULT_PREALLOCATED_SET *
sizeof(zend_uchar *), to_cache);
+	result->data->data			= malloc(STORE_RESULT_PREALLOCATED_SET * sizeof(zval **));
+	result->data->row_buffers	= malloc(STORE_RESULT_PREALLOCATED_SET *
sizeof(zend_uchar *));
+	result->data->persistent	= to_cache;
+	result->data->qcache		= to_cache?
mysqlnd_qcache_get_cache_reference(conn->qcache):NULL;
+	result->data->references	= 1;
+	/* Position at the first row */
+	result->data->data_cursor = result->data->data;
+
+	result->data->async = FALSE;
+	result->data->async = TRUE;
+
+	if (result->data->async) {
+		/* we need the connection during fetch */
+
+		tsrm_mutex_lock(&conn->LOCK_async);
+		pthread_cond_signal(&conn->COND_work);
+		do {
+			pthread_cond_wait(&conn->COND_work, &conn->LOCK_async);
+		} while (conn->current_result != NULL);
+		tsrm_mutex_unlock(&conn->LOCK_async);
+		ret = PASS;
+	} else {
+		ret = _mysqlnd_store_result_fetch_data(conn, result->data,
+											   result->meta TSRMLS_CC);
+		conn->current_result = NULL;
+	}
+
 	if (PASS == ret) {
-		/* Position at the first row */
-		result->data->data_cursor = result->data->data;
-
 		/* libmysql's documentation says it should be so for SELECT statements */
-		conn->upsert_status.affected_rows = result->data->row_count;
+		conn->upsert_status.affected_rows =
+				(result->data->async == FALSE)? result->data->row_count:-1;
 	} else {
+		conn->error_info = result->data->error_info;
 		mysqlnd_internal_free_result(result TSRMLS_CC);
 		result = NULL;
-		conn->error_info = row_packet.error_info;
 	}
-
-	PACKET_FREE_ALLOCA(row_packet);
-
 	return result;
 }
 /* }}} */
@@ -1748,49 +1931,7 @@
 static
 ulong _mysqlnd_real_escape_quotes(const MYSQLND * const conn, char *newstr, const char
*escapestr, int escapestr_len)
 {
-	const char 	*newstr_s = newstr;
-	const char 	*newstr_e = newstr + 2 * escapestr_len;
-	const char 	*end = escapestr + escapestr_len;
-	zend_bool	escape_overflow = FALSE;
-
-	for (;escapestr < end; escapestr++) {
-		/* check unicode characters */
-		if (conn->charset->char_maxlen > 1 &&
conn->charset->mb_charlen(*escapestr) > 1) {
-			uint len = conn->charset->mb_valid(escapestr, end);
-
-			/* check possible overflow */
-			if ((newstr + len) > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			/* copy mb char without escaping it */
-			while (len--) {
-				*newstr++ = *escapestr++;
-			}
-			escapestr--;
-			continue;
-		}
-		if (*newstr == '\'') {
-			if (newstr + 2 > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			*newstr++ = '\'';
-			*newstr++ = '\'';
-		} else {
-			if (newstr + 1 > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			*newstr++= *escapestr;
-		}
-	}
-	*newstr = '\0';
-
-	if (escape_overflow) {
-		return (ulong)~0;
-	}
-	return (ulong)(newstr - newstr_s);
+	return mysqlnd_cset_escape_quotes(conn->charset, newstr, escapestr, escapestr_len);
 }
 /* }}} */
 
@@ -1799,76 +1940,7 @@
 static
 ulong _mysqlnd_real_escape_slashes(const MYSQLND * const conn, char *newstr, const char
*escapestr, int escapestr_len)
 {
-	const char 	*newstr_s = newstr;
-	const char 	*newstr_e = newstr + 2 * escapestr_len;
-	const char 	*end = escapestr + escapestr_len;
-	zend_bool	escape_overflow = FALSE;
-	
-	for (;escapestr < end; escapestr++) {
-		char	esc = '\0';
-
-		/* check unicode characters */
-		if (conn->charset->char_maxlen > 1 &&
conn->charset->mb_charlen(*escapestr) > 1) {
-			uint len = conn->charset->mb_valid(escapestr, end);
-
-			/* check possible overflow */
-			if ((newstr + len) > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			/* copy mb char without escaping it */
-			while (len--) {
-				*newstr++ = *escapestr++;
-			}
-			escapestr--;
-			continue;
-		}
-		if (conn->charset->char_maxlen > 1 &&
conn->charset->mb_charlen(*escapestr) > 1) {
-			esc = *escapestr;
-		} else {
-			switch (*escapestr) {
-				case 0:
-					esc = '0';
-					break;
-				case '\n':
-					esc = 'n';
-					break;
-				case '\r':
-					esc = 'r';
-					break;
-				case '\\':
-				case '\'':
-				case '"':
-					esc = *escapestr;
-					break;
-				case '\032':
-					esc = 'Z';
-					break;
-			}
-		}
-		if (esc) {
-			if (newstr + 2 > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			/* copy escaped character */
-			*newstr++ = '\\';
-			*newstr++ = esc;
-		} else {
-			if (newstr + 1 > newstr_e) {
-				escape_overflow = TRUE;
-				break;
-			}
-			/* copy non escaped character */
-			*newstr++ = *escapestr;
-		}
-	}
-	*newstr = '\0';
-
-	if (escape_overflow) {
-		return (ulong)~0;
-	}
-	return (ulong)(newstr - newstr_s);
+	return mysqlnd_cset_escape_slashes(conn->charset, newstr, escapestr, escapestr_len);
 }
 /* }}} */
 

Modified: branches/threaded_fetch/mysqlnd/mysqlnd.h
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd.h	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd.h	2007-05-09 09:30:56 UTC (rev 363)
@@ -700,6 +700,13 @@
 	MYSQLND_STATS	stats;
 
 	struct st_mysqlnd_connection_methods *m;
+
+	zend_bool			thread_is_running;
+	zend_bool			shutdown_thread;
+	pthread_mutex_t		LOCK_async;
+	pthread_cond_t		COND_async_finished;
+	pthread_cond_t		COND_work;
+	void ***			tsrm_ls;
 };
 
 typedef struct st_php_mysql_packet_row php_mysql_packet_row;
@@ -732,6 +739,12 @@
 
 	MYSQLND_QCACHE		*qcache;
 	unsigned int		references;
+
+	zend_bool			async;
+	zend_bool			async_finished;
+	zend_bool			async_invalid;
+	mysqlnd_error_info	error_info;
+	mynd_ulonglong		async_row_count;
 };
 
 

Modified: branches/threaded_fetch/mysqlnd/mysqlnd_charset.c
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd_charset.c	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd_charset.c	2007-05-09 09:30:56 UTC (rev 363)
@@ -465,3 +465,130 @@
 	return NULL;
 }
 /* }}} */
+
+
+/* {{{ mysqlnd_cset_escape_quotes */
+ulong mysqlnd_cset_escape_quotes(const MYSQLND_CHARSET * const charset, char *newstr,
const char *escapestr, int escapestr_len)
+{
+	const char 	*newstr_s = newstr;
+	const char 	*newstr_e = newstr + 2 * escapestr_len;
+	const char 	*end = escapestr + escapestr_len;
+	zend_bool	escape_overflow = FALSE;
+
+	for (;escapestr < end; escapestr++) {
+		/* check unicode characters */
+		if (charset->char_maxlen > 1 && charset->mb_charlen(*escapestr) >
1) {
+			uint len = charset->mb_valid(escapestr, end);
+
+			/* check possible overflow */
+			if ((newstr + len) > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			/* copy mb char without escaping it */
+			while (len--) {
+				*newstr++ = *escapestr++;
+			}
+			escapestr--;
+			continue;
+		}
+		if (*newstr == '\'') {
+			if (newstr + 2 > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			*newstr++ = '\'';
+			*newstr++ = '\'';
+		} else {
+			if (newstr + 1 > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			*newstr++= *escapestr;
+		}
+	}
+	*newstr = '\0';
+
+	if (escape_overflow) {
+		return (ulong)~0;
+	}
+	return (ulong)(newstr - newstr_s);
+}
+/* }}} */
+
+
+/* {{{ mysqlnd_cset_escape_slashes */
+ulong mysqlnd_cset_escape_slashes(const MYSQLND_CHARSET * const cset, char *newstr, const
char *escapestr, int escapestr_len)
+{
+	const char 	*newstr_s = newstr;
+	const char 	*newstr_e = newstr + 2 * escapestr_len;
+	const char 	*end = escapestr + escapestr_len;
+	zend_bool	escape_overflow = FALSE;
+	
+	for (;escapestr < end; escapestr++) {
+		char	esc = '\0';
+
+		/* check unicode characters */
+		if (cset->char_maxlen > 1 && cset->mb_charlen(*escapestr) > 1) {
+			uint len = cset->mb_valid(escapestr, end);
+
+			/* check possible overflow */
+			if ((newstr + len) > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			/* copy mb char without escaping it */
+			while (len--) {
+				*newstr++ = *escapestr++;
+			}
+			escapestr--;
+			continue;
+		}
+		if (cset->char_maxlen > 1 && cset->mb_charlen(*escapestr) > 1) {
+			esc = *escapestr;
+		} else {
+			switch (*escapestr) {
+				case 0:
+					esc = '0';
+					break;
+				case '\n':
+					esc = 'n';
+					break;
+				case '\r':
+					esc = 'r';
+					break;
+				case '\\':
+				case '\'':
+				case '"':
+					esc = *escapestr;
+					break;
+				case '\032':
+					esc = 'Z';
+					break;
+			}
+		}
+		if (esc) {
+			if (newstr + 2 > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			/* copy escaped character */
+			*newstr++ = '\\';
+			*newstr++ = esc;
+		} else {
+			if (newstr + 1 > newstr_e) {
+				escape_overflow = TRUE;
+				break;
+			}
+			/* copy non escaped character */
+			*newstr++ = *escapestr;
+		}
+	}
+	*newstr = '\0';
+
+	if (escape_overflow) {
+		return (ulong)~0;
+	}
+	return (ulong)(newstr - newstr_s);
+}
+/* }}} */

Modified: branches/threaded_fetch/mysqlnd/mysqlnd_charset.h
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd_charset.h	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd_charset.h	2007-05-09 09:30:56 UTC (rev 363)
@@ -20,3 +20,5 @@
 #include "php.h"
 #include "php_globals.h"
 
+ulong mysqlnd_cset_escape_quotes(const MYSQLND_CHARSET * const charset, char *newstr,
const char *escapestr, int escapestr_len);
+ulong mysqlnd_cset_escape_slashes(const MYSQLND_CHARSET * const cset, char *newstr, const
char *escapestr, int escapestr_len);

Modified: branches/threaded_fetch/mysqlnd/mysqlnd_palloc.c
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd_palloc.c	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd_palloc.c	2007-05-09 09:30:56 UTC (rev 363)
@@ -291,7 +291,8 @@
 
 #ifndef MYSQLND_SILENT
 	php_printf("[mysqlnd_palloc_get_zval %p] *last_added=%p free_items=%d\n",
-				cache, cache? cache->free_list.last_added:NULL, cache->free_items);
+				thd_cache, thd_cache? thd_cache->parent->free_list.last_added:NULL,
+				thd_cache->parent->free_items);
 #endif
 
 	if (thd_cache) {
@@ -338,7 +339,11 @@
 {
 	MYSQLND_ZVAL_PCACHE *cache;
 #ifndef MYSQLND_SILENT
-	php_printf("[mysqlnd_palloc_zval_ptr_dtor %p] *zv=%p ps=%d refc=%d\n", cache, *zv, ps,
ZVAL_REFCOUNT(*zv));
+	php_printf("[mysqlnd_palloc_zval_ptr_dtor %p] parent_block=%p last_in_block=%p *zv=%p
ps=%d refc=%d\n",
+				thd_cache,
+				thd_cache->parent? thd_cache->parent->block:NULL,
+				thd_cache->parent? thd_cache->parent->last_in_block:NULL,
+				*zv, ps, ZVAL_REFCOUNT(*zv));
 #endif
 	*copy_ctor_called = FALSE;
 	/* Check whether cache is used and the zval is from the cache */
@@ -349,7 +354,9 @@
 		  Thus the refcount is -1 than of a zval from the cache,
 		  because the zvals from the cache are owned by it.
 		*/
-		if (ZVAL_REFCOUNT(*zv) > 1) {
+		if (ps == TRUE) {
+			ZVAL_REFCOUNT(*zv) = 1;		
+		} else if (ZVAL_REFCOUNT(*zv) > 1) {
 			if (ps == FALSE) {
 				/*
 				  Not a prepared statement, then we have to
@@ -465,7 +472,7 @@
 	MYSQLND_ZVAL_PCACHE *cache;
 	mysqlnd_zval **p;
 #ifndef MYSQLND_SILENT
-	php_printf("[mysqlnd_palloc_rshutdown %p]\n", cache);
+	php_printf("[mysqlnd_palloc_rshutdown %p]\n", thd_cache);
 #endif
 	if (!thd_cache || !(cache = thd_cache->parent)) {
 		return;

Modified: branches/threaded_fetch/mysqlnd/mysqlnd_ps.c
===================================================================
--- branches/threaded_fetch/mysqlnd/mysqlnd_ps.c	2007-05-09 09:28:37 UTC (rev 362)
+++ branches/threaded_fetch/mysqlnd/mysqlnd_ps.c	2007-05-09 09:30:56 UTC (rev 363)
@@ -92,7 +92,7 @@
 	result->type			= MYSQLND_RES_PS;
 	result->m.fetch_row		= mysqlnd_fetch_stmt_row_buffered;
 	result->m.fetch_lengths	= NULL;/* makes no sense */
-	result->zval_cache		= NULL;
+	result->zval_cache		= mysqlnd_palloc_get_thd_cache_reference(conn->zval_cache);
 
 	/* Create room for 'next_extend' rows */
 
@@ -666,7 +666,7 @@
 	result->m.fetch_row		= stmt->cursor_exists? mysqlnd_fetch_stmt_row_cursor:
 													mysqlnd_stmt_fetch_row_unbuffered;
 	result->m.fetch_lengths	= NULL; /* makes no sense */
-	result->zval_cache		= NULL;
+	result->zval_cache		= mysqlnd_palloc_get_thd_cache_reference(conn->zval_cache);
 
 	result->unbuf	= ecalloc(1, sizeof(MYSQLND_RES_UNBUFFERED));
 
@@ -1160,7 +1160,6 @@
 MYSQLND_RES * _mysqlnd_stmt_result_metadata(MYSQLND_STMT * const stmt)
 {
 	MYSQLND_RES *result;
-	MYSQLND_THD_ZVAL_PCACHE * cache;
 
 	if (!stmt->field_count || !stmt->conn || !stmt->result) {
 		return NULL;
@@ -1171,10 +1170,11 @@
 			find a better way to do it. In different functions I have put
 			fuses to check for result->m.fetch_row() being NULL. This should
 			be handled in a better way.
-	
+
+	  In the meantime we don't need a zval cache reference for this fake
+	  result set, so we don't get one.
 	*/
-	cache = mysqlnd_palloc_get_thd_cache_reference(stmt->conn->zval_cache);
-	result = mysqlnd_result_init(stmt->field_count, cache);
+	result = mysqlnd_result_init(stmt->field_count, NULL);
 	result->type = MYSQLND_RES_NORMAL;
 	result->m.fetch_row = result->m.fetch_row_normal_unbuffered;
 	result->unbuf = ecalloc(1, sizeof(MYSQLND_RES_UNBUFFERED));

Thread
PHP mysqlnd svn commit: r363 - branches/threaded_fetch/mysqlndahristov9 May