Back

Create Distributed, Scalable, Durable, and Highly Available Software— With Cadence

Managing microservices or distributed systems has many challenges, Uber has a solution for many of the issues.

by Percy Bolmér, March 7, 2022

By Percy Bolmér. Gopher by Takuya Ueda, Original Go Gopher by Renée French (CC BY 3.0)
By Percy Bolmér. Gopher by Takuya Ueda, Original Go Gopher by Renée French (CC BY 3.0)

Uber is a small startup company, you might have heard about them before. They provide many open-source tools that their engineering team builds. And let me tell you, they are no jokers, they seem like a very competent bunch of developers.

Cadence is a distributed, scalable, durable, and highly available orchestration engine — Uber GitHub Repository

Have you tried Cadence before? If not, I can promise that you are in for a treat. Cadence is a framework for developing distributed systems.

I know the word framework scares many people in the Go community. Building a scalable, fault tolerant system that performs is very hard, and not something you do easily on your own. Cadence helps obscure much of the heavy lifting needed for a large-scale distributed system.

If you want to develop a distributed system that has to trigger certain actions based on the events from other services, and you want those events to be fail-proof, have retry logic implemented, then you have come to the right place.

If you wonder about the detailed structure of how cadence solves this, you can read their deployment topology.

Cadence allows us to develop services that run workflows, a set of functions in a certain order based on events and signals from other services.

In advance I want to say sorry for the very lengthy article, make sure to bring coffee

Let us begin learning how to use this amazing tool. The full code of what we will build can be found on GitHub.

Setup Cadence Server By Docker-Compose

The first thing we need is to run a Cadence server. The easiest way to do this is by using Docker-Compose. There are a few examples of docker-compose files in the Cadence Github, the different files use different persistent storage solutions. In my case, I will use MySQL.

I am using WSL 2 in my development environment, So I had to make changes to the Compose for it to work. It seems there is trouble with the compose to mount files, you can view the git issue here.

We also want the persistent storage selected to store data across restarts, this is done by adding a mount to the compose, in this case, I use MySQL. We will add a mount for both Mysql and Prometheus so we persist the data.

version: '3'
services:
  mysql:
    image: mysql:5.7
    ports:
      - "3306:3306"
    environment:
      - "MYSQL_ROOT_PASSWORD=root"
    volumes:
      - ./data/mysql/:/var/lib/mysql
  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus:/etc/prometheus
      - ./data/prometheus/:/prometheus
    user: "1000:1000"
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
    ports:
      - '9090:9090'
  cadence:
    image: ubercadence/server:0.23.2-auto-setup
    ports:
      - "8000:8000"
      - "8001:8001"
      - "8002:8002"
      - "8003:8003"
      - "7933:7933"
      - "7934:7934"
      - "7935:7935"
      - "7939:7939"
      - "7833:7833"
    environment:
      - "DB=mysql"
      - "MYSQL_USER=root"
      - "MYSQL_PWD=root"
      - "MYSQL_SEEDS=mysql"
      - "PROMETHEUS_ENDPOINT_0=0.0.0.0:8000"
      - "PROMETHEUS_ENDPOINT_1=0.0.0.0:8001"
      - "PROMETHEUS_ENDPOINT_2=0.0.0.0:8002"
      - "PROMETHEUS_ENDPOINT_3=0.0.0.0:8003"
      - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
    depends_on:
      - mysql
      - prometheus
  cadence-web:
    image: ubercadence/web:v3.29.4
    environment:
      - "CADENCE_TCHANNEL_PEERS=cadence:7933"
    ports:
      - "8088:8088"
    depends_on:
      - cadence
  grafana:
    image: grafana/grafana
    user: "1000"
    depends_on:
      - prometheus
    ports:
      - '3000:3000'
This is the Compose I use, with a few changes to the default one.

Setup Cadence CLI — Manage With Terminal

To manage the cadence server you will use the cadence CLI. You can run the cadence CLI either using Docker or building a binary. I think using docker is super easy and fast if you just want to try it out, but I still recommend building the CLI binary.

It is simple, checkout or clone the Cadence GitHub repo. In the repository there is a make file that we will run, but before that, there is a few dependencies that might need to be resolved.

You will have to have Go installed. If you don’t, follow this link to download it.

There is a few binaries in the project, they each control their own part of the system. We are only interested in the CLI for now.

git clone https://github.com/uber/cadence.git
cd cadence && make bins
#Verify it works
./cadence --help 

This will output a few binaries we can use to manage Cadence in the folder that you are working in. I recommend moving those binaries into a bin folder and adding it to your path.

Setup the project

We will be building an application that connects to Cadence and starts adding things as we go. I recommend that we copy the docker-compose that we want to run into a new project folder. In my examples, I will be using Mysql as a backend persistence storage. Cadence supports a few others, defaulting to Cassandra.

We will also need the Prometheus configurations. I will create a project named Tavern, because we are building one. Copy over the wanted compose and the Prometheus folder.

This is what your project structure should look like for now.

├── /app/
├── /cadence/                                    # All Related Cadence stuff goes here
│     ├── /data/                                 # 
|     |     └── /mysql/                          # The folder mounted onto the Docker to persist data for Mysql
|     |     └── /prometheus/                     # The folder mounted onto the Docker to persist data for Prometheus
│     ├── /prometheus/                           # Folder related to Prometheus
|     |     └── prometheus.yml                  # Prometheus config file
|     |     └── prometheus_config_multicluster.yml     # Prometheus config file for multi clusters
│     └── docker-compose.yml                     # Your docker-compose selected from the Cadence repo based on your persistent storage   
The project structure so far

Running The Cadence Server

Before we begin working with Cadence we need to make sure the server is running. Inside the project, navigate to the location of your docker-compose.yml and run it.

docker-compose up

The first step after getting the server running is to add a Domain. A domain is a namespace for tasks and workflows, this is used to isolate different tasks from each other.

Cadence supports clustering so you can run the system across many machines. This is often used to secure production uptime, in case one server fails, etc the next one can step in. If you are going to release cadence in production, remember that one domain can only be run as a singleton on the cluster.

It will probably be easier to understand if we start using a domain.

We will create a domain called tavern using the CLI. In the command, we will use –do to specify the name of the domain. We will tell the CLI to domain register to make it run a register of the domain.

cadence --do tavern domain register

Hopefully, you will receive a Domain tavern successfully registered.

We now have a Domain to start working in.

Let’s explore one of the beautiful things about Cadence, It provides you with a UI to manage domains and view workflows. Visit http://localhost:8088/ and you should be presented with a UI if you have the compose running. Try searching for Tavern and you should be presented with a domain you can visit.

The Cadence UI showing the Tavern domain when searched for
The Cadence UI showing the Tavern domain when searched for

Entering the domain won’t be super exciting yet, we have 0 workflows running. But this leads us to the next component to learn, Workflows.

Workflow is one of the main components that Cadence evolves around. A workflow is a set of tasks (activities) to run in a certain order. The workflow will help you manage the state of the processers you will have running. A workflow has many great built-in helping features such as retries for failing events. We will learn more about workflows soon by implementing one.

Before we can have a workflow running, we need something to manage the workflows. This is done by a client application that connects to the previously started cadence server. The client will connect and poll for any tasks that have to be performed, the client is known as a Worker Service.

The Worker Service — The Scrum Master

The second thing we need to run is a worker service, a worker service is like a scrum master. It will make sure jobs are accepted into the workflow and distribute it onto a worker, then push the response back to the server. The worker service will also make sure that any jobs that are pushed, will only be executed once. Even if you have multiple service workers listening on the same jobs.

The worker service is a processor running as a client that polls for jobs and is responsible for hosting the actual workflow to perform. You can read more about it here.

Before we started the Cadence server with the docker-compose, the worker service will be responsible for connecting to the server and making sure tasks are handled in a workflow (our set of activities that should be applied).

Hopefully, it will become easier to understand after implementing it. The first thing we will build is a worker service that we will use to register a workflow that will greet new customers. The workflow runs a set of functions in the order we define once a new customer enters the tavern.

I know, saying welcome to new customers might seem a lot like a hello world. However let us be clear, Cadence is a big tool, and to understand it I think we should start simple. Let us build the hello world and go through what everything means, then progress from there into the kick-arse tavern.

Start by making sure you have Cadence SDK installed in Go and that we have a go module set up. The name you give your module is very important, it is used when we shall trigger the workflow.

mkdir app && cd app
go mod init programmingpercy/cadence-tavern
go get go.uber.org/cadence

We will keep a simple and basic project setup, more advanced and scalable ways are discussed later once we have the understanding of the Cadence tooling done.

Inside the app folder create a main.go file. This is my current folder layout.

