In this example we are going to create a server-side streaming example in Golang gRPC application. It is like one to many relation. Client sends one request and server responds with zero or many. The client streams until there is nothing left. Once all read, it exits. For more info read documentation.


Our example is a simple one. The client sends a request containing an account id in order to fetch all the transactions linked to it one by one from the server.


Proto library


pkg/protobuf/bank/transaction.proto


syntax = "proto3";

package bank;

option go_package = "github.com/you/proto-lib/pkg/protobuf/bank";

import "google/protobuf/timestamp.proto";

service TransactionService {
rpc Fetch(FetchRequest) returns (stream FetchResponse) {};
}

message FetchRequest {
string accountId = 1;
}

message FetchResponse {
// This could be repeated as well
Transaction transaction = 1;
}

message Transaction {
enum Operation {
Credit = 0;
Debit = 1;
}

google.protobuf.Timestamp time = 1;
Operation operation = 2;
double amount = 3;
}

Makefile


.PHONY: compile
compile:
protoc -I pkg/protobuf/bank/ --go_out=plugins=grpc:../../.. pkg/protobuf/bank/*.proto

Client


cmd/client/main.go


package main

import (
"context"
"fmt"
"log"

"github.com/you/client/internal/transaction"
"google.golang.org/grpc"
)

func main() {
fmt.Println("client")

conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalln(err)
}
defer conn.Close()

trxClient := transaction.NewClient(conn)
if err := trxClient.Fetch(context.Background(), "ACC-1"); err != nil {
log.Fatalln(err)
}
}

internal/transaction/client.go


package transaction

import (
"context"
"io"
"log"

"github.com/you/proto-lib/pkg/protobuf/bank"
"google.golang.org/grpc"
)

type Client struct {
client bank.TransactionServiceClient
}

func NewClient(conn grpc.ClientConnInterface) Client {
return Client{
client: bank.NewTransactionServiceClient(conn),
}
}

func (c Client) Fetch(ctx context.Context, accountID string) error {
stream, err := c.client.Fetch(ctx, &bank.FetchRequest{AccountId: accountID})
if err != nil {
return err
}

for {
res, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("> ALL DONE!")

return nil
}

return err
}

trx := res.GetTransaction()
log.Println("TIME:", trx.GetTime().String())
log.Println("OPERATION:", trx.GetOperation().String())
log.Println("AMOUNT:", trx.GetAmount())
log.Println("------")
}
}

Server


cmd/server/main.go


package main

import (
"fmt"
"log"
"net"

"github.com/you/server/internal/transaction"
"github.com/you/proto-lib/pkg/protobuf/bank"
"google.golang.org/grpc"
)

func main() {
fmt.Println("server")

listener, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalln(err)
}

grpcServer := grpc.NewServer()
trxServer := transaction.NewServer()

bank.RegisterTransactionServiceServer(grpcServer, trxServer)

log.Fatalln(grpcServer.Serve(listener))
}

internal/transaction/transaction.go


package transaction

import (
"time"

"github.com/you/proto-lib/pkg/protobuf/bank"
)

type transaction struct {
time time.Time
operation bank.Transaction_Operation
amount float64
}

var transactions = map[string][]transaction{
"ACC-1": {
{
time: time.Now().Add(time.Second),
operation: bank.Transaction_Credit,
amount: 10.00,
},
{
time: time.Now().Add(time.Millisecond),
operation: bank.Transaction_Credit,
amount: 20.99,
},
{
time: time.Now().Add(time.Hour),
operation: bank.Transaction_Debit,
amount: 30.62,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Credit,
amount: 40,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Debit,
amount: 50.55,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Debit,
amount: 60.60,
},
},
}

internal/transaction/server.go


package transaction

import (
"log"
"math/rand"
"time"

"github.com/you/proto-lib/pkg/protobuf/bank"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var sleeper = []int{0, 1, 2, 3, 4}

type Server struct {
transactions map[string][]transaction
}

func NewServer() Server {
return Server{
transactions: transactions,
}
}

func (s Server) Fetch(request *bank.FetchRequest, stream bank.TransactionService_FetchServer) error {
rand.Seed(time.Now().UnixNano())

log.Println("Fetching transactions for account:", request.GetAccountId())

trxs := s.transactions[request.GetAccountId()]

for _, trx := range trxs {
ts, err := ptypes.TimestampProto(trx.time)
if err != nil {
return status.Errorf(codes.Internal, "fetch: invalid time: %v", err)
}

if err := stream.Send(&bank.FetchResponse{
Transaction: &bank.Transaction{
Time: ts,
Operation: trx.operation,
Amount: trx.amount,
},
}); err != nil {
return status.Errorf(codes.Internal, "fetch: unexpected stream: %v", err)
}

sleep := rand.Intn(len(sleeper))
time.Sleep(time.Duration(sleep) * time.Second)
}

log.Println("Completed")

return nil
}

Test


# Run server
$ go run --race cmd/server/main.go

# Run client
$ go run --race cmd/client/main.go

Result


# Server
2020/10/28 17:00:58 Fetching transactions for account: ACC-1
2020/10/28 17:01:14 Completed

# Client
2020/10/28 17:00:58 TIME: seconds:1603904454 nanos:702436000
2020/10/28 17:00:58 OPERATION: Credit
2020/10/28 17:00:58 AMOUNT: 10
2020/10/28 17:00:58 ------
2020/10/28 17:00:59 TIME: seconds:1603904453 nanos:703439000
2020/10/28 17:00:59 OPERATION: Credit
2020/10/28 17:00:59 AMOUNT: 20.99
2020/10/28 17:00:59 ------
2020/10/28 17:01:01 TIME: seconds:1603908053 nanos:702440000
2020/10/28 17:01:01 OPERATION: Debit
2020/10/28 17:01:01 AMOUNT: 30.62
2020/10/28 17:01:01 ------
2020/10/28 17:01:03 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:03 OPERATION: Credit
2020/10/28 17:01:03 AMOUNT: 40
2020/10/28 17:01:03 ------
2020/10/28 17:01:06 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:06 OPERATION: Debit
2020/10/28 17:01:06 AMOUNT: 50.55
2020/10/28 17:01:06 ------
2020/10/28 17:01:10 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:10 OPERATION: Debit
2020/10/28 17:01:10 AMOUNT: 60.6
2020/10/28 17:01:10 ------
2020/10/28 17:01:14 > ALL DONE!