2026-05-23 21:03:20 +08:00

1669 lines
73 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Logic.CrashSight;
using Logic.Pool;
using MemoryPack;
using Steamworks;
using TH1_Logic.Net;
using UnityEngine;
namespace TH1_Logic.Steam
{
public class SimpleP2P
{
public static SimpleP2P Instance { get; } = new SimpleP2P();
// 推荐的虚拟端口范围
private static readonly int TargetPort = 1214;
private const int LargeMessageMagic = 0x31504854; // THP1
private const int LargeMessageVersion = 1;
private const int LargeMessageHeaderSize = 28;
private const int LargeMessageChunkPayloadSize = 64 * 1024;
private const int MaxLargeMessageBytes = 64 * 1024 * 1024;
private const int MaxLargeWireMessageBytes = MaxLargeMessageBytes + OrderedMessageHeaderSize;
private const float LargeMessageTimeout = 30f;
private const float OutgoingMessageSendInterval = 0.01f;
private const float OutgoingMessageLimitRetryDelay = 0.25f;
private const int MaxOutgoingMessagesPerUpdate = 8;
private const int MaxOutgoingBytesPerUpdate = 512 * 1024;
private const int OrderedMessageMagic = 0x31514f54; // TOQ1
private const int OrderedMessageVersion = 1;
private const int OrderedMessageHeaderSize = 20;
private const int MaxOutgoingQueuedBytesPerPeer = 96 * 1024 * 1024;
private const int MaxOutgoingQueuedBytesTotal = 256 * 1024 * 1024;
private const int MaxOutgoingMessageAttempts = 120;
private const float OrderedMessageGapTimeout = 30f;
private const int MaxIncomingLargeMessagesPerPeer = 4;
private const int MaxIncomingLargeBytesPerPeer = 96 * 1024 * 1024;
private const int MaxIncomingLargeBytesTotal = 192 * 1024 * 1024;
private const int MaxOrderedPendingMessagesPerPeer = 64;
private const int MaxOrderedPendingBytesPerPeer = 32 * 1024 * 1024;
// 连接映射表SteamID -> 连接句柄
private readonly Dictionary<CSteamID, HSteamNetConnection> _connections = new Dictionary<CSteamID, HSteamNetConnection>();
private readonly Dictionary<CSteamID, Dictionary<int, IncomingLargeMessage>> _incomingLargeMessages =
new Dictionary<CSteamID, Dictionary<int, IncomingLargeMessage>>();
private readonly Dictionary<CSteamID, PeerOutgoingQueue> _outgoingPeerQueues = new Dictionary<CSteamID, PeerOutgoingQueue>();
private readonly List<CSteamID> _outgoingPeerOrder = new List<CSteamID>();
private readonly Dictionary<CSteamID, ulong> _outgoingSequences = new Dictionary<CSteamID, ulong>();
private readonly Dictionary<CSteamID, IncomingOrderedState> _incomingOrderedStates = new Dictionary<CSteamID, IncomingOrderedState>();
private int _largeMessageSequence;
private int _outgoingPeerRoundRobinIndex;
private int _outgoingQueuedBytes;
// 连接超时追踪
private readonly Dictionary<CSteamID, float> _connectionTimeouts = new Dictionary<CSteamID, float>();
private const float ConnectionTimeout = 30f; // 30秒超时
// 重连尝试计数
private readonly Dictionary<CSteamID, int> _retryCount = new Dictionary<CSteamID, int>();
private const int MaxRetryCount = 3;
// 监听套接字
private float _socketRecord;
private HSteamListenSocket _listenSocket;
// 回调
private Callback<SteamNetConnectionStatusChangedCallback_t> _cbConnectionStatusChanged;
private Callback<SteamNetworkingMessagesSessionRequest_t> _cbSessionRequest;
// 事件委托
public event Action<CSteamID> OnPeerConnectedEvent;
public event Action<CSteamID> OnPeerDisconnectedEvent;
public event Action<CSteamID, byte[]> OnMessageReceivedEvent;
public event Action<CSteamID, string> OnMessageSendFailedEvent;
public event Action<string> OnConnectionErrorEvent;
public bool IsInitialized => _listenSocket != HSteamListenSocket.Invalid;
// 初始化
public void Initialize()
{
_socketRecord = 3;
_listenSocket = HSteamListenSocket.Invalid;
_cbConnectionStatusChanged = Callback<SteamNetConnectionStatusChangedCallback_t>.Create(OnConnectionStatusChanged);
// 添加:注册 SteamNetworkingMessages 会话请求回调
_cbSessionRequest = Callback<SteamNetworkingMessagesSessionRequest_t>.Create(OnSessionRequest);
}
// 添加:处理 SteamNetworkingMessages 的会话请求
private void OnSessionRequest(SteamNetworkingMessagesSessionRequest_t callback)
{
var remoteSteamID = callback.m_identityRemote.GetSteamID();
LogSystem.LogInfo($"收到来自 {remoteSteamID} 的 NetworkingMessages 会话请求");
// 接受会话请求(必须调用,否则对方发送会失败)
SteamNetworkingMessages.AcceptSessionWithUser(ref callback.m_identityRemote);
LogSystem.LogInfo($"已接受来自 {remoteSteamID} 的会话");
}
// 创建监听套接字
private void CreateListenSocket()
{
ForceCleanupPort(TargetPort);
LogSystem.LogInfo($"尝试虚拟端口 {TargetPort}...");
_listenSocket = SteamNetworkingSockets.CreateListenSocketP2P(TargetPort, 0, null);
if (_listenSocket != HSteamListenSocket.Invalid)
{
LogSystem.LogInfo($"成功在虚拟端口 {TargetPort} 创建监听套接字: {_listenSocket}");
}
}
// 定时刷新套接字创建
private void RefreshListenSocket()
{
_socketRecord += Time.deltaTime;
if (_listenSocket == HSteamListenSocket.Invalid && _socketRecord > 3f)
{
_socketRecord = 0;
SteamNetworkingUtils.GetRelayNetworkStatus(out var relayStatus);
if (relayStatus.m_eAvail == ESteamNetworkingAvailability.k_ESteamNetworkingAvailability_Current)
{
CreateListenSocket();
}
}
}
private void ForceCleanupPort(int port)
{
LogSystem.LogInfo($"强制清理虚拟端口 {port}");
// 关闭所有现有连接
DisconnectAll();
// 关闭当前监听套接字
SteamNetworkingSockets.CloseListenSocket(_listenSocket);
_listenSocket = HSteamListenSocket.Invalid;
LogSystem.LogInfo("已关闭现有监听套接字");
// 强制清理特定端口的所有连接
SteamNetworkingSockets.RunCallbacks();
}
// 连接到指定玩家
public bool ConnectToPeer(CSteamID steamID)
{
if (LobbyManager.Instance.Lobby.IsLobbyOwner()) return false;
if (_connections.TryGetValue(steamID, out var existingConnection))
{
if (TryGetConnectionState(existingConnection, out var state) && IsActiveConnectionState(state))
{
LogSystem.LogInfo($"Already connected to {steamID}");
return true;
}
LogSystem.LogWarning($"Removing stale connection before reconnecting to {steamID}");
HandleDisconnection(steamID, existingConnection);
}
// 先进行连接前检查
if (!PreConnectionCheck(steamID))
{
return false;
}
// 创建连接配置
var options = new SteamNetworkingConfigValue_t[1];
options[0] = new SteamNetworkingConfigValue_t();
options[0].m_eValue = ESteamNetworkingConfigValue.k_ESteamNetworkingConfig_TimeoutInitial;
options[0].m_eDataType = ESteamNetworkingConfigDataType.k_ESteamNetworkingConfig_Int32;
options[0].m_val.m_int32 = 15000; // 15秒超时
// 创建到目标玩家的连接
var identity = new SteamNetworkingIdentity();
identity.SetSteamID(steamID);
var connection = SteamNetworkingSockets.ConnectP2P(ref identity, TargetPort, options.Length, options);
if (connection == HSteamNetConnection.Invalid)
{
OnConnectionErrorEvent?.Invoke($"Failed to connect to {steamID}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
return false;
}
_connections[steamID] = connection;
_connectionTimeouts[steamID] = Time.time + ConnectionTimeout;
LogSystem.LogInfo($"Connecting to peer: {steamID}");
DiagnoseNetworkStatus(steamID);
return true;
}
// 连接前检查
private bool PreConnectionCheck(CSteamID targetSteamID)
{
try
{
// 检查Steam是否已登录
if (!SteamUser.BLoggedOn())
{
LogSystem.LogWarning("Steam not logged in");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.SteamLoginRequired);
return false;
}
// 检查目标用户是否在线
var friendState = SteamFriends.GetFriendPersonaState(targetSteamID);
if (friendState == EPersonaState.k_EPersonaStateOffline)
{
LogSystem.LogWarning($"Target user {targetSteamID} appears offline");
}
// 检查当前lobby信息
LogSystem.LogInfo("Checking lobby status...");
return true;
}
catch (Exception ex)
{
LogSystem.LogWarning($"Steam P2P pre-check skipped: {ex.Message}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.SteamUnavailable);
return false;
}
}
private void DiagnoseNetworkStatus(CSteamID targetSteamID)
{
try
{
LogSystem.LogInfo("=== P2P 连接诊断 ===");
// Steam 状态
LogSystem.LogInfo($"Steam 登录状态: {SteamUser.BLoggedOn()}");
LogSystem.LogInfo($"本地 Steam ID: {SteamUser.GetSteamID()}");
// 目标用户状态
var friendState = SteamFriends.GetFriendPersonaState(targetSteamID);
LogSystem.LogInfo($"目标用户状态: {friendState}");
// 检查是否是好友
var relationShip = SteamFriends.GetFriendRelationship(targetSteamID);
LogSystem.LogInfo($"好友关系: {relationShip}");
// Steam 网络状态
SteamNetworkingUtils.GetRelayNetworkStatus(out var relayStatus);
LogSystem.LogInfo($"Steam 中继网络: {relayStatus.m_eAvail}");
LogSystem.LogInfo($"中继网络调试信息: {relayStatus.m_debugMsg}");
if (relayStatus.m_eAvail != ESteamNetworkingAvailability.k_ESteamNetworkingAvailability_Current)
{
LogSystem.LogWarning($"Steam 中继网络问题: {relayStatus.m_debugMsg}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
}
}
catch (Exception ex)
{
LogSystem.LogWarning($"Steam P2P diagnose skipped: {ex.Message}");
}
}
// 更新方法 - 需要在主循环中调用
public void Update()
{
RefreshListenSocket();
CheckConnectionTimeouts();
CleanupLargeMessageTimeouts();
CleanupOrderedMessageTimeouts();
ProcessOutgoingMessageQueue();
}
// 检查连接超时
private void CheckConnectionTimeouts()
{
var currentTime = Time.time;
var timeoutList = new List<CSteamID>();
foreach (var kvp in _connectionTimeouts)
{
if (currentTime > kvp.Value)
{
timeoutList.Add(kvp.Key);
}
}
foreach (var steamID in timeoutList)
{
var reason = $"Connection to {steamID} timed out";
LogSystem.LogError(reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionTimeout);
_connectionTimeouts.Remove(steamID);
if (_connections.TryGetValue(steamID, out var conn))
{
OnMessageSendFailedEvent?.Invoke(steamID, reason);
HandleDisconnection(steamID, conn);
}
else
{
_incomingLargeMessages.Remove(steamID);
_incomingOrderedStates.Remove(steamID);
_outgoingSequences.Remove(steamID);
RemoveOutgoingMessages(steamID);
OnMessageSendFailedEvent?.Invoke(steamID, reason);
}
OnConnectionErrorEvent?.Invoke(reason);
}
}
// 断开与指定玩家的连接
public void DisconnectFromPeer(CSteamID steamID)
{
if (!_connections.TryGetValue(steamID, out var connection))
return;
SteamNetworkingSockets.CloseConnection(connection, 0, "Disconnected by user", false);
_connections.Remove(steamID);
_connectionTimeouts.Remove(steamID);
_incomingLargeMessages.Remove(steamID);
_incomingOrderedStates.Remove(steamID);
_outgoingSequences.Remove(steamID);
RemoveOutgoingMessages(steamID);
LogSystem.LogInfo($"Disconnected from peer: {steamID}");
}
// 断开所有连接
public void DisconnectAll()
{
foreach (var kvp in _connections)
{
SteamNetworkingSockets.CloseConnection(kvp.Value, 0, "Disconnecting all", false);
}
_connections.Clear();
_connectionTimeouts.Clear();
_retryCount.Clear();
_incomingLargeMessages.Clear();
_incomingOrderedStates.Clear();
_outgoingPeerQueues.Clear();
_outgoingPeerOrder.Clear();
_outgoingSequences.Clear();
_outgoingQueuedBytes = 0;
_outgoingPeerRoundRobinIndex = 0;
LogSystem.LogInfo("Disconnected from all peers");
}
// 连接状态变化回调
private void OnConnectionStatusChanged(SteamNetConnectionStatusChangedCallback_t info)
{
var state = info.m_info.m_eState;
// 获取连接信息
SteamNetworkingSockets.GetConnectionInfo(info.m_hConn, out var connectionInfo);
var remote = connectionInfo.m_identityRemote.GetSteamID();
// 只有真正连上才清除超时计时器。FindingRoute 仍可能卡死,需要保留超时保护。
if (_connectionTimeouts.ContainsKey(remote) &&
state == ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connected)
{
_connectionTimeouts.Remove(remote);
}
// 记录详细的连接信息用于诊断
LogSystem.LogInfo($"Connection details - Remote: {remote}, State: {state}");
LogSystem.LogInfo($"End reason: {info.m_info.m_eEndReason}");
switch (state)
{
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connecting:
LogSystem.LogInfo($"Connecting to {remote}...");
// 检查这是否是传入连接(对方主动连接我们)
// 如果我们的连接字典中没有这个连接,说明是对方主动连接的
if (!_connections.ContainsKey(remote) && info.m_info.m_hListenSocket != HSteamListenSocket.Invalid)
{
LogSystem.LogInfo($"检测到来自 {remote} 的传入连接请求");
// 检查是否应该接受这个连接
if (ShouldAcceptIncomingConnection(remote))
{
LogSystem.LogInfo($"接受来自 {remote} 的连接");
var result = SteamNetworkingSockets.AcceptConnection(info.m_hConn);
if (result != EResult.k_EResultOK)
{
LogSystem.LogError($"无法接受连接: {result}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
SteamNetworkingSockets.CloseConnection(info.m_hConn, 0, "Failed to accept", false);
return;
}
_connections[remote] = info.m_hConn;
}
else
{
LogSystem.LogInfo($"拒绝来自 {remote} 的连接");
SteamNetworkingSockets.CloseConnection(info.m_hConn, 0, "Connection rejected", false);
}
}
else
{
LogSystem.LogInfo($"这是我们主动发起的连接到 {remote}");
}
break;
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_FindingRoute:
LogSystem.LogInfo($"Finding route to {remote}...");
// 这个状态通常意味着正在通过Steam中继寻找路由
LogSystem.LogInfo("正在通过Steam中继网络寻找连接路径...");
break;
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connected:
LogSystem.LogInfo($"Successfully connected to {remote}");
if (!_connections.ContainsKey(remote))
{
_connections[remote] = info.m_hConn;
}
// 清除超时和重试计数
_connectionTimeouts.Remove(remote);
_retryCount.Remove(remote);
OnPeerConnectedEvent?.Invoke(remote);
break;
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_ClosedByPeer:
LogSystem.LogInfo($"Connection closed by peer: {remote}");
HandleDisconnection(remote, info.m_hConn);
break;
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_ProblemDetectedLocally:
LogSystem.LogWarning($"Connection problem detected locally: {remote}");
LogSystem.LogWarning($"Problem details: {info.m_info.m_szEndDebug}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
HandleDisconnection(remote, info.m_hConn);
break;
case ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_None:
// 检查具体的失败原因
var endReason = info.m_info.m_eEndReason;
LogSystem.LogError($"Connection failed - Reason: {endReason}");
var isTimeout = endReason == (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_Misc_Timeout
|| endReason == (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_Remote_Timeout;
NetworkPlayerTipManager.Instance.Request(isTimeout
? NetworkPlayerTipType.P2PConnectionTimeout
: NetworkPlayerTipType.P2PConnectionFailed);
// 提供更详细的错误信息
switch (endReason)
{
case (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_Misc_Timeout:
LogSystem.LogError("连接超时 - 可能的原因1.目标用户不在线 2.网络问题 3.防火墙阻止");
break;
case (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_Misc_InternalError:
LogSystem.LogError("内部错误 - Steam网络服务可能有问题");
break;
case (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_AppException_Generic:
LogSystem.LogError("应用程序异常 - 目标应用可能没有正确处理连接");
break;
case (int)ESteamNetConnectionEnd.k_ESteamNetConnectionEnd_Remote_Timeout:
LogSystem.LogError("远程超时 - 目标用户网络问题");
break;
default:
if (endReason >= 1000 && endReason < 2000)
{
LogSystem.LogError($"应用层拒绝连接 - 错误码: {endReason}可能原因1.对方未创建监听套接字 2.对方主动拒绝 3.对方游戏未运行");
}
else
{
LogSystem.LogError($"未知连接失败原因: {endReason}");
}
break;
}
HandleDisconnection(remote, info.m_hConn);
break;
}
}
// 检查是否应该接受这个连接
private bool ShouldAcceptIncomingConnection(CSteamID steamID)
{
// 基本检查确保是有效的Steam ID
if (!steamID.IsValid())
{
LogSystem.LogWarning($"接收到无效的Steam ID连接请求: {steamID}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
return false;
}
if (!IsLobbyPeer(steamID))
{
LogSystem.LogWarning($"拒绝非当前房间成员的连接请求: {steamID}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
return false;
}
// 检查是否超过最大连接数
if (_connections.Count >= 10) // 假设最大10个连接
{
LogSystem.LogWarning("已达到最大连接数,拒绝新连接");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionFailed);
return false;
}
// 检查是否是好友(可选)
var relationship = SteamFriends.GetFriendRelationship(steamID);
LogSystem.LogInfo($"连接请求来自: {steamID}, 关系: {relationship}");
return true; // 默认接受所有连接
}
private bool IsLobbyPeer(CSteamID steamID)
{
var lobby = LobbyManager.Instance.Lobby;
if (lobby == null || !lobby.IsInLobby()) return true;
var memberId = steamID.m_SteamID;
if (memberId == 0 || memberId == lobby.GetSelfMemberId()) return false;
return lobby.IsMemberInLobby(memberId);
}
private bool TryGetConnectionState(HSteamNetConnection connection, out ESteamNetworkingConnectionState state)
{
state = ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_None;
if (connection == HSteamNetConnection.Invalid) return false;
if (!SteamNetworkingSockets.GetConnectionInfo(connection, out var info)) return false;
state = info.m_eState;
return true;
}
private bool IsActiveConnectionState(ESteamNetworkingConnectionState state)
{
return state == ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connecting
|| state == ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_FindingRoute
|| state == ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connected;
}
// 处理断开连接
private void HandleDisconnection(CSteamID steamID, HSteamNetConnection connection)
{
if (_connections.ContainsKey(steamID))
{
_connections.Remove(steamID);
OnPeerDisconnectedEvent?.Invoke(steamID);
}
_connectionTimeouts.Remove(steamID);
_incomingLargeMessages.Remove(steamID);
_incomingOrderedStates.Remove(steamID);
_outgoingSequences.Remove(steamID);
RemoveOutgoingMessages(steamID);
SteamNetworkingSockets.CloseConnection(connection, 0, "", false);
}
// 发送消息到指定玩家
public bool SendTo(CSteamID target, byte[] data, bool reliable = true, bool ordered = true)
{
if (data == null || data.Length == 0)
{
LogSystem.LogWarning("Trying to send null or empty data");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
if (data.Length > MaxLargeMessageBytes)
{
var reason = $"Trying to send oversized P2P message: target={target}, bytes={data.Length}, max={MaxLargeMessageBytes}";
LogSystem.LogError(reason);
OnMessageSendFailedEvent?.Invoke(target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
if (!target.IsValid())
{
LogSystem.LogWarning($"Trying to send data to invalid Steam ID: {target}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
if (!IsLobbyPeer(target))
{
var reason = $"Skip sending message to non-lobby peer: {target}";
LogSystem.LogWarning(reason);
OnMessageSendFailedEvent?.Invoke(target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
DisconnectFromPeer(target);
return false;
}
if (!_connections.TryGetValue(target, out var conn))
{
var reason = $"No connection to {target}";
LogSystem.LogWarning(reason);
OnMessageSendFailedEvent?.Invoke(target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
if (!TryGetConnectionState(conn, out var connectionState)
|| !IsActiveConnectionState(connectionState))
{
var reason = $"Connection to {target} is not active for queueing. State: {connectionState}, bytes: {data.Length}";
LogSystem.LogWarning(reason);
OnMessageSendFailedEvent?.Invoke(target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
HandleDisconnection(target, conn);
return false;
}
var sequence = GetNextOutgoingSequence(target);
var orderedData = CreateOrderedMessage(data, sequence);
bool enqueued;
if (orderedData.Length > Constants.k_cbMaxSteamNetworkingSocketsMessageSizeSend)
{
enqueued = EnqueueLargeMessage(target, orderedData);
}
else
{
enqueued = EnqueueOutgoingMessage(target, orderedData, reliable, false);
}
if (enqueued)
{
CommitOutgoingSequence(target, sequence);
}
return enqueued;
}
public bool CanQueueMessages(IReadOnlyList<CSteamID> targets, int dataLength, out CSteamID failedTarget, out string reason)
{
failedTarget = CSteamID.Nil;
reason = string.Empty;
if (targets == null)
{
reason = "Target list is null";
return false;
}
if (dataLength <= 0 || dataLength > MaxLargeMessageBytes)
{
reason = $"Invalid P2P message size for queue preflight: bytes={dataLength}, max={MaxLargeMessageBytes}";
return false;
}
var queuedBytes = EstimateOutgoingQueuedBytes(dataLength);
var additionalTotalBytes = 0;
var additionalPeerBytes = new Dictionary<CSteamID, int>();
foreach (var target in targets)
{
if (!CanQueueMessageToPeer(target, out reason))
{
failedTarget = target;
return false;
}
additionalTotalBytes += queuedBytes;
additionalPeerBytes.TryGetValue(target, out var peerAdditional);
peerAdditional += queuedBytes;
additionalPeerBytes[target] = peerAdditional;
_outgoingPeerQueues.TryGetValue(target, out var queue);
var currentPeerQueuedBytes = queue?.QueuedBytes ?? 0;
if (currentPeerQueuedBytes + peerAdditional > MaxOutgoingQueuedBytesPerPeer)
{
failedTarget = target;
reason = $"P2P outgoing queue preflight failed: target={target}, bytes={dataLength}, peerQueued={currentPeerQueuedBytes}, peerAdditional={peerAdditional}, peerMax={MaxOutgoingQueuedBytesPerPeer}";
return false;
}
}
if (_outgoingQueuedBytes + additionalTotalBytes > MaxOutgoingQueuedBytesTotal)
{
reason = $"P2P outgoing queue preflight failed: bytes={dataLength}, totalQueued={_outgoingQueuedBytes}, additional={additionalTotalBytes}, totalMax={MaxOutgoingQueuedBytesTotal}";
return false;
}
return true;
}
private bool CanQueueMessageToPeer(CSteamID target, out string reason)
{
reason = string.Empty;
if (!target.IsValid())
{
reason = $"Invalid P2P target: {target}";
return false;
}
if (!IsLobbyPeer(target))
{
reason = $"Target is not a lobby peer: {target}";
return false;
}
if (!_connections.TryGetValue(target, out var conn))
{
reason = $"No connection to {target}";
return false;
}
if (!TryGetConnectionState(conn, out var connectionState)
|| !IsActiveConnectionState(connectionState))
{
reason = $"Connection to {target} is not active for queueing. State: {connectionState}";
return false;
}
return true;
}
private int EstimateOutgoingQueuedBytes(int dataLength)
{
var orderedLength = OrderedMessageHeaderSize + dataLength;
if (orderedLength <= Constants.k_cbMaxSteamNetworkingSocketsMessageSizeSend) return orderedLength;
var chunkCount = (orderedLength + LargeMessageChunkPayloadSize - 1) / LargeMessageChunkPayloadSize;
return orderedLength + chunkCount * LargeMessageHeaderSize;
}
private EResult SendRawToResult(CSteamID target, HSteamNetConnection conn, byte[] data, bool reliable, bool useNoNagle, bool logFailure)
{
int flags = reliable ? Constants.k_nSteamNetworkingSend_Reliable : Constants.k_nSteamNetworkingSend_Unreliable;
if (useNoNagle) flags |= Constants.k_nSteamNetworkingSend_NoNagle;
IntPtr ptr = IntPtr.Zero;
try
{
ptr = Marshal.AllocHGlobal(data.Length);
Marshal.Copy(data, 0, ptr, data.Length);
var result = SteamNetworkingSockets.SendMessageToConnection(conn, ptr, (uint)data.Length, flags, out _);
if (result != EResult.k_EResultOK)
{
if (logFailure)
LogSystem.LogError($"Failed to send message to {target}: {result}, bytes: {data.Length}, flags: {flags}, reliable: {reliable}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return result;
}
return EResult.k_EResultOK;
}
catch (Exception e)
{
LogSystem.LogError($"Exception while sending message: {e.Message}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return EResult.k_EResultFail;
}
finally
{
if (ptr != IntPtr.Zero)
Marshal.FreeHGlobal(ptr);
}
}
// 发送消息到指定玩家
public bool SendToWithOutConnect(CSteamID target, byte[] data, bool reliable = true, bool ordered = true)
{
if (data == null || data.Length == 0)
{
LogSystem.LogWarning("Trying to send null or empty data");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
// 创建目标身份标识
var identity = new SteamNetworkingIdentity();
identity.SetSteamID(target);
// 可靠传输=8不可靠传输=0
int flags = reliable ? 8 : 0;
// 如果需要有序传输 k_nSteamNetworkingSend_NoDelay (确保按序处理)
if (ordered) flags |= 1;
IntPtr ptr = IntPtr.Zero;
try
{
ptr = Marshal.AllocHGlobal(data.Length);
Marshal.Copy(data, 0, ptr, data.Length);
var result = SteamNetworkingMessages.SendMessageToUser(ref identity, ptr, (uint)data.Length, flags, 0);
if (result != EResult.k_EResultOK)
{
LogSystem.LogError($"Failed to send message to {target}: {result}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
return true;
}
catch (Exception e)
{
LogSystem.LogError($"Exception while sending message: {e.Message}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
return false;
}
finally
{
if (ptr != IntPtr.Zero)
Marshal.FreeHGlobal(ptr);
}
}
// 广播消息给所有连接的玩家
public bool Broadcast(byte[] data, bool reliable = true)
{
if (data == null || data.Length == 0)
{
LogSystem.LogWarning("Trying to broadcast null or empty data");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PBroadcastFailed);
return false;
}
var allSucceeded = true;
using var pooled = THCollectionPool.GetListHandle<CSteamID>(out var peers);
peers.AddRange(_connections.Keys);
foreach (var steamID in peers)
{
if (!IsLobbyPeer(steamID))
{
LogSystem.LogWarning($"Remove non-lobby peer before broadcast: {steamID}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PBroadcastFailed);
DisconnectFromPeer(steamID);
allSucceeded = false;
continue;
}
if (!SendTo(steamID, data, reliable))
{
allSucceeded = false;
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PBroadcastFailed);
OnMessageSendFailedEvent?.Invoke(steamID, $"Broadcast enqueue failed: bytes={data.Length}");
}
}
return allSucceeded;
}
private bool EnqueueLargeMessage(CSteamID target, byte[] data)
{
var messageId = unchecked(++_largeMessageSequence);
if (messageId == 0) messageId = unchecked(++_largeMessageSequence);
var chunkCount = (data.Length + LargeMessageChunkPayloadSize - 1) / LargeMessageChunkPayloadSize;
LogSystem.LogInfo($"Queueing large P2P message to {target}, bytes: {data.Length}, chunks: {chunkCount}");
for (int chunkIndex = 0; chunkIndex < chunkCount; chunkIndex++)
{
var offset = chunkIndex * LargeMessageChunkPayloadSize;
var payloadLength = Math.Min(LargeMessageChunkPayloadSize, data.Length - offset);
var chunk = new byte[LargeMessageHeaderSize + payloadLength];
WriteInt32(chunk, 0, LargeMessageMagic);
WriteInt32(chunk, 4, LargeMessageVersion);
WriteInt32(chunk, 8, messageId);
WriteInt32(chunk, 12, chunkIndex);
WriteInt32(chunk, 16, chunkCount);
WriteInt32(chunk, 20, data.Length);
WriteInt32(chunk, 24, payloadLength);
Buffer.BlockCopy(data, offset, chunk, LargeMessageHeaderSize, payloadLength);
if (!EnqueueOutgoingMessage(target, chunk, true, false, messageId, chunkIndex, chunkCount))
{
RemoveOutgoingMessages(target, messageId);
return false;
}
}
return true;
}
private bool EnqueueOutgoingMessage(CSteamID target, byte[] data, bool reliable, bool useNoNagle,
int messageId = 0, int chunkIndex = 0, int chunkCount = 1)
{
if (data == null || data.Length == 0) return false;
var createdQueue = false;
if (!_outgoingPeerQueues.TryGetValue(target, out var queue))
{
queue = new PeerOutgoingQueue();
_outgoingPeerQueues[target] = queue;
_outgoingPeerOrder.Add(target);
createdQueue = true;
}
if (queue.QueuedBytes + data.Length > MaxOutgoingQueuedBytesPerPeer
|| _outgoingQueuedBytes + data.Length > MaxOutgoingQueuedBytesTotal)
{
var reason = $"P2P outgoing queue limit exceeded: target={target}, bytes={data.Length}, peerQueued={queue.QueuedBytes}, totalQueued={_outgoingQueuedBytes}";
LogSystem.LogError(reason);
OnMessageSendFailedEvent?.Invoke(target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PQueueFull);
if (createdQueue)
{
RemoveOutgoingMessages(target);
}
return false;
}
queue.Messages.Enqueue(new OutgoingMessage(target, data, reliable, useNoNagle, messageId, chunkIndex, chunkCount));
queue.QueuedBytes += data.Length;
_outgoingQueuedBytes += data.Length;
return true;
}
private byte[] CreateOrderedMessage(byte[] data, ulong sequence)
{
var orderedData = new byte[OrderedMessageHeaderSize + data.Length];
WriteInt32(orderedData, 0, OrderedMessageMagic);
WriteInt32(orderedData, 4, OrderedMessageVersion);
WriteUInt64(orderedData, 8, sequence);
WriteInt32(orderedData, 16, data.Length);
Buffer.BlockCopy(data, 0, orderedData, OrderedMessageHeaderSize, data.Length);
return orderedData;
}
private ulong GetNextOutgoingSequence(CSteamID target)
{
_outgoingSequences.TryGetValue(target, out var sequence);
sequence++;
if (sequence == 0) sequence = 1;
return sequence;
}
private void CommitOutgoingSequence(CSteamID target, ulong sequence)
{
_outgoingSequences[target] = sequence;
}
// 轮询接收消息
public void PollMessages()
{
using var pooled = THCollectionPool.GetListHandle<CSteamID>(out var peers);
peers.AddRange(_connections.Keys);
// 使用快照遍历,避免处理消息时修改 _connections 导致枚举器失效
for (int i = 0; i < peers.Count; i++)
{
var steamID = peers[i];
if (!_connections.TryGetValue(steamID, out var connection))
{
continue;
}
PollMessagesForConnection(steamID, connection);
}
PollInviteMessages();
}
// 为特定连接轮询消息
private void PollMessagesForConnection(CSteamID steamID, HSteamNetConnection connection)
{
IntPtr[] messages = new IntPtr[32]; // 一次最多处理32条消息
int messageCount = SteamNetworkingSockets.ReceiveMessagesOnConnection(connection, messages, 32);
for (int i = 0; i < messageCount; i++)
{
var messagePtr = messages[i];
if (messagePtr == IntPtr.Zero) continue;
try
{
// 获取消息结构
var message = Marshal.PtrToStructure<SteamNetworkingMessage_t>(messagePtr);
// 复制数据
byte[] data = new byte[message.m_cbSize];
Marshal.Copy(message.m_pData, data, 0, message.m_cbSize);
// 触发接收事件
if (IsLargeMessageChunk(data))
{
if (TryHandleLargeMessageChunk(steamID, data, out var fullData) && fullData != null)
{
HandleReceivedConnectionPayload(steamID, fullData);
}
}
else
{
HandleReceivedConnectionPayload(steamID, data);
}
}
catch (Exception e)
{
LogSystem.LogError($"Error processing message from {steamID}: {e}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
}
finally
{
// 释放消息 - 这是关键,必须释放消息内存
SteamNetworkingMessage_t.Release(messagePtr);
}
}
}
// 非特定连接轮询消息
public void PollInviteMessages()
{
IntPtr[] messages = new IntPtr[10];
int messageCount = SteamNetworkingMessages.ReceiveMessagesOnChannel(0, messages, 10);
for (int i = 0; i < messageCount; i++)
{
var messagePtr = messages[i];
if (messagePtr == IntPtr.Zero) continue;
try
{
// 获取消息结构
var message = Marshal.PtrToStructure<SteamNetworkingMessage_t>(messagePtr);
// 复制数据
byte[] data = new byte[message.m_cbSize];
Marshal.Copy(message.m_pData, data, 0, message.m_cbSize);
// 触发接收事件
GameNetReceiver.Instance.OnMessageReceived(data);
}
catch (Exception e)
{
LogSystem.LogError($"Error processing invite message: {e}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
}
finally
{
// 释放消息 - 这是关键,必须释放消息内存
SteamNetworkingMessage_t.Release(messagePtr);
}
}
}
private void HandleReceivedConnectionPayload(CSteamID steamID, byte[] data)
{
if (TryHandleOrderedMessage(steamID, data)) return;
DeliverReceivedPayload(steamID, data);
}
private void DeliverReceivedPayload(CSteamID steamID, byte[] data)
{
byte[] payload;
try
{
payload = NetworkPayloadCodec.DecodeIfNeeded(data);
}
catch (Exception e)
{
LogSystem.LogError($"Decode P2P payload failed from {steamID}: bytes={data?.Length ?? 0}, error={e}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.NetworkMessageParseFailed);
return;
}
OnMessageReceivedEvent?.Invoke(steamID, payload);
GameNetReceiver.Instance.OnMessageReceived(payload);
}
private bool IsLargeMessageChunk(byte[] data)
{
if (data == null || data.Length < LargeMessageHeaderSize) return false;
return ReadInt32(data, 0) == LargeMessageMagic && ReadInt32(data, 4) == LargeMessageVersion;
}
private bool TryHandleOrderedMessage(CSteamID steamID, byte[] data)
{
if (data == null || data.Length < OrderedMessageHeaderSize) return false;
if (ReadInt32(data, 0) != OrderedMessageMagic || ReadInt32(data, 4) != OrderedMessageVersion) return false;
var sequence = ReadUInt64(data, 8);
var payloadLength = ReadInt32(data, 16);
if (sequence == 0 || payloadLength < 0 || payloadLength != data.Length - OrderedMessageHeaderSize)
{
LogSystem.LogWarning($"Invalid ordered P2P payload from {steamID}: sequence={sequence}, payload={payloadLength}, bytes={data.Length}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
return true;
}
var payload = new byte[payloadLength];
Buffer.BlockCopy(data, OrderedMessageHeaderSize, payload, 0, payloadLength);
if (!_incomingOrderedStates.TryGetValue(steamID, out var state))
{
state = new IncomingOrderedState();
_incomingOrderedStates[steamID] = state;
}
if (sequence < state.ExpectedSequence)
{
LogSystem.LogWarning($"Drop duplicate ordered P2P message from {steamID}: sequence={sequence}, expected={state.ExpectedSequence}");
return true;
}
if (sequence > state.ExpectedSequence)
{
if (state.PendingMessages.ContainsKey(sequence)) return true;
if (state.PendingMessages.Count >= MaxOrderedPendingMessagesPerPeer
|| state.PendingBytes + payload.Length > MaxOrderedPendingBytesPerPeer)
{
LogSystem.LogError($"Ordered P2P pending buffer full from {steamID}: sequence={sequence}, expected={state.ExpectedSequence}, pending={state.PendingMessages.Count}, bytes={state.PendingBytes}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
OnConnectionErrorEvent?.Invoke($"Ordered P2P pending buffer full from {steamID}");
if (_connections.TryGetValue(steamID, out var connection))
{
HandleDisconnection(steamID, connection);
}
return true;
}
state.PendingMessages[sequence] = payload;
state.PendingBytes += payload.Length;
if (state.GapStartTime <= 0f) state.GapStartTime = Time.time;
return true;
}
DeliverReceivedPayload(steamID, payload);
state.ExpectedSequence++;
FlushOrderedMessages(steamID, state);
state.GapStartTime = state.PendingMessages.Count > 0 ? Time.time : 0f;
return true;
}
private void FlushOrderedMessages(CSteamID steamID, IncomingOrderedState state)
{
while (state.PendingMessages.TryGetValue(state.ExpectedSequence, out var payload))
{
state.PendingMessages.Remove(state.ExpectedSequence);
state.PendingBytes -= payload.Length;
DeliverReceivedPayload(steamID, payload);
state.ExpectedSequence++;
}
}
private bool TryHandleLargeMessageChunk(CSteamID steamID, byte[] data, out byte[] fullData)
{
fullData = null;
var messageId = ReadInt32(data, 8);
var chunkIndex = ReadInt32(data, 12);
var chunkCount = ReadInt32(data, 16);
var totalLength = ReadInt32(data, 20);
var payloadLength = ReadInt32(data, 24);
var expectedChunkCount = totalLength > 0
? (totalLength + LargeMessageChunkPayloadSize - 1) / LargeMessageChunkPayloadSize
: 0;
var expectedPayloadLength = chunkIndex >= 0 && chunkIndex < expectedChunkCount
? Math.Min(LargeMessageChunkPayloadSize, totalLength - chunkIndex * LargeMessageChunkPayloadSize)
: 0;
if (messageId == 0 || chunkIndex < 0 || chunkCount <= 0 || chunkIndex >= chunkCount
|| totalLength <= 0 || totalLength > MaxLargeWireMessageBytes
|| chunkCount != expectedChunkCount
|| payloadLength <= 0
|| payloadLength != expectedPayloadLength
|| payloadLength > data.Length - LargeMessageHeaderSize)
{
return RejectIncomingLargeMessage(steamID,
$"Invalid large P2P chunk: messageId={messageId}, chunk={chunkIndex}/{chunkCount}, total={totalLength}, payload={payloadLength}");
}
if (!_incomingLargeMessages.TryGetValue(steamID, out var messages))
{
messages = new Dictionary<int, IncomingLargeMessage>();
_incomingLargeMessages[steamID] = messages;
}
if (!messages.TryGetValue(messageId, out var incoming))
{
if (messages.Count >= MaxIncomingLargeMessagesPerPeer)
{
return RejectIncomingLargeMessage(steamID,
$"Too many incoming large P2P messages, drop messageId={messageId}");
}
if (GetIncomingLargeReservedBytes(messages) + totalLength > MaxIncomingLargeBytesPerPeer)
{
return RejectIncomingLargeMessage(steamID,
$"Incoming large P2P byte budget exceeded, drop messageId={messageId}, total={totalLength}");
}
if (GetIncomingLargeReservedBytesTotal() + totalLength > MaxIncomingLargeBytesTotal)
{
return RejectIncomingLargeMessage(steamID,
$"Incoming large P2P global byte budget exceeded, drop messageId={messageId}, total={totalLength}");
}
incoming = new IncomingLargeMessage(totalLength, chunkCount);
messages[messageId] = incoming;
LogSystem.LogInfo($"Receiving large P2P message from {steamID}, bytes: {totalLength}, chunks: {chunkCount}");
}
if (!incoming.IsSameMessage(totalLength, chunkCount))
{
messages.Remove(messageId);
return RejectIncomingLargeMessage(steamID,
$"Large P2P chunk metadata changed: messageId={messageId}");
}
if (incoming.HasChunk(chunkIndex)) return true;
var payload = new byte[payloadLength];
Buffer.BlockCopy(data, LargeMessageHeaderSize, payload, 0, payloadLength);
if (!incoming.AddChunk(chunkIndex, payload, out fullData)) return true;
messages.Remove(messageId);
LogSystem.LogInfo($"Received large P2P message from {steamID}, bytes: {fullData.Length}, chunks: {chunkCount}");
return true;
}
private bool RejectIncomingLargeMessage(CSteamID steamID, string reason)
{
LogSystem.LogWarning($"Reject incoming large P2P message from {steamID}: {reason}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
OnConnectionErrorEvent?.Invoke($"Reject incoming large P2P message from {steamID}: {reason}");
if (_connections.TryGetValue(steamID, out var connection))
{
HandleDisconnection(steamID, connection);
}
else
{
_incomingLargeMessages.Remove(steamID);
_incomingOrderedStates.Remove(steamID);
}
return false;
}
private int GetIncomingLargeReservedBytes(Dictionary<int, IncomingLargeMessage> messages)
{
var total = 0;
foreach (var kv in messages)
{
total += kv.Value.TotalLength;
}
return total;
}
private int GetIncomingLargeReservedBytesTotal()
{
var total = 0;
foreach (var peerMessages in _incomingLargeMessages)
{
total += GetIncomingLargeReservedBytes(peerMessages.Value);
}
return total;
}
private void ProcessOutgoingMessageQueue()
{
if (_outgoingPeerOrder.Count == 0) return;
var peerCount = _outgoingPeerOrder.Count;
var sentCount = 0;
var sentBytes = 0;
for (int attempt = 0; attempt < peerCount; attempt++)
{
if (_outgoingPeerOrder.Count == 0) return;
if (sentCount >= MaxOutgoingMessagesPerUpdate || sentBytes >= MaxOutgoingBytesPerUpdate) return;
if (_outgoingPeerRoundRobinIndex >= _outgoingPeerOrder.Count) _outgoingPeerRoundRobinIndex = 0;
var target = _outgoingPeerOrder[_outgoingPeerRoundRobinIndex];
_outgoingPeerRoundRobinIndex = (_outgoingPeerRoundRobinIndex + 1) % _outgoingPeerOrder.Count;
if (!_outgoingPeerQueues.TryGetValue(target, out var queue) || queue.Messages.Count == 0)
{
RemoveOutgoingMessages(target);
continue;
}
if (Time.time < queue.NextSendTime) continue;
var pending = queue.Messages.Peek();
if (!IsLobbyPeer(pending.Target))
{
var nonLobbyReason = $"Drop queued P2P messages for non-lobby peer: {pending.Target}";
LogSystem.LogWarning(nonLobbyReason);
OnMessageSendFailedEvent?.Invoke(pending.Target, nonLobbyReason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
RemoveOutgoingMessages(pending.Target);
continue;
}
if (!_connections.TryGetValue(pending.Target, out var conn))
{
var missingConnectionReason = $"Drop queued P2P messages because connection is missing: {pending.Target}";
LogSystem.LogWarning(missingConnectionReason);
OnMessageSendFailedEvent?.Invoke(pending.Target, missingConnectionReason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
RemoveOutgoingMessages(pending.Target);
continue;
}
if (!TryGetConnectionState(conn, out var connectionState)
|| connectionState != ESteamNetworkingConnectionState.k_ESteamNetworkingConnectionState_Connected)
{
LogSystem.LogWarning($"Queued P2P message waiting for connected state: {pending.Target}, state: {connectionState}");
if (!IsActiveConnectionState(connectionState))
{
OnMessageSendFailedEvent?.Invoke(pending.Target,
$"Queued P2P message dropped because connection is not active: state={connectionState}, messageId: {pending.MessageId}, chunk: {pending.ChunkIndex + 1}/{pending.ChunkCount}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
HandleDisconnection(pending.Target, conn);
}
else
{
pending.Attempts++;
if (pending.Attempts >= MaxOutgoingMessageAttempts)
{
var timeoutReason = $"Queued P2P message waited too long for connected state to {pending.Target}: state={connectionState}, messageId: {pending.MessageId}, chunk: {pending.ChunkIndex + 1}/{pending.ChunkCount}, attempts: {pending.Attempts}";
LogSystem.LogError(timeoutReason);
OnMessageSendFailedEvent?.Invoke(pending.Target, timeoutReason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PConnectionTimeout);
HandleDisconnection(pending.Target, conn);
continue;
}
queue.NextSendTime = Time.time + OutgoingMessageLimitRetryDelay;
}
continue;
}
var result = SendRawToResult(pending.Target, conn, pending.Data, pending.Reliable, pending.UseNoNagle, false);
if (result == EResult.k_EResultOK)
{
var sentLength = pending.Data.Length;
DequeueOutgoingMessage(pending.Target, queue);
if (pending.IsLastLargeChunk)
LogSystem.LogInfo($"Queued large P2P message sent to {pending.Target}, messageId: {pending.MessageId}, chunks: {pending.ChunkCount}");
sentCount++;
sentBytes += sentLength;
queue.NextSendTime = Time.time + OutgoingMessageSendInterval;
continue;
}
pending.Attempts++;
if (result == EResult.k_EResultLimitExceeded || result == EResult.k_EResultIgnored)
{
if (pending.Attempts >= MaxOutgoingMessageAttempts)
{
var timeoutReason = $"Queued P2P message exceeded retry limit to {pending.Target}: {result}, messageId: {pending.MessageId}, chunk: {pending.ChunkIndex + 1}/{pending.ChunkCount}, attempts: {pending.Attempts}";
LogSystem.LogError(timeoutReason);
OnMessageSendFailedEvent?.Invoke(pending.Target, timeoutReason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
HandleDisconnection(pending.Target, conn);
continue;
}
if (pending.Attempts == 1 || pending.Attempts % 10 == 0)
LogSystem.LogWarning($"Queued P2P message send deferred to {pending.Target}: {result}, messageId: {pending.MessageId}, chunk: {pending.ChunkIndex + 1}/{pending.ChunkCount}, attempts: {pending.Attempts}");
queue.NextSendTime = Time.time + OutgoingMessageLimitRetryDelay;
continue;
}
var reason = $"Failed to send queued P2P message to {pending.Target}: {result}, messageId: {pending.MessageId}, chunk: {pending.ChunkIndex + 1}/{pending.ChunkCount}";
LogSystem.LogError(reason);
OnMessageSendFailedEvent?.Invoke(pending.Target, reason);
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PMessageSendFailed);
HandleDisconnection(pending.Target, conn);
}
}
private void RemoveOutgoingMessages(CSteamID target)
{
if (_outgoingPeerQueues.TryGetValue(target, out var queue))
{
_outgoingQueuedBytes -= queue.QueuedBytes;
_outgoingPeerQueues.Remove(target);
}
_outgoingPeerOrder.Remove(target);
if (_outgoingPeerRoundRobinIndex >= _outgoingPeerOrder.Count)
_outgoingPeerRoundRobinIndex = 0;
}
private void RemoveOutgoingMessages(CSteamID target, int messageId)
{
if (!_outgoingPeerQueues.TryGetValue(target, out var queue)) return;
var count = queue.Messages.Count;
for (int i = 0; i < count; i++)
{
var message = queue.Messages.Dequeue();
if (message.MessageId == messageId)
{
queue.QueuedBytes -= message.Data.Length;
_outgoingQueuedBytes -= message.Data.Length;
continue;
}
queue.Messages.Enqueue(message);
}
if (queue.Messages.Count == 0) RemoveOutgoingMessages(target);
}
private void DequeueOutgoingMessage(CSteamID target, PeerOutgoingQueue queue)
{
if (queue.Messages.Count == 0) return;
var message = queue.Messages.Dequeue();
queue.QueuedBytes -= message.Data.Length;
_outgoingQueuedBytes -= message.Data.Length;
if (queue.Messages.Count == 0) RemoveOutgoingMessages(target);
}
private void CleanupOrderedMessageTimeouts()
{
if (_incomingOrderedStates.Count == 0) return;
using var disconnectPeers = THCollectionPool.GetListHandle<CSteamID>(out var peersToDisconnect);
foreach (var kv in _incomingOrderedStates)
{
var state = kv.Value;
if (state.PendingMessages.Count == 0 || state.GapStartTime <= 0f) continue;
if (Time.time - state.GapStartTime < OrderedMessageGapTimeout) continue;
peersToDisconnect.Add(kv.Key);
}
foreach (var peer in peersToDisconnect)
{
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
OnConnectionErrorEvent?.Invoke($"Ordered P2P message gap timed out from {peer}");
if (_connections.TryGetValue(peer, out var connection))
{
HandleDisconnection(peer, connection);
}
else
{
_incomingOrderedStates.Remove(peer);
}
}
}
private void CleanupLargeMessageTimeouts()
{
if (_incomingLargeMessages.Count == 0) return;
using var removePeers = THCollectionPool.GetListHandle<CSteamID>(out var peersToRemove);
using var disconnectPeers = THCollectionPool.GetListHandle<CSteamID>(out var peersToDisconnect);
foreach (var peerMessages in _incomingLargeMessages)
{
using var removeMessages = THCollectionPool.GetListHandle<int>(out var messagesToRemove);
foreach (var kv in peerMessages.Value)
{
if (Time.time - kv.Value.LastUpdateTime < LargeMessageTimeout) continue;
messagesToRemove.Add(kv.Key);
}
foreach (var messageId in messagesToRemove)
{
peerMessages.Value.Remove(messageId);
LogSystem.LogWarning($"Large P2P message receive timed out from {peerMessages.Key}, messageId: {messageId}");
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
}
if (messagesToRemove.Count > 0)
{
peersToDisconnect.Add(peerMessages.Key);
}
if (peerMessages.Value.Count == 0) peersToRemove.Add(peerMessages.Key);
}
foreach (var peer in peersToRemove)
{
_incomingLargeMessages.Remove(peer);
}
foreach (var peer in peersToDisconnect)
{
NetworkPlayerTipManager.Instance.Request(NetworkPlayerTipType.P2PReceiveFailed);
OnConnectionErrorEvent?.Invoke($"Large P2P message receive timed out from {peer}");
if (_connections.TryGetValue(peer, out var connection))
{
HandleDisconnection(peer, connection);
}
else
{
_incomingOrderedStates.Remove(peer);
}
}
}
private static void WriteInt32(byte[] buffer, int offset, int value)
{
buffer[offset] = (byte)value;
buffer[offset + 1] = (byte)(value >> 8);
buffer[offset + 2] = (byte)(value >> 16);
buffer[offset + 3] = (byte)(value >> 24);
}
private static int ReadInt32(byte[] buffer, int offset)
{
return buffer[offset]
| (buffer[offset + 1] << 8)
| (buffer[offset + 2] << 16)
| (buffer[offset + 3] << 24);
}
private static void WriteUInt64(byte[] buffer, int offset, ulong value)
{
buffer[offset] = (byte)value;
buffer[offset + 1] = (byte)(value >> 8);
buffer[offset + 2] = (byte)(value >> 16);
buffer[offset + 3] = (byte)(value >> 24);
buffer[offset + 4] = (byte)(value >> 32);
buffer[offset + 5] = (byte)(value >> 40);
buffer[offset + 6] = (byte)(value >> 48);
buffer[offset + 7] = (byte)(value >> 56);
}
private static ulong ReadUInt64(byte[] buffer, int offset)
{
return buffer[offset]
| ((ulong)buffer[offset + 1] << 8)
| ((ulong)buffer[offset + 2] << 16)
| ((ulong)buffer[offset + 3] << 24)
| ((ulong)buffer[offset + 4] << 32)
| ((ulong)buffer[offset + 5] << 40)
| ((ulong)buffer[offset + 6] << 48)
| ((ulong)buffer[offset + 7] << 56);
}
private class PeerOutgoingQueue
{
public readonly Queue<OutgoingMessage> Messages = new Queue<OutgoingMessage>();
public int QueuedBytes;
public float NextSendTime;
}
private class IncomingOrderedState
{
public ulong ExpectedSequence = 1;
public readonly SortedDictionary<ulong, byte[]> PendingMessages = new SortedDictionary<ulong, byte[]>();
public int PendingBytes;
public float GapStartTime;
}
private class IncomingLargeMessage
{
private readonly byte[][] _chunks;
private readonly bool[] _received;
private int _receivedCount;
private int _receivedBytes;
public int TotalLength { get; }
public int ChunkCount { get; }
public float LastUpdateTime { get; private set; }
public IncomingLargeMessage(int totalLength, int chunkCount)
{
TotalLength = totalLength;
ChunkCount = chunkCount;
_chunks = new byte[chunkCount][];
_received = new bool[chunkCount];
LastUpdateTime = Time.time;
}
public bool IsSameMessage(int totalLength, int chunkCount)
{
return TotalLength == totalLength && ChunkCount == chunkCount;
}
public bool HasChunk(int chunkIndex)
{
return chunkIndex >= 0 && chunkIndex < _received.Length && _received[chunkIndex];
}
public bool AddChunk(int chunkIndex, byte[] payload, out byte[] fullData)
{
fullData = null;
LastUpdateTime = Time.time;
if (_received[chunkIndex]) return false;
_chunks[chunkIndex] = payload;
_received[chunkIndex] = true;
_receivedCount++;
_receivedBytes += payload.Length;
if (_receivedCount < ChunkCount) return false;
if (_receivedBytes != TotalLength)
{
LogSystem.LogWarning($"Large P2P message size mismatch: received={_receivedBytes}, expected={TotalLength}");
return false;
}
fullData = new byte[TotalLength];
var offset = 0;
for (int i = 0; i < _chunks.Length; i++)
{
var chunk = _chunks[i];
if (chunk == null)
{
fullData = null;
return false;
}
Buffer.BlockCopy(chunk, 0, fullData, offset, chunk.Length);
offset += chunk.Length;
}
return true;
}
}
private class OutgoingMessage
{
public CSteamID Target { get; }
public int MessageId { get; }
public int ChunkIndex { get; }
public int ChunkCount { get; }
public byte[] Data { get; }
public bool Reliable { get; }
public bool UseNoNagle { get; }
public bool IsLastLargeChunk => MessageId != 0 && ChunkIndex == ChunkCount - 1;
public int Attempts;
public OutgoingMessage(CSteamID target, byte[] data, bool reliable, bool useNoNagle,
int messageId = 0, int chunkIndex = 0, int chunkCount = 1)
{
Target = target;
Data = data;
Reliable = reliable;
UseNoNagle = useNoNagle;
MessageId = messageId;
ChunkIndex = chunkIndex;
ChunkCount = chunkCount;
}
}
// 获取连接状态
public bool IsConnectedTo(CSteamID steamID)
{
if (!_connections.TryGetValue(steamID, out var connection)) return false;
return TryGetConnectionState(connection, out var state) && IsActiveConnectionState(state);
}
// 获取所有连接的玩家
public IEnumerable<CSteamID> GetConnectedPeers()
{
return _connections.Keys;
}
// 获取连接数量
public int GetConnectionCount()
{
return _connections.Count;
}
// 清理资源
public void Cleanup()
{
DisconnectAll();
if (_listenSocket != HSteamListenSocket.Invalid)
{
SteamNetworkingSockets.CloseListenSocket(_listenSocket);
_listenSocket = HSteamListenSocket.Invalid;
}
_cbConnectionStatusChanged?.Dispose();
_cbConnectionStatusChanged = null;
_cbSessionRequest?.Dispose();
_cbSessionRequest = null;
}
// 获取详细的连接信息用于调试
public void LogDetailedConnectionInfo(CSteamID steamID)
{
if (!_connections.TryGetValue(steamID, out var connection))
{
LogSystem.LogWarning($"No connection found for {steamID}");
return;
}
if (SteamNetworkingSockets.GetConnectionInfo(connection, out var info))
{
LogSystem.LogInfo("=== 详细连接信息 ===");
LogSystem.LogInfo($"连接状态: {info.m_eState}");
LogSystem.LogInfo($"结束原因: {info.m_eEndReason}");
LogSystem.LogInfo($"用户数据: {info.m_nUserData}");
LogSystem.LogInfo($"监听套接字: {info.m_hListenSocket}");
LogSystem.LogInfo($"远程地址: {info.m_addrRemote}");
LogSystem.LogInfo($"连接描述: {info.m_szConnectionDescription}");
LogSystem.LogInfo($"结束调试信息: {info.m_szEndDebug}");
LogSystem.LogInfo("===================");
}
}
}
}