How to write a full duplex server in Go

How to write a full duplex server in Go

Full duplex connections lead to enhanced user experiences, improved application performance, and optimized use of network resources

In the previous article How to write a concurrent TCP server in Go we saw how to implement a concurrent TCP server in Go. This time we are going to see how to take the server to the next level and allow it to broadcast messages to its clients. That way, we can start a communication from either end of the connection. Thus, ending up in a full duplex scenario.

We are going to implement a mechanism that allows the server to send messages to all of its clients at the same time. If we wanted to send messages to a specific client, it would just be a matter of keeping track of an ID for each client associated with their corresponding connection. And use that mapping to send the messages to a specific client.

Client connection

We will start by modeling each client connection with a reference to the server, a reference to the actual network connection and a buffered channel of 256 strings. This channel will be used to store responses as a buffer before sending them back to the client. That way, we can detach the process of sending messages back to the clients from the process of handling their requests.


type connection struct {
    s         *server
    conn      net.Conn
    responses chan string
}

func newConnection(s *server, conn net.Conn) *connection {
    var c connection
    c.s = s
    c.conn = conn
    c.responses = make(chan string, 256)
    return &c
}

The client connection processing will be composed of two goroutines. One of them will be in charge of reading requests from the client, and sending them to the server. Each client request will be a text line.


func (c *connection) readConnection() {
    defer c.s.removeConnection(c)

    buf := bufio.NewReader(c.conn)

    for {
        data, err := buf.ReadString('\n')
        if err != nil {
            break
        }
        c.s.submitRequest(c, data)
    }
}

The other goroutine will iterate over the responses channel, and every time a new response is added to the channel, this goroutine will send it to the client through the network connection. It will keep iterating over the responses channel, so in case there are no more responses ready to be sent back to the client, this goroutine will wait until a new one gets to the channel.


func (c *connection) writeConnection() {
    for message := range c.responses {
        c.conn.Write([]byte(message))
    }
}

Server

The server object will keep the client connections in a sync.Map. This is needed because the list of connections can be accessed from different goroutines at the same time: the ones that add and remove references to the map, and the ones that broadcast responses to the clients. Similarly to the client connection, the server will contain a channel of 256 requests. That way, it can buffer the requests coming from the clients without the need to block any of them. It will also contain a handle function that allows the user to specify a function to process the requests.


type server struct {
    connections sync.Map
    handle      handleFn
    requests    chan *request
}

func newServer(handle handleFn) *server {
    var s server
    s.handle = handle
    s.requests = make(chan *request, 256)
    return &s
}

The server processing will be composed of a goroutine that accepts new network connections and stores them in the sync.Map.


func (s *server) serve(network, address string) {
    l, err := net.Listen(network, address)
    if err != nil {
        log.Fatal(err)
    }
    defer l.Close()

    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        connection := newConnection(s, c)
        s.connections.Store(c.RemoteAddr().String(), connection)
        connection.start()

        log.Printf("New connection %s", c.RemoteAddr().String())
    }
}

It will also contain several goroutines that handle client requests. This number of goroutines is the same as the number of CPUs available in the server machine. Each one of these worker goroutines will execute the handle function of the server to process the requests.


func (s *server) start(network, address string) {
    go s.serve(network, address)

    numCpu := runtime.NumCPU()
    for i := 0; i < numCpu; i++ {
        go s.worker()
    }
}

func (s *server) worker() {
    for req := range s.requests {
        s.handle(req.c, req.data)
    }
}

The server will also expose a method to broadcast messages to the clients. This method will iterate over the client connections sync.Map, and will send the message to each client. That message will then be queued in the requests channel of the client connection.


func (s *server) broadcast(message string) {
    s.connections.Range(func(k, v interface{}) bool {
        c := v.(*connection)
        c.send(message)
        return true
    })
}

Complete code

After going over the main parts of the example, here you have the complete code of the server.


package main

import (
    "bufio"
    "fmt"
    "log"
    "net"
    "os"
    "runtime"
    "sync"
)

type connection struct {
    s         *server
    conn      net.Conn
    responses chan string
}

