//------------------------------------------------------------------------------ // 此代码版权(除特别声明或在XREF结尾的命名空间的代码)归作者本人若汝棋茗所有 // 源代码使用协议遵循本仓库的开源协议及附加协议,若本仓库没有设置,则按MIT开源协议授权 // CSDN博客:https://blog.csdn.net/qq_40374647 // 哔哩哔哩视频:https://space.bilibili.com/94253567 // Gitee源代码仓库:https://gitee.com/RRQM_Home // Github源代码仓库:https://github.com/RRQM // API首页:https://www.yuque.com/rrqm/touchsocket/index // 交流QQ群:234762506 // 感谢您的下载和使用 //------------------------------------------------------------------------------ //------------------------------------------------------------------------------ using System; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using TouchSocket.Core; using TouchSocket.Resources; namespace TouchSocket.Sockets { /// /// 简单UDP会话。 /// public class UdpSession : UdpSessionBase { /// /// 当收到数据时 /// public UdpReceivedEventHandler Received { get; set; } /// /// /// /// /// /// protected override void HandleReceivedData(EndPoint remoteEndPoint, ByteBlock byteBlock, IRequestInfo requestInfo) { Received?.Invoke(remoteEndPoint, byteBlock, requestInfo); } } /// /// UDP基类服务器。 /// public class UdpSessionBase : BaseSocket, IUdpSession, IPluginObject { private readonly ConcurrentList m_socketAsyncs; private TouchSocketConfig m_config; private UdpDataHandlingAdapter m_adapter; private NetworkMonitor m_monitor; private IPHost m_remoteIPHost; private ServerState m_serverState; private bool m_usePlugin; /// /// 构造函数 /// public UdpSessionBase() { m_socketAsyncs = new ConcurrentList(); Protocol = Protocol.UDP; Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); socket.ReceiveBufferSize = BufferLength; socket.SendBufferSize = BufferLength; m_monitor = new NetworkMonitor(null, socket); } /// /// 处理未经过适配器的数据。返回值表示是否继续向下传递。 /// public Func OnHandleRawBuffer { get; set; } /// /// 处理经过适配器后的数据。返回值表示是否继续向下传递。 /// public Func OnHandleReceivedData { get; set; } /// /// /// public bool CanSend => m_serverState == ServerState.Running; /// /// /// public virtual bool CanSetDataHandlingAdapter => true; /// /// 获取配置 /// public TouchSocketConfig Config => m_config; /// /// /// public IContainer Container => m_config?.Container; /// /// /// public DateTime LastReceivedTime { get; private set; } /// /// /// public DateTime LastSendTime { get; private set; } /// /// 数据处理适配器 /// public UdpDataHandlingAdapter DataHandlingAdapter => m_adapter; /// /// 监听器 /// public NetworkMonitor Monitor => m_monitor; /// /// /// public IPluginsManager PluginsManager => m_config?.PluginsManager; /// /// /// public virtual Protocol Protocol { get; set; } /// /// 默认远程节点 /// public IPHost RemoteIPHost => m_remoteIPHost; /// /// 服务器名称 /// public string ServerName => Config?.GetValue(TouchSocketConfigExtension.ServerNameProperty); /// /// 获取服务器状态 /// public ServerState ServerState => m_serverState; /// /// 是否已启用插件 /// public bool UsePlugin => m_usePlugin; /// /// 退出组播 /// /// public void DropMulticastGroup(IPAddress multicastAddr) { if (DisposedValue) { throw new ObjectDisposedException(GetType().FullName); } if (multicastAddr is null) { throw new ArgumentNullException(nameof(multicastAddr)); } if (m_monitor.Socket.AddressFamily == AddressFamily.InterNetwork) { MulticastOption optionValue = new MulticastOption(multicastAddr); m_monitor.Socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.DropMembership, optionValue); } else { IPv6MulticastOption optionValue2 = new IPv6MulticastOption(multicastAddr); m_monitor.Socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.DropMembership, optionValue2); } } /// /// 加入组播。 /// 组播地址为 224.0.0.0 ~ 239.255.255.255,其中 224.0.0.0~224.255.255.255 不建议在用户程序中使用,因为它们一般都有特殊用途。 /// /// public void JoinMulticastGroup(IPAddress multicastAddr) { if (multicastAddr is null) { throw new ArgumentNullException(nameof(multicastAddr)); } if (DisposedValue) { throw new ObjectDisposedException(GetType().FullName); } if (m_monitor.Socket.AddressFamily == AddressFamily.InterNetwork) { MulticastOption optionValue = new MulticastOption(multicastAddr); m_monitor.Socket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, optionValue); } else { IPv6MulticastOption optionValue2 = new IPv6MulticastOption(multicastAddr); m_monitor.Socket.SetSocketOption(SocketOptionLevel.IPv6, SocketOptionName.AddMembership, optionValue2); } } /// /// 设置数据处理适配器 /// /// public virtual void SetDataHandlingAdapter(UdpDataHandlingAdapter adapter) { if (!CanSetDataHandlingAdapter) { throw new Exception($"不允许自由调用{nameof(SetDataHandlingAdapter)}进行赋值。"); } SetAdapter(adapter); } /// /// /// /// /// public IService Setup(TouchSocketConfig config) { m_config = config; if (config.IsUsePlugin) { PluginsManager.Raise(nameof(IConfigPlugin.OnLoadingConfig), this, new ConfigEventArgs(config)); } LoadConfig(m_config); if (UsePlugin) { PluginsManager.Raise(nameof(IConfigPlugin.OnLoadedConfig), this, new ConfigEventArgs(config)); } return this; } /// /// 通过端口配置 /// /// public IService Setup(int port) { TouchSocketConfig serverConfig = new TouchSocketConfig(); serverConfig.SetBindIPHost(new IPHost(port)); return Setup(serverConfig); } /// /// 启动服务 /// public IService Start() { try { if (m_serverState == ServerState.Disposed) { throw new Exception("无法重新利用已释放对象"); } switch (m_serverState) { case ServerState.None: { if (m_config.GetValue(TouchSocketConfigExtension.BindIPHostProperty) is IPHost iPHost) { BeginReceive(iPHost); } break; } case ServerState.Running: return this; case ServerState.Stopped: { if (m_config.GetValue(TouchSocketConfigExtension.BindIPHostProperty) is IPHost iPHost) { BeginReceive(iPHost); } break; } case ServerState.Disposed: { throw new Exception("无法再次利用已释放对象"); } } m_serverState = ServerState.Running; if (UsePlugin) { PluginsManager.Raise(nameof(IServicePlugin.OnStarted), this, new ServiceStateEventArgs(this.m_serverState, default)); } return this; } catch (Exception ex) { m_serverState = ServerState.Exception; if (UsePlugin) { PluginsManager.Raise(nameof(IServicePlugin.OnStarted), this, new ServiceStateEventArgs(this.m_serverState, ex) { Message = ex.Message }) ; } throw; } } /// /// 停止服务器 /// public IService Stop() { m_monitor?.Socket.Dispose(); m_monitor = null; m_serverState = ServerState.Stopped; foreach (var item in m_socketAsyncs) { item.SafeDispose(); } m_socketAsyncs.Clear(); if (UsePlugin) { PluginsManager.Raise(nameof(IServicePlugin.OnStarted), this, new ServiceStateEventArgs(this.m_serverState, default)); } return this; } /// /// /// /// protected override void Dispose(bool disposing) { if (!this.DisposedValue) { if (disposing) { m_monitor?.Socket.Dispose(); m_monitor = null; m_serverState = ServerState.Disposed; foreach (var item in m_socketAsyncs) { item.SafeDispose(); } m_socketAsyncs.Clear(); if (UsePlugin) { PluginsManager.Raise(nameof(IServicePlugin.OnStarted), this, new ServiceStateEventArgs(this.m_serverState, default)); } } } base.Dispose(disposing); } /// /// 处理已接收到的数据。 /// /// /// 以二进制流形式传递 /// 以解析的数据对象传递 protected virtual void HandleReceivedData(EndPoint remoteEndPoint, ByteBlock byteBlock, IRequestInfo requestInfo) { } /// /// 当即将发送时,如果覆盖父类方法,则不会触发插件。 /// /// /// 数据缓存区 /// 偏移 /// 长度 /// 返回值表示是否允许发送 protected virtual bool HandleSendingData(EndPoint endPoint, byte[] buffer, int offset, int length) { return true; } /// /// 加载配置 /// /// protected virtual void LoadConfig(TouchSocketConfig config) { if (config == null) { throw new Exception("配置文件为空"); } Logger = Container.Resolve(); m_remoteIPHost = config.GetValue(TouchSocketConfigExtension.RemoteIPHostProperty); BufferLength = config.GetValue(TouchSocketConfigExtension.BufferLengthProperty); m_usePlugin = config.IsUsePlugin; if (CanSetDataHandlingAdapter) { SetDataHandlingAdapter(Config.GetValue(TouchSocketConfigExtension.UdpDataHandlingAdapterProperty).Invoke()); } } /// /// 在Socket初始化对象后,Bind之前调用。 /// 可用于设置Socket参数。 /// 父类方法可覆盖。 /// /// protected virtual void PreviewBind(Socket socket) { } /// /// 设置适配器,该方法不会检验的值。 /// /// protected void SetAdapter(UdpDataHandlingAdapter adapter) { if (adapter is null) { throw new ArgumentNullException(nameof(adapter)); } if (adapter.m_owner != null) { throw new Exception("此适配器已被其他终端使用,请重新创建对象。"); } if (Config != null) { if (Config.GetValue(TouchSocketConfigExtension.MaxPackageSizeProperty) is int v1) { adapter.MaxPackageSize = v1; } } adapter.m_owner = this; adapter.ReceivedCallBack = PrivateHandleReceivedData; adapter.SendCallBack = DefaultSend; m_adapter = adapter; } private void BeginReceive(IPHost iPHost) { int threadCount = Config.GetValue(TouchSocketConfigExtension.ThreadCountProperty); threadCount = threadCount < 0 ? 1 : threadCount; Socket socket = new Socket(iPHost.AddressFamily, SocketType.Dgram, ProtocolType.Udp) { ReceiveBufferSize = BufferLength, SendBufferSize = BufferLength, EnableBroadcast = m_config.GetValue(TouchSocketConfigExtension.EnableBroadcastProperty) }; if (m_config.GetValue(TouchSocketConfigExtension.ReuseAddressProperty)) { socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); } PreviewBind(socket); #region Windows下UDP连接被重置错误10054 #if NET45_OR_GREATER const int SIP_UDP_CONNRESET = -1744830452; socket.IOControl(SIP_UDP_CONNRESET, new byte[] { 0, 0, 0, 0 }, null); #else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { const int SIP_UDP_CONNRESET = -1744830452; socket.IOControl(SIP_UDP_CONNRESET, new byte[] { 0, 0, 0, 0 }, null); } #endif #endregion socket.Bind(iPHost.EndPoint); m_monitor = new NetworkMonitor(iPHost, socket); switch (m_config.GetValue(TouchSocketConfigExtension.ReceiveTypeProperty)) { case ReceiveType.Auto: { #if NET45_OR_GREATER||NET5_0_OR_GREATER for (int i = 0; i < threadCount; i++) { SocketAsyncEventArgs eventArg = new SocketAsyncEventArgs(); m_socketAsyncs.Add(eventArg); eventArg.Completed += IO_Completed; ByteBlock byteBlock = new ByteBlock(BufferLength); eventArg.UserToken = byteBlock; eventArg.SetBuffer(byteBlock.Buffer, 0, byteBlock.Capacity); eventArg.RemoteEndPoint = iPHost.EndPoint; if (!socket.ReceiveFromAsync(eventArg)) { ProcessReceive(socket, eventArg); } } #else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) { for (int i = 0; i < threadCount; i++) { SocketAsyncEventArgs eventArg = new SocketAsyncEventArgs(); m_socketAsyncs.Add(eventArg); eventArg.Completed += IO_Completed; ByteBlock byteBlock = new ByteBlock(BufferLength); eventArg.UserToken = byteBlock; eventArg.SetBuffer(byteBlock.Buffer, 0, byteBlock.Capacity); eventArg.RemoteEndPoint = iPHost.EndPoint; if (!socket.ReceiveFromAsync(eventArg)) { ProcessReceive(socket, eventArg); } } } else { Thread thread = new Thread(Received); thread.IsBackground = true; thread.Start(); } #endif break; } default: throw new Exception("UDP中只支持Auto模式"); } } private void Received() { while (true) { ByteBlock byteBlock = new ByteBlock(); try { EndPoint endPoint = m_monitor.IPHost.EndPoint; int r = m_monitor.Socket.ReceiveFrom(byteBlock.Buffer, ref endPoint); byteBlock.SetLength(r); HandleBuffer(endPoint, byteBlock); } catch (Exception ex) { byteBlock.Dispose(); Logger.Log(LogType.Error, this, ex.Message, ex); break; } } } private void HandleBuffer(EndPoint endPoint, ByteBlock byteBlock) { try { LastReceivedTime = DateTime.Now; if (OnHandleRawBuffer?.Invoke(byteBlock) == false) { return; } if (DisposedValue) { return; } if (m_adapter == null) { Logger.Error(this, TouchSocketStatus.NullDataAdapter.GetDescription()); return; } m_adapter.ReceivedInput(endPoint, byteBlock); } catch (Exception ex) { Logger.Log(LogType.Error, this, "在处理数据时发生错误", ex); } finally { byteBlock.Dispose(); } } private void IO_Completed(object sender, SocketAsyncEventArgs e) { ProcessReceive((Socket)sender, e); } private void PrivateHandleReceivedData(EndPoint remoteEndPoint, ByteBlock byteBlock, IRequestInfo requestInfo) { if (OnHandleReceivedData?.Invoke(byteBlock, requestInfo) == false) { return; } if (m_usePlugin) { UdpReceivedDataEventArgs args = new UdpReceivedDataEventArgs(remoteEndPoint, byteBlock, requestInfo); PluginsManager.Raise(nameof(IUdpSessionPlugin.OnReceivedData), this, args); if (args.Handled) { return; } } HandleReceivedData(remoteEndPoint, byteBlock, requestInfo); } #region 向默认远程同步发送 /// /// 向默认终结点发送 /// /// /// /// public virtual void Send(byte[] buffer, int offset, int length) { if (m_remoteIPHost == null) { throw new Exception("默认终结点为空"); } Send(m_remoteIPHost.EndPoint, buffer, offset, length); } /// /// /// /// /// /// public virtual void Send(IRequestInfo requestInfo) { if (DisposedValue) { return; } if (m_adapter == null) { throw new ArgumentNullException(nameof(DataHandlingAdapter), TouchSocketStatus.NullDataAdapter.GetDescription()); } if (!m_adapter.CanSendRequestInfo) { throw new NotSupportedException($"当前适配器不支持对象发送。"); } m_adapter.SendInput(requestInfo); } #endregion 向默认远程同步发送 #region 向默认远程异步发送 /// /// IOCP发送 /// /// /// /// /// /// /// public virtual Task SendAsync(byte[] buffer, int offset, int length) { return EasyTask.Run(() => { Send(buffer, offset, length); }); } /// /// /// /// /// /// public virtual Task SendAsync(IRequestInfo requestInfo) { return EasyTask.Run(() => { Send(requestInfo); }); } #endregion 向默认远程异步发送 #region 向设置的远程同步发送 /// /// 向设置的远程同步发送 /// /// /// /// /// /// /// /// public virtual void Send(EndPoint remoteEP, byte[] buffer, int offset, int length) { m_adapter.SendInput(remoteEP, buffer, offset, length); } #endregion 向设置的远程同步发送 #region 向设置的远程异步发送 /// /// 向设置的远程异步发送 /// /// /// /// /// /// /// /// public virtual Task SendAsync(EndPoint remoteEP, byte[] buffer, int offset, int length) { return EasyTask.Run(() => { Send(remoteEP, buffer, offset, length); }); } #endregion 向设置的远程异步发送 private void ProcessReceive(Socket socket, SocketAsyncEventArgs e) { if (m_serverState == ServerState.Running && e.SocketError == SocketError.Success) { ByteBlock byteBlock = (ByteBlock)e.UserToken; byteBlock.SetLength(e.BytesTransferred); HandleBuffer(e.RemoteEndPoint, byteBlock); ByteBlock newByteBlock = new ByteBlock(BufferLength); e.UserToken = newByteBlock; e.SetBuffer(newByteBlock.Buffer, 0, newByteBlock.Buffer.Length); try { if (!socket.ReceiveFromAsync(e)) { ProcessReceive(socket, e); } } catch (System.Exception ex) { Logger.Log(LogType.Error, this, ex.Message, ex); } } else { if (e.SocketError != SocketError.Success) { Logger?.Error(this, $"接收出现错误:{e.SocketError},错误代码:{(int)e.SocketError}"); e.Dispose(); } } } #region DefaultSend /// /// /// /// /// /// public void DefaultSend(byte[] buffer, int offset, int length) { DefaultSend(m_remoteIPHost.EndPoint, buffer, offset, length); } /// /// /// /// /// /// /// public void DefaultSend(EndPoint endPoint, byte[] buffer, int offset, int length) { if (HandleSendingData(endPoint, buffer, offset, length)) { if (CanSend) { m_monitor.Socket.SendTo(buffer, offset, length, SocketFlags.None, endPoint); } LastSendTime = DateTime.Now; } } #endregion DefaultSend #region DefaultSendAsync /// /// /// /// /// /// public Task DefaultSendAsync(byte[] buffer, int offset, int length) { return EasyTask.Run(() => { DefaultSend(buffer, offset, length); }); } /// /// /// /// /// /// /// public Task DefaultSendAsync(EndPoint endPoint, byte[] buffer, int offset, int length) { return EasyTask.Run(() => { DefaultSend(buffer, offset, length); }); } #endregion DefaultSendAsync #region 组合发送 /// /// /// /// public void Send(IList> transferBytes) { Send(m_remoteIPHost.EndPoint, transferBytes); } /// /// /// /// /// public void Send(EndPoint endPoint, IList> transferBytes) { if (m_adapter == null) { throw new ArgumentNullException(nameof(DataHandlingAdapter), TouchSocketStatus.NullDataAdapter.GetDescription()); } if (!m_adapter.CanSplicingSend) { throw new NotSupportedException("该适配器不支持拼接发送"); } m_adapter.SendInput(endPoint, transferBytes); } /// /// /// /// public Task SendAsync(IList> transferBytes) { return EasyTask.Run(() => { Send(transferBytes); }); } /// /// /// /// /// public Task SendAsync(EndPoint endPoint, IList> transferBytes) { return EasyTask.Run(() => { Send(endPoint, transferBytes); }); } #endregion 组合发送 } }