Back

Streaming data with gRPC

A necessary guide to learn how to survive when frustrated machines takes over the world

by Percy Bolmér, February 5, 2021

By [Alejandro Mendoza] on Unsplash
By [Alejandro Mendoza] on Unsplash

Have you ever driven a car with a kid in the backseat?

The constant questioning.

Are we there yet? Are we there yet? — annoying backseat kids

This could drive people mad and probably has done already. It’s strange because even if we know how frustrating that is, software tends to be designed this way.

You have a frontend or a service running as a client that loads data from the server. This loading is in many cases done by frequently polling for new data. Imagine being an HTTP API with tons of clients, all asking you “Hey, got any new shiny data for me?” each 10th second or so.

This pattern is often called Polling. Polling is a design pattern on how to retrieve data by a client from a server.

Example of a Polling design pattern, repeated requests over a course of time
Example of a Polling design pattern, repeated requests over a course of time

I’m happy computers hasn’t grown a mind of their own yet, because when they do, oh boy, we software developers are screwed. All those servers must be frustrated.

Thankfully you’re reading this blog post that will save your life when the machines take over the world. They might even spare you for redesigning your communication pattern after this post.

In this article, we will look at a streaming architecture using gRPC streams. There are many modern solutions to streaming data, WebSocket, WebRTC. The reason why I’m choosing gRPC is that it allows us to also stream data between microservices and not only web applications. (Also because I love working with Go).

In this article, we will create one gRPC server written in Go, one client written in Go, and one client in React. You will see how both clients use the same server though.

The design will look like the picture below as opposed to the Polling. We will set up a connection with one request and then wait for data from the server. So instead of the client asking for new data, the server will push new data out to clients, this is called Server-side streaming.

Streaming design leverages an established communication channel to deliver data from the server without recurring requests.
Streaming design leverages an established communication channel to deliver data from the server without recurring requests.

There are two other ways of streaming data with gRPC called Client-Streaming and Bidirectional streaming. In this article, we will only use Server-side streaming. Client-side streaming is when the client is pushing data to the server.

Example of Client-side streaming, where client pushes updates to server
Example of Client-side streaming, where client pushes updates to server

Bidirectional streaming is when both the client and the server can push data between each other.

Bidirectional streaming where both client and server can stream data
Bidirectional streaming where both client and server can stream data

Let’s begin implementing the design in a Go backend. We will also implement a client in a react application. So after this article, you should have a Go server, a Go client, and a React client that talks to the same server.

Project setup and needed software

Let’s begin by setting up the project structure. I’ve started by creating a folder for each of our wanted clients and the backend code. In my case, I’ve named the project grpcstreams, and inside are 3 empty folders and 2 empty .go files. The complete code example can be found here.

My project setup.
My project setup.

If you’re unfamiliar with gRPC, I suggest reading my other article about it first.

If you want to continue without prior knowledge, you’ll need to install Protoc.

Protoc is a compiler used to assemble our protobuf and gRPC service into go code.

You will also need the go gRPC plugin so that we can generate gRPC code for the go backend. This can be retrieved by the go get command if you have to go installed (which I will assume) or go get it.

go get -u google.golang.org/protobuf/cmd/protoc-gen-go
go install google.golang.org/protobuf/cmd/protoc-gen-go

go get -u google.golang.org/grpc/cmd/protoc-gen-go-grpc
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc

One more thing before we start, we will need to generate a client for the web application as well. You can find installation instructions for it here.

So, to make sure you have everything, here is a list of what you should have.

  • Protoc
  • protoc-gen-go and protoc-gen-go-
  • grpc-web

Setting up the gRPC service

We will begin by creating a file called service-proto which will be located inside the proto folder. This file is a protobuf file that will define what our backend service will look like. We will create a hardware monitoring service that will publish hardware stats to clients.

service-proto will look like this.

syntax = "proto3";
package main;
option go_package=".;hardwaremonitoring";

// HardwareStats is a struct containing information about the systems Memory usage and CPU
message HardwareStats {
    int32 cpu = 1;
    int32 memory_free = 2;
    int32 memory_used = 3;
}
// EmptyRequest is a way to call gRPC functions without any input data
message EmptyRequest {}

// HardwareMonitor is our service, the service is the API, and rpc statements below will be methods that are available by the API
service HardwareMonitor{
    // Monitor will output stats about the hardware on the system host
    rpc Monitor(EmptyRequest) returns (stream HardwareStats) {};
}

