CNative.MQExtend 1.7.2

dotnet add package CNative.MQExtend --version 1.7.2
NuGet\Install-Package CNative.MQExtend -Version 1.7.2
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="CNative.MQExtend" Version="1.7.2" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add CNative.MQExtend --version 1.7.2
#r "nuget: CNative.MQExtend, 1.7.2"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install CNative.MQExtend as a Cake Addin
#addin nuget:?package=CNative.MQExtend&version=1.7.2

// Install CNative.MQExtend as a Cake Tool
#tool nuget:?package=CNative.MQExtend&version=1.7.2

🏡CNative.MQExtend 分布式消息队列

**统一方法,集成 Kafka;RabbitMQ;ActiveMQ三个消息通讯客户端,实现消息总线功能

🎂整体架构

整体架构   

📢主要功能

  消息总线、分布式消息处理等功能

配置文件:

App.config

   <?xml version="1.0" encoding="utf-8" ?>
<configuration>
	<appSettings>
		
		<add key="MQType" value="RabbitQM" />
		
		<add key="MQBrokerUri" value="127.0.0.1" />
		
		<add key="MQUserName" value="root" />
		
		<add key="MQPassword" value="123456" />
		
		
		<add key="MQQueueName" value="testQueueName" />
        
		<add key="MQIsAckBack" value="false" />
		
		
		
		<add key="ActiveMQType" value="Queue" />
		
		<add key="ActiveMQFilterName" value="" />

		
		
		<add key="KafkaAck" value="Queue" />
		
		
		<add key="KafkaGroupId" value="testGroup" />
		
		<add key="KafkaEnableAutoCommit" value="false" />
		
		<add key="TransformToDLQ" value="false" />
		
		<add key="KafkaOffsetReset" value="Earliest" />

		
		
		<add key="RabbitPort" value="5672" />
		<add key="RabbitVirtualHost" value="" />
		
		<add key="RabbitExchangeType" value="direct" />
		
		<add key="RabbitExchangeName" value="" />
		
		<add key="RabbitRoutingKey" value="" />
		
		<add key="RequestedHeartbeat" value="5000" />
		
		<add key="RabbitDurable" value="false" />
		
		<add key="LoggerAssemblyName" value="" />
		<add key="LoggerAssemblyTypeName" value="" />
	</appSettings>
</configuration>


appsettings.json
{
  "AppSettings": {
    "MQType": "RabbitQM", //消息队列类型 MQ类型[0=None;1=ActiveMQ;2=Kafka;3=RabbitQM

    "MQBrokerUri": "192.168.3.128", //服务地址
    "MQUserName": "root", //用户名
    "MQPassword": "123456", //密码

    "MQQueueName": "testQueueName", //队列名称    
    "MQIsAckBack": "false", //是否回写确认消息

    //ActiveProducerConfig / ActiveSubscribeConfig
    "ActiveMQType": "Queue", //指定使用队列的模式 Queue / Topic
    "ActiveMQFilterName": "", //队列过滤字段

    //KafkaProducerConfig / KafkaSubscribeConfig
    "KafkaAck": "Queue", //Acks All / None / Leader

    "KafkaGroupId": "testGroup", //消费者组ID
    "KafkaEnableAutoCommit": "false", //是否自动提交
    "KafkaTransformToDLQ": "false", //消费失败是否转发到DLQ队列
    "KafkaOffsetReset": "Earliest", //

    //RabbitProducerConfig / RabbitSubscribeConfig
    "RabbitPort": "5672", //端口
    "RabbitVirtualHost": "", //
    "RabbitExchangeType": "direct", //队列类型 simple / direct / fanout / topic
    "RabbitExchangeName": "", //交换名称
    "RabbitRoutingKey": "", //路由Key
    "RabbitHeartbeat": "5000", //心跳超时时间s
    "RabbitDurable": "false" //设置是否持久化

  }
}

👑示例

 class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("测试消息队列");
                new MQTest().Handle();

                Console.WriteLine("\n测试消息总线");
                new EventsBusTest().Handle();
            }
            catch(Exception ex){ Console.WriteLine(ex + " Error"); }
            Console.ReadLine();
        }

    }

##消息队列示例:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using CNative.MQExtend;
using CNative.MQExtend.Interface;
using CNative.MQExtend.Model.Config.Producer;

namespace ConsoleTestApp
{
    class MQTest
    {
        /// <summary>
        /// 消息队列生产者通道
        /// </summary>
        protected  IProducerChannel producerChannel { get; set; }
        /// <summary>
        /// 消息队列消费者通道
        /// </summary>
        protected  ISubscribeChannel subscribeChannel { get; set; }
        protected  ProducerConfigs producerConfigs { get; set; }
        public void Handle()
        {
            try
            {
                producerConfigs = new ProducerConfigs();
                producerChannel = ChannelAdapterFactory.GetProducerChannel();
                subscribeChannel = ChannelAdapterFactory.GetSubscribeChannel();

                subscribeChannel.Subscribe(producerConfigs.QueueName, SubscribeAction);

                producerChannel.Producer("测试消息队列 发送时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF"));
                producerChannel.Producer("测试消息队列2 发送时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF"));

