Client LuaCsForBarotrauma
BarotraumaShared/SharedSource/Networking/ChildServerRelay.cs
1 using System;
2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics;
5 using System.Text;
6 using System.Threading;
7 using System.Threading.Tasks;
8 
9 #if SERVER
10 using PipeType = System.IO.Pipes.AnonymousPipeClientStream;
11 #else
12 using PipeType = System.IO.Pipes.AnonymousPipeServerStream;
13 #endif
14 
15 namespace Barotrauma.Networking
16 {
17  static partial class ChildServerRelay
18  {
19  private static PipeType writeStream;
20  private static PipeType readStream;
21 
22  private enum WriteStatus : byte
23  {
24  Success = 0x00,
25  Heartbeat = 0x01,
26  RequestShutdown = 0xCC,
27  Crash = 0xFF
28  }
29 
30  private static ManualResetEvent writeManualResetEvent;
31 
32  private enum StatusEnum
33  {
34  NeverStarted,
35  Active,
36  RequestedShutDown,
37  ShutDown
38  }
39 
40  private static volatile StatusEnum status = StatusEnum.NeverStarted;
41  public static bool HasShutDown => status is StatusEnum.ShutDown;
42 
43  private const int ReadBufferSize = MsgConstants.MTU * 2;
44  private static byte[] readTempBytes;
45  private static int readIncOffset;
46  private static int readIncTotal;
47 
48  private static ConcurrentQueue<byte[]> msgsToWrite;
49  private static ConcurrentQueue<string> errorsToWrite;
50 
51  private static ConcurrentQueue<byte[]> msgsToRead;
52 
53  private static Thread readThread;
54  private static Thread writeThread;
55 
56  private static CancellationTokenSource readCancellationToken;
57 
58  private static void PrivateStart()
59  {
60  status = StatusEnum.Active;
61 
62  readIncOffset = 0;
63  readIncTotal = 0;
64 
65  readTempBytes = new byte[ReadBufferSize];
66 
67  msgsToWrite = new ConcurrentQueue<byte[]>();
68  errorsToWrite = new ConcurrentQueue<string>();
69 
70  msgsToRead = new ConcurrentQueue<byte[]>();
71 
72  readCancellationToken = new CancellationTokenSource();
73 
74  writeManualResetEvent = new ManualResetEvent(false);
75 
76  readThread = new Thread(UpdateRead)
77  {
78  Name = "ChildServerRelay.ReadThread",
79  IsBackground = true
80  };
81  writeThread = new Thread(UpdateWrite)
82  {
83  Name = "ChildServerRelay.WriteThread",
84  IsBackground = true
85  };
86  readThread.Start();
87  writeThread.Start();
88  }
89 
90  private static void PrivateShutDown()
91  {
92  if (Thread.CurrentThread != GameMain.MainThread)
93  {
94  throw new InvalidOperationException(
95  $"Cannot call {nameof(ChildServerRelay)}.{nameof(PrivateShutDown)} from a thread other than the main one");
96  }
97  if (status is StatusEnum.NeverStarted) { return; }
98  status = StatusEnum.ShutDown;
99  writeManualResetEvent?.Set();
100  readCancellationToken?.Cancel();
101  readThread?.Join(); readThread = null;
102  writeThread?.Join(); writeThread = null;
103  readCancellationToken?.Dispose();
104  readCancellationToken = null;
105  readStream?.Dispose(); readStream = null;
106  writeStream?.Dispose(); writeStream = null;
107  msgsToRead?.Clear(); msgsToWrite?.Clear();
108  }
109 
110 
111  private static Option<int> ReadIncomingMsgs()
112  {
113  Task<int> readTask = readStream?.ReadAsync(readTempBytes, 0, readTempBytes.Length, readCancellationToken.Token);
114  if (readTask is null) { return Option<int>.None(); }
115 
116  int timeOutMilliseconds = 100;
117  for (int i = 0; i < 150; i++)
118  {
119  if (status is StatusEnum.ShutDown)
120  {
121  readCancellationToken?.Cancel();
122  return Option<int>.None();
123  }
124 
125  try
126  {
127  if (readTask.IsCompleted || readTask.Wait(timeOutMilliseconds, readCancellationToken.Token))
128  {
129  break;
130  }
131  }
132  catch (AggregateException aggregateException)
133  {
134  if (aggregateException.InnerException is OperationCanceledException) { return Option<int>.None(); }
135  CheckPipeConnected(nameof(readStream), readStream);
136  throw;
137  }
138  catch (OperationCanceledException)
139  {
140  return Option<int>.None();
141  }
142  }
143 
144  if (readTask.Status == TaskStatus.RanToCompletion)
145  {
146  return Option<int>.Some(readTask.Result);
147  }
148 
149  bool swallowException =
150  status is not StatusEnum.Active
151  && readTask.Exception?.InnerException is ObjectDisposedException or System.IO.IOException;
152  if (swallowException)
153  {
154  readCancellationToken?.Cancel();
155  return Option<int>.None();
156  }
157  throw new Exception(
158  $"ChildServerRelay readTask did not run to completion: status was {readTask.Status}.",
159  readTask.Exception);
160  }
161 
162  private static void CheckPipeConnected(string name, PipeType pipe)
163  {
164  if (status is StatusEnum.Active && pipe is not { IsConnected: true })
165  {
166  string exceptionMsg = $"{name} was disconnected unexpectedly.";
167 #if CLIENT
168  if (Process is { HasExited: true, ExitCode: var exitCode })
169  {
170  exceptionMsg += $" Child process exit code was {(uint)exitCode:X8}.";
171  }
172  else if (Process is { HasExited: false })
173  {
174  exceptionMsg += " Child process has not exited.";
175  }
176 #endif
177  throw new Exception(exceptionMsg);
178  }
179  }
180 
181  static partial void HandleCrashString(string str);
182 
183  private static void UpdateRead()
184  {
185  Span<byte> msgLengthSpan = stackalloc byte[4 + 1];
186  while (!HasShutDown)
187  {
188  CheckPipeConnected(nameof(readStream), readStream);
189 
190  bool readBytes(Span<byte> readTo)
191  {
192  for (int i = 0; i < readTo.Length; i++)
193  {
194  if (readIncOffset >= readIncTotal)
195  {
196  if (!ReadIncomingMsgs().TryUnwrap(out readIncTotal)) { return false; }
197  readIncOffset = 0;
198  if (readIncTotal == 0) { Thread.Yield(); continue; }
199  }
200  readTo[i] = readTempBytes[readIncOffset];
201  readIncOffset++;
202  }
203  return true;
204  }
205 
206  if (!readBytes(msgLengthSpan)) { status = StatusEnum.ShutDown; break; }
207 
208  int msgLength = msgLengthSpan[0]
209  | (msgLengthSpan[1] << 8)
210  | (msgLengthSpan[2] << 16)
211  | (msgLengthSpan[3] << 24);
212  WriteStatus writeStatus = (WriteStatus)msgLengthSpan[4];
213 
214  byte[] msg = msgLength > 0 ? new byte[msgLength] : Array.Empty<byte>();
215  if (msg.Length > 0 && !readBytes(msg.AsSpan())) { status = StatusEnum.ShutDown; break; }
216 
217  switch (writeStatus)
218  {
219  case WriteStatus.Success:
220  msgsToRead.Enqueue(msg);
221  break;
222  case WriteStatus.Heartbeat:
223  //do nothing
224  break;
225  case WriteStatus.RequestShutdown:
226  status = StatusEnum.ShutDown;
227  break;
228  case WriteStatus.Crash:
229  HandleCrashString(Encoding.UTF8.GetString(msg));
230  status = StatusEnum.ShutDown;
231  break;
232  }
233 
234  Thread.Yield();
235  }
236  }
237 
238  private static void UpdateWrite()
239  {
240  while (!HasShutDown)
241  {
242  CheckPipeConnected(nameof(writeStream), writeStream);
243 
244  void writeMsg(WriteStatus writeStatus, byte[] msg)
245  {
246  // It's SUPER IMPORTANT that this stack allocation
247  // remains in this local function and is never inlined,
248  // because C# is stupid and only calls for deallocation
249  // when the function returns; placing it in the loop
250  // this method is based around would lead to a stack
251  // overflow real quick!
252  Span<byte> headerBytes = stackalloc byte[4 + 1];
253 
254  headerBytes[0] = (byte)(msg.Length & 0xFF);
255  headerBytes[1] = (byte)((msg.Length >> 8) & 0xFF);
256  headerBytes[2] = (byte)((msg.Length >> 16) & 0xFF);
257  headerBytes[3] = (byte)((msg.Length >> 24) & 0xFF);
258 
259  headerBytes[4] = (byte)writeStatus;
260 
261  try
262  {
263  writeStream?.Write(headerBytes);
264  writeStream?.Write(msg);
265  }
266  catch (Exception exception)
267  {
268  switch (exception)
269  {
270  case ObjectDisposedException _:
271  case System.IO.IOException _:
272  if (!HasShutDown)
273  {
274  CheckPipeConnected(nameof(writeStream), writeStream);
275  throw;
276  }
277  break;
278  default:
279  throw;
280  };
281  }
282  }
283 
284  if (status is StatusEnum.RequestedShutDown)
285  {
286  writeMsg(WriteStatus.RequestShutdown, Array.Empty<byte>());
287  status = StatusEnum.ShutDown;
288  }
289 
290  while (errorsToWrite.TryDequeue(out var error))
291  {
292  writeMsg(WriteStatus.Crash, Encoding.UTF8.GetBytes(error));
293  status = StatusEnum.ShutDown;
294  }
295 
296  while (msgsToWrite.TryDequeue(out var msg))
297  {
298  writeMsg(WriteStatus.Success, msg);
299 
300  if (HasShutDown) { break; }
301  }
302 
303  if (!HasShutDown)
304  {
305  writeManualResetEvent.Reset();
306  if (!writeManualResetEvent.WaitOne(1000))
307  {
308  if (HasShutDown) { return; }
309 
310  //heartbeat to keep the other end alive
311  writeMsg(WriteStatus.Heartbeat, Array.Empty<byte>());
312  }
313  }
314  }
315  }
316 
317  public static void Write(byte[] msg)
318  {
319  if (HasShutDown) { return; }
320 
321  if (msg.Length > 0x1fff_ffff)
322  {
323  //This message is extremely long and is close to breaking
324  //ChildServerRelay, so let's not allow this to go through!
325  return;
326  }
327  msgsToWrite.Enqueue(msg);
328  writeManualResetEvent.Set();
329  }
330 
331  private static readonly Stopwatch stopwatch = new Stopwatch();
332  private const int MaxMilliseconds = 8;
333 
334  public static IEnumerable<byte[]> Read()
335  {
336  stopwatch.Restart();
337 
338  // To avoid the stopwatch somehow experiencing magical overhead that makes it not even
339  // start the loop within 8ms, use this bool to force at least one iteration.
340  bool hasIteratedAtLeastOnce = false;
341 
342  // If it's taken more than 8 milliseconds to read dequeued messages, take
343  // a break from reading and allow all of the other logic to run for a tick.
344  // Otherwise the server may overwhelm the host client with redundant messages
345  // that are being read too slowly.
346  while (!hasIteratedAtLeastOnce || stopwatch.ElapsedMilliseconds < MaxMilliseconds)
347  {
348  hasIteratedAtLeastOnce = true;
349  if (!ReadSingleMessage(out var msg))
350  {
351  // No more messages available to dequeue, we don't need
352  // to reach 8 milliseconds to know we're done here
353  break;
354  }
355  yield return msg;
356  }
357  stopwatch.Stop();
358  }
359 
360  private static bool ReadSingleMessage(out byte[] msg)
361  {
362  if (HasShutDown) { msg = null; return false; }
363 
364  return msgsToRead.TryDequeue(out msg);
365  }
366  }
367 }
368