├── /app/
│     ├── main.go                               # the place where you will put the worker service code
│     ├── go.mod                                # the go module delaraction 
│     ├── go.sum                                # the go sum with versions 
├── /cadence/                                    # All Related Cadence stuff goes here
│     ├── /data/                                 # 
|     |     └── /mysql/                          # The folder mounted onto the Docker to persist data for Mysql
|     |     └── /prometheus/                     # The folder mounted onto the Docker to persist data for Prometheus
│     ├── /prometheus/                           # Folder related to Prometheus
|     |     └── prometheus.yml                  # Prometheus config file
|     |     └── prometheus_config_multicluster.yml     # Prometheus config file for multi clusters
│     └── docker-compose.yml                     # Your docker-compose selected from the Cadence repo based on your persistent storage  
The current project layout, adding our worker service

Inside main.go we will create a very simple Cadence Client for now. This will be the worker service that we will run. To create a worker service we need to connect to the cadence server that we are running via docker-compose.

Connection to the server is done by Yarpc, a Uber-created communication protocol. It is very easy to use, so don’t be afraid.

To create a worker service, uber provides the go.uber.org/cadence/worker SDK. There is a function for creating a new worker named New that we will use, but there are some configurations that are needed for it to work.

func New(
	service workflowserviceclient.Interface,
	domain string,
	taskList string,
	options Options,
) Worker {
	return internal.NewWorker(service, domain, taskList, options)
}
go.uber.org/cadence/worker.New function definition.
  • service = The Yarpc Connection to the server, we will create this with the SDK easily.
  • domain = The domain namespace to opera in, we previously created the tavern domain which we will use
  • tasklist = A string used to identify the client worker also used to identify workflows and activities run by the worker service. Think of the tasklist as a named queue.
  • options = An struct used to configure the running service, this can be used to configure logging, metrics, etc. We will use the most basic setup at first.
package main

import (
	"fmt"

	_ "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"

	"github.com/uber-go/tally"
	"go.uber.org/yarpc"
	_ "go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/transport/tchannel"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

const (
	// cadenceService should always be cadence-frontend
	CadenceService = "cadence-frontend"
	// ClientName is the identifier for the service
	ClientName = "greetings-worker"
	// Domain is the domain you have registered and want to operate in
	Domain = "tavern"
	// Host is the Cadence server IP:Port
	Host = "127.0.0.1:7933"
	// TaskList is the identifier for tasks, activites and workflows
	TaskList = "greetings"
)

func main() {
	// Create the Worker service
	worker, logger, err := newWorkerServiceClient()
	if err != nil {
		panic(err)
	}

	// Start worker
	if err := worker.Start(); err != nil {
		panic(fmt.Errorf("failed to start the worker: %v", err))
	}

	logger.Info("Started Worker.", zap.String("worker", TaskList))

	// Block Forever
	select {}

}

// newWorkerServiceClient is used to initialize a new Worker service
// It will handle Connecting and configuration of the client
// Returns a Worker, the logger applied or an error
// TODO expand this function to allow more configurations, will be done later in the article.
func newWorkerServiceClient() (worker.Worker, *zap.Logger, error) {

	// Create a logger to use for the service
	logger, err := newLogger()
	if err != nil {
		return nil, nil, err
	}
	// build the most basic Options for now
	workerOptions := worker.Options{
		Logger:       logger,
		MetricsScope: tally.NewTestScope(TaskList, map[string]string{}),
	}
	// Create the connection that the worker should use
	connection, err := newCadenceConnection(ClientName)
	if err != nil {
		return nil, nil, err
	}
	//  Create the worker and return
	return worker.New(connection, Domain, TaskList, workerOptions), logger, nil
}

// newCadenceConnection is used to create a new YARPC connection to the Cadence server
// @clientName - used to identify the connection on YARPC
func newCadenceConnection(clientName string) (workflowserviceclient.Interface, error) {
	// Create a new Channel to communicate through
	// Set the service name to our Client name so we can Identify the connection
	ch, err := tchannel.NewChannelTransport(tchannel.ServiceName(ClientName))
	if err != nil {
		return nil, fmt.Errorf("failed to set up Transport channel: %v", err)
	}
	// Set up the dispatcher
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name: ClientName,
		Outbounds: yarpc.Outbounds{
			CadenceService: {Unary: ch.NewSingleOutbound(Host)},
		},
	})
	// Start the dispatcher to allow incomming/outgoing messages
	if err := dispatcher.Start(); err != nil {
		return nil, fmt.Errorf("failed to start dispatcher: %v", err)
	}
	// Return a new workflowserviceclient with the connection assigned
	return workflowserviceclient.New(dispatcher.ClientConfig(CadenceService)), nil
}

// newLogger will create a new logger to be used by the Worker Services
// For now use DevelopmentConfig and Info level
func newLogger() (*zap.Logger, error) {
	config := zap.NewDevelopmentConfig()

	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		return nil, fmt.Errorf("failed to build logger: %v", err)
	}

	return logger, nil
}
main.go — This is the Worker service that will help us distribute and manage workflows.

That is it, note that this is the most basic worker-service for now. We can later add much more fancy options. You can try running the worker-service now by executing the main file.

go run main.go
percy@awesome:~/development/cadence-demo/app$ go run main.go 
2022-02-27T08:08:08.411+0100    INFO    internal/internal_worker.go:826 Worker has no workflows registered, so workflow worker will not be started.     {"Domain": "tavern", "TaskList": "greetings", "WorkerID": "16765@DESKTOP-3DP516F@greetings@b2b05411-9dbc-4bf3-a241-2a949da322b6"}
2022-02-27T08:08:08.411+0100    INFO    internal/internal_worker.go:834 Started Workflow Worker {"Domain": "tavern", "TaskList": "greetings", "WorkerID": "16765@DESKTOP-3DP516F@greetings@b2b05411-9dbc-4bf3-a241-2a949da322b6"}
2022-02-27T08:08:08.412+0100    INFO    internal/internal_worker.go:838 Worker has no activities registered, so activity worker will not be started.    {"Domain": "tavern", "TaskList": "greetings", "WorkerID": "16765@DESKTOP-3DP516F@greetings@b2b05411-9dbc-4bf3-a241-2a949da322b6"}
2022-02-27T08:08:08.412+0100    INFO    app/main.go:44  Started Worker. {"worker": "greetings"}
Cadance running the worker service

You should see the logs saying that the worker has been started, but that there are no workflows or activities registered. This brings us to the funny parts, the actual processing!

Workflows — A Set Of Activities To Perform In Order

As previously mentioned, a workflow is a set of activities to run in a certain order. An activity is a function or a struct method call, so this is the place where you can start performing your actual work. The rule of thumb is that no database connections etc should be done inside a workflow, but rather inside the Activity.

What is great with Cadence is that workflows provide so many good utilities for creating a reliable and stable environment. I won’t cover them all, check out their docs for that.

In the tavern we are building, we want to greet new customers, and also store customers in a database to keep track of them. This will be two separate activities, one for greeting, one for storing.

We will keep track of the customer’s number of visits and the last time a customer came by the Tavern. To keep the article shorter I will use an in-memory database solution. The Customer code will probably be shared across many workflows and activities in the Tavern, so we store it in a package named Customer in the app folder.

package customer

import (
	"fmt"
	"time"
)


var (
	// Bad Solution for in mem during tutorial
	Database = NewMemoryCustomers()
)

// Customer is representation of a client in the Tavern
type Customer struct {
	Name string `json:"name"`
	// LastVisit is a timestamp of the last time this visitor came by the tavern
	LastVisit time.Time `json:"lastVisit"`
	// TimesVisited is how many times a user has visited
	TimesVisited int `json:"timesVisited"`
}

// Repository is the needed methods to be a customer repo
type Repository interface {
	Get(string) (Customer, error)
	Update(Customer) error
}

// MemoryCustomers is used to store information in Memory
type MemoryCustomers struct {
	Customers map[string]Customer
}

// NewMemoryCustomers will init a new in memory storage for customers
func NewMemoryCustomers() MemoryCustomers {
	return MemoryCustomers{
		Customers: make(map[string]Customer),
	}
}

// Get is used to fetch a customer by Name
func (mc *MemoryCustomers) Get(name string) (Customer, error) {
	if mc.Customers == nil {
		mc.Customers = make(map[string]Customer)
	}
	if cust, ok := mc.Customers[name]; ok {
		return cust, nil
	}
	return Customer{}, fmt.Errorf("no such customer: %s", name)
}

// Update will override the information about a customer in storage
func (mc *MemoryCustomers) Update(customer Customer) error {
	if mc.Customers == nil {
		mc.Customers = make(map[string]Customer)
	}

	mc.Customers[customer.Name] = customer
	return nil
}
app/customer/repository.go — The Customer repository we use to demonstrate Cadence Activities

We also create a new folder where we can store different workflows named worksflows. In it, we will also create the greetings folder in which we will store all code related to greeting a new customer.

This is my project layout at this point.