In gRPC, you create your protobuf file and generate the code based on that file. The service-proto file is a template for how the generated code will look.

We have created a HardwareMonitor service that will expose one method called Monitor. The Monitor method will return a stream as indicated by the (stream HardwareStats).

Let’s use protoc to generate the code needed based on the service-proto

protoc service.proto --js_out=import_style=commonjs,binary:. --grpc-web_out=import_style=commonjs,mode=grpcwebtext:. --go-grpc_out=. --go_out=.

Now that command might look frightening, but it’s quite simple. What we do is call protoc on our service-proto.

We then add the flags (js_out, grpc-web_out, go-grpc_out, go_out) which are the code generators we installed. We then say =. which means output the code in the current directory. This means protoc will generate four files for us.

We will use them soon. So if you’d like to add another type of client, like a Java client, you would add the java_out option.

Go gRPC server

If we take a look at the service_grpc.pb.go file which is generated from protoc we will find that it has created an interface for us. It has generated a lot of code for us, but it cannot know what the Monitor method will do, so this is up to the developers. The generated code is all about setting up a connection and everything needed to make gRPC work, this is something we will leave as is.

If you’re not very familiar with interfaces, you can read Interfaces In Go about them. The generated interface looks like the following gist.

The generated gRPC interface.

// HardwareMonitorServer is the server API for HardwareMonitor service.
// All implementations must embed UnimplementedHardwareMonitorServer
// for forward compatibility
type HardwareMonitorServer interface {
	// Monitor will output stats about the hardware on the system host
	Monitor(*EmptyRequest, HardwareMonitor_MonitorServer) error
	mustEmbedUnimplementedHardwareMonitorServer()
}

What we need to do is open up server.go and edit it so that it will be part of the HardwareMonitorServer interface. This means we need to implement the Monitor method.

Server that will output stats each 2nd second.

package main

import (
	"log"
	"time"

	// Dont forget this import :)
	"github.com/mackerelio/go-osstat/cpu"
	"github.com/mackerelio/go-osstat/memory"
	hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
)

// Server is our struct that will handle the Hardware monitoring Logic
// It will fulfill the gRPC interface generated
type Server struct {
	hardwaremonitoring.UnimplementedHardwareMonitorServer
}

// Monitor is used to start a stream of HardwareStats
func (s *Server) Monitor(req *hardwaremonitoring.EmptyRequest, stream hardwaremonitoring.HardwareMonitor_MonitorServer) error {
	// Start a ticker that executes each 2 seconds
	timer := time.NewTicker(2 * time.Second)

	for {
		select {
		// Exit on stream context done
		case <-stream.Context().Done():
			return nil
		case <-timer.C:
			// Grab stats and output
			hwStats, err := s.GetStats()
			if err != nil {
				log.Println(err.Error())
			} else {

			}
			// Send the Hardware stats on the stream
			err = stream.Send(hwStats)
			if err != nil {
				log.Println(err.Error())
			}
		}
	}
}

// GetStats will extract system stats and output a Hardware Object, or an error
// if extraction fails
func (s *Server) GetStats() (*hardwaremonitoring.HardwareStats, error) {
	// Extarcyt Memory statas
	mem, err := memory.Get()
	if err != nil {
		return nil, err
	}
	// Extract CPU stats
	cpu, err := cpu.Get()
	if err != nil {
		return nil, err
	}
	// Create our response object
	hwStats := &hardwaremonitoring.HardwareStats{
		Cpu:        int32(cpu.Total),
		MemoryFree: int32(mem.Free),
		MemoryUsed: int32(mem.Used),
	}

	return hwStats, nil
}

When the server.go is set, we also need some way of running the server. This is done in main.go.

The main will just listen for TCP connections to port 7777 and host our gRPC hardware monitoring server on that port. This will allow clients from either the web client or go client to execute the Monitor method.

Remember, this is RPC, so any requests are just instructions to execute the method on the server. This is the reason why we can have clients in many languages.

Note, this gRPC API will run on an insecure connection. In production, you should add TLS, see my article about gRPC and TLS.

This is our simple Main that runs the server

package main

import (
	"fmt"
	"log"
	"net"

	hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
	"google.golang.org/grpc"
)

func main() {

	fmt.Println("Welcome to streaming HW monitoring")
	// Setup a tcp connection to port 7777
	lis, err := net.Listen("tcp", ":7777")
	if err != nil {
		panic(err)
	}
	// Create a gRPC server
	gRPCserver := grpc.NewServer()

	// Create a server object of the type we created in server.go
	s := &Server{}

	// Regiser our server as a gRPC server
	hardwaremonitoring.RegisterHardwareMonitorServer(gRPCserver, s)

	log.Println(gRPCserver.Serve(lis))

}