                 //----------------------------------------------------------------------------------------------------------------------
                var cf = new CNative.MQExtend.Model.Config.Subscribe.RabbitSubscribeConfig()
                {
                    BrokerUri = "192.168.3.128",
                    UserName = "admin",
                    Password = "123456",
                    Port = 5672,
                    ExchangeType = CNative.MQExtend.Model.Enums.RabbitExchangeType.simple,
                    QueueName = "testqu",
                    ExchangeName = "",
                    RoutingKey = "",
                    Durable = false
                };
                var subscribeChannel2 = ChannelAdapterFactory.GetSubscribeChannel(cf);
                subscribeChannel2.Subscribe("testqu", (messag)=>
                {
                    try
                    {
                        var msg = messag.ToString();
                        Console.WriteLine("test接收时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF") + "【" + msg + "】");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex + "SubscribeAction2 Error");
                    }
                });
                (subscribeChannel2 as CNative.MQExtend.Executor.Rabbit.RabbitMQConsumer).ErrorNotice =
                (exchange, routingKey, exception, message) =>
                {
                    Console.WriteLine(exception.ToString() + ",ErrorNotice Error");
                };
            }
            catch (Exception ex) { Console.WriteLine(ex + " Error"); }

        }

        #region SubscribeAction
        /// <summary>
        /// 收到排样信息
        /// </summary>
        /// <param name="messag"></param>
        protected  void SubscribeAction(CNative.MQExtend.Model.Message.IMessageContent messag)
        {
            try
            {
                Console.WriteLine("接收时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF") + "【" + messag?.Value+"】");
                //if (messag != null && !string.IsNullOrWhiteSpace(messag.Value))
                //{

                //    //var msg = messag.Value.JsonToObject<LISParser.Entity.InspectOrder>();
                //    //if (msg != null)
                //    //{
                //    lock (producerConfigs)
                //        {
                //           var msg = messag.Value;
                //            //mListSubscribe.Add(msg);

                //            //ii_times = 0;
                //            //ib_send = true;
                //            //is_status = 0;
                //            //ii_serial = 0;

                //            //var ls_serial = getSerial();
                //            //var ls_output = CheckSumOne(ls_serial + is_asking);

                //            //MessageChannel.Send(formatString(ls_output));
                //        }
                //    //}
                //    //else
                //    //{
                //    //    Console.WriteLine("SubscribeAction 反序列化失败=" + messag.Value);
                //    //}
                //}
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex + "SubscribeAction Error");
            }
        }
        #endregion
    }
}

##消息总线实现 继承: IEventHandler

 public class Notice:EventData
    {
        public long Id { get; set; }
        public string Name { get; set; }
        public string Msg { get; set; }
    }

     public class TT:EventData
    {
        public long Id { get; set; }
        public string Name { get; set; }
        public string Msg { get; set; }
    }

     public class MailSend : IEventHandler<Notice>
    {
        public void Handler(Notice entity)
        {
            Console.WriteLine($"MailSend 执行时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")}:【你好{entity.Name},{entity.Msg}】");
        }
    }
    public class Mailend : IEventHandler<Notice>
    {
        public void Handler(Notice entity)
        {
            Console.WriteLine($"Mailend 执行时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")}:【消息发送完毕!】");
        }
    }

     public class TTSend : IEventHandler<TT>
    {
        public void Handler(TT entity)
        {
            Console.WriteLine($"TTSend {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")}:【你好{entity.Name},{entity.Msg}】");
        }
    }
    public class TTend : IEventHandler<TT>
    {
        public void Handler(TT entity)
        {
            //Console.WriteLine($"EventSource,{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")}:【{entity.EventSource}】");
            Console.WriteLine($"TTend {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")}:【消息发送完毕!】");
        }
    }


     class EventsBusTest
    {
        public void Handle()
        {
            var bus = EventBusFactory.CreateEventBus();//根据配置匹配消息队列顾类型
            Console.WriteLine("消息队列类型:" + bus.MQType);
            bus.SubscribeAll(typeof(EventsBusTest).Assembly);
            Notice notice = new Notice()
            {
                Id = 1100,
                EventSource = this,
                Name = "STONE",
                Msg = "后天放假,祝节假日快乐!发送时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")
            };

            TT tt = new TT()
            {
                Id = 1100,
                EventSource = notice,
                Name = "STONETT",
                Msg = "TT后天放假,祝节假日快乐!发送时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss FFFFF")
            };
            bus.Publish(notice);
            bus.Publish(tt);


        }
    }
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 is compatible. 
.NET Framework net451 is compatible.  net452 was computed.  net46 was computed.  net461 is compatible.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.7.2 180 12/8/2023
1.7.1.6 413 4/29/2022
1.7.1.5 384 4/29/2022
1.7.1.4 387 4/22/2022
1.7.1.3 384 4/22/2022
1.7.1.2 382 4/21/2022
1.7.1.1 376 4/21/2022
1.7.1 402 6/29/2021