├── /app/
│     ├── main.go                               # the place where you will put the worker service code
│     ├── go.mod                                # the go module delaraction 
│     ├── go.sum                                # the go sum with versions 
│     ├── /customer                             # Customer folder holds any code related to customer domain
|     |     └── repository.go                   # Repository holds the inMemory Cache for Customeres, Replace with a DB if wanted
│     ├── /workflows                            # workflows holds all the available workflows for the Domain
|     |     └── /greetings/                     # Workflow and Activities for Greetings
|     |          └── greetings.go  
├── /cadence/                                    # All Related Cadence stuff goes here
│     ├── /data/                                 # 
|     |     └── /mysql/                          # The folder mounted onto the Docker to persist data for Mysql
|     |     └── /prometheus/                     # The folder mounted onto the Docker to persist data for Prometheus
│     ├── /prometheus/                           # Folder related to Prometheus
|     |     └── prometheus.yml                  # Prometheus config file
|     |     └── prometheus_config_multicluster.yml     # Prometheus config file for multi clusters
│     └── docker-compose.yml                     # Your docker-compose selected from the Cadence repo based on your persistent storage 
Project layout for the first Greetings workflow

Let’s begin by building the Workflow. The workflow takes care of Input data and triggers the set of activities to run. A workflow is defined by a simple function that we later register.

The only thing to remember is that the Workflow function should accept a workflow.Context as the first parameter. It can then accept any other input parameters, as long as they are serializable ! In our case, we will accept the workflow.Context and a Customer struct.

The output from the workflow should be an error and any other serializable output.

The following gist shows a few examples of possible workflow functions.

// This function is valid, accepts a Customer in JSON format as Input, Outputs a Customer and an error
func workflowGreetings(ctx workflow.Context, visitor customer.Customer) (customer.Customer, error) 
// This could be valid, accept No input and Output a Customer and Error.
func workflowGreetings(ctx workflow.Context) (customer.Customer, error) 
// Accept string as input, output only an Error  
func workflowGreetings(ctx workflow.Context, input string) error 
A few examples of valid Workflow signatures.

In the workflow, we can add configurations about the running process such as Timeouts, Retry policies, Heartbeat configs. This is added by using a workflow.ActivityOptions.

To make sure logs are outputted you have to use the workflow.Logger or the activity.Logger so that they are used in the Cadence framework. Both the workflow and activity package exposes the logger with a GetLogger function.

Inside the workflow we create now, we will say Hello by Greeting and then store the user information in an Update function. To execute these functions we make the workflow trigger them with ExecuteActivity which will expect the workflow context as the first input parameter.

The second parameter is the Activity to run. Any parameter afterward has to align with the Activity input parameters. Both our activities will want a Customer as input, so we pass that into the ExecuteActivity.

Activities are run asynchronous, so the ExecuteActivity will return a Promise instead of the actual results. If you want to wait and run the activities synchronously instead we can use Get on the promise to await the results.

// workflowGreetings is the Workflow that is used to handle new Customers in the Tavern.
// our Workflow accepts a customer as Input, and Outputs a Customer, and an Error
func workflowGreetings(ctx workflow.Context, visitor customer.Customer) (customer.Customer, error) {
	// workflow Options for HeartBeat Timeout and other Timeouts.
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
		HeartbeatTimeout:       time.Second * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)
	// Grab the Logger that is configured on the Workflow
	logger := workflow.GetLogger(ctx)
	logger.Info("greetings workflow started")

	// Execute the activityGreetings and Wait for the Response with GET
	// GET() will Block until the activitiy is Completed.
	// Get accepts input to marshal result to,
	// ExecuteActivity returns a FUTURE, so if you want async you can simply Skip .Get
	// Get takes in a interface{} as input that we can use to Scan the result into.
	err := workflow.ExecuteActivity(ctx, activityGreetings, visitor).Get(ctx, &visitor)
	if err != nil {
		logger.Error("Greetings Activity failed", zap.Error(err))
		return customer.Customer{}, err
	}

	err = workflow.ExecuteActivity(ctx, activityStoreCustomer, visitor).Get(ctx, nil)
	if err != nil {
		logger.Error("Failed to update customer", zap.Error(err))
		return customer.Customer{}, err
	}
	// The output of the Workflow is a Visitor with filled information
	return visitor, nil
}
workflows/greetings/greetings.go — The first workflow we create

This is the complete solution for our workflow, it is a very basic workflow with only 2 activities ran synchronously. Hopefully, you get the idea.

Activites — The Business Logic Functions

Activities is where we apply the actual business logic, up until this point we have simply implemented the framework for running the workflow.

Activities work the same way as Workflows. They are simple functions, that expect a context.Context as the first parameter, followed by any number of serializable parameters. And can output any number of outputs as long as they are serializable. There should also be an error as an output parameter.

There is not more to say about activities, not until you implement heartbeats for long-running activities.


// activityGreetings is used to say Hello to a Customer and change their LastVisit and TimesVisisted
// The returned value will be a Customer struct filled with this information
func activityGreetings(ctx context.Context, visitor customer.Customer) (customer.Customer, error) {
	logger := activity.GetLogger(ctx)
	logger.Info("Greetings activity started")
	logger.Info("New Visitor", zap.String("customer", visitor.Name), zap.Int("visitorCount", visitorCount))
	visitorCount++

	oldCustomerInfo, _ := customer.Database.Get(visitor.Name)

	visitor.LastVisit = time.Now()
	visitor.TimesVisited = oldCustomerInfo.TimesVisited + 1
	return visitor, nil
}

// activityStoreCustomer is used to store the Customer in the configured Customer Storage.
func activityStoreCustomer(ctx context.Context, visitor customer.Customer) error {
	logger := activity.GetLogger(ctx)
	logger.Info("Store Customer activity started")
	logger.Info("Updating Customer", zap.String("customer", visitor.Name), zap.Time("lastVisit", visitor.LastVisit),
		zap.Int("timesVisited", visitor.TimesVisited))

	// Store Customer in Database (Memory Cache during this Example)
	err := customer.Database.Update(visitor)
	if err != nil {
		return err
	}
	return nil
}
workflows/greetings/greetings.go — The first activities we create

Registering Workflows And Activities

Each Workflow and Activity is responsible for Registering to the Cadence server that they exist. This is done using the respective Register function from the workflow and activity packages.

This is usually done in a init function in the workflows and activities packages.

func init() {
	// init will be called once the workflow file is imported
	// this will Register the workflow to the Worker service
	workflow.Register(workflowGreetings)
	// Register the activities also
	activity.Register(activityGreetings)
	activity.Register(activityStoreCustomer)
}

Init functions are functions that run as soon as a package is imported. We will want the Register functions to happen whenever our Worker Service starts up.

To make this happen we will add the import in the main.go which is our worker in this example.

package main

import (
	"fmt"
  // Here we Import the Package, which will make Register Kick in
	_ "programmingpercy/cadence-tavern/workflows/greetings"

	_ "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"

	"github.com/uber-go/tally"
	"go.uber.org/yarpc"
	_ "go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/transport/tchannel"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

........
unchanged
app/main.go — Updated import statement to make Init function register the workflow and activities

Execute Workflows and Activities

We are finally ready to start running the activities and get familiar with the whole Cadence framework.

For reference, this is how my current workflows/greetings/greetings.go looks like.

package greetings

import (
	"context"
	"programmingpercy/cadence-tavern/customer"
	"time"

	"go.uber.org/cadence/activity"
	"go.uber.org/cadence/workflow"
	"go.uber.org/zap"
)

var (
	visitorCount = 0
)

func init() {
	// init will be called once the workflow file is imported
	// this will Register the workflow to the Worker service
	workflow.Register(workflowGreetings)
	// Register the activities also
	activity.Register(activityGreetings)
	activity.Register(activityStoreCustomer)
}

// workflowGreetings is the Workflow that is used to handle new Customers in the Tavern.
// our Workflow accepts a customer as Input, and Outputs a Customer, and an Error
func workflowGreetings(ctx workflow.Context, visitor customer.Customer) (customer.Customer, error) {
	// workflow Options for HeartBeat Timeout and other Timeouts.
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
		HeartbeatTimeout:       time.Second * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)
	// Grab the Logger that is configured on the Workflow
	logger := workflow.GetLogger(ctx)
	logger.Info("greetings workflow started")

	// Execute the activityGreetings and Wait for the Response with GET
	// GET() will Block until the activitiy is Completed.
	// Get accepts input to marshal result to,
	// ExecuteActivity returns a FUTURE, so if you want async you can simply Skip .Get
	// Get takes in a interface{} as input that we can use to Scan the result into.
	err := workflow.ExecuteActivity(ctx, activityGreetings, visitor).Get(ctx, &visitor)
	if err != nil {
		logger.Error("Greetings Activity failed", zap.Error(err))
		return customer.Customer{}, err
	}

	err = workflow.ExecuteActivity(ctx, activityStoreCustomer, visitor).Get(ctx, nil)
	if err != nil {
		logger.Error("Failed to update customer", zap.Error(err))
		return customer.Customer{}, err
	}
	// The output of the Workflow is a Visitor with filled information
	return visitor, nil
}

