none
Probleme mit TCP Server RRS feed

  • Frage

  • Hallo zusammen,

    ich habe mir eine kleine Server Klasse geschrieben die alle Daten die sie empfängt an alle bekannten Client´s schickt.

    Dies klappt allerdings nicht zu 100%. In einigen Testläufen hat alles geklappt, deswegen gehe ich nicht davon aus das die einzelnen Methoden logische Fehler haben. 

    Da ich ein absoluter Neuling in Sachen Thread´s und asynchroner Programmierung bin schätze ich mal das der Fehler dort liegt...

    Allerdings weis ich im Moment auch nicht wo ich den Fehler suchen sollte. 

    Hat da jemand vielleicht eine Idee?

    Schon mal danke im Voraus :)

        class TCPServer
        {
            /*
             * Server versorgt die Clients über den Port 6000 mit den Daten und empfängt auf der 8000. Client gegensätzlich dazu.
             * Der BackgroundWorker führt die Endlosschleife asynchron im Hintergrund aus.
             *
             * */
    
            BackgroundWorker bgWorker = new BackgroundWorker();
            bool close = false;
            TcpListener listner = null;
            private List<string> clients = new List<string>();
            private List<EinheitServer> units;
    
            public TCPServer()
            {
                listner = new TcpListener(IPAddress.Any, 6000);
                this.units = new List<EinheitServer>();
    
                bgWorker.DoWork += new DoWorkEventHandler(DoWork);
                bgWorker.WorkerReportsProgress = true;
                bgWorker.WorkerSupportsCancellation = true;
                bgWorker.RunWorkerAsync();
            }       
            /*
             Sendet die Liste "units" an alle bekannten IP Adressen
                 */
            public async void Send()
            {
                foreach (string str in clients)
                {
                    bool exception = false;
                    try
                    {
                        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                        {
                            s.Connect(IPAddress.Parse(str), 8000);
                            var sendString = Encoding.Unicode.GetBytes(Serialize(this.units));
    
                            using (NetworkStream ns = new NetworkStream(s))
                            {
                                await ns.WriteAsync(sendString, 0, sendString.Length);
                                s.Shutdown(SocketShutdown.Send);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        exception = true;
                        Console.WriteLine(ex.Message);
                    }
                    if (!exception)
                    {
                        Console.WriteLine("Send: {1} Units", str, this.units.Count);
                    }
                }
            }
            public static string Serialize<T>(T value)
            {
               //...
            }
            public static T Deserialize<T>(string xml)
            {
               //.....
            }
    
            /*
            Vergleich die alte Liste mit Einheiten mit der neuen Liste die der Server gerad empfangen hat. Bestehende Einheiten erhalten den neuen Status, fehlende Werden ergänzt.
            */
            private void ForgeLists(List<EinheitServer> NewUnits)
            {
                foreach (EinheitServer e in NewUnits)
                {
                    EinheitServer tmp = units.Find(x => x.Name == e.Name);
                    if (tmp == null)
                    {
                        this.units.Add(e);
                    }
                    else
                    {
                        tmp.Status = e.Status;                   
                    }
                }
            }
    
            private async void DoWork(object sender, DoWorkEventArgs e)
            {
                try
                {               
                    if (!close)
                    {
                        listner.Start();
                        while (!close)
                        {
    
                            TcpClient tcpClient = await listner.AcceptTcpClientAsync();
    
                            var ns = tcpClient.GetStream();
                            string data = string.Empty;
    
                            using (MemoryStream ms = new MemoryStream())
                            {                            
                                await ns.CopyToAsync(ms);                                                 
    
                                var tForge = Task.Run(() => ForgeLists(Deserialize<List<EinheitServer>>(Encoding.Unicode.GetString(ms.ToArray()))));
                                tForge.Wait(); // Warte darauf das Daten abgeglichen werden.
                                                           
                                string ip = clients.Find(x => x == ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString());
                                if (ip == null)
                                {
                                    clients.Add(((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString());
                                }
                                var tSenden = Task.Run(() => Send());
                                tSenden.Wait(); // Danach soll an alle bekannten IPs die neuen Daten gesendet werden. 
                            }                      
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    


    • Bearbeitet Chilltok Freitag, 20. Januar 2017 12:22
    Freitag, 20. Januar 2017 12:20

Antworten

  • Gleich mal vorab: Das ist etwas doppelt-gemoppelt: Du verwendest Async und einen Background-Worker. Unabhängig von dem Konzept, das man im allgemeinen diskutieren könnte, hier ein Vorschlag welches zu Deinem Backgroundworker-Konzept passt: 

    Was mir aufgefallen ist: Du empfängst uns sendest die Daten in dem gleichen Thread (Backogrundworker). Solange Du nicht viele Clients hast und es keine Timeouts gibt, ist das kein Problem - so wie in Deiner Testumgebung.

    Ich würde zumindest das Senden und Empfangen in separate Threads auslagern (Optimal währe es natürlich, jede einzelne Kommunikatoin in einem eigenen Thread auszuführen). Um jedoch Dein Konzept mit Backgroundworkern fortzusetzen, würde ich einen 2. Worker implementieren, welcher die Daten versendet.

    Eine Job-Queue kann die Daten zwischen den Workern austauschen. Das könnte z.B. so aussehen:

        class TCPServer
        {
            public class EinheitServer
            {
                public String Name;
                public String Status;
            }
    
            public class Job
            {
                public List<string> Clients = new List<string>();
                public List<EinheitServer> Units = new List<EinheitServer>();
            }
    
            /*
             * Server versorgt die Clients über den Port 6000 mit den Daten und empfängt auf der 8000. Client gegensätzlich dazu.
             * Der BackgroundWorker führt die Endlosschleife asynchron im Hintergrund aus.
             *
             * */
    
    
            List<Job> JobQueue = new List<Job>();
            BackgroundWorker bgWorkerReceive, bgWorkerSend;
            bool close = false;
            TcpListener listner = null;
            private List<string> clients = new List<string>();
            private List<EinheitServer> units;
    
            public TCPServer()
            {
                listner = new TcpListener(IPAddress.Any, 6000);
                this.units = new List<EinheitServer>();
    
                //Worker, welcher Daten empfängt
                bgWorkerReceive = new BackgroundWorker();
                bgWorkerReceive.DoWork += new DoWorkEventHandler(DoWork_Receive);
                bgWorkerReceive.WorkerReportsProgress = true;
                bgWorkerReceive.WorkerSupportsCancellation = true;
                bgWorkerReceive.RunWorkerAsync();
    
                //Worker, welcher Daten 
                bgWorkerSend = new BackgroundWorker();
                bgWorkerSend.DoWork += new DoWorkEventHandler(DoWork_Send);
                bgWorkerSend.WorkerReportsProgress = true;
                bgWorkerSend.WorkerSupportsCancellation = true;
                bgWorkerSend.RunWorkerAsync();
    
                //Worker, welcher Daten sendet
            }
            /*
             Sendet die Liste "units" an alle bekannten IP Adressen
                 */
            public void Send(List<string> Clients, List<EinheitServer> Units)
            {
                foreach (string str in Clients)
                {
                    bool exception = false;
                    try
                    {
                        using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
                        {
                            s.Connect(IPAddress.Parse(str), 8000);
                            var sendString = Encoding.Unicode.GetBytes(Serialize(Units));
    
                            using (NetworkStream ns = new NetworkStream(s))
                            {                            
                                ns.Write(sendString, 0, sendString.Length);
                                s.Shutdown(SocketShutdown.Send);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        exception = true;
                        Console.WriteLine(ex.Message);
                    }
                    if (!exception)
                    {
                        Console.WriteLine("Send: {1} Units", str, Units.Count);
                    }
                }
            }
            public static string Serialize<T>(T value)
            {
                throw new NotImplementedException();
            }
            public static T Deserialize<T>(string xml)
            {
                throw new NotImplementedException();
            }
    
            /*
            Vergleich die alte Liste mit Einheiten mit der neuen Liste die der Server gerad empfangen hat. Bestehende Einheiten erhalten den neuen Status, fehlende Werden ergänzt.
            */
            private void ForgeLists(List<EinheitServer> NewUnits)
            {
                foreach (EinheitServer e in NewUnits)
                {
                    EinheitServer tmp = units.Find(x => x.Name == e.Name);
                    if (tmp == null)
                    {
                        this.units.Add(e);
                    }
                    else
                    {
                        tmp.Status = e.Status;
                    }
                }
            }
    
    
            private async void DoWork_Receive(object sender, DoWorkEventArgs e)
            {
                try
                {
                    if (!close)
                    {
                        listner.Start();
                        while (!close)
                        {
    
                            TcpClient tcpClient = await listner.AcceptTcpClientAsync();
    
                            var ns = tcpClient.GetStream();
                            string data = string.Empty;
    
                            using (MemoryStream ms = new MemoryStream())
                            {
                                await ns.CopyToAsync(ms);
    
                                var tForge = Task.Run(() => ForgeLists(Deserialize<List<EinheitServer>>(Encoding.Unicode.GetString(ms.ToArray()))));
                                tForge.Wait(); // Warte darauf das Daten abgeglichen werden.
    
                                string ip = clients.Find(x => x == ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString());
                                if (ip == null)
                                {
                                    clients.Add(((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address.ToString());
                                }
    
    
                                //Daten nicht senden, sondern nur speichern
                                lock (JobQueue)
                                {
                                    JobQueue.Add(new Job()
                                    {
                                        Clients = clients.ToList(), //Kopie der der Listen verwenden, um 
                                        Units = units.ToList()      //Konflikte zu vermeiden
                                    });
                                }
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
    
            private async void DoWork_Send(object sender, DoWorkEventArgs e)
            {
                try
                {
                    while (!close)
                    {
                        //Job aus der Queue holen
                        System.Threading.Thread.Sleep(100);
                        Job nextJob = null;
                        lock (JobQueue)
                        {
                            if (JobQueue.Count > 0)
                            {
                                nextJob = JobQueue[0];
                                JobQueue.RemoveAt(0);
                            }
                        }
    
                        //Job verarbeiten, falls es einen gibt
                        if (nextJob != null)                                        
                            Send(nextJob.Clients, nextJob.Units);                    
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }


    Montag, 23. Januar 2017 08:02