Scaling massive, real-time data pipelines with Go

Scaling massive, real-time data pipelines with Go

Do you like our work......we hire!

Never miss our publications about Open Source, big data and distributed systems, low frequency of one email every two months.

Last week at the Open Source Summit in Prague, Jean de Klerk held a talk called Scaling massive, real-time data pipelines with Go.

This article goes over the main points of the talk, detailing the steps Jean went through when optimising his pipelines, explaining critical parts of his code and reproducing his benchmark results.

All of Jean’s code, as well as the slides he used during his talk, are available in this Github repository.

What is Go?

Go, or Golang, is a programming language created by Google in 2007. Like C or Algol, it is a compiled, statically typed language. It provides garbage collection and supports concurrency at the language level. Go’s syntax is straightforward and easy to understand, and it compiles very quickly.

Because of its core features, Go is best applied to networking, systems, and concurrency. As Mark C. Chu-Caroll put it:

Goroutines and channels provide the best support I’ve seen outside of Erlang for making use of concurrency. And frankly, I think Go is a lot less ugly than Erlang. (Sorry Erlang fans, but I really don’t like Erlong.) Compared to Java, which I think is the main competitor to Go in this area, Go’s goroutines and channels are just so much easier to work with than Java threads and locks, there’s just absolutely no comparison at all. Go pretty much destroys the competition in this area.

Our data pipelines

We want our data pipelines to be highly available, to have a little message drop rate, and to have high throughput.

High availability is achieved by implementing our pipelines as stateless, horizontally scaled microservices. Enabling highly concurrent access will result in high throughput. To limit message drop rate, we will work with small buffers with flush capability.

Example pipelines

Pipelines are a simple concept: gateways process messages - packs of data - sent from a source before being sent on to a destination.

Example 1: IoT devices, like smart lightbulbs, send data through gateways to a server that provides services, like auto-dimming.

Example 2: Messaging apps send messages through gateways where they are forwarded to other messaging apps.

Example 3: Logs or metrics are sent through gateways where they are processed and stored in a database.

So what do gateways do precisely? It can often be broken down into four necessary steps: ingestion, parsing, processing, and passing along.

Our barebone pipeline

To benchmark different optimizations, Jean created an example application structure:

type Queue interface {
	Enqueue(data [fusion_builder_container hundred_percent="yes" overflow="visible"][fusion_builder_row][fusion_builder_column type="1_1" background_position="left top" background_color="" border_size="" border_color="" border_style="solid" spacing="yes" background_image="" background_repeat="no-repeat" padding="" margin_top="0px" margin_bottom="0px" class="" id="" animation_type="" animation_speed="0.3" animation_direction="left" hide_on_mobile="no" center_content="no" min_height="none"][]byte)
	Dequeue() ([]byte, bool)
}

type inputter interface {
	StartAccepting(q queues.Queue)
}

type outputter interface {
	StartOutputting(q queues.Queue)
}

type Processor struct {
	i  inputter
	q  queues.Queue
	o  outputter
	wg *sync.WaitGroup
}

func (p *Processor) Start() {
	go p.i.StartAccepting(p.q)
	go p.o.StartOutputting(p.q)
	p.wg.Wait()
}

This code defines interfaces for functions that can be implemented in different ways. Which way is best is precisely the point of Jean’s talk.

The StartAccepting and StartOutputting functions determine which network protocols are used to send messages to gateways, which is the first aspect we will optimise. The Enqueue and Dequeue functions define which data structures our gateways use to store messages as they pass through, which we will also optimise later on.

Optimising network I/O

Jean’s talk looked at several different methods of Network I/O:

  1. Unary HTTP
  2. Unary HTTP/2 with gRPC
  3. UDP
  4. Streaming TCP
  5. Streaming web sockets
  6. Streaming HTTP/2 with gRPC

Unary HTTP

Unary HTTP allows sending one message through a typical HTTP connection.

Advantages:

  • It is easy to know if the connection succeeded and the gateway correctly received the message.
  • It is easy to implement.

Disadvantages:

  • Unary connections are slow.

Here is Jean’s implementation:

func (l *HttpListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting HTTP listening on port %d\n", l.port)

	m := http.NewServeMux()
	m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		body, err := ioutil.ReadAll(r.Body)
		if err != nil {
			panic(err)
		}
		q.Enqueue(body)
	})

	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", l.port), m))
}