// activityGreetings is used to say Hello to a Customer and change their LastVisit and TimesVisisted
// The returned value will be a Customer struct filled with this information
func activityGreetings(ctx context.Context, visitor customer.Customer) (customer.Customer, error) {
	logger := activity.GetLogger(ctx)
	logger.Info("Greetings activity started")
	logger.Info("New Visitor", zap.String("customer", visitor.Name), zap.Int("visitorCount", visitorCount))
	visitorCount++

	oldCustomerInfo, _ := customer.Database.Get(visitor.Name)

	visitor.LastVisit = time.Now()
	visitor.TimesVisited = oldCustomerInfo.TimesVisited + 1
	return visitor, nil
}

// activityStoreCustomer is used to store the Customer in the configured Customer Storage.
func activityStoreCustomer(ctx context.Context, visitor customer.Customer) error {
	logger := activity.GetLogger(ctx)
	logger.Info("Store Customer activity started")
	logger.Info("Updating Customer", zap.String("customer", visitor.Name), zap.Time("lastVisit", visitor.LastVisit),
		zap.Int("timesVisited", visitor.TimesVisited))

	// Store Customer in Database (Memory Cache during this Example)
	err := customer.Database.Update(visitor)
	if err != nil {
		return err
	}
	return nil
}
workflows/greetings/greetings.go — The first complete workflow to run

Make sure you have the Cadence docker-compose still running.

Navigate to the /app folder and execute the worker service.

percy@tavern: go run main.go
2022-02-28T20:12:23.215+0100    INFO    internal/internal_worker.go:834 Started Workflow Worker {"Domain": "tavern", "TaskList": "greetings", "WorkerID": "10484@DESKTOP-3DP516F@greetings@b2156771-c3f8-4e3e-845f-375b1f58081a"}
2022-02-28T20:12:23.239+0100    INFO    internal/internal_worker.go:859 Started Activity Worker {"Domain": "tavern", "TaskList": "greetings", "WorkerID": "10484@DESKTOP-3DP516F@greetings@b2156771-c3f8-4e3e-845f-375b1f58081a"}
2022-02-28T20:12:23.239+0100    INFO    app/main.go:44  Started Worker. {"worker": "greetings"}
Running the Worker Service should print that the Workflow workers and activities are up

Now, why are there no greetings being printed? Let’s make sure we have the understanding of what we are doing.

First up, we ran the docker-compose. This started the Cadence server, which manages the state of all the jobs that are to be executed.

Secondly, we ran the worker service, which registers a workflow that exists and a few activities that are part of it. It also handles polling the server for any jobs to perform.

So we have a Scrum Master and a Developer, but no actual Issues to solve!

This is where the cadence CLI is amazing, we can push new jobs using the CLI. Let us visit the Tavern.

I won’t cover all the commands inside the CLI tool, some of them you should know by now (domain, tasklist). In the cadence command, you will see the –tl flag, which is short for tasklist. This should match the Tasklist you wrote inside the configuration for the worker service.

–wt is the option for the workflow type. This is going a string composed of the full path to the workflow, so the Go module + the file and function where the workflow is declared. In my case, my module is programmingpercy/cadence-tavern. The workflow is stored inside the workflows/greetings folder, and the function is named workflowGreetings. This makes the complete path programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings.

–et is the execution timeout in seconds, this is how long the max amount of time we allow the execution to run before triggering a timeout failure.

cadence --domain tavern workflow run --tl greetings --wt programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings --et 60
Cadence — Broken command that is executed

Run that command in a new terminal, and you should see a log message similar to my gist.

Running execution:
  Workflow Id : 21e30ae8-8246-412d-a6d2-cc0782cf66d4                                   
  Run Id      : 7c3514b4-6839-43ee-9d45-d9d54c7c9a44                                   
  Type        : programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings  
  Domain      : tavern                                                                 
  Task List   : greetings                                                              
  Args        :                                                                        
Progress:
  1, 2022-02-28T20:27:25+01:00, WorkflowExecutionStarted
  2, 2022-02-28T20:27:25+01:00, DecisionTaskScheduled
  3, 2022-02-28T20:27:25+01:00, DecisionTaskStarted
  4, 2022-02-28T20:27:25+01:00, DecisionTaskCompleted
  5, 2022-02-28T20:27:25+01:00, WorkflowExecutionFailed

Result:
  Run Time: 1 seconds
  Status: FAILED
  Reason: cadenceInternal:Generic
  Detail: unable to decode the workflow function input bytes with error: unable to decode argument: 0, *customer.Customer, with json error: EOF, function name: programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings
Cadence CLI failing to run the command

The log shows you a few signs of progress that are made, this is used to show the events that happen, you can see that the Cadence server has gotten the job and successfully decides on what workflow to execute. However, the execution fails.

Let us view a bit more data about this execution, you can open up the Cadence UI by visiting http://localhost:8088/.

It will ask you for the domain, enter tavern and you should be shown all the executions that are performed on that domain.

You can use this to track down executions and their status of them, and also enter them for more information.

Small Image I know, but it shows the Faillure of the workflow execution we just triggerd.
Small Image I know, but it shows the Faillure of the workflow execution we just triggerd.

Try clicking around and navigating into the work to see more information displayed. Remember the workflow declaration?

func workflowGreetings(ctx workflow.Context, visitor customer.Customer) (customer.Customer, error) {

We specify that the input should be a Customer, so we should input JSON data that confirms the Customer struct declaration.

We can do this by using the –i flag followed by JSON.

cadence --domain tavern workflow run --tl greetings --wt programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings --et 60 -i '{"name": "Percy"}'
Cadence CLI using the Input of a Customer

You should see a result from the Workflow stating the number of times we have visited and the time of the last visit, try executing the command multiple times and play around with it. Also, try viewing the results in the UI.

Result:
  Run Time: 1 seconds
  Status: COMPLETED
  Output: {"name":"Percy","lastVisit":"2022-02-28T20:38:49.5461618+01:00","timesVisited":1}
The execution results of the workflow

Don’t forget to check out the logs in the worker-service to see what is happening.

At this time you can try out one of the orchestration utilities of Cadence. Turn off your running worker-service so that you have no process that fetches the jobs that are pushed to the cadence server. And then use the same cadence CLI to push a job.

After you have pushed a job, start the worker service up, and see how it successfully manages to find that job from the queue and perform it.

Signals — A durable asynchronous way to provide data

So far, we have run synchronous activities that rely on each other in the workflow.

But let us imagine that we have welcomed a customer in the tavern, we will expect him to make an order. We don’t know when and we don’t know what so we can’t process this yet.

Signals allow us to provide this data at a later stage, but it maintains the events and payloads in history for the rest of the workflow.

Let’s try this out by creating a second workflow, the workflowOrder that accepts an Order from a Customer, and handles the order, such as serving, and accepting payment.

I will not cover the workflow stuff here, basically, we will redo what we have for the workflowGreetings but instead have a long-running workflow.

I will create a new folder named orders inside the workflows folder. This folder will hold any workflows and activities related to orders.

To begin listening to Signals we will need a Selector. The selector is the component that is responsible for running the workflow, in the cadence source they explain it as the replacement for the regular select statement. Signals are sent on the workflow.Channel and we can subscribe to a topic by using the workflow.GetSignalChannel(ctx, TOPICNAME).

We have to tell the selector how to handle the payloads that come on the Signal channel, this is done by adding receivers which are handler functions to run on each signal that arrives.

You can add receivers by running selector.AddRecieve which accepts a workflow.Channel and a handler function.

Finally, you can make the Selector start picking up signals by running the selector.Select(ctx).

It may sound like a lot, but look at how little code we need.

package orders

import (
	"time"

	"go.uber.org/cadence/workflow"
	"go.uber.org/zap"
)

// Order is a simple type to represent orders made
type Order struct {
	Item  string  `json:"item"`
	Price float32 `json:"price"`
	By    string  `json:"by"`
}

func init() {
	workflow.Register(workflowOrder)
}

// workflowOrder will handle incomming Orders
func workflowOrder(ctx workflow.Context) error {
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute * 60,
		StartToCloseTimeout:    time.Minute * 60,
		HeartbeatTimeout:       time.Hour * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)

	logger := workflow.GetLogger(ctx)
	logger.Info("Waiting for Orders")

	// Grab the Selector from the workflow Context,
	selector := workflow.NewSelector(ctx)
	// For ever running loop
	for {
		// Get the Signal used to identify an Event, we named our Order event into order
		signalChan := workflow.GetSignalChannel(ctx, "order")

		// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved
		selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
			// Create the Order to marshal the Input into
			var order Order
			// Receive will read input data into the struct
			c.Receive(ctx, &order)

			logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price))
		})

		selector.Select(ctx)
		// TODO Add Signal for Leaving the Tavern, Close this workflow

	}

}
app/workflows/orders/order.go — The simple Long running workflow

