Streaming data with gRPC
A necessary guide to learn how to survive when frustrated machines takes over the world
by Percy Bolmér, February 5, 2021
Have you ever driven a car with a kid in the backseat?
The constant questioning.
Are we there yet? Are we there yet? — annoying backseat kids
This could drive people mad and probably has done already. It’s strange because even if we know how frustrating that is, software tends to be designed this way.
You have a frontend or a service running as a client that loads data from the server. This loading is in many cases done by frequently polling for new data. Imagine being an HTTP API with tons of clients, all asking you “Hey, got any new shiny data for me?” each 10th second or so.
This pattern is often called Polling. Polling is a design pattern on how to retrieve data by a client from a server.
I’m happy computers hasn’t grown a mind of their own yet, because when they do, oh boy, we software developers are screwed. All those servers must be frustrated.
Thankfully you’re reading this blog post that will save your life when the machines take over the world. They might even spare you for redesigning your communication pattern after this post.
In this article, we will look at a streaming architecture using gRPC streams. There are many modern solutions to streaming data, WebSocket, WebRTC. The reason why I’m choosing gRPC is that it allows us to also stream data between microservices and not only web applications. (Also because I love working with Go).
In this article, we will create one gRPC server written in Go, one client written in Go, and one client in React. You will see how both clients use the same server though.
The design will look like the picture below as opposed to the Polling. We will set up a connection with one request and then wait for data from the server. So instead of the client asking for new data, the server will push new data out to clients, this is called Server-side streaming.
There are two other ways of streaming data with gRPC called Client-Streaming and Bidirectional streaming. In this article, we will only use Server-side streaming. Client-side streaming is when the client is pushing data to the server.
Bidirectional streaming is when both the client and the server can push data between each other.
Let’s begin implementing the design in a Go backend. We will also implement a client in a react application. So after this article, you should have a Go server, a Go client, and a React client that talks to the same server.
Project setup and needed software
Let’s begin by setting up the project structure. I’ve started by creating a folder for each of our wanted clients and the backend code. In my case, I’ve named the project grpcstreams, and inside are 3 empty folders and 2 empty .go files. The complete code example can be found here.
If you’re unfamiliar with gRPC, I suggest reading my other article about it first.
If you want to continue without prior knowledge, you’ll need to install Protoc.
Protoc is a compiler used to assemble our protobuf and gRPC service into go code.
You will also need the go gRPC plugin so that we can generate gRPC code for the go backend. This can be retrieved by the go get command if you have to go installed (which I will assume) or go get it.
go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go
go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc
One more thing before we start, we will need to generate a client for the web application as well. You can find installation instructions for it here.
So, to make sure you have everything, here is a list of what you should have.
- Protoc
- protoc-gen-go and protoc-gen-go-
- grpc-web
Setting up the gRPC service
We will begin by creating a file called service-proto
which will be located inside the proto folder. This file is a protobuf file that will define what our backend service will look like. We will create a hardware monitoring service that will publish hardware stats to clients.
service-proto
will look like this.
syntax = "proto3";
package main;
option go_package=".;hardwaremonitoring";
// HardwareStats is a struct containing information about the systems Memory usage and CPU
message HardwareStats {
int32 cpu = 1;
int32 memory_free = 2;
int32 memory_used = 3;
}
// EmptyRequest is a way to call gRPC functions without any input data
message EmptyRequest {}
// HardwareMonitor is our service, the service is the API, and rpc statements below will be methods that are available by the API
service HardwareMonitor{
// Monitor will output stats about the hardware on the system host
rpc Monitor(EmptyRequest) returns (stream HardwareStats) {};
}
In gRPC, you create your protobuf file and generate the code based on that file. The service-proto
file is a template for how the generated code will look.
We have created a HardwareMonitor service that will expose one method called Monitor. The Monitor method will return a stream as indicated by the (stream HardwareStats).
Let’s use protoc to generate the code needed based on the service-proto
protoc service.proto --js_out=import_style=commonjs,binary:. --grpc-web_out=import_style=commonjs,mode=grpcwebtext:. --go-grpc_out=. --go_out=.
Now that command might look frightening, but it’s quite simple. What we do is call protoc on our service-proto
.
We then add the flags (js_out, grpc-web_out, go-grpc_out, go_out) which are the code generators we installed. We then say =. which means output the code in the current directory. This means protoc will generate four files for us.
We will use them soon. So if you’d like to add another type of client, like a Java client, you would add the java_out option.
Go gRPC server
If we take a look at the service_grpc.pb.go
file which is generated from protoc we will find that it has created an interface for us. It has generated a lot of code for us, but it cannot know what the Monitor method will do, so this is up to the developers. The generated code is all about setting up a connection and everything needed to make gRPC work, this is something we will leave as is.
If you’re not very familiar with interfaces, you can read Interfaces In Go about them. The generated interface looks like the following gist.
The generated gRPC interface.
// HardwareMonitorServer is the server API for HardwareMonitor service.
// All implementations must embed UnimplementedHardwareMonitorServer
// for forward compatibility
type HardwareMonitorServer interface {
// Monitor will output stats about the hardware on the system host
Monitor(*EmptyRequest, HardwareMonitor_MonitorServer) error
mustEmbedUnimplementedHardwareMonitorServer()
}
What we need to do is open up server.go
and edit it so that it will be part of the HardwareMonitorServer
interface. This means we need to implement the Monitor method.
Server that will output stats each 2nd second.
package main
import (
"log"
"time"
// Dont forget this import :)
"github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory"
hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
)
// Server is our struct that will handle the Hardware monitoring Logic
// It will fulfill the gRPC interface generated
type Server struct {
hardwaremonitoring.UnimplementedHardwareMonitorServer
}
// Monitor is used to start a stream of HardwareStats
func (s *Server) Monitor(req *hardwaremonitoring.EmptyRequest, stream hardwaremonitoring.HardwareMonitor_MonitorServer) error {
// Start a ticker that executes each 2 seconds
timer := time.NewTicker(2 * time.Second)
for {
select {
// Exit on stream context done
case <-stream.Context().Done():
return nil
case <-timer.C:
// Grab stats and output
hwStats, err := s.GetStats()
if err != nil {
log.Println(err.Error())
} else {
}
// Send the Hardware stats on the stream
err = stream.Send(hwStats)
if err != nil {
log.Println(err.Error())
}
}
}
}
// GetStats will extract system stats and output a Hardware Object, or an error
// if extraction fails
func (s *Server) GetStats() (*hardwaremonitoring.HardwareStats, error) {
// Extarcyt Memory statas
mem, err := memory.Get()
if err != nil {
return nil, err
}
// Extract CPU stats
cpu, err := cpu.Get()
if err != nil {
return nil, err
}
// Create our response object
hwStats := &hardwaremonitoring.HardwareStats{
Cpu: int32(cpu.Total),
MemoryFree: int32(mem.Free),
MemoryUsed: int32(mem.Used),
}
return hwStats, nil
}
When the server.go is set, we also need some way of running the server. This is done in main.go
.
The main will just listen for TCP connections to port 7777 and host our gRPC hardware monitoring server on that port. This will allow clients from either the web client or go client to execute the Monitor method.
Remember, this is RPC, so any requests are just instructions to execute the method on the server. This is the reason why we can have clients in many languages.
Note, this gRPC API will run on an insecure connection. In production, you should add TLS, see my article about gRPC and TLS.
This is our simple Main that runs the server
package main
import (
"fmt"
"log"
"net"
hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Welcome to streaming HW monitoring")
// Setup a tcp connection to port 7777
lis, err := net.Listen("tcp", ":7777")
if err != nil {
panic(err)
}
// Create a gRPC server
gRPCserver := grpc.NewServer()
// Create a server object of the type we created in server.go
s := &Server{}
// Regiser our server as a gRPC server
hardwaremonitoring.RegisterHardwareMonitorServer(gRPCserver, s)
log.Println(gRPCserver.Serve(lis))
}
Client in Go
Now that the server is up and running, we need to create a client to test it. I’m going to start with the client written in Go. The client will be very simple, it will connect to the gRPC server and set up a stream and listen for 7 seconds, and then terminate.
The client setups a Stream and listens for 7 seconds.
package main
import (
"context"
"fmt"
"log"
"time"
hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
"google.golang.org/grpc"
)
func main() {
// Create our context
ctx := context.Background()
// Setup connection
conn, err := grpc.DialContext(ctx, "localhost:7777", grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
// Close connection when we are done
defer conn.Close()
// Use the generated NewHardwareMonitorClient method and pass our Connection
client := hardwaremonitoring.NewHardwareMonitorClient(conn)
// Call Monitor to receive the Stream of data
// With an empty request
emptyreq := &hardwaremonitoring.EmptyRequest{}
// call Monitor function, this will return a stream of data
stream, err := client.Monitor(ctx, emptyreq)
if err != nil {
panic(err)
}
// Create a timer to cancel
stop := time.NewTicker(7 * time.Second)
// Itterate stream
for {
select {
case <-stop.C:
// Tell the Server to close this Stream, used to clean up running on the server
err := stream.CloseSend()
if err != nil {
log.Fatal("Failed to close stream: ", err.Error())
}
return
default:
// Recieve on the stream
res, err := stream.Recv()
if err != nil {
panic(err)
}
fmt.Println("New Hardware state receieved")
fmt.Println("CPU Usage: ", res.Cpu)
fmt.Println("Memory Used: ", res.MemoryUsed)
fmt.Println("Memory Free: ", res.MemoryFree)
}
}
}
When you’ve filled in the client.go
, It’s time to try it out. Simply run go run on the server and then the client.
Client in web application
For the web application, I will use react and create-react-app. So creating the web application will require some more installation of software, I’m so sorry!:)
We will use NPM so start by getting that .
We will also need some protobuf and gRPC libraries so that we can use the client.
npm install -g npx
npm install grpc-web
npm install google-protobuf
We will create a react application called hwmonitor. So first of all, you need to enter the webapp folder and run the following commands to generate the react application.
npx create-react-app hwmonitor
Once the command has completed running we will need to create a new directory and copy our generated protobuf files into it. I’ll create the proto folder inside grpcstreams/webapp/hwmonitor/src/proto
.
Don’t forget to copy both service_pb.js
and service_grpc_web_pb
.js into the new proto folder.
It’s a good time to open service_pb.js
.
In this file, we can see the protobuf object that is generated, which is important when we want to reach the data.
The toObject function shown, used to see what fields is available.
/**
* Static version of the {@see toObject} method.
* @param {boolean|undefined} includeInstance Deprecated. Whether to include
* the JSPB instance for transitional soy proto support:
* http://goto/soy-param-migration
* @param {!proto.main.HardwareStats} msg The msg instance to transform.
* @return {!Object}
* @suppress {unusedLocalVariables} f is only used for nested messages
*/
proto.main.HardwareStats.toObject = function(includeInstance, msg) {
var f, obj = {
cpu: jspb.Message.getFieldWithDefault(msg, 1, 0),
memoryFree: jspb.Message.getFieldWithDefault(msg, 2, 0),
memoryUsed: jspb.Message.getFieldWithDefault(msg, 3, 0)
};
if (includeInstance) {
obj.$jspbMessageInstance = msg;
}
return obj;
};
We can see that the object contains 3 fields, cpu, memoryFree, memoryUsed. So our fields are lowercase. This is good to know since we need those fields to display the data.
Next, we will open up the file called App.js
in the react application and modify it. It will be the same as the client in go, it will start a stream and print out the data.
The gist shows how we use the Stream to update the stats.
import logo from './logo.svg';
import './App.css';
import React, { useState, useEffect } from 'react';
import { HardwareMonitorClient } from './proto/service_grpc_web_pb';
import { EmptyRequest } from './proto/service_pb';
// Create a new HardwareMonitorClient like this, correct the ADDR and Port used
// If you use something else.
var client = new HardwareMonitorClient('http://localhost:8080');
function App() {
const [CPU, setCPU] = useState(0);
const [MemoryFree, setMemoryFree] = useState(0);
const [MemoryUsed, setMemoryUsed] = useState(0);
const getStats = () => {
// Create our EmptyRequest that we will use to start the stream;
var request = new EmptyRequest();
// Dont worry about the empty Metadata for now, thats covered in another article :)
var stream = client.monitor(request, {});
// Start listening on the data event, this is the event that is used to notify that new data arrives
stream.on('data', function (response) {
// Convert Response to Object
var stats = response.toObject();
// Set our variable values
setCPU(stats.cpu);
setMemoryFree(stats.memoryFree);
setMemoryUsed(stats.memoryUsed);
});
}
// useEffect will make this trigger on component start
useEffect(() => {
getStats();
});
return (
<div className="App">
<p>CPU : {CPU}</p>
<p>MemoryFree: {MemoryFree}</p>
<p>MemoryUsed: {MemoryUsed}</p>
</div>
);
}
export default App;
After modifying your App.js with the above code execute the npm run build
command to build the application.
npm run build
If your build fails due to PROTO being unknown, please visit Using gRPC With TLS, Golang, React without a Reverse Proxy (Envoy) where I describe how to fix it at the bottom of the article .
If your NPM build command worked you should see a new directory named build. We will need to update the main.go to host this directory as a file server and also update it to host the gRPC API over HTTP.
Updating main.go
We will update main.go to host the gRPC and the React application. We will add some code that serves the react application over HTTP, and also multiplexes the gRPC on that HTTP connection.
main.go now hosts the gRPC server over HTTP and TCP,
package main
import (
"fmt"
"log"
"net"
"net/http"
"time"
"github.com/improbable-eng/grpc-web/go/grpcweb"
hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Welcome to streaming HW monitoring")
// Setup a tcp connection to port 7777
lis, err := net.Listen("tcp", ":7777")
if err != nil {
panic(err)
}
// Create a gRPC server
gRPCserver := grpc.NewServer()
// Create a server object of the type we created in server.go
s := &Server{}
// Regiser our server as a gRPC server
hardwaremonitoring.RegisterHardwareMonitorServer(gRPCserver, s)
// Host the regular gRPC api on a goroutine
go func() {
log.Fatal(gRPCserver.Serve(lis))
}()
// We need to wrap the gRPC server with a multiplexer to enable
// the usage of http2 over http1
grpcWebServer := grpcweb.WrapServer(gRPCserver)
multiplex := grpcMultiplexer{
grpcWebServer,
}
// a regular http router
r := http.NewServeMux()
// Load our React application
webapp := http.FileServer(http.Dir("webapp/hwmonitor/build"))
// Host the web app at / and wrap it in a multiplexer
r.Handle("/", multiplex.Handler(webapp))
// create a http server with some defaults
srv := &http.Server{
Handler: r,
Addr: "localhost:8080",
WriteTimeout: 15 * time.Second,
ReadTimeout: 15 * time.Second,
}
// host it
log.Fatal(srv.ListenAndServe())
}
// grpcMultiplexer enables HTTP requests and gRPC requests to multiple on the same channel
// this is needed since browsers dont fully support http2 yet
type grpcMultiplexer struct {
*grpcweb.WrappedGrpcServer
}
// Handler is used to route requests to either grpc or to regular http
func (m *grpcMultiplexer) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if m.IsGrpcWebRequest(r) {
m.ServeHTTP(w, r)
return
}
next.ServeHTTP(w, r)
})
}
Now we are hosting the API for both the golang client and the web client. Visit localhost:8080 and you should see the server’s hardware stats.
That’s about it for this article. You’ve learned how to make your server a lot happier by instead letting it push data out to clients. Instead of all those clients frequently asking your server for updates. Just hope that it’s enough for the machines to spare your life when they rule.
Feel free to reach out to me with any ideas, questions, critics, or feedback.
And remember to Follow the Stream.
If you enjoyed my writing, please support future articles by buying me an Coffee