func newConnection(s *server, conn net.Conn) *connection {
    var c connection
    c.s = s
    c.conn = conn
    c.responses = make(chan string, 256)
    return &c
}

func (c *connection) start() {
    go c.readConnection()
    go c.writeConnection()
}

func (c *connection) stop() {
    close(c.responses)
    c.conn.Close()
}

func (c *connection) send(data string) {
    c.responses <- data
}

func (c *connection) readConnection() {
    defer c.s.removeConnection(c)

    buf := bufio.NewReader(c.conn)

    for {
        data, err := buf.ReadString('\n')
        if err != nil {
            break
        }
        c.s.submitRequest(c, data)
    }
}

func (c *connection) writeConnection() {
    for message := range c.responses {
        c.conn.Write([]byte(message))
    }
}

type request struct {
    c    *connection
    data string
}

type handleFn func(*connection, string)

type server struct {
    connections sync.Map
    handle      handleFn
    requests    chan *request
}

func newServer(handle handleFn) *server {
    var s server
    s.handle = handle
    s.requests = make(chan *request, 256)
    return &s
}

func (s *server) submitRequest(c *connection, data string) {
    req := request{c, data}
    s.requests <- &req
}

func (s *server) start(network, address string) {
    go s.serve(network, address)

    numCpu := runtime.NumCPU()
    for i := 0; i < numCpu; i++ {
        go s.worker()
    }
}

func (s *server) worker() {
    for req := range s.requests {
        s.handle(req.c, req.data)
    }
}

func (s *server) serve(network, address string) {
    l, err := net.Listen(network, address)
    if err != nil {
        log.Fatal(err)
    }
    defer l.Close()

    for {
        c, err := l.Accept()
        if err != nil {
            log.Fatal(err)
        }
        connection := newConnection(s, c)
        s.connections.Store(c.RemoteAddr().String(), connection)
        connection.start()

        log.Printf("New connection %s", c.RemoteAddr().String())
    }
}

func (s *server) removeConnection(c *connection) {
    c.stop()
    s.connections.Delete(c.conn.RemoteAddr().String())

    log.Printf("Closed connection %s", c.conn.RemoteAddr().String())
}

func (s *server) broadcast(message string) {
    s.connections.Range(func(k, v interface{}) bool {
        c := v.(*connection)
        c.send(message)
        return true
    })
}

func main() {
    arguments := os.Args
    if len(arguments) != 3 {
        log.Fatal("Usage: server <network> <address>")
    }

    network := arguments[1]
    address := ":" + arguments[2]

    s := newServer(func(c *connection, request string) {
        c.send(request)
    })
    s.start(network, address)

    fmt.Print("Enter message: \n")
    reader := bufio.NewReader(os.Stdin)
    for {
        text, _ := reader.ReadString('\n')
        s.broadcast(text)
    }
}

Testing the example

We can test the server by running it on a terminal specifying the kind of network for the sockets, which can be TCP or UNIX.

TCP sockets example

We start the sever in one terminal, and test it from another terminal using netcat (nc).


# TCP Server
./server tcp <port>

# TCP Client
nc localhost <port>

As soon as we type any line on the netcat terminal, we will see an echo message coming back from the server.


>nc localhost 8080
test1
test1
test2
test2

From the server terminal we can also broadcast messages to the connected clients. If type a line on the server terminal, we will see it appearing on the client terminal.


>./main tcp 8080
Enter message:
2023/08/11 12:32:50 New connection 127.0.0.1:34498
message1

>nc localhost 8080
test1
test1
test2
test2
message1

UNIX Domain Sockets example

For the case of UNIX sockets it is pretty similar. It is just a matter of starting the server in one terminal using the unix as network type.

# UNIX Server
./server unix <socket>
# example
./server unix test

# UNIX Client
nc -U <socket>
# example:
nc -U :test

Conclusion

In this article, we have seen an example of how to code a simple full duplex server that can be used with TCP or UNIX Domain Sockets. Unlike half-duplex connections, where data can only flow in one direction at a time, a full duplex connection allows for seamless and real-time exchange of information in both directions concurrently. This capability enhances the efficiency and speed of communication, making it ideal for applications such as video conferencing, online gaming, and data-intensive tasks.