Below is the list of changes that have just been committed into a local
5.1 repository of lthalmann. When lthalmann does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.2245 06/07/06 00:09:12 lars@stripped +1 -0
Merge mysql.com:/users/lthalmann/bkroot/mysql-5.1-new-rpl
into mysql.com:/users/lthalmann/bk/MERGE/mysql-5.1-merge
sql/sql_insert.cc
1.209 06/07/06 00:08:57 lars@stripped +0 -0
Auto merged
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: lars
# Host: dl145j.mysql.com
# Root: /users/lthalmann/bk/MERGE/mysql-5.1-merge/RESYNC
--- 1.208/sql/sql_insert.cc 2006-07-04 09:50:37 +02:00
+++ 1.209/sql/sql_insert.cc 2006-07-06 00:08:57 +02:00
@@ -26,8 +26,8 @@
static int check_null_fields(THD *thd,TABLE *entry);
#ifndef EMBEDDED_LIBRARY
static TABLE *delayed_get_table(THD *thd,TABLE_LIST *table_list);
-static int write_delayed(THD *thd,TABLE *table, enum_duplicates dup, bool ignore,
- char *query, uint query_length, bool log_on);
+static int write_delayed(THD *thd, TABLE *table, enum_duplicates dup,
+ LEX_STRING query, bool ignore, bool log_on);
static void end_delayed_insert(THD *thd);
pthread_handler_t handle_delayed_insert(void *arg);
static void unlink_blobs(register TABLE *table);
@@ -511,7 +511,8 @@
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
- error=write_delayed(thd, table, duplic, ignore, query, thd->query_length, log_on);
+ LEX_STRING const st_query = { query, thd->query_length };
+ error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
query=0;
}
else
@@ -1251,11 +1252,16 @@
bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query;
ulonglong last_insert_id;
timestamp_auto_set_type timestamp_field_type;
+ LEX_STRING query;
- delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg)
- :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {}
+ delayed_row(LEX_STRING const query_arg, enum_duplicates dup_arg,
+ bool ignore_arg, bool log_query_arg)
+ : record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg),
+ query(query_arg)
+ {}
~delayed_row()
{
+ x_free(query.str);
x_free(record);
}
};
@@ -1263,9 +1269,6 @@
class delayed_insert :public ilink {
uint locks_in_memory;
- char *query;
- ulong query_length;
- ulong query_allocated;
public:
THD thd;
TABLE *table;
@@ -1279,7 +1282,7 @@
TABLE_LIST table_list; // Argument
delayed_insert()
- :locks_in_memory(0), query(0), query_length(0), query_allocated(0),
+ :locks_in_memory(0),
table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0),
group_count(0)
{
@@ -1305,7 +1308,6 @@
}
~delayed_insert()
{
- my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR));
/* The following is not really needed, but just for safety */
delayed_row *row;
while ((row=rows.get()))
@@ -1325,25 +1327,6 @@
VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */
}
- int set_query(char const *q, ulong qlen) {
- if (q && qlen > 0)
- {
- if (query_allocated < qlen + 1)
- {
- ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR);
- query= my_realloc(query, qlen + 1, MYF(flags));
- if (query == 0)
- return HA_ERR_OUT_OF_MEM;
- query_allocated= qlen;
- }
- query_length= qlen;
- memcpy(query, q, qlen + 1);
- }
- else
- query_length= 0;
- return 0;
- }
-
/* The following is for checking when we can delete ourselves */
inline void lock()
{
@@ -1608,13 +1591,14 @@
/* Put a question in queue */
-static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic,
- bool ignore, char *query, uint query_length,
- bool log_on)
+static int
+write_delayed(THD *thd,TABLE *table, enum_duplicates duplic,
+ LEX_STRING query, bool ignore, bool log_on)
{
- delayed_row *row=0;
+ delayed_row *row;
delayed_insert *di=thd->di;
DBUG_ENTER("write_delayed");
+ DBUG_PRINT("enter", ("query = '%s' length %u", query.str, query.length));
thd->proc_info="waiting for handler insert";
pthread_mutex_lock(&di->mutex);
@@ -1622,13 +1606,28 @@
pthread_cond_wait(&di->cond_client,&di->mutex);
thd->proc_info="storing row into queue";
- if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on)))
+ if (thd->killed)
goto err;
+ /*
+ Take a copy of the query string, if there is any. The string will
+ be free'ed when the row is destroyed. If there is no query string,
+ we don't do anything special.
+ */
+
+ if (query.str)
+ if (!(query.str= my_strndup(query.str, MYF(MY_WME), query.length)))
+ goto err;
+ row= new delayed_row(query, duplic, ignore, log_on);
+ if (row == NULL)
+ {
+ my_free(query.str, MYF(MY_WME));
+ goto err;
+ }
+
if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME))))
goto err;
memcpy(row->record, table->record[0], table->s->reclength);
- di->set_query(query, query_length);
row->start_time= thd->start_time;
row->query_start_used= thd->query_start_used;
row->last_insert_id_used= thd->last_insert_id_used;
@@ -1987,7 +1986,7 @@
if (thd.killed || table->s->version != refresh_version)
{
thd.killed= THD::KILL_CONNECTION;
- max_rows= ~(ulong)0; // Do as much as possible
+ max_rows= ULONG_MAX; // Do as much as possible
}
/*
@@ -2034,11 +2033,18 @@
thread_safe_increment(delayed_insert_errors,&LOCK_delayed_status);
row->log_query = 0;
}
+
if (using_ignore)
{
using_ignore=0;
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
}
+
+ if (row->log_query && row->query.str != NULL && mysql_bin_log.is_open())
+ thd.binlog_query(THD::ROW_QUERY_TYPE,
+ row->query.str, row->query.length,
+ FALSE, FALSE);
+
if (table->s->blob_fields)
free_delayed_insert_blobs(table);
thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status);
@@ -2085,13 +2091,25 @@
pthread_cond_broadcast(&cond_client); // If waiting clients
}
}
-
thd.proc_info=0;
pthread_mutex_unlock(&mutex);
- /* After releasing the mutex, to prevent deadlocks. */
- if (mysql_bin_log.is_open())
- thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE);
+#ifdef HAVE_ROW_BASED_REPLICATION
+ /*
+ We need to flush the pending event when using row-based
+ replication since the flushing normally done in binlog_query() is
+ not done last in the statement: for delayed inserts, the insert
+ statement is logged *before* all rows are inserted.
+
+ We can flush the pending event without checking the thd->lock
+ since the delayed insert *thread* is not inside a stored function
+ or trigger.
+
+ TODO: Move the logging to last in the sequence of rows.
+ */
+ if (thd.current_stmt_binlog_row_based)
+ thd.binlog_flush_pending_rows_event(TRUE);
+#endif /* HAVE_ROW_BASED_REPLICATION */
if ((error=table->file->extra(HA_EXTRA_NO_CACHE)))
{ // This shouldn't happen
| Thread |
|---|
| • bk commit into 5.1 tree (lars:1.2245) | Lars Thalmann | 6 Jul |