1

Circular buffers

2

A circular buffer, also known as a ring buffer, is a common data structure used in real-time data processing. It

3

functions similarly to a queue, and once it is full, adding a new item causes the oldest item to be removed to make room

4

for the new one. In this post, I'm going to build a small example application to illustrate when this data structure

5

could be useful!

6

7

Stock exchange

8

The application we'll be creating is going to read from a data stream that emits stock prices. The prices are going to

9

update at an interval of 0-4 milliseconds. However, our application has to perform some advanced trend line

10

calculation with each price change. This calculation has a fixed processing time of 2 milliseconds.

11

12

That means that sometimes, the prices are going to change faster than we're able to process them. This is known as

13

backpressure, and could potentially blow up our system if it were to happen over an extended period of time.

14

15

To begin, we'll create one function to generate the stock prices, and another to simulate the trend line calculation:

16

17
import (
18
	"fmt"
19
	"math/rand/v2"
20
	"time"
21
)
22
 
23
// producePrices is going to write 100 random stock prices to ch, and then close it.
24
func producePrices(ch chan<- int) {
25
	fmt.Println("The stock exchange has opened for the day.")
26
	for i := 0; i < 100; i++ {
27
		ch <- rand.IntN(9999)
28
		// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
29
		time.Sleep(time.Duration(rand.IntN(4)) * time.Millisecond)
30
	}
31
	fmt.Println("The stock exchange is closing for the day.")
32
	close(ch)
33
}
34
 
35
// calculateTrendLine is going to sleep for 2ms to simulate an expensive calculation.
36
func calculateTrendLine() {
37
	time.Sleep(2 * time.Millisecond)
38
}
18

19

Next, we'll setup the main function to consume these price changes:

20

21
func main() {
22
	stockPriceStream := make(chan int)
23
	go producePrices(stockPriceStream)
24
 
25
	for v := range stockPriceStream {
26
		calculateTrendLine()
27
		fmt.Printf("Updated the trend line with value: %v\n", v)
28
	}
29
}
22

23

When we run this code using go run . we'll see some output like this:

24

25
The stock exchange has opened for the day.
26
Updated the trend line with value: 6622
27
Updated the trend line with value: 4554
28
Updated the trend line with value: 739
29
...
30
The stock exchange is closing for the day.
31
Updated the trend line with value: 3665
26

27

Even though the output looks fine, our application has some serious flaws. To expose them, we can increase the

28

processing time to 2 seconds and then run the application again:

29

30
func calculateTrendLine() {
31
	// time.Sleep(2 * time.Millisecond)
32
	time.Sleep(2000 * time.Millisecond)
33
}
31

32

As you've probably noticed, the writes to our stock price channel are blocking, which isn't good. We want to be doing

33

our trend line calculation as close to real-time as possible. Otherwise, we might be buying when we should be selling

34

and vice versa!

35

36

One way to address this issue could be to make the producePrices function perform non-blocking writes:

37

38
func producePrices(ch chan<- int) {
39
	fmt.Println("The stock exchange has opened for the day.")
40
	for i := 0; i < 100; i++ {
41
		// ch <- rand.IntN(9999)
42
		select {
43
		case ch <- rand.IntN(9999):
44
		default:
45
		}
46
		// To simulate bursts of price changes we'll add a random delay between 0 and 4ms.
47
		time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
48
	}
49
	fmt.Println("The stock exchange is closing for the day.")
50
	close(ch)
51
}
39

40

To make the example a bit more interesting, I added a random delay between 0 and 4ms to simulate bursts of price

41

changes. Our calculation time is still fixed at 2ms.

42

43

Running the application again, I was only able to process 54 out of 100 price changes. That means that the

44

application dropped 46% of the events. Our processing time is simply too slow to consume all of the data in

45

real-time. However, we should be able make our trend line less fragmented if we were able to "catch up" each time

46

there were more than 2 milliseconds between a price change. And when we do, we'll want to make sure that we're

47

catching up using the most recent prices. This is where a circular buffer comes in handy

48

49

To build one , we'll start by creating a second buffered channel that we'll read from so that the producePrices

50

