Bu örnekte, Golang gRPC uygulamasında sunucu taraflı yanıt akış örneği oluşturacağız. Bu bire birçok ilişki gibidir. İstemci bir istek gönderir ve sunucu sıfır veya çok sayıda yanıt verir. Müşteri, her şey bitene kadar yayın yapar. Hepsi okunduktan sonra istemci çıkar. Daha fazla bilgi için dokümantasyonu okuyun.


Örneğimiz çok basit. İstemci, kendisine bağlı tüm işlemleri sunucudan tek tek almak için bir hesap kimliği içeren bir istek gönderir.


Proto library


pkg/protobuf/bank/transaction.proto


syntax = "proto3";

package bank;

option go_package = "github.com/you/proto-lib/pkg/protobuf/bank";

import "google/protobuf/timestamp.proto";

service TransactionService {
rpc Fetch(FetchRequest) returns (stream FetchResponse) {};
}

message FetchRequest {
string accountId = 1;
}

message FetchResponse {
// This could be repeated as well
Transaction transaction = 1;
}

message Transaction {
enum Operation {
Credit = 0;
Debit = 1;
}

google.protobuf.Timestamp time = 1;
Operation operation = 2;
double amount = 3;
}

Makefile


.PHONY: compile
compile:
protoc -I pkg/protobuf/bank/ --go_out=plugins=grpc:../../.. pkg/protobuf/bank/*.proto

Client


cmd/client/main.go


package main

import (
"context"
"fmt"
"log"

"github.com/you/client/internal/transaction"
"google.golang.org/grpc"
)

func main() {
fmt.Println("client")

conn, err := grpc.Dial(":50051", grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Fatalln(err)
}
defer conn.Close()

trxClient := transaction.NewClient(conn)
if err := trxClient.Fetch(context.Background(), "ACC-1"); err != nil {
log.Fatalln(err)
}
}

internal/transaction/client.go


package transaction

import (
"context"
"io"
"log"

"github.com/you/proto-lib/pkg/protobuf/bank"
"google.golang.org/grpc"
)

type Client struct {
client bank.TransactionServiceClient
}

func NewClient(conn grpc.ClientConnInterface) Client {
return Client{
client: bank.NewTransactionServiceClient(conn),
}
}

func (c Client) Fetch(ctx context.Context, accountID string) error {
stream, err := c.client.Fetch(ctx, &bank.FetchRequest{AccountId: accountID})
if err != nil {
return err
}

for {
res, err := stream.Recv()
if err != nil {
if err == io.EOF {
log.Println("> ALL DONE!")

return nil
}

return err
}

trx := res.GetTransaction()
log.Println("TIME:", trx.GetTime().String())
log.Println("OPERATION:", trx.GetOperation().String())
log.Println("AMOUNT:", trx.GetAmount())
log.Println("------")
}
}

Server


cmd/server/main.go


package main

import (
"fmt"
"log"
"net"

"github.com/you/server/internal/transaction"
"github.com/you/proto-lib/pkg/protobuf/bank"
"google.golang.org/grpc"
)

func main() {
fmt.Println("server")

listener, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalln(err)
}

grpcServer := grpc.NewServer()
trxServer := transaction.NewServer()

bank.RegisterTransactionServiceServer(grpcServer, trxServer)

log.Fatalln(grpcServer.Serve(listener))
}

internal/transaction/transaction.go


package transaction

import (
"time"

"github.com/you/proto-lib/pkg/protobuf/bank"
)

type transaction struct {
time time.Time
operation bank.Transaction_Operation
amount float64
}

var transactions = map[string][]transaction{
"ACC-1": {
{
time: time.Now().Add(time.Second),
operation: bank.Transaction_Credit,
amount: 10.00,
},
{
time: time.Now().Add(time.Millisecond),
operation: bank.Transaction_Credit,
amount: 20.99,
},
{
time: time.Now().Add(time.Hour),
operation: bank.Transaction_Debit,
amount: 30.62,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Credit,
amount: 40,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Debit,
amount: 50.55,
},
{
time: time.Now().Add(time.Minute),
operation: bank.Transaction_Debit,
amount: 60.60,
},
},
}

internal/transaction/server.go


package transaction

import (
"log"
"math/rand"
"time"

"github.com/you/proto-lib/pkg/protobuf/bank"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var sleeper = []int{0, 1, 2, 3, 4}

type Server struct {
transactions map[string][]transaction
}

func NewServer() Server {
return Server{
transactions: transactions,
}
}

func (s Server) Fetch(request *bank.FetchRequest, stream bank.TransactionService_FetchServer) error {
rand.Seed(time.Now().UnixNano())

log.Println("Fetching transactions for account:", request.GetAccountId())

trxs := s.transactions[request.GetAccountId()]

for _, trx := range trxs {
ts, err := ptypes.TimestampProto(trx.time)
if err != nil {
return status.Errorf(codes.Internal, "fetch: invalid time: %v", err)
}

if err := stream.Send(&bank.FetchResponse{
Transaction: &bank.Transaction{
Time: ts,
Operation: trx.operation,
Amount: trx.amount,
},
}); err != nil {
return status.Errorf(codes.Internal, "fetch: unexpected stream: %v", err)
}

sleep := rand.Intn(len(sleeper))
time.Sleep(time.Duration(sleep) * time.Second)
}

log.Println("Completed")

return nil
}

Test


# Run server
$ go run --race cmd/server/main.go

# Run client
$ go run --race cmd/client/main.go

Sonuç


# Server
2020/10/28 17:00:58 Fetching transactions for account: ACC-1
2020/10/28 17:01:14 Completed

# Client
2020/10/28 17:00:58 TIME: seconds:1603904454 nanos:702436000
2020/10/28 17:00:58 OPERATION: Credit
2020/10/28 17:00:58 AMOUNT: 10
2020/10/28 17:00:58 ------
2020/10/28 17:00:59 TIME: seconds:1603904453 nanos:703439000
2020/10/28 17:00:59 OPERATION: Credit
2020/10/28 17:00:59 AMOUNT: 20.99
2020/10/28 17:00:59 ------
2020/10/28 17:01:01 TIME: seconds:1603908053 nanos:702440000
2020/10/28 17:01:01 OPERATION: Debit
2020/10/28 17:01:01 AMOUNT: 30.62
2020/10/28 17:01:01 ------
2020/10/28 17:01:03 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:03 OPERATION: Credit
2020/10/28 17:01:03 AMOUNT: 40
2020/10/28 17:01:03 ------
2020/10/28 17:01:06 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:06 OPERATION: Debit
2020/10/28 17:01:06 AMOUNT: 50.55
2020/10/28 17:01:06 ------
2020/10/28 17:01:10 TIME: seconds:1603904513 nanos:702440000
2020/10/28 17:01:10 OPERATION: Debit
2020/10/28 17:01:10 AMOUNT: 60.6
2020/10/28 17:01:10 ------
2020/10/28 17:01:14 > ALL DONE!