Redis 消息中间件 ServiceStack.Redis 轻量级

black2bi 2020/5/22 22:36:29

问题:公司开了个新项目,算上我一共个人。车间里机台通过流水线连通联动的玩意。一个管理控制系统连接各个机台和硬件。专机类型就有种,个数差不多个左右。软件规划的时候采用总分的结构,管理控制系统和专机子系统之间通过消息中间件通讯。本来也想连接来着,但是开发时间不允许。而且每个系统都得写一

问题:

  • 公司开了个新项目,算上我一共3个人。车间里机台通过流水线连通联动的玩意。一个管理控制系统连接各个机台和硬件。专机类型就有5种,个数差不多20个左右。

  • 软件规划的时候采用总分的结构,管理控制系统和专机子系统之间通过消息中间件通讯。本来也想TCP连接来着,但是开发时间不允许。而且每个系统都得写一遍这个玩意。

  • 消息中间件有很多个,比如 Kafka、RabbitMQ、RocketMQ等国内外的消息中间件。这些中间件无论宣称的多么轻量级都要啃一下,更要命的是就他娘三个人。而且后面还要这个鸡儿系统可复制。

  • 考虑到消息及时性、开发难易程度、维护简便性等因素后决定用Redis的pub/sub功能来实现.软件结构大概如类似结构。

 

可用性:

  • 作为消息通知属于安装了Redis就有的功能,因为Redis是用在系统中存储一些热数据,不用单独维护,在Windows中属于服务直接就开了。

  • 作为可以分布式集群使用的数据库,消息传递应该比较OK了。虽然使用的client-server,但是server-server已经很好了。料想client-server也不会差

  • 试验消息内容发送订阅的情况下,速度在30毫秒内,貌似可以。看其他博主说大于10K入队比较慢,但是可以不用入消息队列啊,用发布订阅。

  • .net 下一般使用ServiceStack.Redis,要命的是4.0以后收费,可以破解的但是不支持List<T>型的数据直接存取,想用只能变成JSON字符串存着。

  • 如果只是用订阅发布功能,不存储热数据或者不使用List<T>的数据可以使用4.0以上的版本。文末会贴上两个类型的下载包。想用其他的包也可以,我这里只说一种思路。

实现:

模块结构图展示如下

 public static class MSServer
    {
        // 定义一个object对象
        private static object objinstance = new object();
    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> ServerState CurState =<span style="color: #000000;"> ServerState.Free;

    </span><span style="color: #0000ff;">static</span><span style="color: #000000;"> PooledRedisClientManager prcm;

    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">string</span> clientmake = <span style="color: #0000ff;">string</span><span style="color: #000000;">.Empty;

    </span><span style="color: #808080;">///</span> <span style="color: #808080;">&lt;summary&gt;</span>
    <span style="color: #808080;">///</span><span style="color: #008000;"> 连接的地址
    </span><span style="color: #808080;">///</span> <span style="color: #808080;">&lt;/summary&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;param name="IP"&gt;</span><span style="color: #008000;">地址127.0.0.1:6379</span><span style="color: #808080;">&lt;/param&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;param name="rechannels"&gt;</span><span style="color: #008000;">接收通道 {"channel:1-13","channel:1-5"}</span><span style="color: #808080;">&lt;/param&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;returns&gt;&lt;/returns&gt;</span>
    <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">int</span> OpenServer(<span style="color: #0000ff;">string</span> IP ,<span style="color: #0000ff;">string</span><span style="color: #000000;">[] rechannels)
    {
        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">
        {
            </span><span style="color: #0000ff;">if</span> (prcm == <span style="color: #0000ff;">null</span><span style="color: #000000;">)
            {
                </span><span style="color: #0000ff;">lock</span><span style="color: #000000;"> (objinstance)
                {
                    </span><span style="color: #0000ff;">if</span> (prcm == <span style="color: #0000ff;">null</span><span style="color: #000000;">)
                    {
                        prcm </span>=<span style="color: #000000;"> CreateManager(IP, IP);
                        CurState </span>=<span style="color: #000000;"> ServerState.Init;
                        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> CreateLink(rechannels);
                    }
                }
            }
        }
        </span><span style="color: #0000ff;">catch</span><span style="color: #000000;">
        {
            prcm </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
            CurState </span>=<span style="color: #000000;"> ServerState.Free;
            </span><span style="color: #0000ff;">return</span> -<span style="color: #800080;">1</span><span style="color: #000000;">;
        }
        </span><span style="color: #0000ff;">return</span> <span style="color: #800080;">1</span><span style="color: #000000;">;
    }

    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">int</span> CreateLink(<span style="color: #0000ff;">string</span><span style="color: #000000;">[] SourceID)
    {
        </span><span style="color: #0000ff;">if</span> (CurState == ServerState.Init &amp;&amp; SourceID.Length &gt; <span style="color: #800080;">0</span><span style="color: #000000;">)
        {
            </span><span style="color: #0000ff;">try</span><span style="color: #000000;">
            {
                </span><span style="color: #0000ff;">using</span> (IRedisClient Redis =<span style="color: #000000;"> prcm.GetReadOnlyClient())
                {
                    clientmake </span>= SourceID[<span style="color: #800080;">0</span><span style="color: #000000;">];
                    </span><span style="color: #0000ff;">var</span> info = Redis.GetClientsInfo().Where(i =&gt; i[<span style="color: #800000;">"</span><span style="color: #800000;">name</span><span style="color: #800000;">"</span>] ==<span style="color: #000000;"> clientmake).ToList();
                    info.ForEach(i </span>=&gt;<span style="color: #000000;">
                    {
                        Redis.KillClient(i[</span><span style="color: #800000;">"</span><span style="color: #800000;">addr</span><span style="color: #800000;">"</span><span style="color: #000000;">]);
                    });
                    Redis.SetClient(clientmake);
                    IRedisSubscription sc </span>=<span style="color: #000000;"> Redis.CreateSubscription();
                    Task.Run(() </span>=&gt;<span style="color: #000000;">
                    {
                        </span><span style="color: #0000ff;">try</span><span style="color: #000000;">
                        {
                            sc.SubscribeToChannels(SourceID);
                        }
                        </span><span style="color: #0000ff;">catch</span><span style="color: #000000;"> { }
                    });
                    sc.OnMessage </span>+= <span style="color: #0000ff;">new</span> Action&lt;<span style="color: #0000ff;">string</span>, <span style="color: #0000ff;">string</span>&gt;<span style="color: #000000;">(showpub);
                }
                CurState </span>=<span style="color: #000000;"> ServerState.Work;
            }
            </span><span style="color: #0000ff;">catch</span><span style="color: #000000;">
            {
                </span><span style="color: #0000ff;">string</span> message = <span style="color: #0000ff;">string</span><span style="color: #000000;">.Empty;
                prcm </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
                CurState </span>=<span style="color: #000000;"> ServerState.Free;
                </span><span style="color: #0000ff;">return</span> -<span style="color: #800080;">1</span><span style="color: #000000;">;
            }
            </span><span style="color: #0000ff;">return</span> <span style="color: #800080;">1</span><span style="color: #000000;">;
        }
        </span><span style="color: #0000ff;">else</span><span style="color: #000000;">
        {
            </span><span style="color: #0000ff;">return</span> <span style="color: #800080;">0</span><span style="color: #000000;">;
        }
    }


    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> Action&lt;<span style="color: #0000ff;">string</span>, <span style="color: #0000ff;">string</span>&gt;<span style="color: #000000;"> ReceiveMessage;
    </span><span style="color: #0000ff;">static</span> <span style="color: #0000ff;">void</span> showpub(<span style="color: #0000ff;">string</span> channel, <span style="color: #0000ff;">string</span><span style="color: #000000;"> message)
    {
        </span><span style="color: #0000ff;">if</span> (ReceiveMessage != <span style="color: #0000ff;">null</span><span style="color: #000000;">)
        {
            ReceiveMessage(channel, message);
        }
    }

    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">static</span> PooledRedisClientManager CreateManager(<span style="color: #0000ff;">string</span> writeHost, <span style="color: #0000ff;">string</span><span style="color: #000000;"> readHost)
    {
        </span><span style="color: #0000ff;">var</span> redisClientConfig = <span style="color: #0000ff;">new</span><span style="color: #000000;"> RedisClientManagerConfig
        {
            MaxWritePoolSize </span>= <span style="color: #800080;">1</span>,<span style="color: #008000;">//</span><span style="color: #008000;">&ldquo;写&rdquo;链接池链接数</span>
            MaxReadPoolSize = <span style="color: #800080;">1</span>,<span style="color: #008000;">//</span><span style="color: #008000;">&ldquo;读&rdquo;链接池链接数</span>
            DefaultDb = <span style="color: #800080;">0</span><span style="color: #000000;">,
            AutoStart </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">,
        };
        </span><span style="color: #008000;">//</span><span style="color: #008000;">读的客户端只能接受特定的命令,不能用于发送信息</span>
        <span style="color: #0000ff;">var</span> RedisClientManager = <span style="color: #0000ff;">new</span><span style="color: #000000;"> PooledRedisClientManager(
            </span><span style="color: #0000ff;">new</span> <span style="color: #0000ff;">string</span>[] { writeHost }<span style="color: #008000;">//</span><span style="color: #008000;">用于写</span>
            , <span style="color: #0000ff;">new</span> <span style="color: #0000ff;">string</span>[] { readHost }<span style="color: #008000;">//</span><span style="color: #008000;">用于读</span>

                , redisClientConfig);
CurState = ServerState.Init;
        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> RedisClientManager;
    }
    </span><span style="color: #808080;">///</span> <span style="color: #808080;">&lt;summary&gt;</span>
    <span style="color: #808080;">///</span><span style="color: #008000;"> 发送信息
    </span><span style="color: #808080;">///</span> <span style="color: #808080;">&lt;/summary&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;param name="channel"&gt;</span><span style="color: #008000;">通讯对象 "channel:1-13"</span><span style="color: #808080;">&lt;/param&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;param name="meesage"&gt;</span><span style="color: #008000;">发送信息 "test send "</span><span style="color: #808080;">&lt;/param&gt;</span>
    <span style="color: #808080;">///</span> <span style="color: #808080;">&lt;returns&gt;</span><span style="color: #008000;">0 发送失败 1 发送成功 -1 连接损毁 检查网络后重建</span><span style="color: #808080;">&lt;/returns&gt;</span>
    <span style="color: #0000ff;">public</span> <span style="color: #0000ff;">static</span> <span style="color: #0000ff;">long</span> PubMessage(<span style="color: #0000ff;">string</span> channel, <span style="color: #0000ff;">string</span><span style="color: #000000;"> meesage)
    {
        </span><span style="color: #0000ff;">if</span> (CurState ==<span style="color: #000000;"> ServerState.Work)
        {
            </span><span style="color: #0000ff;">if</span> (!<span style="color: #0000ff;">string</span>.IsNullOrEmpty(channel) &amp;&amp; !<span style="color: #0000ff;">string</span><span style="color: #000000;">.IsNullOrEmpty(meesage))
            {
                </span><span style="color: #0000ff;">try</span><span style="color: #000000;">
                {
                    </span><span style="color: #0000ff;">using</span> (IRedisClient Redis =<span style="color: #000000;"> prcm.GetClient())
                    {
                        Redis.SetClient(clientmake);
                        </span><span style="color: #0000ff;">return</span><span style="color: #000000;"> Redis.PublishMessage(channel, meesage);
                    }
                }
                </span><span style="color: #0000ff;">catch</span><span style="color: #000000;">
                {
                    prcm </span>= <span style="color: #0000ff;">null</span><span style="color: #000000;">;
                    CurState </span>=<span style="color: #000000;"> ServerState.Free;
                    </span><span style="color: #0000ff;">return</span> -<span style="color: #800080;">1</span><span style="color: #000000;">;
                }
            }
            </span><span style="color: #0000ff;">else</span><span style="color: #000000;">
            {
                </span><span style="color: #0000ff;">return</span> <span style="color: #800080;">0</span><span style="color: #000000;">;
            }
        }
        </span><span style="color: #0000ff;">else</span><span style="color: #000000;">
        {
            </span><span style="color: #0000ff;">return</span> -<span style="color: #800080;">1</span><span style="color: #000000;">;
        }
    }
}

public enum ServerState
{
Free,
Init,
Work,
Del
}
 

 

 

有一个问题,就是连接远程的服务器时如果网络断开再重连,会残留没用的client ,这样如果网络断断续续的话,会留好多没有清除的客户端。

这个在3.0.504版本中Redis 中也有这个问题,不知道是基于什么考虑的。所以需要建立连接的时候,给个客户端名称,再初始化的时候删掉所有同类型的名称。

 

使用的时候大概类似操作 textbox2.text = "channel:1-5" .为了简便发布的和监听的都是本地的一个通道。

 private void button1_Click(object sender, EventArgs e)
        {
        </span><span style="color: #008000;">//</span><span style="color: #008000;">11.1.7.152   192.168.12.173</span>
        <span style="color: #0000ff;">int</span> result = ServerMS.MSServer.OpenServer(<span style="color: #800000;">"</span><span style="color: #800000;">127.0.0.1:6379</span><span style="color: #800000;">"</span>, <span style="color: #0000ff;">new</span> <span style="color: #0000ff;">string</span><span style="color: #000000;">[] { textBox2.Text });
        label1.Text </span>=<span style="color: #000000;"> result.ToString();
        </span><span style="color: #008000;">//</span><span style="color: #008000;">1匿名事件</span>
        ServerMS.MSServer.ReceiveMessage += <span style="color: #0000ff;">new</span> Action&lt;<span style="color: #0000ff;">string</span>, <span style="color: #0000ff;">string</span>&gt;<span style="color: #000000;">(fuck);

        </span><span style="color: #0000ff;">if</span> (result == <span style="color: #800080;">0</span><span style="color: #000000;">)
        {
            </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败重新发送 检查 通道和字符串后重新发送</span>

            }
else if (result == 1)
{
//发送成功
            }
else if (result == -1)
{
//连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
            }
    }

    </span><span style="color: #0000ff;">void</span> fuck(<span style="color: #0000ff;">string</span> channel, <span style="color: #0000ff;">string</span><span style="color: #000000;"> message)
    {
        </span><span style="color: #0000ff;">this</span>.BeginInvoke(<span style="color: #0000ff;">new</span> Action(() =&gt;<span style="color: #000000;">
        {
            textBox4.Text </span>= channel +<span style="color: #000000;"> message;
        }));
    }
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">bool</span> sdfsd = <span style="color: #0000ff;">true</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">private</span> <span style="color: #0000ff;">void</span> button3_Click(<span style="color: #0000ff;">object</span><span style="color: #000000;"> sender, EventArgs e)
    {</span><span style="color: #0000ff;">long</span> result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString(<span style="color: #800000;">"</span><span style="color: #800000;">yyyyyMMddhhmmssfff</span><span style="color: #800000;">"</span><span style="color: #000000;">));


            if (result == 0)
                {
                    //发送失败重新发送
                }
            else if (result == 1)
                {
                            //发送成功
                }
            else if (result == -1)
                 {
                  //连接错误 需要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                 }
        } 

 

为了简便channel:是通道的固定命令 ,可以自定义channel:后面的内容,发送就有反馈。确保所有机台都接收到。

如果有断线的需要程序自己重连,接收通道的客户端不可以再给其他的使用,Redis上说Redis client 进入订阅模式时只能接受订阅发布等命令指令,不接受普通的存取和其他命令

所以如果需要在读取、写入、发布、执行其他的指令需要使用其他客户端,否则就出错了。跑了几天了上亿次的测试貌似没有出现什么问题。

 

 

发布订阅消息不会走AOF RDB只存在于内存中,即发即用,用完就没了。没在线就没了。需要考虑使用环境。

还用ping pong来确定连接状态,也可以自定义数据,使用场景要自己开发,要适合自己的才是好的。

下载:

4.0 dll   

链接:https://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取码:js8p

 

5.8 dll不可以使用List<T>类型

链接:https://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g
提取码:bxh2

随时随地学软件编程-关注百度小程序和微信小程序
关于找一找教程网

本站文章仅代表作者观点,不代表本站立场,所有文章非营利性免费分享。
本站提供了软件编程、网站开发技术、服务器运维、人工智能等等IT技术文章,希望广大程序员努力学习,让我们用科技改变世界。
[Redis 消息中间件 ServiceStack.Redis 轻量级]http://www.zyiz.net/tech/detail-136735.html

上一篇:服务器因为redis未设置密码被挖矿分析

下一篇:对比Memcached和Redis,谁才是适合你的缓存?

赞(0)

共有 条评论 网友评论

验证码: 看不清楚?
    关注微信小程序
    程序员编程王-随时随地学编程

    扫描二维码或查找【程序员编程王】

    可以随时随地学编程啦!

    技术文章导航 更多>
    扫一扫关注最新编程教程