The above code is simple: it listens for incoming HTTP requests and adds incoming data to the gateway’s queue.

Unary HTTP/2 with gRPC

Moving from HTTP to HTTP/2, this next implementation uses gRPC instead of a typical HTTP request.

Google developed then open-sourced gRPC, and now they describe it like this:

gRPC is a modern, open source remote procedure call (RPC) framework that can run anywhere. It enables client and server applications to communicate transparently, and makes it easier to build connected systems.

Google developed protocol buffers, or protobuf, and describes them as follows:

Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Advantages:

  • gRPC uses protobuf, which allows for binary-decoding into models.
  • gRPC uses protobuf, which makes service and spec language agnostic.
  • HTTP/2 is better than HTTP.
  • gRPC is built with load balancing in mind.

Disadvantages:

  • Unary connections are slow.

Here is Jean’s implementation:

func (l *UnaryGrpcListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting gRPC listening on port %d\n", l.port)

	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", l.port))
	if err != nil {
		panic(err)
	}

	s := grpc.NewServer()
	r := unaryGrpcServerReplier{q: q}
	model.RegisterGrpcUnaryInputterServiceServer(s, r)

	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		panic(err)
	}
}

func (r unaryGrpcServerReplier) MakeRequest(ctx context.Context, in *model.Request) (*model.Empty, error) {
	r.q.Enqueue([]byte(in.Message))
	return &model.Empty{}, nil
}

This code uses the gRPC framework to receive messages and insert them into the gateway’s queue.

UDP

HTTP uses TCP on the transport layer. An alternative transport layer protocol is UDP, which this next implementation uses to achieve increased speed.

As opposed to TCP, UDP is a connection-less protocol, which means that the sender does not get any acknowledgment from the receiver stating whether or not the desired data was received.

Advantages:

  • UDP is fast since no time is lost acknowledging.

Disadvantages:

  • UDP is lossy since no connection is established.
  • It is hard to know how much is lost during transmission.

I exchanged a few emails with Jean, and he explained what could impact UDP’s lossyness:

The lossyness of UDP is excaberated the more ‘busy’ intermediary nodes are; TCP will resend dropped packets (and in fact treated dropped packets as rate limiting indication), but in UDP’s case they are just lost. The more ‘busy’ a node is (router, proxy, load balancer, whatever) the more likely it is for a packet to be dropped. Loss of connection and those types of things can also cause packet loss.

Here is Jean’s implementation:

func (l *UdpListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting UDP listening on port %d\n", l.port)

	addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", l.port))
	if err != nil {
		fmt.Println(err)
	}

	conn, err := net.ListenUDP("udp", addr)
	if err != nil {
		fmt.Println(err)
	}
	defer conn.Close()

	buf := make([]byte, 65536)

	for {
		n, _, err := conn.ReadFromUDP(buf)
		if err != nil {
			fmt.Println("Error: ", err)
		}

		message := buf[0:n]
		q.Enqueue(message)
	}
}

This code continuously listens for messages sent through UDP and adds those messages to the gateway’s queue.

Streaming TCP

Moving away from unary connections, we can try streaming, which will allow us to send multiple messages through a single connection. Let’s start with TCP.

Advantages:

  • Streaming is fast because only one connection is made.

Disadvantages:

  • This requires building your protocol.
  • This requires building your security model.
  • This requires building your status codes.
  • etc.

Here is Jean’s implementation:

func (l *TcpListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting TCP listening on port %d\n", l.port)

	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", l.port))
	if err != nil {
		panic(err)
	}

	for {
		conn, err := ln.Accept()
		if err != nil {
			panic(err)
		}

		go readFromConn(conn, q) // incl process buffer manually, retry logic, etc
	}
}

func readFromConn(c net.Conn, q queues.Queue) {
	var add = make(chan ([]byte), 1024)

	go processBuffer(add, q)

	for {
		msg := make([]byte, 1024)
		_, err := c.Read(msg)

		if err != nil {
			if err == io.EOF {
				c.Close()
				return
			}

			panic(err)
		}

		add <- msg
	}
}

func processBuffer(add chan ([]byte), q queues.Queue) {
	b := bytes.NewBuffer([]byte{})

	for {
		select {
		case msg := <-add:
			b.Write(msg)
			break
		default:
			l, err := b.ReadBytes('\n')
			if err != nil {
				if err == io.EOF {
					break
				}

				panic(err)
			}

			if len(l) <= 1 || l[0] == 0 { // kinda hacky way to check if it's a fully-formed message
				break
			}

			q.Enqueue(l)
		}
	}
}

