mirror of
https://github.com/DerTyp7/defrain-shooter-unity.git
synced 2025-11-03 14:48:58 +01:00
CHANGED TO MIRROR
This commit is contained in:
362
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Client.cs
Normal file
362
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Client.cs
Normal file
@@ -0,0 +1,362 @@
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
// ClientState OBJECT that can be handed to the ReceiveThread safely.
|
||||
// => allows us to create a NEW OBJECT every time we connect and start a
|
||||
// receive thread.
|
||||
// => perfectly protects us against data races. fixes all the flaky tests
|
||||
// where .Connecting or .client would still be used by a dieing thread
|
||||
// while attempting to use it for a new connection attempt etc.
|
||||
// => creating a fresh client state each time is the best solution against
|
||||
// data races here!
|
||||
class ClientConnectionState : ConnectionState
|
||||
{
|
||||
public Thread receiveThread;
|
||||
|
||||
// TcpClient.Connected doesn't check if socket != null, which
|
||||
// results in NullReferenceExceptions if connection was closed.
|
||||
// -> let's check it manually instead
|
||||
public bool Connected => client != null &&
|
||||
client.Client != null &&
|
||||
client.Client.Connected;
|
||||
|
||||
// TcpClient has no 'connecting' state to check. We need to keep track
|
||||
// of it manually.
|
||||
// -> checking 'thread.IsAlive && !Connected' is not enough because the
|
||||
// thread is alive and connected is false for a short moment after
|
||||
// disconnecting, so this would cause race conditions.
|
||||
// -> we use a threadsafe bool wrapper so that ThreadFunction can remain
|
||||
// static (it needs a common lock)
|
||||
// => Connecting is true from first Connect() call in here, through the
|
||||
// thread start, until TcpClient.Connect() returns. Simple and clear.
|
||||
// => bools are atomic according to
|
||||
// https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/variables
|
||||
// made volatile so the compiler does not reorder access to it
|
||||
public volatile bool Connecting;
|
||||
|
||||
// thread safe pipe for received messages
|
||||
// => inside client connection state so that we can create a new state
|
||||
// each time we connect
|
||||
// (unlike server which has one receive pipe for all connections)
|
||||
public readonly MagnificentReceivePipe receivePipe;
|
||||
|
||||
// constructor always creates new TcpClient for client connection!
|
||||
public ClientConnectionState(int MaxMessageSize) : base(new TcpClient(), MaxMessageSize)
|
||||
{
|
||||
// create receive pipe with max message size for pooling
|
||||
receivePipe = new MagnificentReceivePipe(MaxMessageSize);
|
||||
}
|
||||
|
||||
// dispose all the state safely
|
||||
public void Dispose()
|
||||
{
|
||||
// close client
|
||||
client.Close();
|
||||
|
||||
// wait until thread finished. this is the only way to guarantee
|
||||
// that we can call Connect() again immediately after Disconnect
|
||||
// -> calling .Join would sometimes wait forever, e.g. when
|
||||
// calling Disconnect while trying to connect to a dead end
|
||||
receiveThread?.Interrupt();
|
||||
|
||||
// we interrupted the receive Thread, so we can't guarantee that
|
||||
// connecting was reset. let's do it manually.
|
||||
Connecting = false;
|
||||
|
||||
// clear send pipe. no need to hold on to elements.
|
||||
// (unlike receiveQueue, which is still needed to process the
|
||||
// latest Disconnected message, etc.)
|
||||
sendPipe.Clear();
|
||||
|
||||
// IMPORTANT: DO NOT CLEAR RECEIVE PIPE.
|
||||
// we still want to process disconnect messages in Tick()!
|
||||
|
||||
// let go of this client completely. the thread ended, no one uses
|
||||
// it anymore and this way Connected is false again immediately.
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
public class Client : Common
|
||||
{
|
||||
// events to hook into
|
||||
// => OnData uses ArraySegment for allocation free receives later
|
||||
public Action OnConnected;
|
||||
public Action<ArraySegment<byte>> OnData;
|
||||
public Action OnDisconnected;
|
||||
|
||||
// disconnect if send queue gets too big.
|
||||
// -> avoids ever growing queue memory if network is slower than input
|
||||
// -> disconnecting is great for load balancing. better to disconnect
|
||||
// one connection than risking every connection / the whole server
|
||||
// -> huge queue would introduce multiple seconds of latency anyway
|
||||
//
|
||||
// Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
|
||||
// limit = 1,000 means 16 MB of memory/connection
|
||||
// limit = 10,000 means 160 MB of memory/connection
|
||||
public int SendQueueLimit = 10000;
|
||||
public int ReceiveQueueLimit = 10000;
|
||||
|
||||
// all client state wrapped into an object that is passed to ReceiveThread
|
||||
// => we create a new one each time we connect to avoid data races with
|
||||
// old dieing threads still using the previous object!
|
||||
ClientConnectionState state;
|
||||
|
||||
// Connected & Connecting
|
||||
public bool Connected => state != null && state.Connected;
|
||||
public bool Connecting => state != null && state.Connecting;
|
||||
|
||||
// pipe count, useful for debugging / benchmarks
|
||||
public int ReceivePipeCount => state != null ? state.receivePipe.TotalCount : 0;
|
||||
|
||||
// constructor
|
||||
public Client(int MaxMessageSize) : base(MaxMessageSize) {}
|
||||
|
||||
// the thread function
|
||||
// STATIC to avoid sharing state!
|
||||
// => pass ClientState object. a new one is created for each new thread!
|
||||
// => avoids data races where an old dieing thread might still modify
|
||||
// the current thread's state :/
|
||||
static void ReceiveThreadFunction(ClientConnectionState state, string ip, int port, int MaxMessageSize, bool NoDelay, int SendTimeout, int ReceiveTimeout, int ReceiveQueueLimit)
|
||||
|
||||
{
|
||||
Thread sendThread = null;
|
||||
|
||||
// absolutely must wrap with try/catch, otherwise thread
|
||||
// exceptions are silent
|
||||
try
|
||||
{
|
||||
// connect (blocking)
|
||||
state.client.Connect(ip, port);
|
||||
state.Connecting = false; // volatile!
|
||||
|
||||
// set socket options after the socket was created in Connect()
|
||||
// (not after the constructor because we clear the socket there)
|
||||
state.client.NoDelay = NoDelay;
|
||||
state.client.SendTimeout = SendTimeout;
|
||||
state.client.ReceiveTimeout = ReceiveTimeout;
|
||||
|
||||
// start send thread only after connected
|
||||
// IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
|
||||
sendThread = new Thread(() => { ThreadFunctions.SendLoop(0, state.client, state.sendPipe, state.sendPending); });
|
||||
sendThread.IsBackground = true;
|
||||
sendThread.Start();
|
||||
|
||||
// run the receive loop
|
||||
// (receive pipe is shared across all loops)
|
||||
ThreadFunctions.ReceiveLoop(0, state.client, MaxMessageSize, state.receivePipe, ReceiveQueueLimit);
|
||||
}
|
||||
catch (SocketException exception)
|
||||
{
|
||||
// this happens if (for example) the ip address is correct
|
||||
// but there is no server running on that ip/port
|
||||
Log.Info("Client Recv: failed to connect to ip=" + ip + " port=" + port + " reason=" + exception);
|
||||
|
||||
// add 'Disconnected' event to receive pipe so that the caller
|
||||
// knows that the Connect failed. otherwise they will never know
|
||||
state.receivePipe.Enqueue(0, EventType.Disconnected, default);
|
||||
}
|
||||
catch (ThreadInterruptedException)
|
||||
{
|
||||
// expected if Disconnect() aborts it
|
||||
}
|
||||
catch (ThreadAbortException)
|
||||
{
|
||||
// expected if Disconnect() aborts it
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// expected if Disconnect() aborts it and disposed the client
|
||||
// while ReceiveThread is in a blocking Connect() call
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// something went wrong. probably important.
|
||||
Log.Error("Client Recv Exception: " + exception);
|
||||
}
|
||||
|
||||
// sendthread might be waiting on ManualResetEvent,
|
||||
// so let's make sure to end it if the connection
|
||||
// closed.
|
||||
// otherwise the send thread would only end if it's
|
||||
// actually sending data while the connection is
|
||||
// closed.
|
||||
sendThread?.Interrupt();
|
||||
|
||||
// Connect might have failed. thread might have been closed.
|
||||
// let's reset connecting state no matter what.
|
||||
state.Connecting = false;
|
||||
|
||||
// if we got here then we are done. ReceiveLoop cleans up already,
|
||||
// but we may never get there if connect fails. so let's clean up
|
||||
// here too.
|
||||
state.client?.Close();
|
||||
}
|
||||
|
||||
public void Connect(string ip, int port)
|
||||
{
|
||||
// not if already started
|
||||
if (Connecting || Connected)
|
||||
{
|
||||
Log.Warning("Telepathy Client can not create connection because an existing connection is connecting or connected");
|
||||
return;
|
||||
}
|
||||
|
||||
// overwrite old thread's state object. create a new one to avoid
|
||||
// data races where an old dieing thread might still modify the
|
||||
// current state! fixes all the flaky tests!
|
||||
state = new ClientConnectionState(MaxMessageSize);
|
||||
|
||||
// We are connecting from now until Connect succeeds or fails
|
||||
state.Connecting = true;
|
||||
|
||||
// create a TcpClient with perfect IPv4, IPv6 and hostname resolving
|
||||
// support.
|
||||
//
|
||||
// * TcpClient(hostname, port): works but would connect (and block)
|
||||
// already
|
||||
// * TcpClient(AddressFamily.InterNetworkV6): takes Ipv4 and IPv6
|
||||
// addresses but only connects to IPv6 servers (e.g. Telepathy).
|
||||
// does NOT connect to IPv4 servers (e.g. Mirror Booster), even
|
||||
// with DualMode enabled.
|
||||
// * TcpClient(): creates IPv4 socket internally, which would force
|
||||
// Connect() to only use IPv4 sockets.
|
||||
//
|
||||
// => the trick is to clear the internal IPv4 socket so that Connect
|
||||
// resolves the hostname and creates either an IPv4 or an IPv6
|
||||
// socket as needed (see TcpClient source)
|
||||
state.client.Client = null; // clear internal IPv4 socket until Connect()
|
||||
|
||||
// client.Connect(ip, port) is blocking. let's call it in the thread
|
||||
// and return immediately.
|
||||
// -> this way the application doesn't hang for 30s if connect takes
|
||||
// too long, which is especially good in games
|
||||
// -> this way we don't async client.BeginConnect, which seems to
|
||||
// fail sometimes if we connect too many clients too fast
|
||||
state.receiveThread = new Thread(() => {
|
||||
ReceiveThreadFunction(state, ip, port, MaxMessageSize, NoDelay, SendTimeout, ReceiveTimeout, ReceiveQueueLimit);
|
||||
});
|
||||
state.receiveThread.IsBackground = true;
|
||||
state.receiveThread.Start();
|
||||
}
|
||||
|
||||
public void Disconnect()
|
||||
{
|
||||
// only if started
|
||||
if (Connecting || Connected)
|
||||
{
|
||||
// dispose all the state safely
|
||||
state.Dispose();
|
||||
|
||||
// IMPORTANT: DO NOT set state = null!
|
||||
// we still want to process the pipe's disconnect message etc.!
|
||||
}
|
||||
}
|
||||
|
||||
// send message to server using socket connection.
|
||||
// arraysegment for allocation free sends later.
|
||||
// -> the segment's array is only used until Send() returns!
|
||||
public bool Send(ArraySegment<byte> message)
|
||||
{
|
||||
if (Connected)
|
||||
{
|
||||
// respect max message size to avoid allocation attacks.
|
||||
if (message.Count <= MaxMessageSize)
|
||||
{
|
||||
// check send pipe limit
|
||||
if (state.sendPipe.Count < SendQueueLimit)
|
||||
{
|
||||
// add to thread safe send pipe and return immediately.
|
||||
// calling Send here would be blocking (sometimes for long
|
||||
// times if other side lags or wire was disconnected)
|
||||
state.sendPipe.Enqueue(message);
|
||||
state.sendPending.Set(); // interrupt SendThread WaitOne()
|
||||
return true;
|
||||
}
|
||||
// disconnect if send queue gets too big.
|
||||
// -> avoids ever growing queue memory if network is slower
|
||||
// than input
|
||||
// -> avoids ever growing latency as well
|
||||
//
|
||||
// note: while SendThread always grabs the WHOLE send queue
|
||||
// immediately, it's still possible that the sending
|
||||
// blocks for so long that the send queue just gets
|
||||
// way too big. have a limit - better safe than sorry.
|
||||
else
|
||||
{
|
||||
// log the reason
|
||||
Log.Warning($"Client.Send: sendPipe reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting to avoid ever growing memory & latency.");
|
||||
|
||||
// just close it. send thread will take care of the rest.
|
||||
state.client.Close();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Log.Error("Client.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
|
||||
return false;
|
||||
}
|
||||
Log.Warning("Client.Send: not connected!");
|
||||
return false;
|
||||
}
|
||||
|
||||
// tick: processes up to 'limit' messages
|
||||
// => limit parameter to avoid deadlocks / too long freezes if server or
|
||||
// client is too slow to process network load
|
||||
// => Mirror & DOTSNET need to have a process limit anyway.
|
||||
// might as well do it here and make life easier.
|
||||
// => returns amount of remaining messages to process, so the caller
|
||||
// can call tick again as many times as needed (or up to a limit)
|
||||
//
|
||||
// Tick() may process multiple messages, but Mirror needs a way to stop
|
||||
// processing immediately if a scene change messages arrives. Mirror
|
||||
// can't process any other messages during a scene change.
|
||||
// (could be useful for others too)
|
||||
// => make sure to allocate the lambda only once in transports
|
||||
public int Tick(int processLimit, Func<bool> checkEnabled = null)
|
||||
{
|
||||
// only if state was created yet (after connect())
|
||||
// note: we don't check 'only if connected' because we want to still
|
||||
// process Disconnect messages afterwards too!
|
||||
if (state == null)
|
||||
return 0;
|
||||
|
||||
// process up to 'processLimit' messages
|
||||
for (int i = 0; i < processLimit; ++i)
|
||||
{
|
||||
// check enabled in case a Mirror scene message arrived
|
||||
if (checkEnabled != null && !checkEnabled())
|
||||
break;
|
||||
|
||||
// peek first. allows us to process the first queued entry while
|
||||
// still keeping the pooled byte[] alive by not removing anything.
|
||||
if (state.receivePipe.TryPeek(out int _, out EventType eventType, out ArraySegment<byte> message))
|
||||
{
|
||||
switch (eventType)
|
||||
{
|
||||
case EventType.Connected:
|
||||
OnConnected?.Invoke();
|
||||
break;
|
||||
case EventType.Data:
|
||||
OnData?.Invoke(message);
|
||||
break;
|
||||
case EventType.Disconnected:
|
||||
OnDisconnected?.Invoke();
|
||||
break;
|
||||
}
|
||||
|
||||
// IMPORTANT: now dequeue and return it to pool AFTER we are
|
||||
// done processing the event.
|
||||
state.receivePipe.TryDequeue();
|
||||
}
|
||||
// no more messages. stop the loop.
|
||||
else break;
|
||||
}
|
||||
|
||||
// return what's left to process for next time
|
||||
return state.receivePipe.TotalCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: a5b95294cc4ec4b15aacba57531c7985
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,39 @@
|
||||
// common code used by server and client
|
||||
namespace Telepathy
|
||||
{
|
||||
public abstract class Common
|
||||
{
|
||||
// IMPORTANT: DO NOT SHARE STATE ACROSS SEND/RECV LOOPS (DATA RACES)
|
||||
// (except receive pipe which is used for all threads)
|
||||
|
||||
// NoDelay disables nagle algorithm. lowers CPU% and latency but
|
||||
// increases bandwidth
|
||||
public bool NoDelay = true;
|
||||
|
||||
// Prevent allocation attacks. Each packet is prefixed with a length
|
||||
// header, so an attacker could send a fake packet with length=2GB,
|
||||
// causing the server to allocate 2GB and run out of memory quickly.
|
||||
// -> simply increase max packet size if you want to send around bigger
|
||||
// files!
|
||||
// -> 16KB per message should be more than enough.
|
||||
public readonly int MaxMessageSize;
|
||||
|
||||
// Send would stall forever if the network is cut off during a send, so
|
||||
// we need a timeout (in milliseconds)
|
||||
public int SendTimeout = 5000;
|
||||
|
||||
// Default TCP receive time out can be huge (minutes).
|
||||
// That's way too much for games, let's make it configurable.
|
||||
// we need a timeout (in milliseconds)
|
||||
// => '0' means disabled
|
||||
// => disabled by default because some people might use Telepathy
|
||||
// without Mirror and without sending pings, so timeouts are likely
|
||||
public int ReceiveTimeout = 0;
|
||||
|
||||
// constructor
|
||||
protected Common(int MaxMessageSize)
|
||||
{
|
||||
this.MaxMessageSize = MaxMessageSize;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: c4d56322cf0e248a89103c002a505dab
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,35 @@
|
||||
// both server and client need a connection state object.
|
||||
// -> server needs it to keep track of multiple connections
|
||||
// -> client needs it to safely create a new connection state on every new
|
||||
// connect in order to avoid data races where a dieing thread might still
|
||||
// modify the current state. can't happen if we create a new state each time!
|
||||
// (fixes all the flaky tests)
|
||||
//
|
||||
// ... besides, it also allows us to share code!
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public class ConnectionState
|
||||
{
|
||||
public TcpClient client;
|
||||
|
||||
// thread safe pipe to send messages from main thread to send thread
|
||||
public readonly MagnificentSendPipe sendPipe;
|
||||
|
||||
// ManualResetEvent to wake up the send thread. better than Thread.Sleep
|
||||
// -> call Set() if everything was sent
|
||||
// -> call Reset() if there is something to send again
|
||||
// -> call WaitOne() to block until Reset was called
|
||||
public ManualResetEvent sendPending = new ManualResetEvent(false);
|
||||
|
||||
public ConnectionState(TcpClient client, int MaxMessageSize)
|
||||
{
|
||||
this.client = client;
|
||||
|
||||
// create send pipe with max message size for pooling
|
||||
sendPipe = new MagnificentSendPipe(MaxMessageSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: af95e2b6f6343411aa8bdf871abd7b1b
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,8 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 885e89897e3a03241827ab7a14fe5fa0
|
||||
folderAsset: yes
|
||||
DefaultImporter:
|
||||
externalObjects: {}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1 @@
|
||||
// removed 2021-02-04
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: aa8d703f0b73f4d6398b76812719b68b
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1 @@
|
||||
// removed 2021-02-04
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: aedf812e9637b4f92a35db1aedca8c92
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1 @@
|
||||
// removed 2021-02-04
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 8fc06e2fb29854a0c9e90c0188d36a08
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1 @@
|
||||
// removed 2021-02-04
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 64df4eaebe4ff9a43a9fb318c3e8e321
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,9 @@
|
||||
namespace Telepathy
|
||||
{
|
||||
public enum EventType
|
||||
{
|
||||
Connected,
|
||||
Data,
|
||||
Disconnected
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 49f1a330755814803be5f27f493e1910
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
21
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/LICENSE
Normal file
21
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2018, vis2k
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
@@ -0,0 +1,7 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 0ba11103b95fd4721bffbb08440d5b8e
|
||||
DefaultImporter:
|
||||
externalObjects: {}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
15
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Log.cs
Normal file
15
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Log.cs
Normal file
@@ -0,0 +1,15 @@
|
||||
// A simple logger class that uses Console.WriteLine by default.
|
||||
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
|
||||
// (this way we don't have to depend on UnityEngine.DLL and don't need a
|
||||
// different version for every UnityEngine version here)
|
||||
using System;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public static class Log
|
||||
{
|
||||
public static Action<string> Info = Console.WriteLine;
|
||||
public static Action<string> Warning = Console.WriteLine;
|
||||
public static Action<string> Error = Console.Error.WriteLine;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 0a123d054bef34d059057ac2ce936605
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,222 @@
|
||||
// a magnificent receive pipe to shield us from all of life's complexities.
|
||||
// safely sends messages from receive thread to main thread.
|
||||
// -> thread safety built in
|
||||
// -> byte[] pooling coming in the future
|
||||
//
|
||||
// => hides all the complexity from telepathy
|
||||
// => easy to switch between stack/queue/concurrentqueue/etc.
|
||||
// => easy to test
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public class MagnificentReceivePipe
|
||||
{
|
||||
// queue entry message. only used in here.
|
||||
// -> byte arrays are always of 4 + MaxMessageSize
|
||||
// -> ArraySegment indicates the actual message content
|
||||
struct Entry
|
||||
{
|
||||
public int connectionId;
|
||||
public EventType eventType;
|
||||
public ArraySegment<byte> data;
|
||||
public Entry(int connectionId, EventType eventType, ArraySegment<byte> data)
|
||||
{
|
||||
this.connectionId = connectionId;
|
||||
this.eventType = eventType;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
|
||||
// message queue
|
||||
// ConcurrentQueue allocates. lock{} instead.
|
||||
//
|
||||
// IMPORTANT: lock{} all usages!
|
||||
readonly Queue<Entry> queue = new Queue<Entry>();
|
||||
|
||||
// byte[] pool to avoid allocations
|
||||
// Take & Return is beautifully encapsulated in the pipe.
|
||||
// the outside does not need to worry about anything.
|
||||
// and it can be tested easily.
|
||||
//
|
||||
// IMPORTANT: lock{} all usages!
|
||||
Pool<byte[]> pool;
|
||||
|
||||
// unfortunately having one receive pipe per connetionId is way slower
|
||||
// in CCU tests. right now we have one pipe for all connections.
|
||||
// => we still need to limit queued messages per connection to avoid one
|
||||
// spamming connection being able to slow down everyone else since
|
||||
// the queue would be full of just this connection's messages forever
|
||||
// => let's use a simpler per-connectionId counter for now
|
||||
Dictionary<int, int> queueCounter = new Dictionary<int, int>();
|
||||
|
||||
// constructor
|
||||
public MagnificentReceivePipe(int MaxMessageSize)
|
||||
{
|
||||
// initialize pool to create max message sized byte[]s each time
|
||||
pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
|
||||
}
|
||||
|
||||
// return amount of queued messages for this connectionId.
|
||||
// for statistics. don't call Count and assume that it's the same after
|
||||
// the call.
|
||||
public int Count(int connectionId)
|
||||
{
|
||||
lock (this)
|
||||
{
|
||||
return queueCounter.TryGetValue(connectionId, out int count)
|
||||
? count
|
||||
: 0;
|
||||
}
|
||||
}
|
||||
|
||||
// total count
|
||||
public int TotalCount
|
||||
{
|
||||
get { lock (this) { return queue.Count; } }
|
||||
}
|
||||
|
||||
// pool count for testing
|
||||
public int PoolCount
|
||||
{
|
||||
get { lock (this) { return pool.Count(); } }
|
||||
}
|
||||
|
||||
// enqueue a message
|
||||
// -> ArraySegment to avoid allocations later
|
||||
// -> parameters passed directly so it's more obvious that we don't just
|
||||
// queue a passed 'Message', instead we copy the ArraySegment into
|
||||
// a byte[] and store it internally, etc.)
|
||||
public void Enqueue(int connectionId, EventType eventType, ArraySegment<byte> message)
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
// does this message have a data array content?
|
||||
ArraySegment<byte> segment = default;
|
||||
if (message != default)
|
||||
{
|
||||
// ArraySegment is only valid until returning.
|
||||
// copy it into a byte[] that we can store.
|
||||
// ArraySegment array is only valid until returning, so copy
|
||||
// it into a byte[] that we can queue safely.
|
||||
|
||||
// get one from the pool first to avoid allocations
|
||||
byte[] bytes = pool.Take();
|
||||
|
||||
// copy into it
|
||||
Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
|
||||
|
||||
// indicate which part is the message
|
||||
segment = new ArraySegment<byte>(bytes, 0, message.Count);
|
||||
}
|
||||
|
||||
// enqueue it
|
||||
// IMPORTANT: pass the segment around pool byte[],
|
||||
// NOT the 'message' that is only valid until returning!
|
||||
Entry entry = new Entry(connectionId, eventType, segment);
|
||||
queue.Enqueue(entry);
|
||||
|
||||
// increase counter for this connectionId
|
||||
int oldCount = Count(connectionId);
|
||||
queueCounter[connectionId] = oldCount + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// peek the next message
|
||||
// -> allows the caller to process it while pipe still holds on to the
|
||||
// byte[]
|
||||
// -> TryDequeue should be called after processing, so that the message
|
||||
// is actually dequeued and the byte[] is returned to pool!
|
||||
// => see TryDequeue comments!
|
||||
//
|
||||
// IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
|
||||
public bool TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> data)
|
||||
{
|
||||
connectionId = 0;
|
||||
eventType = EventType.Disconnected;
|
||||
data = default;
|
||||
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
if (queue.Count > 0)
|
||||
{
|
||||
Entry entry = queue.Peek();
|
||||
connectionId = entry.connectionId;
|
||||
eventType = entry.eventType;
|
||||
data = entry.data;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// dequeue the next message
|
||||
// -> simply dequeues and returns the byte[] to pool (if any)
|
||||
// -> use Peek to actually process the first element while the pipe
|
||||
// still holds on to the byte[]
|
||||
// -> doesn't return the element because the byte[] needs to be returned
|
||||
// to the pool in dequeue. caller can't be allowed to work with a
|
||||
// byte[] that is already returned to pool.
|
||||
// => Peek & Dequeue is the most simple, clean solution for receive
|
||||
// pipe pooling to avoid allocations!
|
||||
//
|
||||
// IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
|
||||
public bool TryDequeue()
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
if (queue.Count > 0)
|
||||
{
|
||||
// dequeue from queue
|
||||
Entry entry = queue.Dequeue();
|
||||
|
||||
// return byte[] to pool (if any).
|
||||
// not all message types have byte[] contents.
|
||||
if (entry.data != default)
|
||||
{
|
||||
pool.Return(entry.data.Array);
|
||||
}
|
||||
|
||||
// decrease counter for this connectionId
|
||||
queueCounter[entry.connectionId]--;
|
||||
|
||||
// remove if zero. don't want to keep old connectionIds in
|
||||
// there forever, it would cause slowly growing memory.
|
||||
if (queueCounter[entry.connectionId] == 0)
|
||||
queueCounter.Remove(entry.connectionId);
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public void Clear()
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
// clear queue, but via dequeue to return each byte[] to pool
|
||||
while (queue.Count > 0)
|
||||
{
|
||||
// dequeue
|
||||
Entry entry = queue.Dequeue();
|
||||
|
||||
// return byte[] to pool (if any).
|
||||
// not all message types have byte[] contents.
|
||||
if (entry.data != default)
|
||||
{
|
||||
pool.Return(entry.data.Array);
|
||||
}
|
||||
}
|
||||
|
||||
// clear counter too
|
||||
queueCounter.Clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 010a208972a9a4e0cb0e7c18a60b4494
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,165 @@
|
||||
// a magnificent send pipe to shield us from all of life's complexities.
|
||||
// safely sends messages from main thread to send thread.
|
||||
// -> thread safety built in
|
||||
// -> byte[] pooling coming in the future
|
||||
//
|
||||
// => hides all the complexity from telepathy
|
||||
// => easy to switch between stack/queue/concurrentqueue/etc.
|
||||
// => easy to test
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public class MagnificentSendPipe
|
||||
{
|
||||
// message queue
|
||||
// ConcurrentQueue allocates. lock{} instead.
|
||||
// -> byte arrays are always of MaxMessageSize
|
||||
// -> ArraySegment indicates the actual message content
|
||||
//
|
||||
// IMPORTANT: lock{} all usages!
|
||||
readonly Queue<ArraySegment<byte>> queue = new Queue<ArraySegment<byte>>();
|
||||
|
||||
// byte[] pool to avoid allocations
|
||||
// Take & Return is beautifully encapsulated in the pipe.
|
||||
// the outside does not need to worry about anything.
|
||||
// and it can be tested easily.
|
||||
//
|
||||
// IMPORTANT: lock{} all usages!
|
||||
Pool<byte[]> pool;
|
||||
|
||||
// constructor
|
||||
public MagnificentSendPipe(int MaxMessageSize)
|
||||
{
|
||||
// initialize pool to create max message sized byte[]s each time
|
||||
pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
|
||||
}
|
||||
|
||||
// for statistics. don't call Count and assume that it's the same after
|
||||
// the call.
|
||||
public int Count
|
||||
{
|
||||
get { lock (this) { return queue.Count; } }
|
||||
}
|
||||
|
||||
// pool count for testing
|
||||
public int PoolCount
|
||||
{
|
||||
get { lock (this) { return pool.Count(); } }
|
||||
}
|
||||
|
||||
// enqueue a message
|
||||
// arraysegment for allocation free sends later.
|
||||
// -> the segment's array is only used until Enqueue() returns!
|
||||
public void Enqueue(ArraySegment<byte> message)
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
// ArraySegment array is only valid until returning, so copy
|
||||
// it into a byte[] that we can queue safely.
|
||||
|
||||
// get one from the pool first to avoid allocations
|
||||
byte[] bytes = pool.Take();
|
||||
|
||||
// copy into it
|
||||
Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
|
||||
|
||||
// indicate which part is the message
|
||||
ArraySegment<byte> segment = new ArraySegment<byte>(bytes, 0, message.Count);
|
||||
|
||||
// now enqueue it
|
||||
queue.Enqueue(segment);
|
||||
}
|
||||
}
|
||||
|
||||
// send threads need to dequeue each byte[] and write it into the socket
|
||||
// -> dequeueing one byte[] after another works, but it's WAY slower
|
||||
// than dequeueing all immediately (locks only once)
|
||||
// lock{} & DequeueAll is WAY faster than ConcurrentQueue & dequeue
|
||||
// one after another:
|
||||
//
|
||||
// uMMORPG 450 CCU
|
||||
// SafeQueue: 900-1440ms latency
|
||||
// ConcurrentQueue: 2000ms latency
|
||||
//
|
||||
// -> the most obvious solution is to just return a list with all byte[]
|
||||
// (which allocates) and then write each one into the socket
|
||||
// -> a faster solution is to serialize each one into one payload buffer
|
||||
// and pass that to the socket only once. fewer socket calls always
|
||||
// give WAY better CPU performance(!)
|
||||
// -> to avoid allocating a new list of entries each time, we simply
|
||||
// serialize all entries into the payload here already
|
||||
// => having all this complexity built into the pipe makes testing and
|
||||
// modifying the algorithm super easy!
|
||||
//
|
||||
// IMPORTANT: serializing in here will allow us to return the byte[]
|
||||
// entries back to a pool later to completely avoid
|
||||
// allocations!
|
||||
public bool DequeueAndSerializeAll(ref byte[] payload, out int packetSize)
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
// do nothing if empty
|
||||
packetSize = 0;
|
||||
if (queue.Count == 0)
|
||||
return false;
|
||||
|
||||
// we might have multiple pending messages. merge into one
|
||||
// packet to avoid TCP overheads and improve performance.
|
||||
//
|
||||
// IMPORTANT: Mirror & DOTSNET already batch into MaxMessageSize
|
||||
// chunks, but we STILL pack all pending messages
|
||||
// into one large payload so we only give it to TCP
|
||||
// ONCE. This is HUGE for performance so we keep it!
|
||||
packetSize = 0;
|
||||
foreach (ArraySegment<byte> message in queue)
|
||||
packetSize += 4 + message.Count; // header + content
|
||||
|
||||
// create payload buffer if not created yet or previous one is
|
||||
// too small
|
||||
// IMPORTANT: payload.Length might be > packetSize! don't use it!
|
||||
if (payload == null || payload.Length < packetSize)
|
||||
payload = new byte[packetSize];
|
||||
|
||||
// dequeue all byte[] messages and serialize into the packet
|
||||
int position = 0;
|
||||
while (queue.Count > 0)
|
||||
{
|
||||
// dequeue
|
||||
ArraySegment<byte> message = queue.Dequeue();
|
||||
|
||||
// write header (size) into buffer at position
|
||||
Utils.IntToBytesBigEndianNonAlloc(message.Count, payload, position);
|
||||
position += 4;
|
||||
|
||||
// copy message into payload at position
|
||||
Buffer.BlockCopy(message.Array, message.Offset, payload, position, message.Count);
|
||||
position += message.Count;
|
||||
|
||||
// return to pool so it can be reused (avoids allocations!)
|
||||
pool.Return(message.Array);
|
||||
}
|
||||
|
||||
// we did serialize something
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void Clear()
|
||||
{
|
||||
// pool & queue usage always needs to be locked
|
||||
lock (this)
|
||||
{
|
||||
// clear queue, but via dequeue to return each byte[] to pool
|
||||
while (queue.Count > 0)
|
||||
{
|
||||
pool.Return(queue.Dequeue().Array);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: d490021c2e6a64374bc88168cec75c70
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,67 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net.Sockets;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public static class NetworkStreamExtensions
|
||||
{
|
||||
// .Read returns '0' if remote closed the connection but throws an
|
||||
// IOException if we voluntarily closed our own connection.
|
||||
//
|
||||
// let's add a ReadSafely method that returns '0' in both cases so we don't
|
||||
// have to worry about exceptions, since a disconnect is a disconnect...
|
||||
public static int ReadSafely(this NetworkStream stream, byte[] buffer, int offset, int size)
|
||||
{
|
||||
try
|
||||
{
|
||||
return stream.Read(buffer, offset, size);
|
||||
}
|
||||
// IOException happens if we voluntarily closed our own connection.
|
||||
catch (IOException)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
// ObjectDisposedException can be thrown if Client.Disconnect()
|
||||
// disposes the stream, while we are still trying to read here.
|
||||
// catching it fixes https://github.com/vis2k/Telepathy/pull/104
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// helper function to read EXACTLY 'n' bytes
|
||||
// -> default .Read reads up to 'n' bytes. this function reads exactly
|
||||
// 'n' bytes
|
||||
// -> this is blocking until 'n' bytes were received
|
||||
// -> immediately returns false in case of disconnects
|
||||
public static bool ReadExactly(this NetworkStream stream, byte[] buffer, int amount)
|
||||
{
|
||||
// there might not be enough bytes in the TCP buffer for .Read to read
|
||||
// the whole amount at once, so we need to keep trying until we have all
|
||||
// the bytes (blocking)
|
||||
//
|
||||
// note: this just is a faster version of reading one after another:
|
||||
// for (int i = 0; i < amount; ++i)
|
||||
// if (stream.Read(buffer, i, 1) == 0)
|
||||
// return false;
|
||||
// return true;
|
||||
int bytesRead = 0;
|
||||
while (bytesRead < amount)
|
||||
{
|
||||
// read up to 'remaining' bytes with the 'safe' read extension
|
||||
int remaining = amount - bytesRead;
|
||||
int result = stream.ReadSafely(buffer, bytesRead, remaining);
|
||||
|
||||
// .Read returns 0 if disconnected
|
||||
if (result == 0)
|
||||
return false;
|
||||
|
||||
// otherwise add to bytes read
|
||||
bytesRead += result;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 7a8076c43fa8d4d45831adae232d4d3c
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
34
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Pool.cs
Normal file
34
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Pool.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
// pool to avoid allocations. originally from libuv2k.
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public class Pool<T>
|
||||
{
|
||||
// objects
|
||||
readonly Stack<T> objects = new Stack<T>();
|
||||
|
||||
// some types might need additional parameters in their constructor, so
|
||||
// we use a Func<T> generator
|
||||
readonly Func<T> objectGenerator;
|
||||
|
||||
// constructor
|
||||
public Pool(Func<T> objectGenerator)
|
||||
{
|
||||
this.objectGenerator = objectGenerator;
|
||||
}
|
||||
|
||||
// take an element from the pool, or create a new one if empty
|
||||
public T Take() => objects.Count > 0 ? objects.Pop() : objectGenerator();
|
||||
|
||||
// return an element to the pool
|
||||
public void Return(T item) => objects.Push(item);
|
||||
|
||||
// clear the pool with the disposer function applied to each object
|
||||
public void Clear() => objects.Clear();
|
||||
|
||||
// count to see how many objects are in the pool. useful for tests.
|
||||
public int Count() => objects.Count;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 6d3e530f6872642ec81e9b8b76277c93
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
401
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Server.cs
Normal file
401
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Server.cs
Normal file
@@ -0,0 +1,401 @@
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public class Server : Common
|
||||
{
|
||||
// events to hook into
|
||||
// => OnData uses ArraySegment for allocation free receives later
|
||||
public Action<int> OnConnected;
|
||||
public Action<int, ArraySegment<byte>> OnData;
|
||||
public Action<int> OnDisconnected;
|
||||
|
||||
// listener
|
||||
public TcpListener listener;
|
||||
Thread listenerThread;
|
||||
|
||||
// disconnect if send queue gets too big.
|
||||
// -> avoids ever growing queue memory if network is slower than input
|
||||
// -> disconnecting is great for load balancing. better to disconnect
|
||||
// one connection than risking every connection / the whole server
|
||||
// -> huge queue would introduce multiple seconds of latency anyway
|
||||
//
|
||||
// Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
|
||||
// limit = 1,000 means 16 MB of memory/connection
|
||||
// limit = 10,000 means 160 MB of memory/connection
|
||||
public int SendQueueLimit = 10000;
|
||||
public int ReceiveQueueLimit = 10000;
|
||||
|
||||
// thread safe pipe for received messages
|
||||
// IMPORTANT: unfortunately using one pipe per connection is way slower
|
||||
// when testing 150 CCU. we need to use one pipe for all
|
||||
// connections. this scales beautifully.
|
||||
protected MagnificentReceivePipe receivePipe;
|
||||
|
||||
// pipe count, useful for debugging / benchmarks
|
||||
public int ReceivePipeTotalCount => receivePipe.TotalCount;
|
||||
|
||||
// clients with <connectionId, ConnectionState>
|
||||
readonly ConcurrentDictionary<int, ConnectionState> clients = new ConcurrentDictionary<int, ConnectionState>();
|
||||
|
||||
// connectionId counter
|
||||
int counter;
|
||||
|
||||
// public next id function in case someone needs to reserve an id
|
||||
// (e.g. if hostMode should always have 0 connection and external
|
||||
// connections should start at 1, etc.)
|
||||
public int NextConnectionId()
|
||||
{
|
||||
int id = Interlocked.Increment(ref counter);
|
||||
|
||||
// it's very unlikely that we reach the uint limit of 2 billion.
|
||||
// even with 1 new connection per second, this would take 68 years.
|
||||
// -> but if it happens, then we should throw an exception because
|
||||
// the caller probably should stop accepting clients.
|
||||
// -> it's hardly worth using 'bool Next(out id)' for that case
|
||||
// because it's just so unlikely.
|
||||
if (id == int.MaxValue)
|
||||
{
|
||||
throw new Exception("connection id limit reached: " + id);
|
||||
}
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
// check if the server is running
|
||||
public bool Active => listenerThread != null && listenerThread.IsAlive;
|
||||
|
||||
// constructor
|
||||
public Server(int MaxMessageSize) : base(MaxMessageSize) {}
|
||||
|
||||
// the listener thread's listen function
|
||||
// note: no maxConnections parameter. high level API should handle that.
|
||||
// (Transport can't send a 'too full' message anyway)
|
||||
void Listen(int port)
|
||||
{
|
||||
// absolutely must wrap with try/catch, otherwise thread
|
||||
// exceptions are silent
|
||||
try
|
||||
{
|
||||
// start listener on all IPv4 and IPv6 address via .Create
|
||||
listener = TcpListener.Create(port);
|
||||
listener.Server.NoDelay = NoDelay;
|
||||
// IMPORTANT: do not set send/receive timeouts on listener.
|
||||
// On linux setting the recv timeout will cause the blocking
|
||||
// Accept call to timeout with EACCEPT (which mono interprets
|
||||
// as EWOULDBLOCK).
|
||||
// https://stackoverflow.com/questions/1917814/eagain-error-for-accept-on-blocking-socket/1918118#1918118
|
||||
// => fixes https://github.com/vis2k/Mirror/issues/2695
|
||||
//
|
||||
//listener.Server.SendTimeout = SendTimeout;
|
||||
//listener.Server.ReceiveTimeout = ReceiveTimeout;
|
||||
listener.Start();
|
||||
Log.Info("Server: listening port=" + port);
|
||||
|
||||
// keep accepting new clients
|
||||
while (true)
|
||||
{
|
||||
// wait and accept new client
|
||||
// note: 'using' sucks here because it will try to
|
||||
// dispose after thread was started but we still need it
|
||||
// in the thread
|
||||
TcpClient client = listener.AcceptTcpClient();
|
||||
|
||||
// set socket options
|
||||
client.NoDelay = NoDelay;
|
||||
client.SendTimeout = SendTimeout;
|
||||
client.ReceiveTimeout = ReceiveTimeout;
|
||||
|
||||
// generate the next connection id (thread safely)
|
||||
int connectionId = NextConnectionId();
|
||||
|
||||
// add to dict immediately
|
||||
ConnectionState connection = new ConnectionState(client, MaxMessageSize);
|
||||
clients[connectionId] = connection;
|
||||
|
||||
// spawn a send thread for each client
|
||||
Thread sendThread = new Thread(() =>
|
||||
{
|
||||
// wrap in try-catch, otherwise Thread exceptions
|
||||
// are silent
|
||||
try
|
||||
{
|
||||
// run the send loop
|
||||
// IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
|
||||
ThreadFunctions.SendLoop(connectionId, client, connection.sendPipe, connection.sendPending);
|
||||
}
|
||||
catch (ThreadAbortException)
|
||||
{
|
||||
// happens on stop. don't log anything.
|
||||
// (we catch it in SendLoop too, but it still gets
|
||||
// through to here when aborting. don't show an
|
||||
// error.)
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
Log.Error("Server send thread exception: " + exception);
|
||||
}
|
||||
});
|
||||
sendThread.IsBackground = true;
|
||||
sendThread.Start();
|
||||
|
||||
// spawn a receive thread for each client
|
||||
Thread receiveThread = new Thread(() =>
|
||||
{
|
||||
// wrap in try-catch, otherwise Thread exceptions
|
||||
// are silent
|
||||
try
|
||||
{
|
||||
// run the receive loop
|
||||
// (receive pipe is shared across all loops)
|
||||
ThreadFunctions.ReceiveLoop(connectionId, client, MaxMessageSize, receivePipe, ReceiveQueueLimit);
|
||||
|
||||
// IMPORTANT: do NOT remove from clients after the
|
||||
// thread ends. need to do it in Tick() so that the
|
||||
// disconnect event in the pipe is still processed.
|
||||
// (removing client immediately would mean that the
|
||||
// pipe is lost and the disconnect event is never
|
||||
// processed)
|
||||
|
||||
// sendthread might be waiting on ManualResetEvent,
|
||||
// so let's make sure to end it if the connection
|
||||
// closed.
|
||||
// otherwise the send thread would only end if it's
|
||||
// actually sending data while the connection is
|
||||
// closed.
|
||||
sendThread.Interrupt();
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
Log.Error("Server client thread exception: " + exception);
|
||||
}
|
||||
});
|
||||
receiveThread.IsBackground = true;
|
||||
receiveThread.Start();
|
||||
}
|
||||
}
|
||||
catch (ThreadAbortException exception)
|
||||
{
|
||||
// UnityEditor causes AbortException if thread is still
|
||||
// running when we press Play again next time. that's okay.
|
||||
Log.Info("Server thread aborted. That's okay. " + exception);
|
||||
}
|
||||
catch (SocketException exception)
|
||||
{
|
||||
// calling StopServer will interrupt this thread with a
|
||||
// 'SocketException: interrupted'. that's okay.
|
||||
Log.Info("Server Thread stopped. That's okay. " + exception);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// something went wrong. probably important.
|
||||
Log.Error("Server Exception: " + exception);
|
||||
}
|
||||
}
|
||||
|
||||
// start listening for new connections in a background thread and spawn
|
||||
// a new thread for each one.
|
||||
public bool Start(int port)
|
||||
{
|
||||
// not if already started
|
||||
if (Active) return false;
|
||||
|
||||
// create receive pipe with max message size for pooling
|
||||
// => create new pipes every time!
|
||||
// if an old receive thread is still finishing up, it might still
|
||||
// be using the old pipes. we don't want to risk any old data for
|
||||
// our new start here.
|
||||
receivePipe = new MagnificentReceivePipe(MaxMessageSize);
|
||||
|
||||
// start the listener thread
|
||||
// (on low priority. if main thread is too busy then there is not
|
||||
// much value in accepting even more clients)
|
||||
Log.Info("Server: Start port=" + port);
|
||||
listenerThread = new Thread(() => { Listen(port); });
|
||||
listenerThread.IsBackground = true;
|
||||
listenerThread.Priority = ThreadPriority.BelowNormal;
|
||||
listenerThread.Start();
|
||||
return true;
|
||||
}
|
||||
|
||||
public void Stop()
|
||||
{
|
||||
// only if started
|
||||
if (!Active) return;
|
||||
|
||||
Log.Info("Server: stopping...");
|
||||
|
||||
// stop listening to connections so that no one can connect while we
|
||||
// close the client connections
|
||||
// (might be null if we call Stop so quickly after Start that the
|
||||
// thread was interrupted before even creating the listener)
|
||||
listener?.Stop();
|
||||
|
||||
// kill listener thread at all costs. only way to guarantee that
|
||||
// .Active is immediately false after Stop.
|
||||
// -> calling .Join would sometimes wait forever
|
||||
listenerThread?.Interrupt();
|
||||
listenerThread = null;
|
||||
|
||||
// close all client connections
|
||||
foreach (KeyValuePair<int, ConnectionState> kvp in clients)
|
||||
{
|
||||
TcpClient client = kvp.Value.client;
|
||||
// close the stream if not closed yet. it may have been closed
|
||||
// by a disconnect already, so use try/catch
|
||||
try { client.GetStream().Close(); } catch {}
|
||||
client.Close();
|
||||
}
|
||||
|
||||
// clear clients list
|
||||
clients.Clear();
|
||||
|
||||
// reset the counter in case we start up again so
|
||||
// clients get connection ID's starting from 1
|
||||
counter = 0;
|
||||
}
|
||||
|
||||
// send message to client using socket connection.
|
||||
// arraysegment for allocation free sends later.
|
||||
// -> the segment's array is only used until Send() returns!
|
||||
public bool Send(int connectionId, ArraySegment<byte> message)
|
||||
{
|
||||
// respect max message size to avoid allocation attacks.
|
||||
if (message.Count <= MaxMessageSize)
|
||||
{
|
||||
// find the connection
|
||||
if (clients.TryGetValue(connectionId, out ConnectionState connection))
|
||||
{
|
||||
// check send pipe limit
|
||||
if (connection.sendPipe.Count < SendQueueLimit)
|
||||
{
|
||||
// add to thread safe send pipe and return immediately.
|
||||
// calling Send here would be blocking (sometimes for long
|
||||
// times if other side lags or wire was disconnected)
|
||||
connection.sendPipe.Enqueue(message);
|
||||
connection.sendPending.Set(); // interrupt SendThread WaitOne()
|
||||
return true;
|
||||
}
|
||||
// disconnect if send queue gets too big.
|
||||
// -> avoids ever growing queue memory if network is slower
|
||||
// than input
|
||||
// -> disconnecting is great for load balancing. better to
|
||||
// disconnect one connection than risking every
|
||||
// connection / the whole server
|
||||
//
|
||||
// note: while SendThread always grabs the WHOLE send queue
|
||||
// immediately, it's still possible that the sending
|
||||
// blocks for so long that the send queue just gets
|
||||
// way too big. have a limit - better safe than sorry.
|
||||
else
|
||||
{
|
||||
// log the reason
|
||||
Log.Warning($"Server.Send: sendPipe for connection {connectionId} reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting this connection for load balancing.");
|
||||
|
||||
// just close it. send thread will take care of the rest.
|
||||
connection.client.Close();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// sending to an invalid connectionId is expected sometimes.
|
||||
// for example, if a client disconnects, the server might still
|
||||
// try to send for one frame before it calls GetNextMessages
|
||||
// again and realizes that a disconnect happened.
|
||||
// so let's not spam the console with log messages.
|
||||
//Logger.Log("Server.Send: invalid connectionId: " + connectionId);
|
||||
return false;
|
||||
}
|
||||
Log.Error("Server.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
|
||||
return false;
|
||||
}
|
||||
|
||||
// client's ip is sometimes needed by the server, e.g. for bans
|
||||
public string GetClientAddress(int connectionId)
|
||||
{
|
||||
// find the connection
|
||||
if (clients.TryGetValue(connectionId, out ConnectionState connection))
|
||||
{
|
||||
return ((IPEndPoint)connection.client.Client.RemoteEndPoint).Address.ToString();
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
// disconnect (kick) a client
|
||||
public bool Disconnect(int connectionId)
|
||||
{
|
||||
// find the connection
|
||||
if (clients.TryGetValue(connectionId, out ConnectionState connection))
|
||||
{
|
||||
// just close it. send thread will take care of the rest.
|
||||
connection.client.Close();
|
||||
Log.Info("Server.Disconnect connectionId:" + connectionId);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// tick: processes up to 'limit' messages for each connection
|
||||
// => limit parameter to avoid deadlocks / too long freezes if server or
|
||||
// client is too slow to process network load
|
||||
// => Mirror & DOTSNET need to have a process limit anyway.
|
||||
// might as well do it here and make life easier.
|
||||
// => returns amount of remaining messages to process, so the caller
|
||||
// can call tick again as many times as needed (or up to a limit)
|
||||
//
|
||||
// Tick() may process multiple messages, but Mirror needs a way to stop
|
||||
// processing immediately if a scene change messages arrives. Mirror
|
||||
// can't process any other messages during a scene change.
|
||||
// (could be useful for others too)
|
||||
// => make sure to allocate the lambda only once in transports
|
||||
public int Tick(int processLimit, Func<bool> checkEnabled = null)
|
||||
{
|
||||
// only if pipe was created yet (after start())
|
||||
if (receivePipe == null)
|
||||
return 0;
|
||||
|
||||
// process up to 'processLimit' messages for this connection
|
||||
for (int i = 0; i < processLimit; ++i)
|
||||
{
|
||||
// check enabled in case a Mirror scene message arrived
|
||||
if (checkEnabled != null && !checkEnabled())
|
||||
break;
|
||||
|
||||
// peek first. allows us to process the first queued entry while
|
||||
// still keeping the pooled byte[] alive by not removing anything.
|
||||
if (receivePipe.TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> message))
|
||||
{
|
||||
switch (eventType)
|
||||
{
|
||||
case EventType.Connected:
|
||||
OnConnected?.Invoke(connectionId);
|
||||
break;
|
||||
case EventType.Data:
|
||||
OnData?.Invoke(connectionId, message);
|
||||
break;
|
||||
case EventType.Disconnected:
|
||||
OnDisconnected?.Invoke(connectionId);
|
||||
// remove disconnected connection now that the final
|
||||
// disconnected message was processed.
|
||||
clients.TryRemove(connectionId, out ConnectionState _);
|
||||
break;
|
||||
}
|
||||
|
||||
// IMPORTANT: now dequeue and return it to pool AFTER we are
|
||||
// done processing the event.
|
||||
receivePipe.TryDequeue();
|
||||
}
|
||||
// no more messages. stop the loop.
|
||||
else break;
|
||||
}
|
||||
|
||||
// return what's left to process for next time
|
||||
return receivePipe.TotalCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: fb98a16841ccc4338a7e0b4e59136563
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"name": "Telepathy",
|
||||
"references": [],
|
||||
"optionalUnityReferences": [],
|
||||
"includePlatforms": [],
|
||||
"excludePlatforms": [],
|
||||
"allowUnsafeCode": false,
|
||||
"overrideReferences": false,
|
||||
"precompiledReferences": [],
|
||||
"autoReferenced": true,
|
||||
"defineConstraints": []
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 725ee7191c021de4dbf9269590ded755
|
||||
AssemblyDefinitionImporter:
|
||||
externalObjects: {}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
@@ -0,0 +1,244 @@
|
||||
// IMPORTANT
|
||||
// force all thread functions to be STATIC.
|
||||
// => Common.Send/ReceiveLoop is EXTREMELY DANGEROUS because it's too easy to
|
||||
// accidentally share Common state between threads.
|
||||
// => header buffer, payload etc. were accidentally shared once after changing
|
||||
// the thread functions from static to non static
|
||||
// => C# does not automatically detect data races. best we can do is move all of
|
||||
// our thread code into static functions and pass all state into them
|
||||
//
|
||||
// let's even keep them in a STATIC CLASS so it's 100% obvious that this should
|
||||
// NOT EVER be changed to non static!
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading;
|
||||
|
||||
namespace Telepathy
|
||||
{
|
||||
public static class ThreadFunctions
|
||||
{
|
||||
// send message (via stream) with the <size,content> message structure
|
||||
// this function is blocking sometimes!
|
||||
// (e.g. if someone has high latency or wire was cut off)
|
||||
// -> payload is of multiple <<size, content, size, content, ...> parts
|
||||
public static bool SendMessagesBlocking(NetworkStream stream, byte[] payload, int packetSize)
|
||||
{
|
||||
// stream.Write throws exceptions if client sends with high
|
||||
// frequency and the server stops
|
||||
try
|
||||
{
|
||||
// write the whole thing
|
||||
stream.Write(payload, 0, packetSize);
|
||||
return true;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// log as regular message because servers do shut down sometimes
|
||||
Log.Info("Send: stream.Write exception: " + exception);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// read message (via stream) blocking.
|
||||
// writes into byte[] and returns bytes written to avoid allocations.
|
||||
public static bool ReadMessageBlocking(NetworkStream stream, int MaxMessageSize, byte[] headerBuffer, byte[] payloadBuffer, out int size)
|
||||
{
|
||||
size = 0;
|
||||
|
||||
// buffer needs to be of Header + MaxMessageSize
|
||||
if (payloadBuffer.Length != 4 + MaxMessageSize)
|
||||
{
|
||||
Log.Error($"ReadMessageBlocking: payloadBuffer needs to be of size 4 + MaxMessageSize = {4 + MaxMessageSize} instead of {payloadBuffer.Length}");
|
||||
return false;
|
||||
}
|
||||
|
||||
// read exactly 4 bytes for header (blocking)
|
||||
if (!stream.ReadExactly(headerBuffer, 4))
|
||||
return false;
|
||||
|
||||
// convert to int
|
||||
size = Utils.BytesToIntBigEndian(headerBuffer);
|
||||
|
||||
// protect against allocation attacks. an attacker might send
|
||||
// multiple fake '2GB header' packets in a row, causing the server
|
||||
// to allocate multiple 2GB byte arrays and run out of memory.
|
||||
//
|
||||
// also protect against size <= 0 which would cause issues
|
||||
if (size > 0 && size <= MaxMessageSize)
|
||||
{
|
||||
// read exactly 'size' bytes for content (blocking)
|
||||
return stream.ReadExactly(payloadBuffer, size);
|
||||
}
|
||||
Log.Warning("ReadMessageBlocking: possible header attack with a header of: " + size + " bytes.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// thread receive function is the same for client and server's clients
|
||||
public static void ReceiveLoop(int connectionId, TcpClient client, int MaxMessageSize, MagnificentReceivePipe receivePipe, int QueueLimit)
|
||||
{
|
||||
// get NetworkStream from client
|
||||
NetworkStream stream = client.GetStream();
|
||||
|
||||
// every receive loop needs it's own receive buffer of
|
||||
// HeaderSize + MaxMessageSize
|
||||
// to avoid runtime allocations.
|
||||
//
|
||||
// IMPORTANT: DO NOT make this a member, otherwise every connection
|
||||
// on the server would use the same buffer simulatenously
|
||||
byte[] receiveBuffer = new byte[4 + MaxMessageSize];
|
||||
|
||||
// avoid header[4] allocations
|
||||
//
|
||||
// IMPORTANT: DO NOT make this a member, otherwise every connection
|
||||
// on the server would use the same buffer simulatenously
|
||||
byte[] headerBuffer = new byte[4];
|
||||
|
||||
// absolutely must wrap with try/catch, otherwise thread exceptions
|
||||
// are silent
|
||||
try
|
||||
{
|
||||
// add connected event to pipe
|
||||
receivePipe.Enqueue(connectionId, EventType.Connected, default);
|
||||
|
||||
// let's talk about reading data.
|
||||
// -> normally we would read as much as possible and then
|
||||
// extract as many <size,content>,<size,content> messages
|
||||
// as we received this time. this is really complicated
|
||||
// and expensive to do though
|
||||
// -> instead we use a trick:
|
||||
// Read(2) -> size
|
||||
// Read(size) -> content
|
||||
// repeat
|
||||
// Read is blocking, but it doesn't matter since the
|
||||
// best thing to do until the full message arrives,
|
||||
// is to wait.
|
||||
// => this is the most elegant AND fast solution.
|
||||
// + no resizing
|
||||
// + no extra allocations, just one for the content
|
||||
// + no crazy extraction logic
|
||||
while (true)
|
||||
{
|
||||
// read the next message (blocking) or stop if stream closed
|
||||
if (!ReadMessageBlocking(stream, MaxMessageSize, headerBuffer, receiveBuffer, out int size))
|
||||
// break instead of return so stream close still happens!
|
||||
break;
|
||||
|
||||
// create arraysegment for the read message
|
||||
ArraySegment<byte> message = new ArraySegment<byte>(receiveBuffer, 0, size);
|
||||
|
||||
// send to main thread via pipe
|
||||
// -> it'll copy the message internally so we can reuse the
|
||||
// receive buffer for next read!
|
||||
receivePipe.Enqueue(connectionId, EventType.Data, message);
|
||||
|
||||
// disconnect if receive pipe gets too big for this connectionId.
|
||||
// -> avoids ever growing queue memory if network is slower
|
||||
// than input
|
||||
// -> disconnecting is great for load balancing. better to
|
||||
// disconnect one connection than risking every
|
||||
// connection / the whole server
|
||||
if (receivePipe.Count(connectionId) >= QueueLimit)
|
||||
{
|
||||
// log the reason
|
||||
Log.Warning($"receivePipe reached limit of {QueueLimit} for connectionId {connectionId}. This can happen if network messages come in way faster than we manage to process them. Disconnecting this connection for load balancing.");
|
||||
|
||||
// IMPORTANT: do NOT clear the whole queue. we use one
|
||||
// queue for all connections.
|
||||
//receivePipe.Clear();
|
||||
|
||||
// just break. the finally{} will close everything.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// something went wrong. the thread was interrupted or the
|
||||
// connection closed or we closed our own connection or ...
|
||||
// -> either way we should stop gracefully
|
||||
Log.Info("ReceiveLoop: finished receive function for connectionId=" + connectionId + " reason: " + exception);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// clean up no matter what
|
||||
stream.Close();
|
||||
client.Close();
|
||||
|
||||
// add 'Disconnected' message after disconnecting properly.
|
||||
// -> always AFTER closing the streams to avoid a race condition
|
||||
// where Disconnected -> Reconnect wouldn't work because
|
||||
// Connected is still true for a short moment before the stream
|
||||
// would be closed.
|
||||
receivePipe.Enqueue(connectionId, EventType.Disconnected, default);
|
||||
}
|
||||
}
|
||||
// thread send function
|
||||
// note: we really do need one per connection, so that if one connection
|
||||
// blocks, the rest will still continue to get sends
|
||||
public static void SendLoop(int connectionId, TcpClient client, MagnificentSendPipe sendPipe, ManualResetEvent sendPending)
|
||||
{
|
||||
// get NetworkStream from client
|
||||
NetworkStream stream = client.GetStream();
|
||||
|
||||
// avoid payload[packetSize] allocations. size increases dynamically as
|
||||
// needed for batching.
|
||||
//
|
||||
// IMPORTANT: DO NOT make this a member, otherwise every connection
|
||||
// on the server would use the same buffer simulatenously
|
||||
byte[] payload = null;
|
||||
|
||||
try
|
||||
{
|
||||
while (client.Connected) // try this. client will get closed eventually.
|
||||
{
|
||||
// reset ManualResetEvent before we do anything else. this
|
||||
// way there is no race condition. if Send() is called again
|
||||
// while in here then it will be properly detected next time
|
||||
// -> otherwise Send might be called right after dequeue but
|
||||
// before .Reset, which would completely ignore it until
|
||||
// the next Send call.
|
||||
sendPending.Reset(); // WaitOne() blocks until .Set() again
|
||||
|
||||
// dequeue & serialize all
|
||||
// a locked{} TryDequeueAll is twice as fast as
|
||||
// ConcurrentQueue, see SafeQueue.cs!
|
||||
if (sendPipe.DequeueAndSerializeAll(ref payload, out int packetSize))
|
||||
{
|
||||
// send messages (blocking) or stop if stream is closed
|
||||
if (!SendMessagesBlocking(stream, payload, packetSize))
|
||||
// break instead of return so stream close still happens!
|
||||
break;
|
||||
}
|
||||
|
||||
// don't choke up the CPU: wait until queue not empty anymore
|
||||
sendPending.WaitOne();
|
||||
}
|
||||
}
|
||||
catch (ThreadAbortException)
|
||||
{
|
||||
// happens on stop. don't log anything.
|
||||
}
|
||||
catch (ThreadInterruptedException)
|
||||
{
|
||||
// happens if receive thread interrupts send thread.
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// something went wrong. the thread was interrupted or the
|
||||
// connection closed or we closed our own connection or ...
|
||||
// -> either way we should stop gracefully
|
||||
Log.Info("SendLoop Exception: connectionId=" + connectionId + " reason: " + exception);
|
||||
}
|
||||
finally
|
||||
{
|
||||
// clean up no matter what
|
||||
// we might get SocketExceptions when sending if the 'host has
|
||||
// failed to respond' - in which case we should close the connection
|
||||
// which causes the ReceiveLoop to end and fire the Disconnected
|
||||
// message. otherwise the connection would stay alive forever even
|
||||
// though we can't send anymore.
|
||||
stream.Close();
|
||||
client.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: d01598bf851164dc48a24c26913460b9
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
23
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Utils.cs
Normal file
23
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/Utils.cs
Normal file
@@ -0,0 +1,23 @@
|
||||
namespace Telepathy
|
||||
{
|
||||
public static class Utils
|
||||
{
|
||||
// IntToBytes version that doesn't allocate a new byte[4] each time.
|
||||
// -> important for MMO scale networking performance.
|
||||
public static void IntToBytesBigEndianNonAlloc(int value, byte[] bytes, int offset = 0)
|
||||
{
|
||||
bytes[offset + 0] = (byte)(value >> 24);
|
||||
bytes[offset + 1] = (byte)(value >> 16);
|
||||
bytes[offset + 2] = (byte)(value >> 8);
|
||||
bytes[offset + 3] = (byte)value;
|
||||
}
|
||||
|
||||
public static int BytesToIntBigEndian(byte[] bytes)
|
||||
{
|
||||
return (bytes[0] << 24) |
|
||||
(bytes[1] << 16) |
|
||||
(bytes[2] << 8) |
|
||||
bytes[3];
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
fileFormatVersion: 2
|
||||
guid: 951d08c05297f4b3e8feb5bfcab86531
|
||||
MonoImporter:
|
||||
externalObjects: {}
|
||||
serializedVersion: 2
|
||||
defaultReferences: []
|
||||
executionOrder: 0
|
||||
icon: {instanceID: 0}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
62
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/VERSION
Normal file
62
Assets/Mirror/Runtime/Transport/Telepathy/Telepathy/VERSION
Normal file
@@ -0,0 +1,62 @@
|
||||
V1.8 [2021-06-02]
|
||||
- fix: Do not set timeouts on listener (fixes https://github.com/vis2k/Mirror/issues/2695)
|
||||
- fix: #104 - ReadSafely now catches ObjectDisposedException too
|
||||
|
||||
V1.7 [2021-02-20]
|
||||
- ReceiveTimeout: disabled by default for cases where people use Telepathy by
|
||||
itself without pings etc.
|
||||
|
||||
V1.6 [2021-02-10]
|
||||
- configurable ReceiveTimeout to avoid TCPs high default timeout
|
||||
- Server/Client receive queue limit now disconnects instead of showing a
|
||||
warning. this is necessary for load balancing to avoid situations where one
|
||||
spamming connection might fill the queue and slow down everyone else.
|
||||
|
||||
V1.5 [2021-02-05]
|
||||
- fix: client data races & flaky tests fixed by creating a new client state
|
||||
object every time we connect. fixes data race where an old dieing thread
|
||||
might still try to modify the current state
|
||||
- fix: Client.ReceiveThreadFunction catches and ignores ObjectDisposedException
|
||||
which can happen if Disconnect() closes and disposes the client, while the
|
||||
ReceiveThread just starts up and still uses the client.
|
||||
- Server/Client Tick() optional enabled check for Mirror scene changing
|
||||
|
||||
V1.4 [2021-02-03]
|
||||
- Server/Client.Tick: limit parameter added to process up to 'limit' messages.
|
||||
makes Mirror & DOTSNET transports easier to implement
|
||||
- stability: Server/Client send queue limit disconnects instead of showing a
|
||||
warning. allows for load balancing. better to kick one connection and keep
|
||||
the server running than slowing everything down for everyone.
|
||||
|
||||
V1.3 [2021-02-02]
|
||||
- perf: ReceivePipe: byte[] pool for allocation free receives (╯°□°)╯︵ ┻━┻
|
||||
- fix: header buffer, payload buffer data races because they were made non
|
||||
static earlier. server threads would all access the same ones.
|
||||
=> all threaded code was moved into a static ThreadFunctions class to make it
|
||||
100% obvious that there should be no shared state in the future
|
||||
|
||||
V1.2 [2021-02-02]
|
||||
- Client/Server Tick & OnConnected/OnData/OnDisconnected events instead of
|
||||
having the outside process messages via GetNextMessage. That's easier for
|
||||
Mirror/DOTSNET and allows for allocation free data message processing later.
|
||||
- MagnificientSend/RecvPipe to shield Telepathy from all the complexity
|
||||
- perf: SendPipe: byte[] pool for allocation free sends (╯°□°)╯︵ ┻━┻
|
||||
|
||||
V1.1 [2021-02-01]
|
||||
- stability: added more tests
|
||||
- breaking: Server/Client.Send: ArraySegment parameter and copy internally so
|
||||
that Transports don't need to worry about it
|
||||
- perf: Buffer.BlockCopy instead of Array.Copy
|
||||
- perf: SendMessageBlocking puts message header directly into payload now
|
||||
- perf: receiveQueues use SafeQueue instead of ConcurrentQueue to avoid
|
||||
allocations
|
||||
- Common: removed static state
|
||||
- perf: SafeQueue.TryDequeueAll: avoid queue.ToArray() allocations. copy into a
|
||||
list instead.
|
||||
- Logger.Log/LogWarning/LogError renamed to Log.Info/Warning/Error
|
||||
- MaxMessageSize is now specified in constructor to prepare for pooling
|
||||
- flaky tests are ignored for now
|
||||
- smaller improvements
|
||||
|
||||
V1.0
|
||||
- first stable release
|
||||
@@ -0,0 +1,7 @@
|
||||
fileFormatVersion: 2
|
||||
guid: d942af06608be434dbeeaa58207d20bd
|
||||
DefaultImporter:
|
||||
externalObjects: {}
|
||||
userData:
|
||||
assetBundleName:
|
||||
assetBundleVariant:
|
||||
Reference in New Issue
Block a user