Added:
trunk/mysqlclient/common/SocketStream.cs
Modified:
trunk/TestSuite/AsyncTests.cs
trunk/mysqlclient/command.cs
Log:
Finished the implemenation of Async queries.
Added ExecuteNonQuery that takes a commandbehavior and
added ExecuteReader methods.
Also, added SocketStream back in.
Modified: trunk/TestSuite/AsyncTests.cs
===================================================================
--- trunk/TestSuite/AsyncTests.cs 2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/TestSuite/AsyncTests.cs 2006-08-08 06:41:36 UTC (rev 299)
@@ -43,17 +43,81 @@
[Test]
public void ExecuteNonQuery()
{
+ execSQL("DROP TABLE IF EXISTS test");
+ execSQL("CREATE TABLE test (id int)");
+ execSQL("CREATE PROCEDURE spTest() BEGIN SET @x=0; REPEAT INSERT INTO test VALUES(@x); " +
+ "SET @x=@x+1; UNTIL @x = 300 END REPEAT; END");
+ try
+ {
+ MySqlCommand proc = new MySqlCommand("spTest", conn);
+ proc.CommandType = CommandType.StoredProcedure;
+ IAsyncResult iar = proc.BeginExecuteNonQuery();
+ int count = 0;
+ while (!iar.IsCompleted)
+ {
+ count++;
+ System.Threading.Thread.Sleep(20);
+ }
+ int updated = proc.EndExecuteNonQuery(iar);
+ Assert.IsTrue(count > 0);
+
+ proc.CommandType = CommandType.Text;
+ proc.CommandText = "SELECT COUNT(*) FROM test";
+ object cnt = proc.ExecuteScalar();
+ Assert.AreEqual(300, cnt);
+ }
+ catch (Exception ex)
+ {
+ Assert.Fail(ex.Message);
+ }
+ finally
+ {
+ }
}
[Test]
public void ExecuteReader()
{
- }
+ execSQL("DROP TABLE IF EXISTS test");
+ execSQL("CREATE TABLE test (id int)");
+ execSQL("CREATE PROCEDURE spTest() BEGIN SET @x=0; REPEAT INSERT INTO test VALUES(@x); " +
+ "SET @x=@x+1; UNTIL @x = 300 END REPEAT; SELECT 'done'; END");
- [Test]
- public void ExecuteScalar()
- {
+ MySqlDataReader reader = null;
+ try
+ {
+ MySqlCommand proc = new MySqlCommand("spTest", conn);
+ proc.CommandType = CommandType.StoredProcedure;
+ IAsyncResult iar = proc.BeginExecuteReader();
+ int count = 0;
+ while (!iar.IsCompleted)
+ {
+ count++;
+ System.Threading.Thread.Sleep(20);
+ }
+
+ reader = proc.EndExecuteReader(iar);
+ Assert.IsNotNull(reader);
+ Assert.IsTrue(count > 0);
+ Assert.IsTrue(reader.Read());
+ Assert.AreEqual("done", reader.GetString(0));
+ reader.Close();
+
+ proc.CommandType = CommandType.Text;
+ proc.CommandText = "SELECT COUNT(*) FROM test";
+ object cnt = proc.ExecuteScalar();
+ Assert.AreEqual(300, cnt);
+ }
+ catch (Exception ex)
+ {
+ Assert.Fail(ex.Message);
+ }
+ finally
+ {
+ if (reader != null)
+ reader.Close();
+ }
}
}
}
Modified: trunk/mysqlclient/command.cs
===================================================================
--- trunk/mysqlclient/command.cs 2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/mysqlclient/command.cs 2006-08-08 06:41:36 UTC (rev 299)
@@ -350,77 +350,6 @@
throw new MySqlException( "Stored procedures are not supported on this version of MySQL" );
}
-/* internal bool ExecuteInternal()
- {
- if (statements.Count == 0)
- return false;
-
- Statement st = statements[0];
- st.Execute(parameters);
- statements.Remove(st);*/
-
-/* if (preparedStatement != null)
- {
- if (!preparedStatement.ExecutionCount > 0)
- return false;
- preparedStatement.Execute(parameters, cursorPageSize);
- }
- else
- {
- if (sqlBuffers.Count == 0) return false;
- MemoryStream sqlStream = (MemoryStream)sqlBuffers[0];
-
- using (sqlStream)
- {
- connection.driver.SendQuery(sqlStream.GetBuffer());
- sqlBuffers.RemoveAt(0);
- }
- }*/
-// return true;
- // }
-
-/* internal Statement GetNextResultset()
- {
- // execute the statement
-// ulong affectedRows;
- // long fieldCount;
-
- if (Driver.HasMoreResults(StatementId))
- return true;
-
- while (statements.Count > 0)
- {
- Statement statement = (Statement)statements[0];
- // if this is a prepared statement and we have already executed it,
- // then break out of the loop
- if (statement is PreparedStatement &&
- (statement as PreparedStatement).ExecutionCount > 0) break;
- statement.Execute(parameters);
- if (statement.HasRows)
- return statement;
- RecordsAffected += statement.RecordsAffected;
- }
- return null;*/
-/*
- while (true)
- {
- bool hasResults = connection.driver.ReadResult(ref fieldCount, ref affectedRows, ref lastInsertedId);
- if (hasResults)
- {
- if (fieldCount > 0)
- return fieldCount;
- if (updatedRowCount == -1)
- updatedRowCount = 0;
- updatedRowCount += affectedRows;
- }
- else
- {
- if (!ExecuteInternal())
- return 0;
- }
- }*/
-// }
-
/// <include file='docs/mysqlcommand.xml' path='docs/ExecuteNonQuery/*'/>
public override int ExecuteNonQuery()
{
@@ -428,29 +357,8 @@
MySqlDataReader reader = ExecuteReader();
reader.Close();
lastInsertedId = reader.InsertedId;
+ this.updatedRowCount = reader.RecordsAffected;
return reader.RecordsAffected;
-/* CheckState();
-
-// updatedRowsCount = -1;
-
-// if (preparedStatement == null)
-// sqlBuffers = PrepareSqlBuffers(CommandText);
-
- try
- {
-// ExecuteInternal();
- MySqlDataReader reader = new MySqlDataReader(this, statement, CommandBehavior.Default);
- reader.NextResult();
- reader.Close();
- }
- catch (MySqlException ex)
- {
- //TODO: fix this
- //connection.Abort();
- throw;
- }
-
- return (int)this.updatedRowCount;*/
}
internal void Close()
@@ -546,7 +454,8 @@
#region Async Methods
- internal delegate void AsyncExecuteNonQueryDelegate();
+ internal delegate int AsyncExecuteNonQueryDelegate();
+ internal delegate MySqlDataReader AsyncExecuteReaderDelegate(CommandBehavior behavior);
private string TrimSemicolons(string sql)
{
@@ -560,23 +469,44 @@
end--;
return sb.ToString(start, end-start+1);
}
- private void AsyncExecuteNonQuery()
- {
- ExecuteNonQuery();
- }
+ public IAsyncResult BeginExecuteReader()
+ {
+ return BeginExecuteReader(CommandBehavior.Default);
+ }
+
+ public IAsyncResult BeginExecuteReader(CommandBehavior behavior)
+ {
+ AsyncExecuteReaderDelegate del = new AsyncExecuteReaderDelegate(ExecuteReader);
+ asyncResult = del.BeginInvoke(behavior, null, null);
+ return asyncResult;
+ }
+
+ public MySqlDataReader EndExecuteReader(IAsyncResult result)
+ {
+ result.AsyncWaitHandle.WaitOne();
+ return connection.Reader;
+ }
+
+ public IAsyncResult BeginExecuteNonQuery(AsyncCallback callback, object stateObject)
+ {
+ AsyncExecuteNonQueryDelegate del =
+ new AsyncExecuteNonQueryDelegate(ExecuteNonQuery);
+ asyncResult = del.BeginInvoke(callback, stateObject);
+ return asyncResult;
+ }
+
public IAsyncResult BeginExecuteNonQuery()
{
AsyncExecuteNonQueryDelegate del =
- new AsyncExecuteNonQueryDelegate(AsyncExecuteNonQuery);
+ new AsyncExecuteNonQueryDelegate(ExecuteNonQuery);
asyncResult = del.BeginInvoke(null, null);
return asyncResult;
}
public int EndExecuteNonQuery(IAsyncResult result)
{
- while (! result.IsCompleted)
- System.Threading.Thread.Sleep(100);
+ result.AsyncWaitHandle.WaitOne();
return (int)updatedRowCount;
}
Added: trunk/mysqlclient/common/SocketStream.cs
===================================================================
--- trunk/mysqlclient/common/SocketStream.cs 2006-08-07 03:06:43 UTC (rev 298)
+++ trunk/mysqlclient/common/SocketStream.cs 2006-08-08 06:41:36 UTC (rev 299)
@@ -0,0 +1,118 @@
+using System;
+using System.IO;
+using System.Net;
+using System.Net.Sockets;
+using System.Collections;
+
+namespace MySql.Data.Common
+{
+ /// <summary>
+ /// Summary description for MySqlSocket.
+ /// </summary>
+ internal sealed class SocketStream : Stream
+ {
+ private Socket socket;
+
+ public SocketStream(AddressFamily addressFamily, SocketType socketType, ProtocolType protocol)
+ : base()
+ {
+ socket = new Socket(addressFamily, socketType, protocol);
+ }
+
+ #region Properties
+
+ public Socket Socket
+ {
+ get { return socket; }
+ }
+
+ public override bool CanRead
+ {
+ get { return true; }
+ }
+
+ public override bool CanSeek
+ {
+ get { return false; }
+ }
+
+ public override bool CanWrite
+ {
+ get { return true; }
+ }
+
+ public override long Length
+ {
+ get { return 0; }
+ }
+
+ public override long Position
+ {
+ get { return 0; }
+ set { throw new NotSupportedException("SocketStream does not support seek"); }
+ }
+
+ #endregion
+
+ #region Stream Implementation
+
+ public override void Flush()
+ {
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ return socket.Receive(buffer, offset, count, SocketFlags.None);
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ throw new NotSupportedException("SocketStream does not support seek");
+ }
+
+ public override void SetLength(long value)
+ {
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ socket.Send(buffer, offset, count, SocketFlags.None);
+ }
+
+
+ #endregion
+
+ public bool Connect(EndPoint remoteEP, int timeout)
+ {
+ // set the socket to non blocking
+ socket.Blocking = false;
+
+ // then we star the connect
+ SocketAddress addr = remoteEP.Serialize();
+ byte[] buff = new byte[addr.Size];
+ for (int i=0; i<addr.Size; i++)
+ buff[i] = addr[i];
+
+ int result = NativeMethods.connect(socket.Handle, buff, addr.Size);
+ int wsaerror = NativeMethods.WSAGetLastError();
+ if (wsaerror != 10035)
+ throw new Exception("Error creating MySQLSocket");
+
+ // next we wait for our connect timeout or until the socket is connected
+ ArrayList write = new ArrayList();
+ write.Add(socket);
+ ArrayList error = new ArrayList();
+ error.Add(socket);
+
+ Socket.Select(null, write, error, timeout*1000*1000);
+
+ if (write.Count == 0) return false;
+
+ // set socket back to blocking mode
+ socket.Blocking = true;
+ return true;
+ }
+
+
+ }
+}
Thread |
---|
• Connector/NET commit: r299 - in trunk: TestSuite mysqlclient mysqlclient/common | rburnett | 8 Aug |