同步 I/O 意味着在 I/O 操作完成之前,方法被阻塞,I/O 操作完成后,方法返回其数据。使用异步 I/O,用户可以调用 BeginRead 或 BeginWrite。主线程可以继续进行其他工作,稍后,用户将能够处理数据。另外,多个 I/O 请求可以被同时挂起。
要在此数据可用时得到通知,您可以调用 EndRead 或 EndWrite,传入与您发出的 I/O 请求对应的 IAsyncResult。您还可以提供回调方法,该回调方法应调用 EndRead 或 EndWrite 以计算出读取或写入了多少字节。当许多 I/O 请求被同时挂起时,异步 I/O 可以提供较好的性能,但通常要求对您的应用程序进行一些重要的调整以使其正常工作。
Stream 类支持在同一个流上混合使用同步和异步读取及写入,而不管操作系统是否允许。Stream 根据其同步实现提供默认的异步读取和写入操作的实现,而根据其异步实现提供默认的同步读取和写入操作的实现。
当实现 Stream 的派生类时,必须为同步或异步 Read 和 Write 方法提供实现。虽然允许重写 Read 和 Write,并且异步方法(BeginRead、EndRead、BeginWrite 和 EndWrite)的默认实现将和同步方法的实现一起工作,但这不能提供最有效的性能。与之相似,如果您提供了一个异步方法的实现,同步 Read 和 Write 方法也将正常工作,但是,如果您专门实现同步方法,性能通常会更好。ReadByte 和 WriteByte 的默认实现调用带有一个元素字节数组的同步 Read 和 Write 方法。当从 Stream 派生类时,如果有内部字节缓冲区,强烈建议重写这些方法以访问内部缓冲区,这样性能将得到显著提高。
连接到后备存储器的流重写同步或异步 Read 和 Write 方法之一,以获取默认情况下另一种方法的功能。如果流不支持异步或同步操作,实施者只需让适当的方法引发异常即可。
下面的示例是一个假设的批量图像处理器的异步实现,其后是同步实现的示例。本代码用于在目录中的每个文件上执行耗费 CPU 资源的操作。有关更多信息,请参见 .NET Framework 开发人员规范中的“.NET 异步编程模型”主题。
[C#]
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;public class BulkImageProcAsync
{public const String ImageBaseName = "tmpImage-";public const int numImages = 200;public const int numPixels = 512*512;// ProcessImage has a simple O(N) loop, and you can vary the number// of times you repeat that loop to make the application more CPU-// bound or more IO-bound.public static int processImageRepeats = 20;// Threads must decrement NumImagesToFinish, and protect// their access to it through a mutex.public static int NumImagesToFinish = numImages;public static Object NumImagesMutex = new Object[0];// WaitObject is signalled when all image processing is done.public static Object WaitObject = new Object[0];public class ImageStateObject{public byte[] pixels;public int imageNum;public FileStream fs;}public static void MakeImageFiles(){int sides = (int) Math.Sqrt(numPixels);Console.Write("Making "+numImages+" "+sides+"x"+sides+" images... ");byte[] pixels = new byte[numPixels];for(int i=0; i<numPixels; i++)pixels[i] = (byte) i;for(int i=0; i<numImages; i++) {FileStream fs = new FileStream(ImageBaseName+i+".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, false);fs.Write(pixels, 0, pixels.Length);FlushFileBuffers(fs.Handle);fs.Close();}Console.WriteLine("Done.");}public static void ReadInImageCallback(IAsyncResult asyncResult){ImageStateObject state = (ImageStateObject) asyncResult.AsyncState;Stream stream = state.fs;int bytesRead = stream.EndRead(asyncResult);if (bytesRead != numPixels)throw new Exception("In ReadInImageCallback, got the wrong number of bytes from the image: {0}.", bytesRead);ProcessImage(state.pixels, state.imageNum);stream.Close();// Now write out the image. // Using asynchronous I/O here appears not to be best practice.// It ends up swamping the threadpool, because the threadpool// threads are blocked on I/O requests that were just queued to// the threadpool. FileStream fs = new FileStream(ImageBaseName+state.imageNum+".done",FileMode.Create, FileAccess.Write, FileShare.None, 4096, false);fs.Write(state.pixels, 0, numPixels);fs.Close();// This application model uses too much memory.// Releasing memory as soon as possible is a good idea, // especially global state.state.pixels = null;// Record that an image is finished now.lock(NumImagesMutex) {NumImagesToFinish--;if (NumImagesToFinish==0) {Monitor.Enter(WaitObject);Monitor.Pulse(WaitObject);Monitor.Exit(WaitObject);}}}public static void ProcessImage(byte[] pixels, int imageNum){Console.WriteLine("ProcessImage "+imageNum);// Perform some CPU-intensive operation on the image.for(int i=0; i<processImageRepeats; i++)for(int j=0; j<numPixels; j++)pixels[j] += 1;Console.WriteLine("ProcessImage "+imageNum+" done.");}public static void ProcessImagesInBulk(){Console.WriteLine("Processing images... ");long t0 = Environment.TickCount;NumImagesToFinish = numImages;AsyncCallback readImageCallback = new AsyncCallback(ReadInImageCallback);for(int i=0; i<numImages; i++) {ImageStateObject state = new ImageStateObject();state.pixels = new byte[numPixels];state.imageNum = i;// Very large items are read only once, so you can make the // buffer on the FileStream very small to save memory.FileStream fs = new FileStream(ImageBaseName+i+".tmp",FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);state.fs = fs;fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,state);}// Determine whether all images are done being processed. // If not, block until all are finished.bool mustBlock = false;lock (NumImagesMutex) {if (NumImagesToFinish > 0)mustBlock = true;}if (mustBlock) {Console.WriteLine("All worker threads are queued. Blockinguntil they complete. numLeft: {0}.", NumImagesToFinish);Monitor.Enter(WaitObject);Monitor.Wait(WaitObject);Monitor.Exit(WaitObject);}long t1 = Environment.TickCount;Console.WriteLine("Total time processing images: {0} ms",(t1-t0));}public static void Cleanup(){for(int i=0; i<numImages; i++) {File.Delete(ImageBaseName+i+".tmp");File.Delete(ImageBaseName+i+".done");}}public static void TryToClearDiskCache(){// Try to force all pending writes to disk, and clear the// disk cache of any data.byte[] bytes = new byte[100*(1<<20)];for(int i=0; i<bytes.Length; i++)bytes[i] = 0;bytes = null;GC.Collect();Thread.Sleep(2000);}public static void Main(String[] args){Console.WriteLine("Bulk image processing sample application, using async IO.");Console.WriteLine("Simulates applying a simple transformation to"+numImages+" /"images/"");Console.WriteLine("(Async FileStream & Threadpool benchmark)");Console.WriteLine("Warning - this test requires "+(numPixels *numImages * 2)+" bytes of temporary space");if (args.Length==1) {processImageRepeats = Int32.Parse(args[0]);Console.WriteLine("ProcessImage inner loop - {0}.",processImageRepeats);}MakeImageFiles();TryToClearDiskCache();ProcessImagesInBulk();Cleanup();}[DllImport("KERNEL32", SetLastError=true)]private static extern void FlushFileBuffers(IntPtr handle);
}
以下是同一假设的同步示例。
[C#]
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;public class BulkImageProcSync
{public const String ImageBaseName = "tmpImage-";public const int numImages = 200;public const int numPixels = 512*512;// ProcessImage has a simple O(N) loop, and you can vary the number// of times you repeat that loop to make the application more CPU-// bound or more IO-bound.public static int processImageRepeats = 20;public static void MakeImageFiles(){int sides = (int) Math.Sqrt(numPixels);Console.Write("Making "+numImages+" "+sides+"x"+sides+" images... ");byte[] pixels = new byte[numPixels];for(int i=0; i<numPixels; i++)pixels[i] = (byte) i;for(int i=0; i<numImages; i++) {FileStream fs = new FileStream(ImageBaseName+i+".tmp",FileMode.Create, FileAccess.Write, FileShare.None, 8192, false);fs.Write(pixels, 0, pixels.Length);FlushFileBuffers(fs.Handle);fs.Close();}Console.WriteLine("Done.");}public static void ProcessImage(byte[] pixels, int imageNum){Console.WriteLine("ProcessImage "+imageNum);// Perform some CPU-intensive operation on the image.for(int i=0; i<processImageRepeats; i++)for(int j=0; j<numPixels; j++)pixels[j] += 1;Console.WriteLine("ProcessImage "+imageNum+" done.");}public static void ProcessImagesInBulk(){Console.WriteLine("Processing images... ");long t0 = Environment.TickCount;byte[] pixels = new byte[numPixels];for(int i=0; i<numImages; i++) {FileStream input = new FileStream(ImageBaseName+i+".tmp",FileMode.Open, FileAccess.Read, FileShare.Read, 4196, false);input.Read(pixels, 0, numPixels);input.Close();ProcessImage(pixels, i);FileStream output = new FileStream(ImageBaseName+i+".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, false);output.Write(pixels, 0, numPixels);output.Close();}long t1 = Environment.TickCount;Console.WriteLine("Total time processing images: {0} ms", (t1-t0));}public static void Cleanup(){for(int i=0; i<numImages; i++) {File.Delete(ImageBaseName+i+".tmp");File.Delete(ImageBaseName+i+".done");}}public static void TryToClearDiskCache(){byte[] bytes = new byte[100*(1<<20)];for(int i=0; i<bytes.Length; i++)bytes[i] = 0;bytes = null;GC.Collect();Thread.Sleep(2000);}public static void Main(String[] args){Console.WriteLine("Bulk image processing sample application, using synchronous I/O");Console.WriteLine("Simulates applying a simple transformation to "+numImages+" /"images/"");Console.WriteLine("(ie, Sync FileStream benchmark)");Console.WriteLine("Warning - this test requires "+(numPixels * numImages * 2)+" bytes of tmp space");if (args.Length==1) {processImageRepeats = Int32.Parse(args[0]);Console.WriteLine("ProcessImage inner loop – "+processImageRepeats);}MakeImageFiles();TryToClearDiskCache();ProcessImagesInBulk();Cleanup();}[DllImport("KERNEL32", SetLastError=true)]private static extern void FlushFileBuffers(IntPtr handle);
}