Barotrauma Client Doc
SteamP2POwnerPeer.cs
1 #nullable enable
2 using Barotrauma.Steam;
3 using System;
4 using System.Collections.Generic;
5 using System.Threading;
7 
8 namespace Barotrauma.Networking
9 {
10  sealed class SteamP2POwnerPeer : ClientPeer
11  {
12  private readonly SteamId selfSteamID;
13  private UInt64 ownerKey64 => unchecked((UInt64)ownerKey.Fallback(0));
14 
15  private SteamId ReadSteamId(IReadMessage inc) => new SteamId(inc.ReadUInt64() ^ ownerKey64);
16  private void WriteSteamId(IWriteMessage msg, SteamId val) => msg.WriteUInt64(val.Value ^ ownerKey64);
17 
18  private long sentBytes, receivedBytes;
19 
20  private sealed class RemotePeer
21  {
22  public readonly SteamId SteamId;
23  public Option<SteamId> OwnerSteamId;
24  public double? DisconnectTime;
25  public bool Authenticating;
26  public bool Authenticated;
27 
28  public readonly struct UnauthedMessage
29  {
30  public readonly SteamId Sender;
31  public readonly byte[] Bytes;
32  public readonly int Length;
33 
34  public UnauthedMessage(SteamId sender, byte[] bytes)
35  {
36  Sender = sender;
37  Bytes = bytes;
38  Length = bytes.Length;
39  }
40  }
41 
42  public readonly List<UnauthedMessage> UnauthedMessages;
43 
44  public RemotePeer(SteamId steamId)
45  {
46  SteamId = steamId;
47  OwnerSteamId = Option<SteamId>.None();
48  DisconnectTime = null;
49  Authenticating = false;
50  Authenticated = false;
51 
52  UnauthedMessages = new List<UnauthedMessage>();
53  }
54  }
55 
56  private List<RemotePeer> remotePeers = null!;
57 
58  public SteamP2POwnerPeer(Callbacks callbacks, int ownerKey) : base(new PipeEndpoint(), callbacks, Option<int>.Some(ownerKey))
59  {
60  ServerConnection = null;
61 
62  isActive = false;
63 
64  selfSteamID = SteamManager.GetSteamId().TryUnwrap(out var steamId)
65  ? steamId
66  : throw new InvalidOperationException("Steamworks not initialized");
67  }
68 
69 
70  public override void Start()
71  {
72  if (isActive) { return; }
73 
74  initializationStep = ConnectionInitialization.SteamTicketAndVersion;
75 
76  ServerConnection = new PipeConnection(selfSteamID)
77  {
78  Status = NetworkConnectionStatus.Connected
79  };
80 
81  remotePeers = new List<RemotePeer>();
82 
83  Steamworks.SteamNetworking.ResetActions();
84  Steamworks.SteamNetworking.OnP2PSessionRequest = OnIncomingConnection;
85  Steamworks.SteamUser.OnValidateAuthTicketResponse += OnAuthChange;
86 
87  Steamworks.SteamNetworking.AllowP2PPacketRelay(true);
88 
89  isActive = true;
90  }
91 
92  private void OnAuthChange(Steamworks.SteamId steamId, Steamworks.SteamId ownerId, Steamworks.AuthResponse status)
93  {
94  RemotePeer? remotePeer = remotePeers.Find(p => p.SteamId.Value == steamId);
95 
96  if (remotePeer == null) { return; }
97 
98  if (status == Steamworks.AuthResponse.OK)
99  {
100  if (remotePeer.Authenticated) { return; }
101 
102  SteamId ownerSteamId = new SteamId(ownerId);
103  remotePeer.OwnerSteamId = Option<SteamId>.Some(ownerSteamId);
104  remotePeer.Authenticated = true;
105  remotePeer.Authenticating = false;
106  foreach (var unauthedMessage in remotePeer.UnauthedMessages)
107  {
108  IWriteMessage msg = new WriteOnlyMessage();
109  WriteSteamId(msg, unauthedMessage.Sender);
110  WriteSteamId(msg, ownerSteamId);
111  msg.WriteBytes(unauthedMessage.Bytes, 0, unauthedMessage.Length);
112  ForwardToServerProcess(msg);
113  }
114 
115  remotePeer.UnauthedMessages.Clear();
116  }
117  else
118  {
119  DisconnectPeer(remotePeer, PeerDisconnectPacket.SteamAuthError(status));
120  }
121  }
122 
123  private void OnIncomingConnection(Steamworks.SteamId steamId)
124  {
125  if (!isActive) { return; }
126 
127  if (remotePeers.None(p => p.SteamId.Value == steamId))
128  {
129  remotePeers.Add(new RemotePeer(new SteamId(steamId)));
130  }
131 
132  Steamworks.SteamNetworking.AcceptP2PSessionWithUser(steamId); //accept all connections, the server will figure things out later
133  }
134 
135  private void OnP2PData(ulong steamId, IReadMessage inc)
136  {
137  if (!isActive) { return; }
138 
139  RemotePeer? remotePeer = remotePeers.Find(p => p.SteamId.Value == steamId);
140  if (remotePeer == null) { return; }
141 
142  if (remotePeer.DisconnectTime != null) { return; }
143 
144  try
145  {
146  ProcessP2PData(steamId, remotePeer, inc);
147  }
148  catch (Exception e)
149  {
150  string errorMsg = $"Server failed to read an incoming P2P message. {{{e}}}\n{e.StackTrace.CleanupStackTrace()}";
151  GameAnalyticsManager.AddErrorEventOnce($"SteamP2POwnerPeer.OnP2PData:OwnerReadException{e.TargetSite}", GameAnalyticsManager.ErrorSeverity.Error, errorMsg);
152 #if DEBUG
153  DebugConsole.ThrowError(errorMsg);
154 #else
155  if (GameSettings.CurrentConfig.VerboseLogging) { DebugConsole.ThrowError(errorMsg); }
156 #endif
157  }
158  }
159 
160  private void ProcessP2PData(ulong steamId, RemotePeer remotePeer, IReadMessage inc)
161  {
162  var (deliveryMethod, packetHeader, connectionInitialization) = INetSerializableStruct.Read<PeerPacketHeaders>(inc);
163 
164  if (remotePeer is { Authenticated: false, Authenticating: false } && packetHeader.IsConnectionInitializationStep())
165  {
166  remotePeer.DisconnectTime = null;
167 
168  ConnectionInitialization initialization = connectionInitialization ?? throw new Exception("Initialization step missing");
169  if (initialization == ConnectionInitialization.SteamTicketAndVersion)
170  {
171  remotePeer.Authenticating = true;
172 
173  var packet = INetSerializableStruct.Read<ClientSteamTicketAndVersionPacket>(inc);
174 
175  packet.SteamAuthTicket.TryUnwrap(out var ticket);
176 
177  Steamworks.BeginAuthResult authSessionStartState = SteamManager.StartAuthSession(ticket, steamId);
178  if (authSessionStartState != Steamworks.BeginAuthResult.OK)
179  {
180  DisconnectPeer(remotePeer, PeerDisconnectPacket.SteamAuthError(authSessionStartState));
181  return;
182  }
183  }
184  }
185 
186  var steamUserId = new SteamId(steamId);
187  if (remotePeer.Authenticating)
188  {
189  remotePeer.UnauthedMessages.Add(new RemotePeer.UnauthedMessage(steamUserId, inc.Buffer));
190  }
191  else
192  {
193  IWriteMessage outMsg = new WriteOnlyMessage();
194  WriteSteamId(outMsg, steamUserId);
195  WriteSteamId(outMsg, remotePeer.OwnerSteamId.Fallback(steamUserId));
196  outMsg.WriteBytes(inc.Buffer, 0, inc.LengthBytes);
197 
198  ForwardToServerProcess(outMsg);
199  }
200 
201  }
202 
203  public override void Update(float deltaTime)
204  {
205  if (!isActive) { return; }
206 
207  if (ChildServerRelay.HasShutDown || !ChildServerRelay.IsProcessAlive)
208  {
209  var gameClient = GameMain.Client;
210  Close(PeerDisconnectPacket.WithReason(DisconnectReason.ServerCrashed));
211  gameClient?.CreateServerCrashMessage();
212  return;
213  }
214 
215  for (int i = remotePeers.Count - 1; i >= 0; i--)
216  {
217  if (remotePeers[i].DisconnectTime != null && remotePeers[i].DisconnectTime < Timing.TotalTime)
218  {
219  ClosePeerSession(remotePeers[i]);
220  }
221  }
222 
223  for (int i = 0; i < 100; i++)
224  {
225  if (!Steamworks.SteamNetworking.IsP2PPacketAvailable()) { break; }
226 
227  var packet = Steamworks.SteamNetworking.ReadP2PPacket();
228  if (packet is { SteamId: var steamId, Data: var data })
229  {
230  OnP2PData(steamId, new ReadWriteMessage(data, 0, data.Length * 8, false));
231  receivedBytes += data.Length;
232  }
233  }
234 
235  GameMain.Client?.NetStats?.AddValue(NetStats.NetStatType.ReceivedBytes, receivedBytes);
236  GameMain.Client?.NetStats?.AddValue(NetStats.NetStatType.SentBytes, sentBytes);
237 
238  while (ChildServerRelay.Read(out byte[] incBuf))
239  {
240  ChildServerRelay.DisposeLocalHandles();
241  IReadMessage inc = new ReadOnlyMessage(incBuf, false, 0, incBuf.Length, ServerConnection);
242  HandleDataMessage(inc);
243  }
244  }
245 
246  private void HandleDataMessage(IReadMessage inc)
247  {
248  if (!isActive) { return; }
249 
250  SteamId recipientSteamId = ReadSteamId(inc);
251 
252  var peerPacketHeaders = INetSerializableStruct.Read<PeerPacketHeaders>(inc);
253 
254  if (recipientSteamId != selfSteamID)
255  {
256  HandleMessageForRemotePeer(peerPacketHeaders, recipientSteamId, inc);
257  }
258  else
259  {
260  HandleMessageForOwner(peerPacketHeaders, inc);
261  }
262  }
263 
264  private static byte[] GetRemainingBytes(IReadMessage msg)
265  {
266  return msg.Buffer[msg.BytePosition..msg.LengthBytes];
267  }
268 
269  private void HandleMessageForRemotePeer(PeerPacketHeaders peerPacketHeaders, SteamId recipientSteamId, IReadMessage inc)
270  {
271  var (deliveryMethod, packetHeader, initialization) = peerPacketHeaders;
272 
273  if (!packetHeader.IsServerMessage())
274  {
275  DebugConsole.ThrowError("Received non-server message meant for remote peer");
276  return;
277  }
278 
279  RemotePeer? peer = remotePeers.Find(p => p.SteamId == recipientSteamId);
280  if (peer is null) { return; }
281 
282  if (packetHeader.IsDisconnectMessage())
283  {
284  var packet = INetSerializableStruct.Read<PeerDisconnectPacket>(inc);
285  DisconnectPeer(peer, packet);
286  return;
287  }
288 
289  IWriteMessage outMsg = new WriteOnlyMessage();
290 
291  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
292  {
293  DeliveryMethod = deliveryMethod,
294  PacketHeader = packetHeader,
295  Initialization = initialization
296  });
297 
298  if (packetHeader.IsConnectionInitializationStep())
299  {
300  var initRelayPacket = new SteamP2PInitializationRelayPacket
301  {
302  LobbyID = SteamManager.CurrentLobbyID,
303  Message = new PeerPacketMessage
304  {
305  Buffer = GetRemainingBytes(inc)
306  }
307  };
308 
309  outMsg.WriteNetSerializableStruct(initRelayPacket);
310  }
311  else
312  {
313  byte[] userMessage = GetRemainingBytes(inc);
314  outMsg.WriteBytes(userMessage, 0, userMessage.Length);
315  }
316 
317  ForwardToRemotePeer(deliveryMethod, recipientSteamId, outMsg);
318  }
319 
320  private void HandleMessageForOwner(PeerPacketHeaders peerPacketHeaders, IReadMessage inc)
321  {
322  var (_, packetHeader, _) = peerPacketHeaders;
323 
324  if (packetHeader.IsDisconnectMessage())
325  {
326  DebugConsole.ThrowError("Received disconnect message from owned server");
327  return;
328  }
329 
330  if (!packetHeader.IsServerMessage())
331  {
332  DebugConsole.ThrowError("Received non-server message from owned server");
333  return;
334  }
335 
336  if (packetHeader.IsHeartbeatMessage())
337  {
338  return; //no timeout since we're using pipes, ignore this message
339  }
340 
341  if (packetHeader.IsConnectionInitializationStep())
342  {
343  IWriteMessage outMsg = new WriteOnlyMessage();
344  WriteSteamId(outMsg, selfSteamID);
345  WriteSteamId(outMsg, selfSteamID);
346  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
347  {
348  DeliveryMethod = DeliveryMethod.Reliable,
349  PacketHeader = PacketHeader.IsConnectionInitializationStep,
350  Initialization = ConnectionInitialization.SteamTicketAndVersion
351  });
352  outMsg.WriteNetSerializableStruct(new SteamP2PInitializationOwnerPacket
353  {
354  OwnerName = GameMain.Client.Name
355  });
356  ForwardToServerProcess(outMsg);
357  }
358  else
359  {
360  if (initializationStep != ConnectionInitialization.Success)
361  {
362  callbacks.OnInitializationComplete.Invoke();
363  initializationStep = ConnectionInitialization.Success;
364  }
365 
366  PeerPacketMessage packet = INetSerializableStruct.Read<PeerPacketMessage>(inc);
367  IReadMessage msg = new ReadOnlyMessage(packet.Buffer, packetHeader.IsCompressed(), 0, packet.Length, ServerConnection);
368  callbacks.OnMessageReceived.Invoke(msg);
369  }
370  }
371 
372  private void DisconnectPeer(RemotePeer peer, PeerDisconnectPacket peerDisconnectPacket)
373  {
374  peer.DisconnectTime ??= Timing.TotalTime + 1.0;
375 
376  IWriteMessage outMsg = new WriteOnlyMessage();
377  outMsg.WriteNetSerializableStruct(new PeerPacketHeaders
378  {
379  DeliveryMethod = DeliveryMethod.Reliable,
380  PacketHeader = PacketHeader.IsServerMessage | PacketHeader.IsDisconnectMessage
381  });
382  outMsg.WriteNetSerializableStruct(peerDisconnectPacket);
383 
384  Steamworks.SteamNetworking.SendP2PPacket(peer.SteamId.Value, outMsg.Buffer, outMsg.LengthBytes, 0, Steamworks.P2PSend.Reliable);
385  sentBytes += outMsg.LengthBytes;
386  }
387 
388  private void ClosePeerSession(RemotePeer peer)
389  {
390  Steamworks.SteamNetworking.CloseP2PSessionWithUser(peer.SteamId.Value);
391  remotePeers.Remove(peer);
392  }
393 
394  public override void SendPassword(string password)
395  {
396  //owner doesn't send passwords
397  }
398 
399  public override void Close(PeerDisconnectPacket peerDisconnectPacket)
400  {
401  if (!isActive) { return; }
402 
403  isActive = false;
404 
405  for (int i = remotePeers.Count - 1; i >= 0; i--)
406  {
407  DisconnectPeer(remotePeers[i], PeerDisconnectPacket.WithReason(DisconnectReason.ServerShutdown));
408  }
409 
410  Thread.Sleep(100);
411 
412  for (int i = remotePeers.Count - 1; i >= 0; i--)
413  {
414  ClosePeerSession(remotePeers[i]);
415  }
416 
417  callbacks.OnDisconnect.Invoke(peerDisconnectPacket);
418 
419  SteamManager.LeaveLobby();
420  Steamworks.SteamNetworking.ResetActions();
421  Steamworks.SteamUser.OnValidateAuthTicketResponse -= OnAuthChange;
422  }
423 
424  public override void Send(IWriteMessage msg, DeliveryMethod deliveryMethod, bool compressPastThreshold = true)
425  {
426  if (!isActive) { return; }
427 
428  IWriteMessage msgToSend = new WriteOnlyMessage();
429  byte[] msgData = msg.PrepareForSending(compressPastThreshold, out bool isCompressed, out _);
430  WriteSteamId(msgToSend, selfSteamID);
431  WriteSteamId(msgToSend, selfSteamID);
432  msgToSend.WriteNetSerializableStruct(new PeerPacketHeaders
433  {
434  DeliveryMethod = deliveryMethod,
435  PacketHeader = isCompressed ? PacketHeader.IsCompressed : PacketHeader.None
436  });
437  msgToSend.WriteNetSerializableStruct(new PeerPacketMessage
438  {
439  Buffer = msgData
440  });
441  ForwardToServerProcess(msgToSend);
442  }
443 
444  protected override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
445  {
446  //not currently used by SteamP2POwnerPeer
447  throw new NotImplementedException();
448  }
449 
450  private static void ForwardToServerProcess(IWriteMessage msg)
451  {
452  byte[] bufToSend = new byte[msg.LengthBytes];
453  msg.Buffer[..msg.LengthBytes].CopyTo(bufToSend.AsSpan());
454  ChildServerRelay.Write(bufToSend);
455  }
456 
457  private void ForwardToRemotePeer(DeliveryMethod deliveryMethod, SteamId recipent, IWriteMessage outMsg)
458  {
459  byte[] buf = outMsg.PrepareForSending(compressPastThreshold: false, out _, out int length);
460 
461  if (length + 4 >= MsgConstants.MTU)
462  {
463  DebugConsole.Log($"WARNING: message length comes close to exceeding MTU, forcing reliable send ({length} bytes)");
464  deliveryMethod = DeliveryMethod.Reliable;
465  }
466 
467  bool successSend = Steamworks.SteamNetworking.SendP2PPacket(recipent.Value, buf, length, 0, deliveryMethod.ToSteam());
468  sentBytes += length;
469 
470  if (successSend) { return; }
471 
472  if (deliveryMethod is DeliveryMethod.Unreliable)
473  {
474  DebugConsole.Log($"WARNING: message couldn't be sent unreliably, forcing reliable send ({length} bytes)");
475  successSend = Steamworks.SteamNetworking.SendP2PPacket(recipent.Value, buf, length, 0, DeliveryMethod.Reliable.ToSteam());
476  sentBytes += length;
477  }
478 
479  if (!successSend)
480  {
481  DebugConsole.AddWarning($"Failed to send message to remote peer! ({length} bytes)");
482  }
483  }
484 
485 #if DEBUG
486  public override void ForceTimeOut()
487  {
488  //TODO: reimplement?
489  }
490 #endif
491  }
492 }
static GameClient Client
Definition: GameMain.cs:186
readonly NetStats NetStats
Definition: GameClient.cs:54
void AddValue(NetStatType statType, float value)
Definition: NetStats.cs:37
readonly UInt64 Value
Definition: SteamId.cs:8
override void Send(IWriteMessage msg, DeliveryMethod deliveryMethod, bool compressPastThreshold=true)
override void SendMsgInternal(PeerPacketHeaders headers, INetSerializableStruct? body)
SteamP2POwnerPeer(Callbacks callbacks, int ownerKey)
override void SendPassword(string password)
override void Close(PeerDisconnectPacket peerDisconnectPacket)
override void Update(float deltaTime)
byte[] PrepareForSending(bool compressPastThreshold, out bool isCompressed, out int outLength)
static UnspecifiedNone None
Definition: Option.cs:119