NetMQ是一个封装了Socket队列的开源库,他是ZeroMQ的.net移植版,而ZeroMQ是用C写成的,有人测试过他的性能,几乎可以秒杀其他所有的MQ(MSMQ,RabitMQ等等,都不是他的对手),不过他也有一个弱点,消息不支持持久化!
定义要发送到消息里的对象
1 using System; 2 using ProtoBuf; 3 4 namespace Model 5 { 6 [Serializable] 7 [ProtoContract] 8 public class Person 9 { 10 [ProtoMember(1)] 11 public int Id { get; set; } 12 [ProtoMember(2)] 13 public string Name { get; set; } 14 [ProtoMember(3)] 15 public DateTime BirthDay { set; get; } 16 [ProtoMember(4)] 17 public Address Address { get; set; } 18 } 19 }
using System; using ProtoBuf;namespace Model {[Serializable][ProtoContract]public class Address{[ProtoMember(1)]public string Line1 { get; set; }[ProtoMember(2)]public string Line2 { get; set; }} }
消息的发送者
using System; using System.IO; using System.Runtime.Remoting.Channels; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Threading.Tasks; using Model; using NetMQ; using ProtoBuf; using ProtoBuf.Meta;namespace Ventilator {sealed class Ventilator{public void Run(){Task.Run(() =>{using (var ctx = NetMQContext.Create())using (var sender = ctx.CreatePushSocket())using (var sink = ctx.CreatePushSocket()){sender.Bind("tcp://*:5557");sink.Connect("tcp://localhost:5558");sink.Send("0");Console.WriteLine("Sending tasks to workers");RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;//send 100 tasks (workload for tasks, is just some random sleep time that//the workers can perform, in real life each work would do more than sleepfor (int taskNumber = 0; taskNumber < 10000; taskNumber++){Console.WriteLine("Workload : {0}", taskNumber);var person = new Person{Id = taskNumber,Name = "First",BirthDay = DateTime.Parse("1981-11-15"),Address = new Address { Line1 = "Line1", Line2 = "Line2" }};using (var sm = new MemoryStream()){//Serializer.PrepareSerializer<Person>();//Serializer.Serialize(sm, person);//sender.Send(sm.ToArray());var binaryFormatter = new BinaryFormatter();binaryFormatter.Serialize(sm, person);sender.Send(sm.ToArray());}}}});}} }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ;namespace Ventilator {public class Program{public static void Main(string[] args){// Task Ventilator// Binds PUSH socket to tcp://localhost:5557// Sends batch of tasks to workers via that socketConsole.WriteLine("====== VENTILATOR ======");Console.WriteLine("Press enter when worker are ready");Console.ReadLine();//the first message it "0" and signals start of batch//see the Sink.csproj Program.cs file for where this is usedConsole.WriteLine("Sending start of batch to Sink");var ventilator = new Ventilator();ventilator.Run();Console.WriteLine("Press Enter to quit");Console.ReadLine();}} }
消息的处理者
using System; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Threading.Tasks; using Model; using NetMQ; using ProtoBuf;namespace Worker {sealed class Worker{public void Run(){Task.Run(() =>{using (NetMQContext ctx = NetMQContext.Create()){//socket to receive messages onusing (var receiver = ctx.CreatePullSocket()){receiver.Connect("tcp://localhost:5557");//socket to send messages onusing (var sender = ctx.CreatePushSocket()){sender.Connect("tcp://localhost:5558");//process tasks foreverwhile (true){//workload from the vetilator is a simple delay//to simulate some work being done, see//Ventilator.csproj Proram.cs for the workload sent//In real life some more meaningful work would be done//string workload = receiver.ReceiveString();var receivedBytes = receiver.Receive();using (var sm = new MemoryStream(receivedBytes)){//Protobuf.net 序列化在多线程方式下报错:/*Timeout while inspecting metadata; this may indicate a deadlock. This can often be avoided by preparing necessary serializers during application initialization, rather than allowing multiple threads to perform the initial metadata inspection; please also see the LockContended event*///var person = Serializer.Deserialize<Person>(sm);//采用二进制方式var binaryFormatter = new BinaryFormatter();var person = binaryFormatter.Deserialize(sm) as Person;Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" +person.BirthDay + ",Address:{Line1:" + person.Address.Line1 +",Line2:" + person.Address.Line2 + "}}");Console.WriteLine("Sending to Sink:" + person.Id);sender.Send(person.Id + "");}//simulate some work being done//Thread.Sleep(int.Parse(workload)); }}}}});}} }
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks;namespace Worker {public class Program{public static void Main(string[] args){// Task Worker// Connects PULL socket to tcp://localhost:5557// collects workload for socket from Ventilator via that socket// Connects PUSH socket to tcp://localhost:5558// Sends results to Sink via that socketConsole.WriteLine("====== WORKER ======");//Task 方式多线程//foreach (Worker client in Enumerable.Range(0, 1000).Select(// x => new Worker()))//{// client.Run();//}//多核计算方式多线程var actList =Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList();var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };Parallel.Invoke(paraOption, actList.ToArray());Console.ReadLine();}} }