This code listens for incoming TCP connections and continuously reads data into a buffer which it then adds to the gateway’s queue once an entire message is received.

Note: This implementation is simple and does not work when sending large messages.

Streaming web sockets

Instead of streaming through TCP, perhaps we can use a higher-level protocol like web socket.

Mozilla describes web sockets like this:

WebSockets is an advanced technology that makes it possible to open an interactive communication session between the user’s browser and a server. With this API, you can send messages to a server and receive event-driven responses without having to poll the server for a reply.

Advantages:

  • Streaming with web sockets is very fast.

Disadvantages:

  • It is difficult to know if the web socket connection is alive.
  • Load balancing can be tricky with web sockets.

Here is Jean’s implementation:

func (l *WebsocketListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting websocket listening on port %d\n", l.port)

	m := http.NewServeMux()
	m.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		c, err := upgrader.Upgrade(w, r, nil)
		if err != nil {
			return
		}

		defer c.Close()

		for {
			_, message, err := c.ReadMessage()
			if err != nil {
				break
			}
			q.Enqueue(message)
		}
	})

	log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%d", l.port), m))
}

This code listens continuously on a web socket, adding incoming messages to the gateway’s queue.

Streaming HTTP/2 with gRPC

gRPC can work through streaming, adding the strengths of the technology to the speed of streaming.

Advantages:

  • Streaming with gRPC is very fast.
  • gRPC uses protobuf, which allows for binary-decoding into models.
  • gRPC uses protobuf, which makes service and spec language agnostic.
  • gRPC uses protobuf, which allows for smaller packets moving through the network.
  • HTTP/2 is better than HTTP.
  • gRPC is built with load balancing in mind.

Disadvantages:

  • It is difficult to know if the connection is alive.

Here is Jean’s implementation:

func (l *StreamingGrpcListener) StartAccepting(q queues.Queue) {
	fmt.Printf("Starting gRPC listening on port %d\n", l.port)

	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", l.port))
	if err != nil {
		panic(err)
	}

	s := grpc.NewServer()
	model.RegisterGrpcStreamingInputterServiceServer(s, &streamingGrpcServerReplier{q: q})
	s.Serve(lis)
}

func (r streamingGrpcServerReplier) MakeRequest(request model.GrpcStreamingInputterService_MakeRequestServer) error {
	for {
		req, err := request.Recv()
		if err != nil {
			return err
		}

		r.q.Enqueue([]byte(req.Message))
	}

	return nil
}

This code uses the gRPC framework to continuously receive messages and insert them into the gateway’s queue.

Benchmarks

Jean wrote code to benchmark the different methods described above. I ran those benchmarks and here are the results.

ProtocolParallel SendingSmall MessageMedium MessageLarge MessageVery Large Message
Unary HTTPNo469425768258412170765
Unary HTTPYes44874403084046198686
Unary UDPNo106471289013388
Unary UDPYes177528013063
Unary gRPCNo708169327492594212194
Unary gRPCYes188132595028199127414
Streaming TCPNo3261
Streaming TCPYes3106
Streaming UDPNo195535444400
Streaming UDPYes232440104543
Streaming WebsocketNo37691010710646120604
Streaming WebsocketYes10904498495147776
Streaming gRPCNo3004988313296123162
Streaming gRPCYes10756810881373845

Results are in nanoseconds per operation.

Here are the different message sizes:

  • Small: 18 bytes
  • Medium: 4266 bytes
  • Large: 6786 bytes
  • Very Large: 79435 bytes

Some results are missing. Extensive messages sent through UDP seem to have been lost, and Jean’s implementation of streaming TCPwas unable to process messages of too high a size. As he put it in an email:

The TCP decoding is just a reference implementation, and isn’t able to handle stitching together incomplete data transmission. Small packets tend to arrive whole, and larger packets tend to arrive broken up.

By looking at these benchmark results, we can see that streaming messages to our gateway is much faster than using unary connections. The increased speed is even more visible when sending multiple messages concurrently.

Optimising data ingestion

Once the messages have reached our gateway, they must be stored in a queue as they wait for processing. While some threads will write - or add - new messages into the queue, others will read - or remove - them to process them.