Before you can try this out, Don’t forget to add the Import in the main.go for the new workflow, or it won’t be registered.

import (
_ "programmingpercy/cadence-tavern/workflows/orders"
......
)

Once you have the import, restart the worker service (main.go).

go run main.go

This next part is new, so read carefully! To send a Signal, we will need to provide a workflow-id. You can find this ID when you start the worker with the cadence CLI. You can use the same command as before when we triggered the greetingsWorkflow, but exchange the workflow_type, and remove the input. Remember that the service will shut down after the –et time has passed, we will talk more about this soon.

percy@awesome:~/development/cadence-demo$ cadence --domain tavern workflow run --tl greetings --wt programmingpercy/cadence-tavern/workflows/orders.workflowOrder --et 60
Running execution:
  Workflow Id : a49df98b-34ed-46b6-ac74-8a2d6e39db0e                            
  Run Id      : bd989409-8688-4eed-b08f-d871d035c77b                            
  Type        : programmingpercy/cadence-tavern/workflows/orders.workflowOrder  
  Domain      : tavern                                                          
  Task List   : greetings                                                       
  Args        :                                                                 
Progress:
  1, 2022-03-01T19:34:15+01:00, WorkflowExecutionStarted
  2, 2022-03-01T19:34:15+01:00, DecisionTaskScheduled
  3, 2022-03-01T19:34:15+01:00, DecisionTaskStarted
  4, 2022-03-01T19:34:15+01:00, DecisionTaskCompleted
  5, 2022-03-01T19:34:30+01:00, WorkflowExecutionSignaled
  6, 2022-03-01T19:34:30+01:00, DecisionTaskScheduled
  7, 2022-03-01T19:34:30+01:00, DecisionTaskStarted
  8, 2022-03-01T19:34:30+01:00, DecisionTaskCompleted

Result:
  Run Time: 41 seconds
Cadence starting the Workflow waiting for a Signal

At the top of the output, grab the workflow ID.

Our service is now up and running and waiting for orders, let us send a signal that we want to order a Beer. We can yet again, use the Cadence CLI.

We enter the domain the same as before, use the workflow command, and the signal subcommand. We set the workflow-ID using the -w flag, and the signal name with -n. Remember you have to reuse the same signal name as in the code.

The -i flag is used the same as before, with the item we want to buy, and the name of the buyer.

cadence --domain tavern workflow signal -w 71b24355–4d9a-4460–818a-50c9058a837e -n order -i '{ "item": "Beer", "by": "Percy" }'
Cadence CLI sending the Signal to our worker

You should see a status message is printed with the Signal workflow Succeeded. This means that the signal was sent to the workflow, and you can view the workflow logs by either opening the terminal that runs your worker service or visiting the UI.

I recommend exploring the UI, if you set the –et timer high enough for the workflow to not time out, send a few signals, and then view the history tab in the UI. This is great for seeing all the events that occur on the worker.

The UI can plot you the workflows history, signals received etc, and show you a lot of very useful information about the running processes etc.

Workflow Running Forever Without Timeout

It doesn’t make any sense that we have to set the –et flag to make the workflow run for a period of time. In a real system, you will solve this in two ways.

Let us discuss the issue a bit. You want a workflow to run forever and always be ready, but a workflow saves history and states. This will lead to a memory “leakage”, or growth. The workflow will grow and grow, and we cant have that.

To solve this, I would recommend two solutions. The first one is the best according to me but harder to implement. The scenario would be that we only want the workflowOrder to run during open hours. No sense to run it if the Tavern is closed. The way to implement this is to have an event/signal pushed once the Tavern opens that triggers the Order workflow.

The second solution is to just have it run forever, but with a twist. The guys at uber are no jokers, of course, they have thought of this and solved it.

A workflow can be retriggered by itself, clearing all history and state. This is done with the workflow.NewContinueAsNewError. When this error is returned, Cadence will effectively await all current work to be completed, and restart the workflow with the same ID. It will also reset all timeout timers!

One way of knowing when to return this error is either by counting the number of signals received or by a timer that is shorter than the –et timer.

I recommend the Signal counter, you dont want a workflows history to grow to large, this is also the way Cadence themselfs has examples in their git.

Let us add a signal counter, and a max amount of signals, and if we processed enough signals, let us return the NewContinueAsNewError.

You must use the AddDefault to the selector instead of just setting the restart bool into true. This is because the selector has a lot of logic implemented to finish durably and safely.

package orders

import (
	"time"

	"go.uber.org/cadence/workflow"
	"go.uber.org/zap"
)

// Order is a simple type to represent orders made
type Order struct {
	Item  string  `json:"item"`
	Price float32 `json:"price"`
	By    string  `json:"by"`
}

func init() {
	workflow.Register(workflowOrder)
}

// MaxSignalsAmount is how many signals we accept before restart
// Cadence recommends a production workflow to have <1000
const MaxSignalsAmount = 3

// workflowOrder will handle incomming Orders
func workflowOrder(ctx workflow.Context) error {
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute * 60,
		StartToCloseTimeout:    time.Minute * 60,
		HeartbeatTimeout:       time.Hour * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)

	logger := workflow.GetLogger(ctx)
	logger.Info("Waiting for Orders")

	// restartWorkflow
	var restartWorkflow bool
	// signalCounter
	signalCount := 0

	// Grab the Selector from the workflow Context,
	selector := workflow.NewSelector(ctx)
	// For ever running loop
	for {
		// Get the Signal used to identify an Event, we named our Order event into order
		signalChan := workflow.GetSignalChannel(ctx, "order")

		// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved
		selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
			// Create the Order to marshal the Input into
			var order Order
			// Receive will read input data into the struct
			c.Receive(ctx, &order)

			// increment signal counter
			signalCount++

			logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price))
		})

		if signalCount >= MaxSignalsAmount {
			// We should restart
			// Add a Default to the selector, which will make sure that this is triggered once all jobs in queue are done
			selector.AddDefault(func() {
				restartWorkflow = true
			})
		}

		selector.Select(ctx)

		// If its time to restart, return the ContinueAsNew
		if restartWorkflow {
			return workflow.NewContinueAsNewError(ctx, workflowOrder)
		}

	}

}
orders.go — The Workflow will restart after 3 signals

Once you have that change, restart it and try sending more than 3 signals and see how everything just works smoothly.

Great, we can now welcome new customers and take orders. In a real application I would want events for customers leaving, validation inside the workflow that the customer is present in the tavern before ordering etc.

One thing we will do is refactor the order workflow to learn a bit more about child workflows. It is what it sounds like, a Workflow can spawn Child workflows. This is very useful when your workflow grows large and complex, and/or has many async activities, etc.

I like structuring it so that the parent workflow is responsible for the signals etc, and the child workflow is responsible for the activities to run on certain signals. Remember, a single workflow can listen to many signals.

Child workflows

Let us start the simple workflow that will take care of the order, as a child.

Triggering a child workflow is done in the same way we trigger Activities. There is a ExecuteChildWorkflow function provided by the workflow package.

You need to create a configuration for the child workflow, in it, you set the rules about retries, how long it can run before time out, etc. Since this is a Child workflow we use the ChildWorkflowOptions configuration object found in the workflow package.

We then need to create a Child Context and Execute the child workflow. This is pretty straight forward so I won’t cover the code in detail.

The gist shows a small part of the workflowOrder where we execute the Child workflow for handling orders, we will create the actual workflow after.

	// Preconfigure ChildWorkflow Options
	orderWaiterCfg := workflow.ChildWorkflowOptions{
		ExecutionStartToCloseTimeout: time.Minute * 2, // Each Order can tops take 2 min
	}

	// Grab the Selector from the workflow Context,
	selector := workflow.NewSelector(ctx)
	// For ever running loop
	for {
		// Get the Signal used to identify an Event, we named our Order event into order
		signalChan := workflow.GetSignalChannel(ctx, "order")

		// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved
		selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
			// Create the Order to marshal the Input into
			var order Order
			// Receive will read input data into the struct
			c.Receive(ctx, &order)

			// increment signal counter
			signalCount++
			// Create ctx for Child flow
			orderCtx := workflow.WithChildOptions(ctx, orderWaiterCfg)
			// Trigger the child workflow
			waiter := workflow.ExecuteChildWorkflow(orderCtx, workflowProcessOrder, order)
			if err := waiter.Get(ctx, nil); err != nil {
				workflow.GetLogger(ctx).Error("Order has failed.", zap.Error(err))
			}

		})
Cadence example of executing a child workflow

I will create the workflowProcessOrder which will run two activities, the first one will find the customer given the name, the second will validate that the customer is old enough to order. There is nothing new to learn here, this works as the activities and workflows we made earlier, no changes.

