In this example we are going to create a client-side streaming example in Golang gRPC application. It is like one to many relation. Client sends many requests and server responds with a one. The client streams until there is nothing left. Once all read by server, they both exit. For more info read documentation.


Proto file


syntax = "proto3";

package port;

option go_package = "github.com/you/pirate/pkg/protobuf/port;protoport";

service PortService {
rpc Create(stream CreateRequest) returns (CreateResponse) {}
}

message CreateRequest {
string id = 1;
string name = 2;
}

message CreateResponse {
int32 total = 1;
}


Run protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative pkg/protobuf/port/*.proto command to generate compiled file.


Client


main.go


package main

import (
"context"
"log"

"github.com/you/pirate/internal/pkg/transport/client/grpc"

gogrpc "google.golang.org/grpc"
)

func main() {
log.Println("client")

conn, err := gogrpc.Dial(":50051", gogrpc.WithInsecure(), gogrpc.WithBlock())
if err != nil {
log.Fatalln(err)
}
defer conn.Close()

portClient := grpc.NewPort(conn)
if err := portClient.Create(context.Background()); err != nil {
log.Fatalln(err)
}
}

grpc.go


package grpc

import (
"context"
"fmt"
"log"

"google.golang.org/grpc"

protoport "github.com/you/pirate/pkg/protobuf/port"
)

type Port struct {
client protoport.PortServiceClient
}

func NewPort(conn grpc.ClientConnInterface) Port {
return Port{
client: protoport.NewPortServiceClient(conn),
}
}

func (p Port) Create(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// I am hardcoding this here but you should not!
requests := []*protoport.CreateRequest{
{Id: "id-1", Name: "name-1"},
{Id: "id-2", Name: "name-2"},
{Id: "id-3", Name: "name-3"},
{Id: "id-4", Name: "name-4"},
}

stream, err := p.client.Create(ctx)
if err != nil {
return fmt.Errorf("create stream: %w", err)
}

for _, request := range requests {
if err := stream.Send(request); err != nil {
return fmt.Errorf("send stream: %w", err)
}
}

response, err := stream.CloseAndRecv()
if err != nil {
return fmt.Errorf("close and receive: %w", err)
}

log.Printf("%+v\n", response)

return nil
}

Server


main.go


package main

import (
"log"
"net"

"github.com/you/pirate/internal/pkg/transport/server/grpc"

gogrpc "google.golang.org/grpc"

protoport "github.com/you/pirate/pkg/protobuf/port"
)

func main() {
log.Println("server")

listener, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalln(err)
}

server := gogrpc.NewServer()
protoServer := grpc.Port{}

protoport.RegisterPortServiceServer(server, protoServer)

log.Fatalln(server.Serve(listener))
}

grpc.go


package grpc

import (
"io"
"log"

protoport "github.com/you/pirate/pkg/protobuf/port"
)

type Port struct{}

func (p Port) Create(stream protoport.PortService_CreateServer) error {
var total int32

for {
port, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&protoport.CreateResponse{
Total: total,
})
}
if err != nil {
return err
}

total++
log.Printf("%+v\n", port)
}
}

Test


Run you server first then the client. The result should be as follows.


server$ go run main.go
2021/04/13 16:27:26 server
2021/04/13 16:27:31 id:"id-1" name:"name-1"
2021/04/13 16:27:31 id:"id-2" name:"name-2"
2021/04/13 16:27:31 id:"id-3" name:"name-3"
2021/04/13 16:27:31 id:"id-4" name:"name-4"


client$ go run main.go
2021/04/13 16:27:31 client
2021/04/13 16:27:31 total:4