Jean’s talk looked at several different data structures:

  1. Array-backed queue using mutices
  2. Channel-backed queue
  3. Ring-buffer-backed queue using mutices
  4. Ring-buffer-backed queue using atomics

Array-backed queue using mutices

This queue is implemented as a simple array, with a mutex to avoid simultaneous writing or reading by different threads.

If you don’t know what a mutex, or MUTually EXclusive semaphore, is then Xetius and Петър Петров’s explanation should help:

When I am having a big heated discussion at work, I use a rubber chicken which I keep in my desk for just such occasions. The person holding the chicken is the only person who is allowed to talk. If you don’t hold the chicken you cannot speak. You can only indicate that you want the chicken and wait until you get it before you speak. Once you have finished speaking, you can hand the chicken back to the moderator who will hand it to the next person to speak. This ensures that people do not speak over each other, and also have their own space to talk.

Replace Chicken with Mutex and person with thread and you basically have the concept of a mutex.

The chicken is the mutex. People holding the mu… chicken are competing threads. The Moderator is the OS. When people requests the chicken, they do a lock request. When you call mutex.lock(), your thread stalls in lock() and makes a lock request to the OS. When the OS detects that the mutex was released from a thread, it merely gives it to you, and lock() returns - the mutex is now yours and only yours. Nobody else can steal it, because calling lock() will block him. There is also try_lock() that will block and return true when mutex is yours and immediately false if mutex is in use.

Advantages:

  • None. This is not a proper implementation.

Disadvantages:

  • Continually locking and unlocking the mutex is slow.
  • Resizing the array is agonizingly slow.
  • If your reads are slower than your writes, then the array will grow and grow uncontrollably.

Here is Jean’s implementation:

func (q *MutexArrayQueue) Enqueue(data []byte) {
	q.mu.Lock()
	defer q.mu.Unlock()

	q.data = append(q.data, data)
}

func (q *MutexArrayQueue) Dequeue() ([]byte, bool) {
	q.mu.Lock()
	defer q.mu.Unlock()

	if len(q.data) == 0 {
		return nil, false
	}

	data := q.data[0]
	q.data = q.data[1:len(q.data)]
	return data, true
}

This code adds/writes new messages to the end of the array/queue and removes/reads the oldest messages from the beginning of the array/queue.

While this is probably the most straightforward and most intuitive solution, it is one of the slowest.

Channel-backed queue

In Go, channels are pre-allocated structures into which one thread can add/write data into one end or remove/read data from the other end. Channels are, in essence, queues.

Advantages:

  • Channels are fast, as they are a core feature of Go.

Disadvantages:

  • Channels eventually fill up, dropping newest data. The oldest data is prioritized.
  • Channels sometimes internally use mutices, which can slow them down a bit.

Here is Jean’s implementation:

func (q *ChannelQueue) Enqueue(data []byte) {
	select {
	case q.c <- data:
	default:
		// Queue was full! Data dropped
		// metrics.Record("dropped_messages", 1)
	}
}

func (q *ChannelQueue) Dequeue() ([]byte, bool) {
	select {
	case data := <-q.c:
		return data, true
	default:
		return nil, false
	}
}

This code writes/adds messages to the queue if it is not full. If the queue is full, then any new messages are dropped. Messages in the channel can be read/removed.

Ring-buffer-backed queue using mutices

If our writes are faster than our reads, our queue will fill up. Channels prioritize older messages, but we would prefer to prioritize newer messages by flushing out older messages when writes are too far ahead of reads.

The solution for this is to use a ring buffer. A ring buffer is an array that loops around onto itself. When you reach the end, you start back at the start. Two pointers keep track of where messages should be written to or read from. A mutex is used to avoid simultaneous writing or reading by different threads.

Ring Buffer example

Ring Buffer example being consumed

Right Buffer being written

Advantages:

  • If the writes exceed the reads by a full loop, then the buffer flushes: reads act on new messages, and oldest messages are overwritten. This prioritizes new messages.

Disadvantages:

  • Continually locking and unlocking the mutex is slow.

This is Jean’s implementation:

func (q *MutexRingBufferQueue) Enqueue(data []byte) {
	q.mu.Lock()
	defer q.mu.Unlock()

	q.buffer[q.inputCursor] = data

	if q.inputCursor == len(q.buffer)-1 {
		q.inputCursor = -1
	}
	q.inputCursor++
}