Here is the full code of my orders.go.

package orders

import (
	"context"
	"errors"
	"programmingpercy/cadence-tavern/customer"
	"time"

	"go.uber.org/cadence/activity"
	"go.uber.org/cadence/workflow"
	"go.uber.org/zap"
)

// Order is a simple type to represent orders made
type Order struct {
	Item  string  `json:"item"`
	Price float32 `json:"price"`
	By    string  `json:"by"`
}

func init() {
	workflow.Register(workflowOrder)
	workflow.Register(workflowProcessOrder)

	activity.Register(activityIsCustomerLegal)
	activity.Register(activitiyFindCustomerByName)
}

// MaxSignalsAmount is how many signals we accept before restart
// Cadence recommends a production workflow to have <1000
const MaxSignalsAmount = 3

// workflowOrder will handle incomming Orders
func workflowOrder(ctx workflow.Context) error {
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute * 60,
		StartToCloseTimeout:    time.Minute * 60,
		HeartbeatTimeout:       time.Hour * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)

	logger := workflow.GetLogger(ctx)
	logger.Info("Waiting for Orders")

	// restartWorkflow
	var restartWorkflow bool
	// signalCounter
	signalCount := 0

	// Preconfigure ChildWorkflow Options
	orderWaiterCfg := workflow.ChildWorkflowOptions{
		ExecutionStartToCloseTimeout: time.Minute * 2, // Each Order can tops take 2 min
	}

	// Grab the Selector from the workflow Context,
	selector := workflow.NewSelector(ctx)
	// For ever running loop
	for {
		// Get the Signal used to identify an Event, we named our Order event into order
		signalChan := workflow.GetSignalChannel(ctx, "order")

		// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved
		selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
			// Create the Order to marshal the Input into
			var order Order
			// Receive will read input data into the struct
			c.Receive(ctx, &order)

			// increment signal counter
			signalCount++
			// Create ctx for Child flow
			orderCtx := workflow.WithChildOptions(ctx, orderWaiterCfg)
			// Trigger the child workflow
			waiter := workflow.ExecuteChildWorkflow(orderCtx, workflowProcessOrder, order)
			if err := waiter.Get(ctx, nil); err != nil {
				workflow.GetLogger(ctx).Error("Order has failed.", zap.Error(err))
			}

		})

		if signalCount >= MaxSignalsAmount {
			// We should restart
			// Add a Default to the selector, which will make sure that this is triggered once all jobs in queue are done
			selector.AddDefault(func() {
				restartWorkflow = true
			})
		}

		selector.Select(ctx)

		// If its time to restart, return the ContinueAsNew
		if restartWorkflow {
			return workflow.NewContinueAsNewError(ctx, workflowOrder)
		}

	}
}

// workflowProcessOrder is used to handle orders and will be ran as a CHILD
func workflowProcessOrder(ctx workflow.Context, order Order) error {

	logger := workflow.GetLogger(ctx)
	logger.Info("process order workflow started")
	ao := workflow.ActivityOptions{
		ScheduleToStartTimeout: time.Minute,
		StartToCloseTimeout:    time.Minute,
		HeartbeatTimeout:       time.Second * 20,
		// Here we will Add Retry policies etc later
	}
	// Add the Options to Context to apply configurations
	ctx = workflow.WithActivityOptions(ctx, ao)

	// Find Customer from Repo
	var cust customer.Customer
	err := workflow.ExecuteActivity(ctx, activitiyFindCustomerByName, order.By).Get(ctx, &cust)

	if err != nil {
		logger.Error("Customer is not in the Tavern", zap.Error(err))
		return err
	}

	var allowed bool
	err = workflow.ExecuteActivity(ctx, activityIsCustomerLegal, cust).Get(ctx, &allowed)
	if err != nil {
		logger.Error("Customer is not of age", zap.Error(err))
		return err
	}

	logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price))
	return nil

}

// activityFindCustomerByName is used to find the Customer is in the Tavern
func activitiyFindCustomerByName(ctx context.Context, name string) (customer.Customer, error) {
	return customer.Database.Get(name)
}

// activityIsCustomerLegal is used to check the age of the customer
func activityIsCustomerLegal(ctx context.Context, visitor customer.Customer) (bool, error) {

	if visitor.Age < 18 {
		return false, errors.New("customer is not old enough, dont serve him")
	}
	return true, nil
}
Cadence full example of child workflow.

To try this we need to do 4 things now.

  • Restart the worker service
go run main.go
  • Trigger the Order workflow
cadence --domain tavern workflow run --tl greetings --wt programmingpercy/cadence-tavern/workflows/orders.workflowOrder --et 1000
  • Visit the Tavern with a User
cadence --domain tavern workflow run --tl greetings --wt programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings --et 20 -i '{"name": "Percy", "age": 22"}'
  • Make an Order using a signal
cadence --domain tavern workflow signal -w YOUR-WORKFLOW-ID -n order -i '{"item": "Beer", "by": "Percy"}'

Doing so should show you logs about the order being processed, and you can visit the UI to see information about the workflow being executed.

Cadence Child Workflow showing executing information about parent etc.
Cadence Child Workflow showing executing information about parent etc.

Amazing, we have a tavern with 2 workflows running. Remember, spinning up worker services allows you to scale this very easily. And adding functionality is very easy and does not break the other workflows. All events being transmitted are also handled in the best possible way with retries, if a worker goes down it won’t be lost etc.

API —Users Won’t have the CLI

Great, we can publish jobs to the server and have workers accept them. This is all great, but let us be honest, we will not have the users of our application using the CLI.

The CLI has helped us test the things during development, and they allow you to also manage the production environment.

Usually, you will want your Cadence workers and workflows running but have some ways for the work to be inserted. This usually is done by a regular Rest API that triggers a signal for us.

We will set up a simple HTTP endpoint that allows us to wrap both the workflows that we have.

I will create a folder named api inside the app folder.

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"programmingpercy/cadence-tavern/customer"
	"programmingpercy/cadence-tavern/workflows/orders"
)

func main() {

	mux := http.NewServeMux()
	mux.HandleFunc("/greetings", GreetUser)
	mux.HandleFunc("/order", Order)

	log.Fatal(http.ListenAndServe("localhost:8080", mux))
}

func GreetUser(w http.ResponseWriter, r *http.Request) {

	// Grab user info from body
	var visitor customer.Customer

	err := json.NewDecoder(r.Body).Decode(&visitor)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}
	// Trigger Workflow here
	log.Print(visitor)
}

func Order(w http.ResponseWriter, r *http.Request) {
	// Grab order info from body
	var orderInfo orders.Order

	err := json.NewDecoder(r.Body).Decode(&orderInfo)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	log.Print(orderInfo)
	// Trigger Workflow here
}
app/api/main.go — The Simple HTTP server that acts as a gateway into cadence

Now let us add a Cadence Client, which can be used to talk to the cadence server and replace what we do with the CLI.

To do this we need to initiate a yarpc connection to the server, then create a workflowserviceclient which is a SDK for controlling workflows. This workflowserviceclient has to be inputted into a cadence.Client which we will be using to execute workflows.

All code related to the cadence client will be put into a file called client.go. We will have hardcoded values for this demonstration, but in a real app, you can make this configurable.

We will also have the workflow type names hardcoded. These could be fetched using the cadence client, but that is something you can experiment with yourself.

const (
	cadenceClientName = "cadence-client"
	cadenceService    = "cadence-frontend"
)

const (
	// The names of the Workflows we will be using
	OrderWorkflow     = "programmingpercy/cadence-tavern/workflows/orders.WorkflowOrder"
	GreetingsWorkflow = "programmingpercy/cadence-tavern/workflows/greetings.workflowGreetings"
)

type CadenceClient struct {
	//dispatcher used to communicate
	dispatcher *yarpc.Dispatcher
	// wfClient is the workflow Client
	wfClient workflowserviceclient.Interface
	// client is the client used for cadence
	client client.Client
	// orderWorkflowID is used to remember the workflow id
	orderWorkflowID string
	// orderWorkflowRunID is the run id of the order workflow
	orderWorkflowRunID string
}

// SetupCadenceClient is used to create the client we can use
func SetupCadenceClient() (*CadenceClient, error) {
	// Create a dispatcher used to communicate with server
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name: cadenceClientName,
		Outbounds: yarpc.Outbounds{
			// This shouldnt be hard coded in real app
			// This is a map, so we store this communication channel on "cadence-frontend"
			cadenceService: {Unary: grpc.NewTransport().NewSingleOutbound("localhost:7833")},
		},
	})
	// Start dispatcher
	if err := dispatcher.Start(); err != nil {
		return nil, err
	}
	// Grab the Configurations from the Dispatcher based on cadenceService name
	yarpConfig := dispatcher.ClientConfig(cadenceService)
	// Build the workflowserviceClient that handles the workflows
	wfClient := workflowserviceclient.New(yarpConfig)
	// clientoptions used to control metrics etc
	opts := &client.Options{
		MetricsScope: tally.NoopScope, // Do nothing for now
	}
	// Build the Cadence Client
	cadenceClient := client.NewClient(wfClient, "tavern", opts)

	return &CadenceClient{
		dispatcher: dispatcher,
		wfClient:   wfClient,
		client:     cadenceClient,
	}, nil

}

