5 using System.Collections.Generic;
6 using System.Collections.Immutable;
8 using System.Threading;
9 using System.Threading.Tasks;
16 private readonly ImmutableDictionary<AuthenticationTicketKind, Authenticator> authenticators;
21 private long sentBytes, receivedBytes;
23 private sealed
class RemotePeer
25 public enum AuthenticationStatus
28 AuthenticationPending,
29 SuccessfullyAuthenticated
35 public readonly record
struct DisconnectInfo(
37 PeerDisconnectPacket Packet);
39 public Option<DisconnectInfo> PendingDisconnect;
40 public AuthenticationStatus AuthStatus;
42 public readonly record
struct UnauthedMessage(byte[] Bytes, int LengthBytes);
44 public readonly List<UnauthedMessage> UnauthedMessages;
50 PendingDisconnect = Option.
None;
51 AuthStatus = AuthenticationStatus.NotAuthenticated;
53 UnauthedMessages =
new List<UnauthedMessage>();
57 private readonly List<RemotePeer> remotePeers =
new();
59 public P2POwnerPeer(Callbacks callbacks,
int ownerKey, ImmutableArray<P2PEndpoint> allEndpoints) :
60 base(new
PipeEndpoint(), allEndpoints.Cast<
Endpoint>().ToImmutableArray(), callbacks, Option<int>.Some(ownerKey))
62 ServerConnection =
null;
66 var selfSteamEndpoint = allEndpoints.FirstOrNone(e => e is
SteamP2PEndpoint);
67 var selfEosEndpoint = allEndpoints.FirstOrNone(e => e is
EosP2PEndpoint);
68 var selfPrimaryEndpointOption = selfSteamEndpoint.Fallback(selfEosEndpoint);
69 if (!selfPrimaryEndpointOption.TryUnwrap(out var selfPrimaryEndpointNotNull))
71 throw new Exception(
"Could not determine endpoint for P2POwnerPeer");
73 selfPrimaryEndpoint = selfPrimaryEndpointNotNull;
80 if (isActive) {
return; }
82 initializationStep = ConnectionInitialization.AuthInfoAndVersion;
91 var socketCallbacks =
new P2PSocket.
Callbacks(OnIncomingConnection, OnConnectionClosed, OnP2PData);
93 socket = socketCreateResult.TryUnwrapSuccess(out var s)
95 :
throw new Exception($
"Failed to create dual-stack socket: {socketCreateResult}");
97 TaskPool.Add(
"P2POwnerPeer.GetAccountId",
GetAccountId(), t =>
99 if (t.TryGetResult(out Option<AccountId> accountIdOption) && accountIdOption.TryUnwrap(out var accountId))
101 selfAccountInfo = new AccountInfo(accountId);
104 if (selfAccountInfo.
IsNone)
106 Close(PeerDisconnectPacket.WithReason(DisconnectReason.AuthenticationFailed));
113 private bool OnIncomingConnection(
P2PEndpoint remoteEndpoint)
115 if (!isActive) {
return false; }
117 if (remotePeers.None(p => p.Endpoint == remoteEndpoint))
119 remotePeers.Add(
new RemotePeer(remoteEndpoint));
125 private void OnConnectionClosed(
P2PEndpoint remoteEndpoint, PeerDisconnectPacket disconnectPacket)
128 = remotePeers.Find(p => p.Endpoint == remoteEndpoint);
129 if (remotePeer is
null) {
return; }
130 CommunicatePeerDisconnectToServerProcess(
132 remotePeer.PendingDisconnect.Select(d => d.Packet).Fallback(disconnectPacket));
135 private void OnP2PData(
P2PEndpoint senderEndpoint, IReadMessage inc)
137 if (!isActive) {
return; }
139 receivedBytes += inc.LengthBytes;
141 var remotePeer = remotePeers.Find(p => p.Endpoint == senderEndpoint);
142 if (remotePeer is
null) {
return; }
143 if (remotePeer.PendingDisconnect.IsSome()) {
return; }
145 if (!INetSerializableStruct.TryRead(inc, remotePeer.AccountInfo, out PeerPacketHeaders peerPacketHeaders))
147 CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.MalformedData));
151 PacketHeader packetHeader = peerPacketHeaders.PacketHeader;
153 if (packetHeader.IsConnectionInitializationStep())
155 if (peerPacketHeaders.Initialization ==
null)
158 DebugConsole.ThrowErrorOnce(
159 $
"P2POwnerPeer.OnP2PData:{remotePeer.Endpoint.StringRepresentation}",
160 $
"Failed to initialize remote peer {remotePeer.Endpoint.StringRepresentation}: initialization step missing.");
161 CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.MalformedData));
164 ConnectionInitialization initialization = peerPacketHeaders.Initialization.Value;
165 if (initialization == ConnectionInitialization.AuthInfoAndVersion
166 && remotePeer.AuthStatus == RemotePeer.AuthenticationStatus.NotAuthenticated)
168 StartAuthTask(inc, remotePeer);
172 if (remotePeer.AuthStatus == RemotePeer.AuthenticationStatus.AuthenticationPending)
174 remotePeer.UnauthedMessages.Add(
new RemotePeer.UnauthedMessage(inc.Buffer, inc.LengthBytes));
178 IWriteMessage outMsg =
new WriteOnlyMessage();
179 outMsg.WriteNetSerializableStruct(
new P2POwnerToServerHeader
181 EndpointStr = remotePeer.Endpoint.StringRepresentation,
182 AccountInfo = remotePeer.AccountInfo
184 outMsg.WriteBytes(inc.Buffer, 0, inc.LengthBytes);
186 ForwardToServerProcess(outMsg);
190 private void StartAuthTask(IReadMessage inc, RemotePeer remotePeer)
192 remotePeer.AuthStatus = RemotePeer.AuthenticationStatus.AuthenticationPending;
194 if (!INetSerializableStruct.TryRead(inc, remotePeer.AccountInfo, out ClientAuthTicketAndVersionPacket packet))
199 if (!packet.AuthTicket.TryUnwrap(out var authenticationTicket))
204 if (!authenticators.TryGetValue(authenticationTicket.Kind, out var authenticator))
209 TaskPool.Add($
"P2POwnerPeer.VerifyRemotePeerAccountId",
210 authenticator.VerifyTicket(authenticationTicket),
213 if (!t.TryGetResult(out AccountInfo accountInfo)
214 || accountInfo.IsNone)
220 remotePeer.AccountInfo = accountInfo;
221 remotePeer.AuthStatus = RemotePeer.AuthenticationStatus.SuccessfullyAuthenticated;
222 foreach (var unauthedMessage in remotePeer.UnauthedMessages)
224 IWriteMessage msg = new WriteOnlyMessage();
225 msg.WriteNetSerializableStruct(new P2POwnerToServerHeader
227 EndpointStr = remotePeer.Endpoint.StringRepresentation,
228 AccountInfo = accountInfo
230 msg.WriteBytes(unauthedMessage.Bytes, 0, unauthedMessage.LengthBytes);
231 ForwardToServerProcess(msg);
233 remotePeer.UnauthedMessages.Clear();
238 CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.AuthenticationFailed));
242 public override void Update(
float deltaTime)
244 if (!isActive) {
return; }
246 if (ChildServerRelay.HasShutDown || ChildServerRelay.Process is not { HasExited: false })
248 Close(PeerDisconnectPacket.WithReason(DisconnectReason.ServerCrashed));
249 var msgBox =
new GUIMessageBox(TextManager.Get(
"ConnectionLost"), ChildServerRelay.CrashMessage);
250 msgBox.Buttons[0].OnClicked += (btn, obj) =>
258 if (selfAccountInfo.
IsNone) {
return; }
260 for (
int i = remotePeers.Count - 1; i >= 0; i--)
262 if (remotePeers[i].PendingDisconnect.TryUnwrap(out var pendingDisconnect) && pendingDisconnect.TimeToGiveUp < Timing.TotalTime)
264 CommunicatePeerDisconnectToServerProcess(remotePeers[i], pendingDisconnect.Packet);
268 socket?.ProcessIncomingMessages();
273 foreach (var incBuf
in ChildServerRelay.Read())
275 ChildServerRelay.DisposeLocalHandles();
276 IReadMessage inc =
new ReadOnlyMessage(incBuf,
false, 0, incBuf.Length, ServerConnection);
277 HandleServerMessage(inc);
283 if (!isActive) {
return; }
285 var recipientInfo = INetSerializableStruct.Read<P2PServerToOwnerHeader>(inc);
286 if (!recipientInfo.Endpoint.TryUnwrap(out var recipientEndpoint)) {
return; }
287 var peerPacketHeaders = INetSerializableStruct.Read<PeerPacketHeaders>(inc);
289 if (recipientEndpoint != selfPrimaryEndpoint)
291 HandleMessageForRemotePeer(peerPacketHeaders, recipientEndpoint, inc);
295 HandleMessageForOwner(peerPacketHeaders, inc);
299 private static byte[] GetRemainingBytes(IReadMessage msg)
301 return msg.Buffer[msg.BytePosition..msg.LengthBytes];
304 private void HandleMessageForRemotePeer(PeerPacketHeaders peerPacketHeaders,
P2PEndpoint recipientEndpoint, IReadMessage inc)
306 var (deliveryMethod, packetHeader, initialization) = peerPacketHeaders;
308 if (!packetHeader.IsServerMessage())
310 DebugConsole.ThrowError(
"Received non-server message meant for remote peer");
314 RemotePeer? peer = remotePeers.Find(p => p.Endpoint == recipientEndpoint);
315 if (peer is
null) {
return; }
317 if (packetHeader.IsDisconnectMessage())
319 var packet = INetSerializableStruct.Read<PeerDisconnectPacket>(inc);
320 CommunicateDisconnectToRemotePeer(peer, packet);
324 IWriteMessage outMsg =
new WriteOnlyMessage();
326 outMsg.WriteNetSerializableStruct(
new PeerPacketHeaders
328 DeliveryMethod = deliveryMethod,
329 PacketHeader = packetHeader,
330 Initialization = initialization
333 if (packetHeader.IsConnectionInitializationStep())
335 var initRelayPacket =
new P2PInitializationRelayPacket
338 Message =
new PeerPacketMessage
340 Buffer = GetRemainingBytes(inc)
344 outMsg.WriteNetSerializableStruct(initRelayPacket);
348 byte[] userMessage = GetRemainingBytes(inc);
349 outMsg.WriteBytes(userMessage, 0, userMessage.Length);
352 ForwardToRemotePeer(deliveryMethod, recipientEndpoint, outMsg);
355 private void HandleMessageForOwner(PeerPacketHeaders peerPacketHeaders, IReadMessage inc)
357 var (_, packetHeader, _) = peerPacketHeaders;
359 if (packetHeader.IsDisconnectMessage())
361 DebugConsole.ThrowError(
"Received disconnect message from owned server");
365 if (!packetHeader.IsServerMessage())
367 DebugConsole.ThrowError(
"Received non-server message from owned server");
371 if (packetHeader.IsHeartbeatMessage())
376 if (packetHeader.IsConnectionInitializationStep())
378 if (selfAccountInfo.
IsNone) {
throw new InvalidOperationException($
"Cannot initialize {nameof(P2POwnerPeer)} because {nameof(selfAccountInfo)} is not defined"); }
379 IWriteMessage outMsg =
new WriteOnlyMessage();
380 outMsg.WriteNetSerializableStruct(
new P2POwnerToServerHeader
382 EndpointStr = selfPrimaryEndpoint.StringRepresentation,
383 AccountInfo = selfAccountInfo
385 outMsg.WriteNetSerializableStruct(
new PeerPacketHeaders
387 DeliveryMethod = DeliveryMethod.Reliable,
388 PacketHeader = PacketHeader.IsConnectionInitializationStep,
389 Initialization = ConnectionInitialization.AuthInfoAndVersion
391 outMsg.WriteNetSerializableStruct(
new P2PInitializationOwnerPacket(
392 Name: GameMain.Client.Name,
393 AccountId: selfAccountInfo.
AccountId.Fallback(
default(AccountId)!)));
394 ForwardToServerProcess(outMsg);
398 OnInitializationComplete();
400 var packet = INetSerializableStruct.Read<PeerPacketMessage>(inc);
401 IReadMessage msg =
new ReadOnlyMessage(packet.Buffer, packetHeader.IsCompressed(), 0, packet.Length, ServerConnection);
402 callbacks.OnMessageReceived.Invoke(msg);
406 private void CommunicateDisconnectToRemotePeer(RemotePeer peer, PeerDisconnectPacket peerDisconnectPacket)
408 if (peer.PendingDisconnect.IsNone())
410 peer.PendingDisconnect = Option.Some(
411 new RemotePeer.DisconnectInfo(
412 Timing.TotalTime + 3f,
413 peerDisconnectPacket));
416 IWriteMessage outMsg =
new WriteOnlyMessage();
417 outMsg.WriteNetSerializableStruct(
new PeerPacketHeaders
419 DeliveryMethod = DeliveryMethod.Reliable,
420 PacketHeader = PacketHeader.IsServerMessage | PacketHeader.IsDisconnectMessage
422 outMsg.WriteNetSerializableStruct(peerDisconnectPacket);
424 ForwardToRemotePeer(DeliveryMethod.Reliable, peer.Endpoint, outMsg);
427 private void CommunicatePeerDisconnectToServerProcess(RemotePeer peer, PeerDisconnectPacket peerDisconnectPacket)
429 if (!remotePeers.Remove(peer)) {
return; }
431 IWriteMessage outMsg =
new WriteOnlyMessage();
432 outMsg.WriteNetSerializableStruct(
new P2POwnerToServerHeader
434 EndpointStr = peer.Endpoint.StringRepresentation,
435 AccountInfo = peer.AccountInfo
437 outMsg.WriteNetSerializableStruct(
new PeerPacketHeaders
439 DeliveryMethod = DeliveryMethod.Reliable,
440 PacketHeader = PacketHeader.IsDisconnectMessage
442 outMsg.WriteNetSerializableStruct(peerDisconnectPacket);
443 if (peer.AccountInfo.AccountId.TryUnwrap(out var accountId))
445 authenticators.Values.ForEach(authenticator => authenticator.EndAuthSession(accountId));
448 ForwardToServerProcess(outMsg);
450 socket?.CloseConnection(peer.Endpoint);
458 public override void Close(PeerDisconnectPacket peerDisconnectPacket)
460 if (!isActive) {
return; }
464 for (
int i = remotePeers.Count - 1; i >= 0; i--)
466 CommunicateDisconnectToRemotePeer(remotePeers[i], peerDisconnectPacket);
471 for (
int i = remotePeers.Count - 1; i >= 0; i--)
473 CommunicatePeerDisconnectToServerProcess(remotePeers[i], peerDisconnectPacket);
479 callbacks.OnDisconnect.Invoke(peerDisconnectPacket);
482 public override void Send(
IWriteMessage msg, DeliveryMethod deliveryMethod,
bool compressPastThreshold =
true)
484 if (!isActive) {
return; }
487 byte[] msgData = msg.
PrepareForSending(compressPastThreshold, out
bool isCompressed, out _);
488 msgToSend.WriteNetSerializableStruct(
new P2POwnerToServerHeader
490 EndpointStr = selfPrimaryEndpoint.StringRepresentation,
493 msgToSend.WriteNetSerializableStruct(
new PeerPacketHeaders
495 DeliveryMethod = deliveryMethod,
496 PacketHeader = isCompressed ? PacketHeader.IsCompressed : PacketHeader.None
498 msgToSend.WriteNetSerializableStruct(
new PeerPacketMessage
502 ForwardToServerProcess(msgToSend);
505 protected override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
508 throw new NotImplementedException();
511 private static void ForwardToServerProcess(
IWriteMessage msg)
515 ChildServerRelay.Write(bufToSend);
520 if (socket is
null) {
return; }
524 if (length + 4 >= MsgConstants.MTU)
526 DebugConsole.Log($
"WARNING: message length comes close to exceeding MTU, forcing reliable send ({length} bytes)");
527 deliveryMethod = DeliveryMethod.Reliable;
530 var success = socket.SendMessage(recipient, outMsg, deliveryMethod);
534 if (success) {
return; }
536 if (deliveryMethod is DeliveryMethod.Unreliable)
538 DebugConsole.Log($
"WARNING: message couldn't be sent unreliably, forcing reliable send ({length} bytes)");
539 success = socket.SendMessage(recipient, outMsg, DeliveryMethod.Reliable);
545 DebugConsole.AddWarning($
"Failed to send message to remote peer! ({length} bytes)");
551 if (SteamManager.IsInitialized) {
return SteamManager.GetSteamId().Select(
id => (AccountId)
id); }
553 if (EosInterface.IdQueries.GetLoggedInPuids() is not { Length: > 0 } puids)
557 var externalAccountIdsResult = await EosInterface.IdQueries.GetSelfExternalAccountIds(puids[0]);
558 if (!externalAccountIdsResult.TryUnwrapSuccess(out var externalAccountIds)
559 || externalAccountIds is not { Length: > 0 })
563 return Option.Some(externalAccountIds[0]);
567 public override void ForceTimeOut()
573 => ForwardToServerProcess(msg);
static ImmutableDictionary< AuthenticationTicketKind, Authenticator > GetAuthenticatorsForHost(Option< Endpoint > ownerEndpointOption)
static MainMenuScreen MainMenuScreen
readonly NetStats NetStats
void AddValue(NetStatType statType, float value)
override void SendPassword(string password)
override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
override void Send(IWriteMessage msg, DeliveryMethod deliveryMethod, bool compressPastThreshold=true)
override void Update(float deltaTime)
P2POwnerPeer(Callbacks callbacks, int ownerKey, ImmutableArray< P2PEndpoint > allEndpoints)
override async Task< Option< AccountId > > GetAccountId()
override void Close(PeerDisconnectPacket peerDisconnectPacket)
static Result< P2PSocket, Error > Create(Callbacks callbacks)
readonly record struct Callbacks(Predicate< P2PEndpoint > OnIncomingConnection, Action< P2PEndpoint, PeerDisconnectPacket > OnConnectionClosed, Action< P2PEndpoint, IReadMessage > OnData)
byte[] PrepareForSending(bool compressPastThreshold, out bool isCompressed, out int outLength)
readonly Option< AccountId > AccountId
The primary ID for a given user
static readonly AccountInfo None