2 using System.Collections.Concurrent;
3 using System.Collections.Generic;
4 using System.Diagnostics;
6 using System.Threading;
7 using System.Threading.Tasks;
10 using PipeType = System.IO.Pipes.AnonymousPipeClientStream;
12 using PipeType = System.IO.Pipes.AnonymousPipeServerStream;
17 static partial class ChildServerRelay
19 private static PipeType writeStream;
20 private static PipeType readStream;
22 private enum WriteStatus :
byte
26 RequestShutdown = 0xCC,
30 private static ManualResetEvent writeManualResetEvent;
32 private enum StatusEnum
40 private static volatile StatusEnum status = StatusEnum.NeverStarted;
41 public static bool HasShutDown => status is StatusEnum.ShutDown;
43 private const int ReadBufferSize = MsgConstants.MTU * 2;
44 private static byte[] readTempBytes;
45 private static int readIncOffset;
46 private static int readIncTotal;
48 private static ConcurrentQueue<byte[]> msgsToWrite;
49 private static ConcurrentQueue<string> errorsToWrite;
51 private static ConcurrentQueue<byte[]> msgsToRead;
53 private static Thread readThread;
54 private static Thread writeThread;
56 private static CancellationTokenSource readCancellationToken;
58 private static void PrivateStart()
60 status = StatusEnum.Active;
65 readTempBytes =
new byte[ReadBufferSize];
67 msgsToWrite =
new ConcurrentQueue<byte[]>();
68 errorsToWrite =
new ConcurrentQueue<string>();
70 msgsToRead =
new ConcurrentQueue<byte[]>();
72 readCancellationToken =
new CancellationTokenSource();
74 writeManualResetEvent =
new ManualResetEvent(
false);
76 readThread =
new Thread(UpdateRead)
78 Name =
"ChildServerRelay.ReadThread",
81 writeThread =
new Thread(UpdateWrite)
83 Name =
"ChildServerRelay.WriteThread",
90 private static void PrivateShutDown()
92 if (Thread.CurrentThread != GameMain.MainThread)
94 throw new InvalidOperationException(
95 $
"Cannot call {nameof(ChildServerRelay)}.{nameof(PrivateShutDown)} from a thread other than the main one");
97 if (status is StatusEnum.NeverStarted) {
return; }
98 status = StatusEnum.ShutDown;
99 writeManualResetEvent?.Set();
100 readCancellationToken?.Cancel();
101 readThread?.Join(); readThread =
null;
102 writeThread?.Join(); writeThread =
null;
103 readCancellationToken?.Dispose();
104 readCancellationToken =
null;
105 readStream?.Dispose(); readStream =
null;
106 writeStream?.Dispose(); writeStream =
null;
107 msgsToRead?.Clear(); msgsToWrite?.Clear();
111 private static Option<int> ReadIncomingMsgs()
113 Task<int> readTask = readStream?.ReadAsync(readTempBytes, 0, readTempBytes.Length, readCancellationToken.Token);
114 if (readTask is
null) {
return Option<int>.None(); }
116 int timeOutMilliseconds = 100;
117 for (
int i = 0; i < 150; i++)
119 if (status is StatusEnum.ShutDown)
121 readCancellationToken?.Cancel();
122 return Option<int>.None();
127 if (readTask.IsCompleted || readTask.Wait(timeOutMilliseconds, readCancellationToken.Token))
132 catch (AggregateException aggregateException)
134 if (aggregateException.InnerException is OperationCanceledException) {
return Option<int>.None(); }
135 CheckPipeConnected(nameof(readStream), readStream);
138 catch (OperationCanceledException)
140 return Option<int>.None();
144 if (readTask.Status == TaskStatus.RanToCompletion)
146 return Option<int>.Some(readTask.Result);
149 bool swallowException =
150 status is not StatusEnum.Active
151 && readTask.Exception?.InnerException is ObjectDisposedException or System.IO.IOException;
152 if (swallowException)
154 readCancellationToken?.Cancel();
155 return Option<int>.None();
158 $
"ChildServerRelay readTask did not run to completion: status was {readTask.Status}.",
162 private static void CheckPipeConnected(
string name, PipeType pipe)
164 if (status is StatusEnum.Active && pipe is not { IsConnected: true })
166 string exceptionMsg = $
"{name} was disconnected unexpectedly.";
168 if (Process is { HasExited:
true, ExitCode: var exitCode })
170 exceptionMsg += $
" Child process exit code was {(uint)exitCode:X8}.";
172 else if (Process is { HasExited:
false })
174 exceptionMsg +=
" Child process has not exited.";
177 throw new Exception(exceptionMsg);
181 static partial
void HandleCrashString(
string str);
183 private static void UpdateRead()
185 Span<byte> msgLengthSpan = stackalloc
byte[4 + 1];
188 CheckPipeConnected(nameof(readStream), readStream);
190 bool readBytes(Span<byte> readTo)
192 for (
int i = 0; i < readTo.Length; i++)
194 if (readIncOffset >= readIncTotal)
196 if (!ReadIncomingMsgs().TryUnwrap(out readIncTotal)) {
return false; }
198 if (readIncTotal == 0) { Thread.Yield();
continue; }
200 readTo[i] = readTempBytes[readIncOffset];
206 if (!readBytes(msgLengthSpan)) { status = StatusEnum.ShutDown;
break; }
208 int msgLength = msgLengthSpan[0]
209 | (msgLengthSpan[1] << 8)
210 | (msgLengthSpan[2] << 16)
211 | (msgLengthSpan[3] << 24);
212 WriteStatus writeStatus = (WriteStatus)msgLengthSpan[4];
214 byte[] msg = msgLength > 0 ?
new byte[msgLength] : Array.Empty<
byte>();
215 if (msg.Length > 0 && !readBytes(msg.AsSpan())) { status = StatusEnum.ShutDown;
break; }
219 case WriteStatus.Success:
220 msgsToRead.Enqueue(msg);
222 case WriteStatus.Heartbeat:
225 case WriteStatus.RequestShutdown:
226 status = StatusEnum.ShutDown;
228 case WriteStatus.Crash:
229 HandleCrashString(Encoding.UTF8.GetString(msg));
230 status = StatusEnum.ShutDown;
238 private static void UpdateWrite()
242 CheckPipeConnected(nameof(writeStream), writeStream);
244 void writeMsg(WriteStatus writeStatus,
byte[] msg)
252 Span<byte> headerBytes = stackalloc
byte[4 + 1];
254 headerBytes[0] = (byte)(msg.Length & 0xFF);
255 headerBytes[1] = (byte)((msg.Length >> 8) & 0xFF);
256 headerBytes[2] = (byte)((msg.Length >> 16) & 0xFF);
257 headerBytes[3] = (byte)((msg.Length >> 24) & 0xFF);
259 headerBytes[4] = (byte)writeStatus;
263 writeStream?.Write(headerBytes);
264 writeStream?.Write(msg);
266 catch (Exception exception)
270 case ObjectDisposedException _:
271 case System.IO.IOException _:
274 CheckPipeConnected(nameof(writeStream), writeStream);
284 if (status is StatusEnum.RequestedShutDown)
286 writeMsg(WriteStatus.RequestShutdown, Array.Empty<
byte>());
287 status = StatusEnum.ShutDown;
290 while (errorsToWrite.TryDequeue(out var error))
292 writeMsg(WriteStatus.Crash, Encoding.UTF8.GetBytes(error));
293 status = StatusEnum.ShutDown;
296 while (msgsToWrite.TryDequeue(out var msg))
298 writeMsg(WriteStatus.Success, msg);
300 if (HasShutDown) {
break; }
305 writeManualResetEvent.Reset();
306 if (!writeManualResetEvent.WaitOne(1000))
308 if (HasShutDown) {
return; }
311 writeMsg(WriteStatus.Heartbeat, Array.Empty<
byte>());
317 public static void Write(
byte[] msg)
319 if (HasShutDown) {
return; }
321 if (msg.Length > 0x1fff_ffff)
327 msgsToWrite.Enqueue(msg);
328 writeManualResetEvent.Set();
331 private static readonly Stopwatch stopwatch =
new Stopwatch();
332 private const int MaxMilliseconds = 8;
334 public static IEnumerable<byte[]> Read()
340 bool hasIteratedAtLeastOnce =
false;
346 while (!hasIteratedAtLeastOnce || stopwatch.ElapsedMilliseconds < MaxMilliseconds)
348 hasIteratedAtLeastOnce =
true;
349 if (!ReadSingleMessage(out var msg))
360 private static bool ReadSingleMessage(out
byte[] msg)
362 if (HasShutDown) { msg =
null;
return false; }
364 return msgsToRead.TryDequeue(out msg);