// SetOrderWorkflowIds is used to store workflows IDS in Memory
func (cc *CadenceClient) SetOrderWorkflowIds(id, runID string) {
	cc.orderWorkflowID = id
	cc.orderWorkflowRunID = runID
}
app/api/client.go — the cadence client related code

Next, we will change the GreetUser handler so that it is a method attached to the CadenceClient. We will be adding some execution options and ExecuteWorkflow.

ExecuteWorkflow is a way to trigger a workflow and wait for it to finish, this is mostly used in development according to Cadence them selfs. It is good for synchronous calls such as the greetings workflow.

Remember that the workflow expected a customer.Customer as input? So we have to use that in the execution here.

// GreetUser is used to Welcome a new User into the tavern
func (cc *CadenceClient) GreetUser(w http.ResponseWriter, r *http.Request) {
	// Grab user info from body
	var visitor customer.Customer

	err := json.NewDecoder(r.Body).Decode(&visitor)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}
	// Trigger Workflow here
	log.Print(visitor)

	// Create workflow options, this is the same as the CLI, a task list, a timeout timer
	opts := client.StartWorkflowOptions{
		TaskList:                     "greetings",
		ExecutionStartToCloseTimeout: time.Second * 10,
	}

	log.Println("Starting workflow")
	// This is how you Execute a Workflow and wait for it to finish
	// This is useful if you have synchronous workflows that you want to leverage as functions
	future, err := cc.client.ExecuteWorkflow(r.Context(), opts, GreetingsWorkflow, visitor)

	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	log.Println("Get Result from  workflow")
	// Fetch result once done and marshal into
	if err := future.Get(r.Context(), &visitor); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	data, _ := json.Marshal(visitor)
	w.WriteHeader(http.StatusOK)
	w.Write(data)
}
api/client.go — The HTTP handler that executes a cadence workflow

Before we start trying, we need to also change the Order handler. This handler will not execute a workflow, but only send a Signal that an event should be triggered.

For this to happen we will have to specify a workflow ID, a Run ID, and the Signal name. By now you should know how this works, but they also pass an Order struct as input, since the workflow expects that as input.

Remember that this handler will simply send the signal, we will cover how to start the workflow after this handler.

// Order is used to send a signal to the worker
func (cc *CadenceClient) Order(w http.ResponseWriter, r *http.Request) {
	// Grab order info from body
	var orderInfo orders.Order

	err := json.NewDecoder(r.Body).Decode(&orderInfo)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadRequest)
		return
	}

	log.Print(orderInfo)
	// Send a signal to the Workflow
	// We need to provide a Workflow ID, the RUN ID of the workflow, and the Signal type
	err = cc.client.SignalWorkflow(r.Context(), cc.orderWorkflowID, cc.orderWorkflowRunID, "order", orderInfo)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	log.Println("Signalled system of order")

	w.WriteHeader(http.StatusOK)

}
api/orders.go — The HTTP handler for triggering a Signal

To start a workflow we use StartWorkflow, the difference between Start and Execute is that Start will send the start command and return immediately. Execute will start the workflow and wait for it to complete.

So StartWorkflow is perfect when we want to boot up long-running workflows programmatically instead of the CLI.

We need to provide some runtime options such as timeouts, remember to set the ExecutionStartToCloseTimeout high enough so that you are sure that it will not timeout before ContinueAsNew has been triggered if you want a forever running workflow.

The StartWorkflow will return information about the execution, such as the WorkflowID and the RunID, two items we need to store for the Signal to be sent.

package main

import (
	"context"
	"log"
	"net/http"
	"time"

	"go.uber.org/cadence/client"
)

func main() {

	rootCtx := context.Background()
	cc, err := SetupCadenceClient()
	if err != nil {
		panic(err)
	}

	// Start long running workflow
	opts := client.StartWorkflowOptions{
		TaskList:                     "greetings",
		ExecutionStartToCloseTimeout: time.Hour * 1, // Wait 1 hours, make sure you use a high enough time
		// to make sure that the workflow does not timeout before 3 singals are recieved
	}

	// We use Start here since we want to start it but not wait for it to return
	// Execution contains information about the execution such as Workflow ID etc
	// In production, make sure you check if the WOrkflows are already running to avoid  booting up multiple unless wanted
	execution, err := cc.client.StartWorkflow(rootCtx, opts, OrderWorkflow)
	if err != nil {
		panic(err)
	}

	log.Println("Workflow ID: ", execution.ID)
	// Apply Workflows IDs
	cc.SetOrderWorkflowIds(execution.ID, execution.RunID)

	mux := http.NewServeMux()
	mux.HandleFunc("/greetings", cc.GreetUser)
	mux.HandleFunc("/order", cc.Order)

	log.Fatal(http.ListenAndServe("localhost:8080", mux))
}
api/main.go — The simple HTTP server that starts a orderWorkflow upon boot

Time to test the API! Make sure your worker service is running and then boot up the API. I use CURL to visit the tavern and then make an order.

# Start API
go run *.go

# New Terminal
# Send CURL
percy@dev:~/development/cadence-demo/app/api$ curl localhost:8080/greetings -d '{"name": "Percy", "age": 22}'
# RESPONSE
{"name":"Percy","lastVisit":"2022-03-04T07:21:33.1540528+01:00","timesVisited":1,"age":22}

# Send Order
percy@dev:~/development/cadence-demo/app/api$ curl localhost:8080/order -d '{"by": "Percy", "item": "Beer"}'
Curl — Testing the API

Visit the Cadence logs or the UI and make sure you can see that the Order has been made.

Metrics — Prometheus & Grafana

Maybe some of you noticed before, but the docker-compose contains both Prometheus and Grafana. I won’t cover what they are in this article as it is already super mega lengthy, but shortly, they are used for Metrics.

One great thing about Cadence is that it has Prometheus support for metrics, and right now we have been using an empty Tally for this. We can easily update the code to start outputting some Metrics to Prometheus.

I will create a new folder inside the app named prometheus, in it, we will have a helper function to create a new Prometheus reporter. A Prometheus reporter is a way to expose metrics data.

We will also have helper functions to create two Tally scopes, one for workers and one for the worker services.

package prometheus

import (
	"time"

	prom "github.com/m3db/prometheus_client_golang/prometheus"
	"github.com/uber-go/tally"
	"github.com/uber-go/tally/prometheus"
	"go.uber.org/zap"
)

var (
	safeCharacters = []rune{'_'}

	sanitizeOptions = tally.SanitizeOptions{
		NameCharacters: tally.ValidCharacters{
			Ranges:     tally.AlphanumericRange,
			Characters: safeCharacters,
		},
		KeyCharacters: tally.ValidCharacters{
			Ranges:     tally.AlphanumericRange,
			Characters: safeCharacters,
		},
		ValueCharacters: tally.ValidCharacters{
			Ranges:     tally.AlphanumericRange,
			Characters: safeCharacters,
		},
		ReplacementCharacter: tally.DefaultReplacementCharacter,
	}
)

// NewPrometheusReporter is used to create a new reporter that can send info to prom
// we need a zap logger inputted to make sure we get logs on error
// addr should be the IP:PORT to send metrics
func NewPrometheusReporter(addr string, logger *zap.Logger) (prometheus.Reporter, error) {
	promCfg := prometheus.Configuration{
		ListenAddress: addr,
	}

	reporter, err := promCfg.NewReporter(
		prometheus.ConfigurationOptions{
			Registry: prom.NewRegistry(),
			OnError: func(err error) {
				logger.Warn("error in prometheus reporter", zap.Error(err))
			},
		},
	)
	if err != nil {
		return nil, err
	}

	return reporter, nil
}

// NewServiceScope is used by services and prefixed Service_
func NewServiceScope(reporter prometheus.Reporter) tally.Scope {
	serviceScope, _ := tally.NewRootScope(tally.ScopeOptions{
		Prefix:          "Service_",
		Tags:            map[string]string{},
		CachedReporter:  reporter,
		Separator:       prometheus.DefaultSeparator,
		SanitizeOptions: &sanitizeOptions,
	}, 1*time.Second)

	return serviceScope
}

// NewWorkerScope is used by Workers and prefixed Worker_
func NewWorkerScope(reporter prometheus.Reporter) tally.Scope {
	serviceScope, _ := tally.NewRootScope(tally.ScopeOptions{
		Prefix:          "Worker_",
		Tags:            map[string]string{},
		CachedReporter:  reporter,
		Separator:       prometheus.DefaultSeparator,
		SanitizeOptions: &sanitizeOptions,
	}, 1*time.Second)

	return serviceScope
}
Prometheus code and helper functions for tally,

