Circular buffers
A circular buffer, also known as a ring buffer, is a common data structure used in real-time data processing. It
functions similarly to a queue, and once it is full, adding a new item causes the oldest item to be removed to make room
for the new one. In this post, I'm going to build a small example application to illustrate when this data structure
could be useful!
Stock exchange
The application we'll be creating is going to read from a data stream that emits stock prices. The prices are going to
update at an interval of 0-4 milliseconds. However, our application has to perform some advanced trend line
calculation with each price change. This calculation has a fixed processing time of 2 milliseconds.
That means that sometimes, the prices are going to change faster than we're able to process them. This is known as
backpressure, and could potentially blow up our system if it were to happen over an extended period of time.
To begin, we'll create one function to generate the stock prices, and another to simulate the trend line calculation:
import (
"fmt"
"math/rand/v2"
"time"
)
// producePrices is going to write 100 random stock prices to ch, and then close it.
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
ch <- rand.IntN(9999)
// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
time.Sleep(time.Duration(rand.IntN(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
// calculateTrendLine is going to sleep for 2ms to simulate an expensive calculation.
func calculateTrendLine() {
time.Sleep(2 * time.Millisecond)
}
Next, we'll setup the main
function to consume these price changes:
func main() {
stockPriceStream := make(chan int)
go producePrices(stockPriceStream)
for v := range stockPriceStream {
calculateTrendLine()
fmt.Printf("Updated the trend line with value: %v\n", v)
}
}
When we run this code using go run .
we'll see some output like this:
The stock exchange has opened for the day.
Updated the trend line with value: 6622
Updated the trend line with value: 4554
Updated the trend line with value: 739
...
The stock exchange is closing for the day.
Updated the trend line with value: 3665
Even though the output looks fine, our application has some serious flaws. To expose them, we can increase the
processing time to 2 seconds and then run the application again:
func calculateTrendLine() {
// time.Sleep(2 * time.Millisecond)
time.Sleep(2000 * time.Millisecond)
}
As you've probably noticed, the writes to our stock price channel are blocking, which isn't good. We want to be doing
our trend line calculation as close to real-time as possible. Otherwise, we might be buying when we should be selling
and vice versa!
One way to address this issue could be to make the producePrices
function perform non-blocking writes:
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
// ch <- rand.IntN(9999)
select {
case ch <- rand.IntN(9999):
default:
}
// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
To make the example a bit more interesting, I added a random delay between 0 and 4ms to simulate bursts of price
changes. Our calculation time is still fixed at 2ms.
Running the application again, I was only able to process 54
out of 100
price changes. That means that the
application dropped 46%
of the events. Our processing time is simply too slow to consume all of the data in
real-time. However, we should be able make our trend line less fragmented if we were able to "catch up" each time
there were more than 2
milliseconds between a price change. And when we do, we'll want to make sure that we're
catching up using the most recent prices. This is where a circular buffer comes in handy
To build one , we'll start by creating a second buffered channel that we'll read from so that the producePrices
function can go back to performing blocking writes:
func producePrices(ch chan<- int) {
fmt.Println("The stock exchange has opened for the day.")
for i := 0; i < 100; i++ {
ch <- r.Intn(9999):
time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
}
fmt.Println("The stock exchange is closing for the day.")
close(ch)
}
func main() {
originalStream := make(chan int)
bufferedStream := make(chan int, 3)
// Simulate a stream of data that is going to produce stock prices at a high phase.
go producePrices(originalStream)
for v := range bufferedStream {
calculateTrendLine()
fmt.Printf("Updated the trend line with value: %v\n", v)
}
}
Now we just need to connect these two streams in a way where the original stream never gets blocked, while ensuring that
the buffered stream emits the most recent price changes.
To achieve that, we'll create a new CircularBuffer
type:
type CircularBuffer[T any] struct {
inputStream <-chan T
outputStream chan T
}
func NewCircularBuffer[T any](inputStream <-chan T, outputStream chan T) *CircularBuffer[T] {
return &CircularBuffer[T]{
inputStream: inputStream,
outputStream: outputStream,
}
}
func (cb *CircularBuffer[T]) Run() {
for v := range cb.inputStream {
select {
case cb.outputStream <- v:
default:
fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
cb.outputStream <- v
}
}
fmt.Println("The input stream was closed. Closing the output stream.")
close(cb.outputStream)
}
The interesting part that we should be focusing on here is the select
statement:
select {
case cb.outputStream <- v:
default:
fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
cb.outputStream <- v
}
If the buffer is full, we'll leverage the default
case to pop and discard the oldest value, and then simply write
the new value again.
With that, we can head back to the main
function and use the CircularBuffer
to connect our two streams:
func main() {
originalStream := make(chan int)
bufferedStream := make(chan int, 3)
cb := NewCircularBuffer(originalStream, bufferedStream)
go cb.Run()
// The code here stays the same...
}
and then run the program again:
go run .
The stock exchange has opened for the day.
Updated the trend line with value: 5690
Updated the trend line with value: 4011
Updated the trend line with value: 2018
The buffer is full. Dropping the oldest value: 1294
Updated the trend line with value: 9617
...
The stock exchange is closing for the day.
The input stream was closed. Closing the output stream.
This time, we only dropped 21
prices. That is a huge improvement compared to the original 46
. The cost for that is
of course that our trend line calculation could be 3
price changes behind (our buffer size).
By using our CircularBuffer
we are able to tip the scale between real-time data (small buffer size) and less data
fragmentation (large buffer size) while not having to worry about our system imploding due to backpressure.
This concludes this post, I hope you enjoyed it!
The end
I usually tweet something when I've finished writing a new post. You can find me on Twitter
by clickingÂ