ETJava Beta | Java    注册   登录
  • 搜索:
  • 基于surging的木舟IOT平台如何添加网络组件

    发表于      阅读(1)     博客类别:Crawler     转自:https://www.cnblogs.com/fanliang11/p/18384041
    如有侵权 请联系我们删除  (页面底部联系我们)  

      一 、 概述

              为了弥补代码的遗失,木舟IOT平台正在加班加点进行研发,后面不只是针对于IOT设备接入上报,告警,视频管理,组态数据可视化大屏,后面还会有快速搭建微服务平台,利用surging.cli工具根据数据库表生成微服务,中间服务,能让程序员快速完成BOSS交给的任务,从而在这个内卷的社会能占有一席之地。这些都是没有完成任务的空话,现在发此篇的目的是作者有能力开发出优秀的IOT平台,先介绍一个比较突出的功能,就是可以基于共享或者独立配置添加网络组件, 下面来介绍一下如何添加网络组件。

     

        一键运行打包成品下载:https://pan.baidu.com/s/11hcf9ieCkJxlGrzvIuxeQA?pwd=ajsr

        测试用户:fanly

        测试密码:123456

          为了让大家节约时间,能尽快运行产品看到效果,上面有 一键运行打包成品可以进行下载测试运行。

      二、如何测试运行

    以下是目录结构,

    IDE:consul 注册中心

    kayak.client: 网关

    kayak.server:微服务

    apache-skywalking-apm:skywalking链路跟踪

     以上是目录结构,大家不需要一个个运行,只需要打开运行startup.bat,如果需要测试skywalking ,只需要apache-skywalking-apm\bin\startup.bat  文件就可以了,以下是运行的界面

    三、如何添加组件

    1.添加http服务组件,

    打开平台界面,然后点击设备接入->网络组件,然后可以看到如下界面

     再点击新增组件或者编辑组件,完成后注意启动状态是关闭状态,此时并不能对于该组件功能进行访问调用,只有把启动状态打开,才能访问调用

     以上是http服务组件,启动完成后,如果设置了webservice和swagger,你可以访问webservice和swagger,看是否可以访问

     

    2.添加/编辑Tcp服务组件

    当添加/编辑Tcp组件时,设置Host:127.0.0.1 ,port:248并且还有解析方式选项,选项里面有不处理,固定长度,分隔符,自定义脚本,下面我们就来看自定义脚本

     添加脚本如下:

    parser.Fixed(4).Handler(
                      function(buffer){
                        var buf = BytesUtils.Slice(buffer,1,4);
                        parser.Fixed(buffer.ReadableBytes).Result(buf);
                 }).Handler(
                        function(buffer){parser.Fixed(8).Result(buffer);}
                ).Handler(function(buffer){
                        parser.Result('处理完成','gb2312').Complete();
                     }
                 )

    而基于TCP服务代码如下,需要继承于TcpBehavior

        internal class TcpDeviceDataService : TcpBehavior, ITcpDeviceDataService
        {
            private readonly IDeviceProvider _deviceProvider;
            public TcpDeviceDataService(IDeviceProvider deviceProvider)
            {
                _deviceProvider = deviceProvider;
            }
    
            public override void Load(string clientId, NetworkProperties tcpServerProperties)
            {
                var deviceStatus = _deviceProvider.IsConnected(clientId);
                this.Parser.HandlePayload().Subscribe(async buffer => await ParserBuffer(buffer));
            }
    
            public override void DeviceStatusProcess(DeviceStatus status, string clientId, NetworkProperties tcpServerProperties)
            {
                //throw new NotImplementedException();
            }
    
            public async Task ParserBuffer(IByteBuffer buffer)
            {
                List<string> result = new List<string>();
                while (buffer.ReadableBytes > 0)
                {
                    result.Add(buffer.ReadString(this.Parser.GetNextFixedRecordLength(),
                        Encoding.GetEncoding("gb2312")));
                }
    
                // var str= buffer.ReadString(buffer.ReadableBytes, Encoding.UTF8);
    
                var byteBuffer = Unpooled.Buffer();
                byteBuffer.WriteString("\r\n", Encoding.UTF8);
                byteBuffer.WriteString("处理完成", Encoding.GetEncoding("gb2312"));
                await Sender.SendAndFlushAsync(byteBuffer); 
                //  await Sender.SendAndFlushAsync("消息已接收",Encoding.GetEncoding("gb2312"));
                this.Parser.Close();
            }
    
            public Task<bool> ChangeDeviceStage(string deviceId)
            {
                throw new NotImplementedException();
            }
        }

    用测试Tcp调试工具结果如下

     

    3.添加/编辑UDP服务组件

    当添加/编辑UDP组件时, 设置Host:127.0.0.1 ,port:267 并且可以是否开启组播

     而基于udp服务代码如下,需要继承于UdpBehavior

      internal class UdpDeviceDataService : UdpBehavior, IUdpDeviceDataService
      {
          public Task<bool> ChangeDeviceStage(string deviceId)
          {
              throw new NotImplementedException();
          }
    
          public override async Task Dispatch(IEnumerable<byte> bytes)
          {
              await Sender.SendAndFlushAsync("\r\n", Encoding.UTF8);
              await Sender.SendAndFlushAsync("处理完成", Encoding.GetEncoding("gb2312"));
          }
      }

    测试结果如下:

    4.添加/编辑WebSocket服务组件

    当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:55

      而基于websocket服务代码如下,需要继承于WSBehavior

    internal class WSDeviceDataService : WSBehavior, IWSDeviceDataService
    {
        protected override void OnMessage(MessageEventArgs e)
        {
            this.Client.Value.SendTo($"send:{e.Data},\r\n reply:hello,welcome to you!",ID);
        }
    
        protected override void OnOpen()
        {
         
        }
    }

    测试结果如下:

    5.添加/编辑UDP服务组件

    当添加/编辑WebSocket组件时, 设置Host:127.0.0.1 ,port:345

     添加greet.proto文件,脚本如下:

    syntax = "proto3";
     
    package Greet;
     
    service Greeter {
      // Sends a greeting
      rpc ChangeDeviceStage (DeviceRequest) returns (DeviceReply) {}
    }
     
    message DeviceRequest {
      string deviceId = 1;
    }
     
    message  DeviceReply {
     bool message = 1;
    }

    然后再创建GreeterBehavior,继承Greeter.GreeterBase, IServiceBehavior,代码如下

    public partial class GreeterBehavior : Greeter.GreeterBase, IServiceBehavior
    {
        private ServerReceivedDelegate received;
        public event ServerReceivedDelegate Received
        {
            add
            {
                if (value == null)
                {
                    received += value;
                }
            }
            remove
            {
                received -= value;
            }
        }
    
        public string MessageId { get; } = Guid.NewGuid().ToString("N");
        public async Task Write(object result, int statusCode = 200, string exceptionMessage = "")
        {
            if (received == null)
                return;
            var message = new TransportMessage(MessageId, new ReactiveResultMessage
            {
                ExceptionMessage = exceptionMessage,
                StatusCode = statusCode,
                Result = result
    
            });
            await received(message);
        }
    
        public T CreateProxy<T>(string key) where T : class
        {
            return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
        }
    
        public object CreateProxy(Type type)
        {
            return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
        }
    
        public object CreateProxy(string key, Type type)
        {
            return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);
        }
    
        public T CreateProxy<T>() where T : class
        {
            return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();
        }
    
        public T GetService<T>(string key) where T : class
        {
            if (ServiceLocator.Current.IsRegisteredWithKey<T>(key))
                return ServiceLocator.GetService<T>(key);
            else
                return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key);
        }
    
        public T GetService<T>() where T : class
        {
            if (ServiceLocator.Current.IsRegistered<T>())
                return ServiceLocator.GetService<T>();
            else
                return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>();
    
        }
    
        public object GetService(Type type)
        {
            if (ServiceLocator.Current.IsRegistered(type))
                return ServiceLocator.GetService(type);
            else
                return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type);
        }
    
        public object GetService(string key, Type type)
        {
            if (ServiceLocator.Current.IsRegisteredWithKey(key, type))
                return ServiceLocator.GetService(key, type);
            else
                return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type);
    
        } 
        public void Publish(IntegrationEvent @event)
        {
            GetService<IEventBus>().Publish(@event);
        }
    
    }

     而基于grpc服务代码如下,需要继承于刚刚创建的GreeterBehavior

        public class GrpcDeviceDataService : GreeterBehavior, IGrpcDeviceDataService
        {
            public override Task<DeviceReply> ChangeDeviceStage(DeviceRequest request, ServerCallContext context)
            {
                return Task.FromResult(new DeviceReply
                {
                    Message = true
                }) ;
            }
        }

    以下是测试结果:

     6.添加/编辑MQTT服务组件

    当添加/编辑MQTT组件时, 设置Host:127.0.0.1 ,port:425

       而基于mqtt服务代码如下,需要继承于MqttBehavior

     public class MQTTDeviceDataService : MqttBehavior, IMQTTDeviceDataService
     {
         public override async Task<bool> Authorized(string username, string password)
         {
             bool result = false;
             if (username == "admin" && password == "123456")
                 result = true;
             return await Task.FromResult(result);
         }
    
         public async Task<bool> IsOnline(string deviceId)
         {
             return await base.GetDeviceIsOnine(deviceId);
         }
    
         public async Task Publish(string deviceId, WillMessage message)
         {
             var willMessage = new MqttWillMessage
             {
                 WillMessage = message.Message,
                 Qos = message.Qos,
                 Topic = message.Topic,
                 WillRetain = message.WillRetain
             };
             await Publish(deviceId, willMessage);
             await RemotePublish(deviceId, willMessage);
         }
     }

    以下是测试结果:

    三、总结

       木舟IOT平台会在github开源社区版本,可以自由更改代码,用于商业项目,但不能自营平台,如低代码平台,IOT平台等,如有违反,后果自负,还有最好不要更改命名空间,然后跟公司说是自己研发的,如果知道后,我在博客全网通报此人,以前surging相关的事件就算了,就当没发生过。,如果碰到困难,比较紧急的话,可以联系作者,加群:744677125