Client in Go

Now that the server is up and running, we need to create a client to test it. I’m going to start with the client written in Go. The client will be very simple, it will connect to the gRPC server and set up a stream and listen for 7 seconds, and then terminate.

The client setups a Stream and listens for 7 seconds.

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
	"google.golang.org/grpc"
)

func main() {

	// Create our context
	ctx := context.Background()
	// Setup connection
	conn, err := grpc.DialContext(ctx, "localhost:7777", grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	// Close connection when we are done
	defer conn.Close()
	// Use the generated NewHardwareMonitorClient method and pass our Connection
	client := hardwaremonitoring.NewHardwareMonitorClient(conn)

	// Call Monitor to receive the Stream of data
	// With an empty request
	emptyreq := &hardwaremonitoring.EmptyRequest{}
	// call Monitor function, this will return a stream of data
	stream, err := client.Monitor(ctx, emptyreq)
	if err != nil {
		panic(err)
	}
	// Create a timer to cancel
	stop := time.NewTicker(7 * time.Second)
	// Itterate stream
	for {
		select {
		case <-stop.C:
			// Tell the Server to close this Stream, used to clean up running on the server
			err := stream.CloseSend()
			if err != nil {
				log.Fatal("Failed to close stream: ", err.Error())
			}
			return
		default:
			// Recieve on the stream
			res, err := stream.Recv()
			if err != nil {
				panic(err)
			}
			fmt.Println("New Hardware state receieved")
			fmt.Println("CPU Usage: ", res.Cpu)
			fmt.Println("Memory Used: ", res.MemoryUsed)
			fmt.Println("Memory Free: ", res.MemoryFree)
		}
	}
}

When you’ve filled in the client.go, It’s time to try it out. Simply run go run on the server and then the client.

Example showing the client and server running.
Example showing the client and server running.

Client in web application

For the web application, I will use react and create-react-app. So creating the web application will require some more installation of software, I’m so sorry!:)

We will use NPM so start by getting that .

We will also need some protobuf and gRPC libraries so that we can use the client.

npm install -g npx
npm install grpc-web
npm install google-protobuf

We will create a react application called hwmonitor. So first of all, you need to enter the webapp folder and run the following commands to generate the react application.

npx create-react-app hwmonitor

Once the command has completed running we will need to create a new directory and copy our generated protobuf files into it. I’ll create the proto folder inside grpcstreams/webapp/hwmonitor/src/proto.

Don’t forget to copy both service_pb.js and service_grpc_web_pb.js into the new proto folder. It’s a good time to open service_pb.js.

In this file, we can see the protobuf object that is generated, which is important when we want to reach the data.

The toObject function shown, used to see what fields is available.

/**
 * Static version of the {@see toObject} method.
 * @param {boolean|undefined} includeInstance Deprecated. Whether to include
 *     the JSPB instance for transitional soy proto support:
 *     http://goto/soy-param-migration
 * @param {!proto.main.HardwareStats} msg The msg instance to transform.
 * @return {!Object}
 * @suppress {unusedLocalVariables} f is only used for nested messages
 */
proto.main.HardwareStats.toObject = function(includeInstance, msg) {
  var f, obj = {
    cpu: jspb.Message.getFieldWithDefault(msg, 1, 0),
    memoryFree: jspb.Message.getFieldWithDefault(msg, 2, 0),
    memoryUsed: jspb.Message.getFieldWithDefault(msg, 3, 0)
  };

  if (includeInstance) {
    obj.$jspbMessageInstance = msg;
  }
  return obj;
};

We can see that the object contains 3 fields, cpu, memoryFree, memoryUsed. So our fields are lowercase. This is good to know since we need those fields to display the data.

Next, we will open up the file called App.js in the react application and modify it. It will be the same as the client in go, it will start a stream and print out the data.

The gist shows how we use the Stream to update the stats.

import logo from './logo.svg';
import './App.css';
import React, { useState, useEffect } from 'react';
import { HardwareMonitorClient } from './proto/service_grpc_web_pb';
import { EmptyRequest } from './proto/service_pb';

// Create a new HardwareMonitorClient like this, correct the ADDR and Port used
// If you use something else. 
var client = new HardwareMonitorClient('http://localhost:8080');