function can go back to performing blocking writes:

51

52
func producePrices(ch chan<- int) {
53
	fmt.Println("The stock exchange has opened for the day.")
54
	for i := 0; i < 100; i++ {
55
		ch <- r.Intn(9999):
56
		time.Sleep(time.Duration(r.Intn(4)) * time.Millisecond)
57
	}
58
	fmt.Println("The stock exchange is closing for the day.")
59
	close(ch)
60
}
61
 
62
func main() {
63
	originalStream := make(chan int)
64
	bufferedStream := make(chan int, 3)
65
 
66
	// Simulate a stream of data that is going to produce stock prices at a high phase.
67
	go producePrices(originalStream)
68
 
69
	for v := range bufferedStream {
70
		calculateTrendLine()
71
		fmt.Printf("Updated the trend line with value: %v\n", v)
72
	}
73
}
53

54

Now we just need to connect these two streams in a way where the original stream never gets blocked, while ensuring that

55

the buffered stream emits the most recent price changes.

56

57

To achieve that, we'll create a new CircularBuffer type:

58

59
type CircularBuffer[T any] struct {
60
	inputStream  <-chan T
61
	outputStream chan T
62
}
63
 
64
func NewCircularBuffer[T any](inputStream <-chan T, outputStream chan T) *CircularBuffer[T] {
65
	return &CircularBuffer[T]{
66
		inputStream:  inputStream,
67
		outputStream: outputStream,
68
	}
69
}
70
 
71
func (cb *CircularBuffer[T]) Run() {
72
	for v := range cb.inputStream {
73
		select {
74
		case cb.outputStream <- v:
75
		default:
76
			fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
77
			cb.outputStream <- v
78
		}
79
	}
80
	fmt.Println("The input stream was closed. Closing the output stream.")
81
	close(cb.outputStream)
82
}
60

61

The interesting part that we should be focusing on here is the select statement:

62

63
select {
64
case cb.outputStream <- v:
65
default:
66
    fmt.Printf("The buffer is full. Dropping the oldest value: %v\n", <-cb.outputStream)
67
    cb.outputStream <- v
68
}
64

65

If the buffer is full, we'll leverage the default case to pop and discard the oldest value, and then simply write

66

the new value again.

67

68

With that, we can head back to the main function and use the CircularBuffer to connect our two streams:

69

70
func main() {
71
	originalStream := make(chan int)
72
	bufferedStream := make(chan int, 3)
73
 
74
	cb := NewCircularBuffer(originalStream, bufferedStream)
75
	go cb.Run()
76
 
77
    // The code here stays the same...
78
}
71

72

and then run the program again:

73

74
go run .
75

76
The stock exchange has opened for the day.
77
Updated the trend line with value: 5690
78
Updated the trend line with value: 4011
79
Updated the trend line with value: 2018
80
The buffer is full. Dropping the oldest value: 1294
81
Updated the trend line with value: 9617
82
...
83
The stock exchange is closing for the day.
84
The input stream was closed. Closing the output stream.
77

78

This time, we only dropped 21 prices. That is a huge improvement compared to the original 46. The cost for that is

79

of course that our trend line calculation could be 3 price changes behind (our buffer size).

80

81

By using our CircularBuffer we are able to tip the scale between real-time data (small buffer size) and less data

82

fragmentation (large buffer size) while not having to worry about our system imploding due to backpressure.

83

84

This concludes this post, I hope you enjoyed it!

84

85

The end

86

I usually tweet something when I've finished writing a new post. You can find me on Twitter

87

by clicking 

normalintroduction.md
||240:48

Recently Edited

Recently Edited

File name

Tags

Time to read

Created at

context

  • go
  • context
8 minutes2024-02-28

circular-buffers

  • go
  • concurrency
  • data processing
5 minutes2024-02-04

go-directives

  • go
  • compiler
  • performance
4 minutes2023-10-21

async-tree-traversals

  • node
  • trees
  • graphs
  • typescript
19 minutes2023-09-10

All Files

All Files

  • go

    5 files

  • node

    2 files

  • typescript

    1 file

  • frontend

    1 file

  • workflow

    7 files