If you are unfamiliar with Tally, it is a Uber-created metrics package. We will only publish default cadence metrics, but you can easily add your metrics. I won’t cover this here, but you can read up on it on their GitHub.

To start publishing metrics we need to change the Tally Scope in both the API main.go and the worker service.

The worker service will look like the following gist, what we change is that we create a reporter on localhost:9098, which will host a Prometheus metrics website.

package main

import (
	"fmt"
	localprom "programmingpercy/cadence-tavern/prometheus"
	_ "programmingpercy/cadence-tavern/workflows/greetings"
	_ "programmingpercy/cadence-tavern/workflows/orders"

	_ "go.uber.org/cadence/.gen/go/cadence"
	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"

	"go.uber.org/yarpc"
	_ "go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/transport/tchannel"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)
..... unchanged until

// newWorkerServiceClient is used to initialize a new Worker service
// It will handle Connecting and configuration of the client
// Returns a Worker, the logger applied or an error
// TODO expand this function to allow more configurations, will be done later in the article.
func newWorkerServiceClient() (worker.Worker, *zap.Logger, error) {

	// Create a logger to use for the service
	logger, err := newLogger()
	if err != nil {
		return nil, nil, err
	}

	reporter, err := localprom.NewPrometheusReporter("127.0.0.1:9098", logger)
	if err != nil {
		return nil, nil, err
	}

	metricsScope := localprom.NewServiceScope(reporter)

	// build the most basic Options for now
	workerOptions := worker.Options{
		Logger:       logger,
		MetricsScope: metricsScope,
	}
	// Create the connection that the worker should use
	connection, err := newCadenceConnection(ClientName)
	if err != nil {
		return nil, nil, err
	}
	//  Create the worker and return
	return worker.New(connection, Domain, TaskList, workerOptions), logger, nil
}
app/main.go — Updated to use the Metrics scope.

The same thing goes for the API. Here we use localhost:9099 instead, to avoid conflicting ports.

// SetupCadenceClient is used to create the client we can use
func SetupCadenceClient() (*CadenceClient, error) {
	// Create a dispatcher used to communicate with server
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name: cadenceClientName,
		Outbounds: yarpc.Outbounds{
			// This shouldnt be hard coded in real app
			// This is a map, so we store this communication channel on "cadence-frontend"
			cadenceService: {Unary: grpc.NewTransport().NewSingleOutbound("localhost:7833")},
		},
	})
	// Start dispatcher
	if err := dispatcher.Start(); err != nil {
		return nil, err
	}
	// Grab the Configurations from the Dispatcher based on cadenceService name
	yarpConfig := dispatcher.ClientConfig(cadenceService)
	// Build the workflowserviceClient that handles the workflows
	wfClient := workflowserviceclient.New(yarpConfig)
	// clientoptions used to control metrics etc

	config := zap.NewDevelopmentConfig()

	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		return nil, fmt.Errorf("failed to build logger: %v", err)
	}

	// Start prom scope
	reporter, err := localprom.NewPrometheusReporter("127.0.0.1:9099", logger)
	if err != nil {
		return nil, err
	}
	// use WorkerScope
	metricsScope := localprom.NewWorkerScope(reporter)

	opts := &client.Options{
		MetricsScope: metricsScope,
	}

	// Build the Cadence Client
	cadenceClient := client.NewClient(wfClient, "tavern", opts)

	return &CadenceClient{
		dispatcher: dispatcher,
		wfClient:   wfClient,
		client:     cadenceClient,
	}, nil

}// SetupCadenceClient is used to create the client we can use
func SetupCadenceClient() (*CadenceClient, error) {
	// Create a dispatcher used to communicate with server
	dispatcher := yarpc.NewDispatcher(yarpc.Config{
		Name: cadenceClientName,
		Outbounds: yarpc.Outbounds{
			// This shouldnt be hard coded in real app
			// This is a map, so we store this communication channel on "cadence-frontend"
			cadenceService: {Unary: grpc.NewTransport().NewSingleOutbound("localhost:7833")},
		},
	})
	// Start dispatcher
	if err := dispatcher.Start(); err != nil {
		return nil, err
	}
	// Grab the Configurations from the Dispatcher based on cadenceService name
	yarpConfig := dispatcher.ClientConfig(cadenceService)
	// Build the workflowserviceClient that handles the workflows
	wfClient := workflowserviceclient.New(yarpConfig)
	// clientoptions used to control metrics etc

	config := zap.NewDevelopmentConfig()

	config.Level.SetLevel(zapcore.InfoLevel)

	var err error
	logger, err := config.Build()
	if err != nil {
		return nil, fmt.Errorf("failed to build logger: %v", err)
	}

	// Start prom scope
	reporter, err := localprom.NewPrometheusReporter("127.0.0.1:9099", logger)
	if err != nil {
		return nil, err
	}
	// use WorkerScope
	metricsScope := localprom.NewWorkerScope(reporter)

	opts := &client.Options{
		MetricsScope: metricsScope,
	}

	// Build the Cadence Client
	cadenceClient := client.NewClient(wfClient, "tavern", opts)

	return &CadenceClient{
		dispatcher: dispatcher,
		wfClient:   wfClient,
		client:     cadenceClient,
	}, nil

}
app/api/client.go — Updated to use prometheus metrics

Now the services are publishing metrics on those URLs, you can visit them to see the data. We need to tell Prometheus to scrape these URLS, which is done in the prometheus.yml file that docker-compose is using.

We will be adding localhost:9090 and also the two URLs from the workers, we need to use host.docker.internal for this to work.

global:
  scrape_interval: 5s
  external_labels:
    monitor: 'cadence-monitor'
scrape_configs:
  - job_name: 'prometheus'
    static_configs:
      - targets: # addresses to scrape
          - 'localhost:9090'
          - 'cadence:8000'
          - 'cadence:8001'
          - 'cadence:8002'
          - 'cadence:8003'
          - 'host.docker.internal:9098'
          - 'host.docker.internal:9099'
Added Scrape targets for Prometheus

Restart the docker-compose and then boot up the API and the Worker service.

We will start by visiting Prometheus to make sure the scrape targets are connecting.

Visit http://localhost:9090/targets and you should see all the scrape targets and their status if they are not OK. Make sure you entered the correct ports and that the services are all running.

Prometheus scrape targets and their status
Prometheus scrape targets and their status

Grafana is the service we can use to visualize the metrics. Cadence comes with two premade Dashboards that we can leverage, you can download them from here.

Grafana should be found on localhost:3000 if you haven’t changed the docker-compose.

In the left side menu, we need to select Data Sources so that we can add the Prometheus server as a data provider to Grafana.

Grafana Data Sources In the Menu
Grafana Data Sources In the Menu

You will be brought to a screen with many options, for this simple example, you only need to provide the URL. The URL should be http://host.docker.internal:9090

At the bottom, you will find a button saying Save & Test.

Grafana adding the Prometheus as a data source
Grafana adding the Prometheus as a data source

Once we have the data source we need to add the JSON dashboards.

Grafana import Dashboard
Grafana import Dashboard

Import both the dashboards and we are good to go. You now have metrics in Grafana.

Try sending a few requests, here you can see how I can view the number of successful workflow executions, etc.

Grafana successful workflow executions
Grafana successful workflow executions

Of course, there are only default metrics about executions, latencies, failures, etc. You can add more by providing a tally of your metrics.

Conclusion

Getting started with Cadence is a bit of work, but it is time well spent. Cadence has a small overhead, the code we have produced is not much, and what did it give you? Fault tolerant, retryable, event managed tasks. Implementing this yourself would be a crazy amount of code.

Cadence offers very much, and the guys at Uber have really proved skilled.

The overhead of starting up a project using Cadence is existing, but it is small and only needs to be done at the start of the project. Once up, adding features and activities and workflows is so easy.

The extra time for setting it up, is gained back in many other ends.

I am impressed by how easy it was getting up and running and how scalable it feels. Many of these frameworks can be confusing to get started with, but I feel that the layout of Cadence makes it very simple. We have Worker services that poll the server for jobs, we have workflows that are a set of functions run by the worker services, and workflows are made of activities (functions).

The framework also is easy to incorporate into the regular go code.

In a production codebase, you should create wrappers/util packages to create the Cadence clients. We have duplicate code for connection a cadence workflowserviceclient inside the Worker service and the API right now. These utility packages will even further reduce the overhead.

Also, you should make everything configurable, and the API should try Listing workflows to make sure they are not already running, etc.

For production, make sure you take a look at the production operation found at their docs.

Thank you for reading this may be too long article, but It was hard to keep short. The full code can be found on GitHub.

I’d love to hear from you on any of my social medias listed below!

If you enjoyed my writing, please support future articles by buying me an Coffee

Sign up for my Awesome newsletter