function App() {
  const [CPU, setCPU] = useState(0);
  const [MemoryFree, setMemoryFree] = useState(0);
  const [MemoryUsed, setMemoryUsed] = useState(0);

  const getStats = () => {
    // Create our EmptyRequest that we will use to start the stream;
    var request = new EmptyRequest();
    // Dont worry about the empty Metadata for now, thats covered in another article :)
    var stream = client.monitor(request, {});
    // Start listening on the data event, this is the event that is used to notify that new data arrives
    stream.on('data', function (response) {
      // Convert Response to Object
      var stats = response.toObject();
      // Set our variable values
      setCPU(stats.cpu);
      setMemoryFree(stats.memoryFree);
      setMemoryUsed(stats.memoryUsed);
    });
  }
  // useEffect will make this trigger on component start
  useEffect(() => {
    getStats();
  });

  return (
    <div className="App">
      <p>CPU : {CPU}</p>
      <p>MemoryFree: {MemoryFree}</p>
      <p>MemoryUsed: {MemoryUsed}</p>
    </div>
  );
}

export default App;

After modifying your App.js with the above code execute the npm run build command to build the application.

npm run build

If your build fails due to PROTO being unknown, please visit Using gRPC With TLS, Golang, React without a Reverse Proxy (Envoy) where I describe how to fix it at the bottom of the article .

If your NPM build command worked you should see a new directory named build. We will need to update the main.go to host this directory as a file server and also update it to host the gRPC API over HTTP.

Updating main.go

We will update main.go to host the gRPC and the React application. We will add some code that serves the react application over HTTP, and also multiplexes the gRPC on that HTTP connection.

main.go now hosts the gRPC server over HTTP and TCP,

package main

import (
	"fmt"
	"log"
	"net"
	"net/http"
	"time"

	"github.com/improbable-eng/grpc-web/go/grpcweb"
	hardwaremonitoring "github.com/percybolmer/grpcstreams/proto"
	"google.golang.org/grpc"
)

func main() {

	fmt.Println("Welcome to streaming HW monitoring")
	// Setup a tcp connection to port 7777
	lis, err := net.Listen("tcp", ":7777")
	if err != nil {
		panic(err)
	}
	// Create a gRPC server
	gRPCserver := grpc.NewServer()

	// Create a server object of the type we created in server.go
	s := &Server{}

	// Regiser our server as a gRPC server
	hardwaremonitoring.RegisterHardwareMonitorServer(gRPCserver, s)

	// Host the regular gRPC api on a goroutine
	go func() {
		log.Fatal(gRPCserver.Serve(lis))
	}()

	// We need to wrap the gRPC server with a multiplexer to enable
	// the usage of http2 over http1
	grpcWebServer := grpcweb.WrapServer(gRPCserver)

	multiplex := grpcMultiplexer{
		grpcWebServer,
	}

	// a regular http router
	r := http.NewServeMux()
	// Load our React application
	webapp := http.FileServer(http.Dir("webapp/hwmonitor/build"))
	// Host the web app at / and wrap it in a multiplexer
	r.Handle("/", multiplex.Handler(webapp))

	// create a http server with some defaults
	srv := &http.Server{
		Handler:      r,
		Addr:         "localhost:8080",
		WriteTimeout: 15 * time.Second,
		ReadTimeout:  15 * time.Second,
	}

	// host it
	log.Fatal(srv.ListenAndServe())
}

// grpcMultiplexer enables HTTP requests and gRPC requests to multiple on the same channel
// this is needed since browsers dont fully support http2 yet
type grpcMultiplexer struct {
	*grpcweb.WrappedGrpcServer
}

// Handler is used to route requests to either grpc or to regular http
func (m *grpcMultiplexer) Handler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if m.IsGrpcWebRequest(r) {
			m.ServeHTTP(w, r)
			return
		}
		next.ServeHTTP(w, r)
	})
}

Now we are hosting the API for both the golang client and the web client. Visit localhost:8080 and you should see the server’s hardware stats.

Stats are being updated every time the server pushes updates
Stats are being updated every time the server pushes updates

That’s about it for this article. You’ve learned how to make your server a lot happier by instead letting it push data out to clients. Instead of all those clients frequently asking your server for updates. Just hope that it’s enough for the machines to spare your life when they rule.

Feel free to reach out to me with any ideas, questions, critics, or feedback.

And remember to Follow the Stream.

If you enjoyed my writing, please support future articles by buying me an Coffee

Sign up for my Awesome newsletter