func (q *MutexRingBufferQueue) Dequeue() ([]byte, bool) {
	q.mu.Lock()
	defer q.mu.Unlock()

	if q.outputCursor == q.inputCursor {
		return nil, false
	}

	data := q.buffer[q.outputCursor]

	if q.outputCursor == len(q.buffer)-1 {
		q.outputCursor = -1
	}
	q.outputCursor++

	return data, true
}

This code implements a ring buffer as a queue. The pointers that keep track of where to write to and read from are called cursorshere.

Ring-buffer-backed queue using atomics

In Go, atomics are low-level implementations of simple operations like adding, swapping, etc. Because of their atomic nature, these actions do not require mutices.

We can reimplement a ring buffer without mutices by using Go’s atomics.

Advantages:

  • If the writes exceed the reads by a full loop, then the buffer effectively flushes: reads act on new messages, and oldest messages are overwritten. This prioritizes new messages.
  • Atomics are fast.

Disadvantages:

  • None.

Here is Jean’s implementation:

func (d *OneToOne) Enqueue(data []byte) {
	writeIndex := atomic.AddUint64(&d.writeIndex, 1)
	idx := writeIndex % uint64(len(d.buffer))
	newBucket := &bucket{
		data: data,
		seq:  writeIndex,
	}

	atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket))
}

func (d *OneToOne) Dequeue() ([]byte, bool) {
	readIndex := atomic.LoadUint64(&d.readIndex)
	idx := readIndex % uint64(len(d.buffer))

	value, ok := d.tryNext(idx)
	if ok {
		atomic.AddUint64(&d.readIndex, 1)
	}
	return value, ok
}

func (d *OneToOne) tryNext(idx uint64) ([]byte, bool) {
	result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

	if result == nil {
		return nil, false
	}

	if result.seq > d.readIndex {
		atomic.StoreUint64(&d.readIndex, result.seq)
	}

	return result.data, true
}

This code implements a ring buffer with atomics instead of mutices. It requires a bit more code than the previous implementation precisely because of this.

Benchmarks

Jean wrote code to benchmark the different data structures described above. I ran those benchmarks and here are the results.

Data StructureSmall MessageMedium MessageLarge MessageVery Large Message
Array & Mutices15969163701596115947
Channel6888686268056416
Ring Buffer & Mutices11812118691309711891
Ring Buffer & Atomics6339619963176177

Results are in nanoseconds per operation.

Here are the different message sizes:

  • Small: 18 bytes
  • Medium: 4266 bytes
  • Large: 6786 bytes
  • Very Large: 79435 bytes

These results don’t seem to depend on message size, only because when a message is added to or removed from the queue, it is not copied. Just its address in memory is.

By looking at these benchmark results, we can see that using mutices is quite slow so using atomics can be better. While the ring buffer requires more complex code than channels, it prioritizes newer data over older data. Fancy data structures can be advantageous.

Profiling in Go

Before you start optimising, Jean recommends that you profile your code.

Wikipedia explains what profiling is reasonably well:

In software engineering, profiling is a form of dynamic program analysis that measures, for example, the space (memory) or time complexity of a program, the usage of particular instructions, or the frequency and duration of function calls. Most commonly, profiling information serves to aid program optimization.

You can profile your code using pprof, which gives a runtime trace of execution time, or with gotrace, which creates a dynamic graph of all threads at execution time.

Conclusion

After trying all these different optimisations, we can safely say that to obtain the best performance we need to stream our messages to our gateways, and our gateways should implement their queues as atomic ring buffers.

Jean shared some insight at the end of his talk:

  • Metrics are more useful than logs, and they also tend to be smaller.
  • Queues are great, mainly when they are correctly implemented.
  • Be sure to profile your applications.
  • Channels are fantastic, but not always perfect; you may need to profile your code to be sure.

On a personal note, I believe that all developers should have some knowledge of algorithms and data structures, so that they may optimise their code in the best possible way. Optimisation starts with knowing your options.

Share this article

Canada - Morocco - France

We are a team of Open Source enthusiasts doing consulting in Big Data, Cloud, DevOps, Data Engineering, Data Science…

We provide our customers with accurate insights on how to leverage technologies to convert their use cases to projects in production, how to reduce their costs and increase the time to market.

If you enjoy reading our publications and have an interest in what we do, contact us and we will be thrilled to cooperate with you.

Support Ukrain