Client LuaCsForBarotrauma
P2POwnerPeer.cs
1 #nullable enable
3 using Barotrauma.Steam;
4 using System;
5 using System.Collections.Generic;
6 using System.Collections.Immutable;
7 using System.Linq;
8 using System.Threading;
9 using System.Threading.Tasks;
10 
11 namespace Barotrauma.Networking
12 {
13  sealed class P2POwnerPeer : ClientPeer<PipeEndpoint>
14  {
15  private P2PSocket? socket;
16  private readonly ImmutableDictionary<AuthenticationTicketKind, Authenticator> authenticators;
17 
18  private readonly P2PEndpoint selfPrimaryEndpoint;
19  private AccountInfo selfAccountInfo;
20 
21  private long sentBytes, receivedBytes;
22 
23  private sealed class RemotePeer
24  {
25  public enum AuthenticationStatus
26  {
27  NotAuthenticated,
28  AuthenticationPending,
29  SuccessfullyAuthenticated
30  }
31 
32  public readonly P2PEndpoint Endpoint;
33  public AccountInfo AccountInfo;
34 
35  public readonly record struct DisconnectInfo(
36  double TimeToGiveUp,
37  PeerDisconnectPacket Packet);
38 
39  public Option<DisconnectInfo> PendingDisconnect;
40  public AuthenticationStatus AuthStatus;
41 
42  public readonly record struct UnauthedMessage(byte[] Bytes, int LengthBytes);
43 
44  public readonly List<UnauthedMessage> UnauthedMessages;
45 
46  public RemotePeer(P2PEndpoint endpoint)
47  {
48  Endpoint = endpoint;
50  PendingDisconnect = Option.None;
51  AuthStatus = AuthenticationStatus.NotAuthenticated;
52 
53  UnauthedMessages = new List<UnauthedMessage>();
54  }
55  }
56 
57  private readonly List<RemotePeer> remotePeers = new();
58 
59  public P2POwnerPeer(Callbacks callbacks, int ownerKey, ImmutableArray<P2PEndpoint> allEndpoints) :
60  base(new PipeEndpoint(), allEndpoints.Cast<Endpoint>().ToImmutableArray(), callbacks, Option<int>.Some(ownerKey))
61  {
62  ServerConnection = null;
63 
64  isActive = false;
65 
66  var selfSteamEndpoint = allEndpoints.FirstOrNone(e => e is SteamP2PEndpoint);
67  var selfEosEndpoint = allEndpoints.FirstOrNone(e => e is EosP2PEndpoint);
68  var selfPrimaryEndpointOption = selfSteamEndpoint.Fallback(selfEosEndpoint);
69  if (!selfPrimaryEndpointOption.TryUnwrap(out var selfPrimaryEndpointNotNull))
70  {
71  throw new Exception("Could not determine endpoint for P2POwnerPeer");
72  }
73  selfPrimaryEndpoint = selfPrimaryEndpointNotNull;
74  selfAccountInfo = AccountInfo.None;
75  authenticators = Authenticator.GetAuthenticatorsForHost(Option.Some<Endpoint>(selfPrimaryEndpoint));
76  }
77 
78  public override void Start()
79  {
80  if (isActive) { return; }
81 
82  initializationStep = ConnectionInitialization.AuthInfoAndVersion;
83 
84  ServerConnection = new PipeConnection(Option.None)
85  {
86  Status = NetworkConnectionStatus.Connected
87  };
88 
89  remotePeers.Clear();
90 
91  var socketCallbacks = new P2PSocket.Callbacks(OnIncomingConnection, OnConnectionClosed, OnP2PData);
92  var socketCreateResult = DualStackP2PSocket.Create(socketCallbacks);
93  socket = socketCreateResult.TryUnwrapSuccess(out var s)
94  ? s
95  : throw new Exception($"Failed to create dual-stack socket: {socketCreateResult}");
96 
97  TaskPool.Add("P2POwnerPeer.GetAccountId", GetAccountId(), t =>
98  {
99  if (t.TryGetResult(out Option<AccountId> accountIdOption) && accountIdOption.TryUnwrap(out var accountId))
100  {
101  selfAccountInfo = new AccountInfo(accountId);
102  }
103 
104  if (selfAccountInfo.IsNone)
105  {
106  Close(PeerDisconnectPacket.WithReason(DisconnectReason.AuthenticationFailed));
107  }
108  });
109 
110  isActive = true;
111  }
112 
113  private bool OnIncomingConnection(P2PEndpoint remoteEndpoint)
114  {
115  if (!isActive) { return false; }
116 
117  if (remotePeers.None(p => p.Endpoint == remoteEndpoint))
118  {
119  remotePeers.Add(new RemotePeer(remoteEndpoint));
120  }
121 
122  return true;
123  }
124 
125  private void OnConnectionClosed(P2PEndpoint remoteEndpoint, PeerDisconnectPacket disconnectPacket)
126  {
127  var remotePeer
128  = remotePeers.Find(p => p.Endpoint == remoteEndpoint);
129  if (remotePeer is null) { return; }
130  CommunicatePeerDisconnectToServerProcess(
131  remotePeer,
132  remotePeer.PendingDisconnect.Select(d => d.Packet).Fallback(disconnectPacket));
133  }
134 
135  private void OnP2PData(P2PEndpoint senderEndpoint, IReadMessage inc)
136  {
137  if (!isActive) { return; }
138 
139  receivedBytes += inc.LengthBytes;
140 
141  var remotePeer = remotePeers.Find(p => p.Endpoint == senderEndpoint);
142  if (remotePeer is null) { return; }
143  if (remotePeer.PendingDisconnect.IsSome()) { return; }
144 
145  if (!INetSerializableStruct.TryRead(inc, remotePeer.AccountInfo, out PeerPacketHeaders peerPacketHeaders))
146  {
147  CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.MalformedData));
148  return;
149  }
150 
151  PacketHeader packetHeader = peerPacketHeaders.PacketHeader;
152 
153  if (packetHeader.IsConnectionInitializationStep())
154  {
155  if (peerPacketHeaders.Initialization == null)
156  {
157  //can happen if the packet is crafted in a way to leave the Initialization value as null
158  DebugConsole.ThrowErrorOnce(
159  $"P2POwnerPeer.OnP2PData:{remotePeer.Endpoint.StringRepresentation}",
160  $"Failed to initialize remote peer {remotePeer.Endpoint.StringRepresentation}: initialization step missing.");
161  CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.MalformedData));
162  return;
163  }
164  ConnectionInitialization initialization = peerPacketHeaders.Initialization.Value;
165  if (initialization == ConnectionInitialization.AuthInfoAndVersion
166  && remotePeer.AuthStatus == RemotePeer.AuthenticationStatus.NotAuthenticated)
167  {
168  StartAuthTask(inc, remotePeer);
169  }
170  }
171 
172  if (remotePeer.AuthStatus == RemotePeer.AuthenticationStatus.AuthenticationPending)
173  {
174  remotePeer.UnauthedMessages.Add(new RemotePeer.UnauthedMessage(inc.Buffer, inc.LengthBytes));
175  }
176  else
177  {
178  IWriteMessage outMsg = new WriteOnlyMessage();
179  outMsg.WriteNetSerializableStruct(new P2POwnerToServerHeader
180  {
181  EndpointStr = remotePeer.Endpoint.StringRepresentation,
182  AccountInfo = remotePeer.AccountInfo
183  });
184  outMsg.WriteBytes(inc.Buffer, 0, inc.LengthBytes);
185 
186  ForwardToServerProcess(outMsg);
187  }
188  }
189 
190  private void StartAuthTask(IReadMessage inc, RemotePeer remotePeer)
191  {
192  remotePeer.AuthStatus = RemotePeer.AuthenticationStatus.AuthenticationPending;
193 
194  if (!INetSerializableStruct.TryRead(inc, remotePeer.AccountInfo, out ClientAuthTicketAndVersionPacket packet))
195  {
196  failAuth();
197  return;
198  }
199  if (!packet.AuthTicket.TryUnwrap(out var authenticationTicket))
200  {
201  failAuth();
202  return;
203  }
204  if (!authenticators.TryGetValue(authenticationTicket.Kind, out var authenticator))
205  {
206  failAuth();
207  return;
208  }
209  TaskPool.Add($"P2POwnerPeer.VerifyRemotePeerAccountId",
210  authenticator.VerifyTicket(authenticationTicket),
211  t =>
212  {
213  if (!t.TryGetResult(out AccountInfo accountInfo)
214  || accountInfo.IsNone)
215  {
216  failAuth();
217  return;
218  }
219 
220  remotePeer.AccountInfo = accountInfo;
221  remotePeer.AuthStatus = RemotePeer.AuthenticationStatus.SuccessfullyAuthenticated;
222  foreach (var unauthedMessage in remotePeer.UnauthedMessages)
223  {
224  IWriteMessage msg = new WriteOnlyMessage();
225  msg.WriteNetSerializableStruct(new P2POwnerToServerHeader
226  {
227  EndpointStr = remotePeer.Endpoint.StringRepresentation,
228  AccountInfo = accountInfo
229  });
230  msg.WriteBytes(unauthedMessage.Bytes, 0, unauthedMessage.LengthBytes);
231  ForwardToServerProcess(msg);
232  }
233  remotePeer.UnauthedMessages.Clear();
234  });
235 
236  void failAuth()
237  {
238  CommunicateDisconnectToRemotePeer(remotePeer, PeerDisconnectPacket.WithReason(DisconnectReason.AuthenticationFailed));
239  }
240  }
241 
242  public override void Update(float deltaTime)
243  {
244  if (!isActive) { return; }
245 
246  if (ChildServerRelay.HasShutDown || ChildServerRelay.Process is not { HasExited: false })
247  {
248  Close(PeerDisconnectPacket.WithReason(DisconnectReason.ServerCrashed));
249  var msgBox = new GUIMessageBox(TextManager.Get("ConnectionLost"), ChildServerRelay.CrashMessage);
250  msgBox.Buttons[0].OnClicked += (btn, obj) =>
251  {
253  return false;
254  };
255  return;
256  }
257 
258  if (selfAccountInfo.IsNone) { return; }
259 
260  for (int i = remotePeers.Count - 1; i >= 0; i--)
261  {
262  if (remotePeers[i].PendingDisconnect.TryUnwrap(out var pendingDisconnect) && pendingDisconnect.TimeToGiveUp < Timing.TotalTime)
263  {
264  CommunicatePeerDisconnectToServerProcess(remotePeers[i], pendingDisconnect.Packet);
265  }
266  }
267 
268  socket?.ProcessIncomingMessages();
269 
270  GameMain.Client?.NetStats?.AddValue(NetStats.NetStatType.ReceivedBytes, receivedBytes);
271  GameMain.Client?.NetStats?.AddValue(NetStats.NetStatType.SentBytes, sentBytes);
272 
273  foreach (var incBuf in ChildServerRelay.Read())
274  {
275  ChildServerRelay.DisposeLocalHandles();
276  IReadMessage inc = new ReadOnlyMessage(incBuf, false, 0, incBuf.Length, ServerConnection);
277  HandleServerMessage(inc);
278  }
279  }
280 
281  private void HandleServerMessage(IReadMessage inc)
282  {
283  if (!isActive) { return; }
284 
285  var recipientInfo = INetSerializableStruct.Read<P2PServerToOwnerHeader>(inc);
286  if (!recipientInfo.Endpoint.TryUnwrap(out var recipientEndpoint)) { return; }
287  var peerPacketHeaders = INetSerializableStruct.Read<PeerPacketHeaders>(inc);
288 
289  if (recipientEndpoint != selfPrimaryEndpoint)
290  {
291  HandleMessageForRemotePeer(peerPacketHeaders, recipientEndpoint, inc);
292  }
293  else
294  {
295  HandleMessageForOwner(peerPacketHeaders, inc);
296  }
297  }
298 
299  private static byte[] GetRemainingBytes(IReadMessage msg)
300  {
301  return msg.Buffer[msg.BytePosition..msg.LengthBytes];
302  }
303 
304  private void HandleMessageForRemotePeer(PeerPacketHeaders peerPacketHeaders, P2PEndpoint recipientEndpoint, IReadMessage inc)
305  {
306  var (deliveryMethod, packetHeader, initialization) = peerPacketHeaders;
307 
308  if (!packetHeader.IsServerMessage())
309  {
310  DebugConsole.ThrowError("Received non-server message meant for remote peer");
311  return;
312  }
313 
314  RemotePeer? peer = remotePeers.Find(p => p.Endpoint == recipientEndpoint);
315  if (peer is null) { return; }
316 
317  if (packetHeader.IsDisconnectMessage())
318  {
319  var packet = INetSerializableStruct.Read<PeerDisconnectPacket>(inc);
320  CommunicateDisconnectToRemotePeer(peer, packet);
321  return;
322  }
323 
324  IWriteMessage outMsg = new WriteOnlyMessage();
325 
326  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
327  {
328  DeliveryMethod = deliveryMethod,
329  PacketHeader = packetHeader,
330  Initialization = initialization
331  });
332 
333  if (packetHeader.IsConnectionInitializationStep())
334  {
335  var initRelayPacket = new P2PInitializationRelayPacket
336  {
337  LobbyID = 0,
338  Message = new PeerPacketMessage
339  {
340  Buffer = GetRemainingBytes(inc)
341  }
342  };
343 
344  outMsg.WriteNetSerializableStruct(initRelayPacket);
345  }
346  else
347  {
348  byte[] userMessage = GetRemainingBytes(inc);
349  outMsg.WriteBytes(userMessage, 0, userMessage.Length);
350  }
351 
352  ForwardToRemotePeer(deliveryMethod, recipientEndpoint, outMsg);
353  }
354 
355  private void HandleMessageForOwner(PeerPacketHeaders peerPacketHeaders, IReadMessage inc)
356  {
357  var (_, packetHeader, _) = peerPacketHeaders;
358 
359  if (packetHeader.IsDisconnectMessage())
360  {
361  DebugConsole.ThrowError("Received disconnect message from owned server");
362  return;
363  }
364 
365  if (!packetHeader.IsServerMessage())
366  {
367  DebugConsole.ThrowError("Received non-server message from owned server");
368  return;
369  }
370 
371  if (packetHeader.IsHeartbeatMessage())
372  {
373  return; //no timeout since we're using pipes, ignore this message
374  }
375 
376  if (packetHeader.IsConnectionInitializationStep())
377  {
378  if (selfAccountInfo.IsNone) { throw new InvalidOperationException($"Cannot initialize {nameof(P2POwnerPeer)} because {nameof(selfAccountInfo)} is not defined"); }
379  IWriteMessage outMsg = new WriteOnlyMessage();
380  outMsg.WriteNetSerializableStruct(new P2POwnerToServerHeader
381  {
382  EndpointStr = selfPrimaryEndpoint.StringRepresentation,
383  AccountInfo = selfAccountInfo
384  });
385  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
386  {
387  DeliveryMethod = DeliveryMethod.Reliable,
388  PacketHeader = PacketHeader.IsConnectionInitializationStep,
389  Initialization = ConnectionInitialization.AuthInfoAndVersion
390  });
391  outMsg.WriteNetSerializableStruct(new P2PInitializationOwnerPacket(
392  Name: GameMain.Client.Name,
393  AccountId: selfAccountInfo.AccountId.Fallback(default(AccountId)!)));
394  ForwardToServerProcess(outMsg);
395  }
396  else
397  {
398  OnInitializationComplete();
399 
400  var packet = INetSerializableStruct.Read<PeerPacketMessage>(inc);
401  IReadMessage msg = new ReadOnlyMessage(packet.Buffer, packetHeader.IsCompressed(), 0, packet.Length, ServerConnection);
402  callbacks.OnMessageReceived.Invoke(msg);
403  }
404  }
405 
406  private void CommunicateDisconnectToRemotePeer(RemotePeer peer, PeerDisconnectPacket peerDisconnectPacket)
407  {
408  if (peer.PendingDisconnect.IsNone())
409  {
410  peer.PendingDisconnect = Option.Some(
411  new RemotePeer.DisconnectInfo(
412  Timing.TotalTime + 3f,
413  peerDisconnectPacket));
414  }
415 
416  IWriteMessage outMsg = new WriteOnlyMessage();
417  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
418  {
419  DeliveryMethod = DeliveryMethod.Reliable,
420  PacketHeader = PacketHeader.IsServerMessage | PacketHeader.IsDisconnectMessage
421  });
422  outMsg.WriteNetSerializableStruct(peerDisconnectPacket);
423 
424  ForwardToRemotePeer(DeliveryMethod.Reliable, peer.Endpoint, outMsg);
425  }
426 
427  private void CommunicatePeerDisconnectToServerProcess(RemotePeer peer, PeerDisconnectPacket peerDisconnectPacket)
428  {
429  if (!remotePeers.Remove(peer)) { return; }
430 
431  IWriteMessage outMsg = new WriteOnlyMessage();
432  outMsg.WriteNetSerializableStruct(new P2POwnerToServerHeader
433  {
434  EndpointStr = peer.Endpoint.StringRepresentation,
435  AccountInfo = peer.AccountInfo
436  });
437  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
438  {
439  DeliveryMethod = DeliveryMethod.Reliable,
440  PacketHeader = PacketHeader.IsDisconnectMessage
441  });
442  outMsg.WriteNetSerializableStruct(peerDisconnectPacket);
443  if (peer.AccountInfo.AccountId.TryUnwrap(out var accountId))
444  {
445  authenticators.Values.ForEach(authenticator => authenticator.EndAuthSession(accountId));
446  }
447 
448  ForwardToServerProcess(outMsg);
449 
450  socket?.CloseConnection(peer.Endpoint);
451  }
452 
453  public override void SendPassword(string password)
454  {
455  //owner doesn't send passwords
456  }
457 
458  public override void Close(PeerDisconnectPacket peerDisconnectPacket)
459  {
460  if (!isActive) { return; }
461 
462  isActive = false;
463 
464  for (int i = remotePeers.Count - 1; i >= 0; i--)
465  {
466  CommunicateDisconnectToRemotePeer(remotePeers[i], peerDisconnectPacket);
467  }
468 
469  Thread.Sleep(100);
470 
471  for (int i = remotePeers.Count - 1; i >= 0; i--)
472  {
473  CommunicatePeerDisconnectToServerProcess(remotePeers[i], peerDisconnectPacket);
474  }
475 
476  socket?.Dispose();
477  socket = null;
478 
479  callbacks.OnDisconnect.Invoke(peerDisconnectPacket);
480  }
481 
482  public override void Send(IWriteMessage msg, DeliveryMethod deliveryMethod, bool compressPastThreshold = true)
483  {
484  if (!isActive) { return; }
485 
486  IWriteMessage msgToSend = new WriteOnlyMessage();
487  byte[] msgData = msg.PrepareForSending(compressPastThreshold, out bool isCompressed, out _);
488  msgToSend.WriteNetSerializableStruct(new P2POwnerToServerHeader
489  {
490  EndpointStr = selfPrimaryEndpoint.StringRepresentation,
491  AccountInfo = selfAccountInfo
492  });
493  msgToSend.WriteNetSerializableStruct(new PeerPacketHeaders
494  {
495  DeliveryMethod = deliveryMethod,
496  PacketHeader = isCompressed ? PacketHeader.IsCompressed : PacketHeader.None
497  });
498  msgToSend.WriteNetSerializableStruct(new PeerPacketMessage
499  {
500  Buffer = msgData
501  });
502  ForwardToServerProcess(msgToSend);
503  }
504 
505  protected override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
506  {
507  //not currently used by P2POwnerPeer
508  throw new NotImplementedException();
509  }
510 
511  private static void ForwardToServerProcess(IWriteMessage msg)
512  {
513  byte[] bufToSend = new byte[msg.LengthBytes];
514  msg.Buffer[..msg.LengthBytes].CopyTo(bufToSend.AsSpan());
515  ChildServerRelay.Write(bufToSend);
516  }
517 
518  private void ForwardToRemotePeer(DeliveryMethod deliveryMethod, P2PEndpoint recipient, IWriteMessage outMsg)
519  {
520  if (socket is null) { return; }
521 
522  int length = outMsg.LengthBytes;
523 
524  if (length + 4 >= MsgConstants.MTU)
525  {
526  DebugConsole.Log($"WARNING: message length comes close to exceeding MTU, forcing reliable send ({length} bytes)");
527  deliveryMethod = DeliveryMethod.Reliable;
528  }
529 
530  var success = socket.SendMessage(recipient, outMsg, deliveryMethod);
531 
532  sentBytes += length;
533 
534  if (success) { return; }
535 
536  if (deliveryMethod is DeliveryMethod.Unreliable)
537  {
538  DebugConsole.Log($"WARNING: message couldn't be sent unreliably, forcing reliable send ({length} bytes)");
539  success = socket.SendMessage(recipient, outMsg, DeliveryMethod.Reliable);
540  sentBytes += length;
541  }
542 
543  if (!success)
544  {
545  DebugConsole.AddWarning($"Failed to send message to remote peer! ({length} bytes)");
546  }
547  }
548 
549  protected override async Task<Option<AccountId>> GetAccountId()
550  {
551  if (SteamManager.IsInitialized) { return SteamManager.GetSteamId().Select(id => (AccountId)id); }
552 
553  if (EosInterface.IdQueries.GetLoggedInPuids() is not { Length: > 0 } puids)
554  {
555  return Option.None;
556  }
557  var externalAccountIdsResult = await EosInterface.IdQueries.GetSelfExternalAccountIds(puids[0]);
558  if (!externalAccountIdsResult.TryUnwrapSuccess(out var externalAccountIds)
559  || externalAccountIds is not { Length: > 0 })
560  {
561  return Option.None;
562  }
563  return Option.Some(externalAccountIds[0]);
564  }
565 
566 #if DEBUG
567  public override void ForceTimeOut()
568  {
569  //TODO: reimplement?
570  }
571 
572  public override void DebugSendRawMessage(IWriteMessage msg)
573  => ForwardToServerProcess(msg);
574 #endif
575  }
576 }
static ImmutableDictionary< AuthenticationTicketKind, Authenticator > GetAuthenticatorsForHost(Option< Endpoint > ownerEndpointOption)
static MainMenuScreen MainMenuScreen
Definition: GameMain.cs:53
static GameClient Client
Definition: GameMain.cs:188
readonly NetStats NetStats
Definition: GameClient.cs:61
void AddValue(NetStatType statType, float value)
Definition: NetStats.cs:37
override void SendPassword(string password)
override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
override void Send(IWriteMessage msg, DeliveryMethod deliveryMethod, bool compressPastThreshold=true)
override void Update(float deltaTime)
P2POwnerPeer(Callbacks callbacks, int ownerKey, ImmutableArray< P2PEndpoint > allEndpoints)
Definition: P2POwnerPeer.cs:59
override async Task< Option< AccountId > > GetAccountId()
override void Close(PeerDisconnectPacket peerDisconnectPacket)
static Result< P2PSocket, Error > Create(Callbacks callbacks)
readonly record struct Callbacks(Predicate< P2PEndpoint > OnIncomingConnection, Action< P2PEndpoint, PeerDisconnectPacket > OnConnectionClosed, Action< P2PEndpoint, IReadMessage > OnData)
byte[] PrepareForSending(bool compressPastThreshold, out bool isCompressed, out int outLength)
readonly Option< AccountId > AccountId
The primary ID for a given user
Definition: AccountInfo.cs:15
static readonly AccountInfo None
Definition: AccountInfo.cs:10