Bu örnekte, sayfalara ayrılmış kullanıcı listesi döndüreceğimiz bir Rest API oluşturacağız. Burada önemli olan nokta, yanıtın "sonraki sayfayı" temsil eden next adlı bir anahtar içereceğidir. Bir "önceki sayfa" seçeneği olmayacak. Ancak geriye gitmek isterseniz yazının sonuna örnek bir çözüm/öneri ekledim.


Sayfalandırma, Cassandra sorgusunun "sayfa durumu" değerini göstermemizi gerektirir. İstemci yeni bir istek gönderdiğinde, bu değer sonraki sayfayı getirmek için sorguda kullanılacaktır. Sorguyu her çalıştırdığımızda, bu değer güncellenecek ve istemciye bir sonraki istekte kullanabilmesi için iade edilecektir. Bu arada, güvenlik nedeniyle bu değerin şifrelenmiş versiyonu kullanıyoruz.


Cassandra keyspace


DROP KEYSPACE IF EXISTS blog;

CREATE KEYSPACE IF NOT EXISTS blog
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 1
};

DROP TABLE IF EXISTS blog.users;

CREATE TABLE IF NOT EXISTS blog.users (
id int,
username text,
created_at timestamp,
PRIMARY KEY (username, id)
);

Yapı


├── Makefile
├── docker
│   └── docker-compose.yaml
├── internal
│   ├── pkg
│   │   ├── cryptography
│   │   │   └── cryptography.go
│   │   ├── http
│   │   │   ├── router.go
│   │   │   └── server.go
│   │   └── storage
│   │   ├── cassandra
│   │   │   ├── cassandra.go
│   │   │   └── user.go
│   │   ├── manager.go
│   │   └── model.go
│   └── user
│   ├── controller.go
│   ├── request.go
│   └── response.go
└── main.go

Dosyalar


Makefile


.PHONY: docker-up
docker-up:
docker-compose -f docker/docker-compose.yaml up

.PHONY: docker-down
docker-down:
docker-compose -f docker/docker-compose.yaml down
docker system prune --volumes --force

.PHONY: up
up:
go run -race main.go

docker-compose.yaml


version: "3.7"

services:

blog-cassandra:
image: "cassandra:3.11.9"
container_name: "blog-cassandra"
ports:
- "9042:9042"
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"

main.go


package main

import (
"log"
"time"

"github.com/you/blog/internal/pkg/cryptography"
"github.com/you/blog/internal/pkg/http"
"github.com/you/blog/internal/pkg/storage/cassandra"
"github.com/you/blog/internal/user"
)

func main() {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// Cassandra connection
con, err := cassandra.NewConnection(cassandra.ConnectionConfig{
Hosts: []string{"127.0.0.1"},
Port: 9042,
ProtoVersion: 4,
Consistency: "Quorum",
Keyspace: "blog",
Timeout: time.Second * 5,
})
if err != nil {
log.Fatalln("CON:", err)
}
defer con.Close()

// User storage manager.
usr := cassandra.User{
Connection: con,
Timeout: time.Second * 5,
}

// Initialise cryptography with the application wide secret key.
// Secret generation and dump: `cry.Secret(cryptography.SecretAES256)`
cry := cryptography.New("b09e58536e4df2a4fc6dd3c9773e4f3d")

ctr := user.Controller{
UserManager: usr,
Cryptography: cry,
}

rtr := http.NewRouter()
rtr.HandleFunc("/api/v1/users", ctr.List)

srv := http.NewServer(rtr, ":8080")
log.Fatal(srv.ListenAndServe())
}

cryptography.go


This is the one we use to handle Cassandra query's "page state" value.


package cryptography

import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
)

type SecretSize int

const (
// AES-128 = 128 bits = 16 bytes (8*2)
SecretAES128 SecretSize = 8
// AES-192 = 192 bits = 24 bytes (12*2)
SecretAES192 SecretSize = 12
// AES-256 = 256 bits = 32 bytes (16*2)
SecretAES256 SecretSize = 16
)

// Cryptography helps encrypting and decrypting private data which is often
// required to be exposed externally.
type Cryptography struct {
secret string
}

// New returns `Cryptography` type. It accepts an application wide `secret`
// which will be used as the "fallback" value if the methods are called with a
// nil `secret` argument.
func New(secret string) Cryptography {
return Cryptography{
secret: secret,
}
}

// Secret generates a random bytes key in given size. The key size must be
// either 16, 24 or 32 bytes to select AES-128, AES-192 or AES-256 modes
// respectively. Depending on the requiremenets, this function can be used only
// once to create an application wide secret key then stored as an environment
// variable or repeatedly for each encryption/decryption. You can dump the
// generated secret with with `fmt.Printf("%x", byte)` method.
func (c Cryptography) Secret(size SecretSize) ([]byte, error) {
key := make([]byte, size)

if _, err := rand.Read(key); err != nil {
return nil, err
}

return key, nil
}

// EncryptAsString returns an encrypted string version of the given data using a
// secret key. Decryption requires the same secret key and the `DecryptString`
// method.
func (c Cryptography) EncryptAsString(data, secret []byte) (string, error) {
if secret == nil {
secret = []byte(c.secret)
}

val, _, err := c.encrypt(data, secret, true)
if err != nil {
return "", err
}

return val, nil
}

// EncryptAsByte returns an encrypted byte version of the given data using a
// secret key. Decryption requires the same secret key and the `DecryptByte`
// method.
func (c Cryptography) EncryptAsByte(data, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

_, val, err := c.encrypt(data, secret, false)
if err != nil {
return nil, err
}

return val, nil
}

// encrypt returns an encrypted version of the given data using a secret key.
// Decryption requires the same secret key.
func (c Cryptography) encrypt(data, secret []byte, isString bool) (string, []byte, error) {
block, err := aes.NewCipher(secret)
if err != nil {
return "", nil, fmt.Errorf("new cipher: %w", err)
}

aead, err := cipher.NewGCM(block)
if err != nil {
return "", nil, fmt.Errorf("new gcm: %w", err)
}

nonce := make([]byte, aead.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", nil, fmt.Errorf("io read: %w", err)
}

byte := aead.Seal(nonce, nonce, data, nil)

if isString {
return base64.URLEncoding.EncodeToString(byte), nil, nil
}

return "", byte, nil
}

// DecryptString returns an decrypted byte version of the given string data
// using a secret key. It requires the same secret key as the `EncryptAsString`
// method used.
func (c Cryptography) DecryptString(data string, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

byte, err := base64.URLEncoding.DecodeString(data)
if err != nil {
return nil, fmt.Errorf("decode string: %w", err)
}

return c.decrypt(byte, secret)
}

// DecryptByte returns an decrypted byte version of the given byte data using
// a secret key. It requires the same secret key as the `EncryptAsByte` method
// used.
func (c Cryptography) DecryptByte(data, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

return c.decrypt(data, secret)
}

// decrypt returns a decrypted version of the given data using a secret key. It
// requires the same secret key as the relevant encryption method used.
func (c Cryptography) decrypt(data, secret []byte) ([]byte, error) {
block, err := aes.NewCipher(secret)
if err != nil {
return nil, fmt.Errorf("new cipher: %w", err)
}

aead, err := cipher.NewGCM(block)
if err != nil {
return nil, fmt.Errorf("new gcm: %w", err)
}

size := aead.NonceSize()
if len(data) < size {
return nil, fmt.Errorf("nonce size: invalid length")
}

nonce, text := data[:size], data[size:]

res, err := aead.Open(nil, nonce, text, nil)
if err != nil {
return nil, fmt.Errorf("aead open: %w", err)
}

return res, nil
}

router.go


package http

import (
"net/http"
)

func NewRouter() *http.ServeMux {
return http.NewServeMux()
}

server.go


package http

import (
"net/http"
)

func NewServer(handler http.Handler, address string) *http.Server {
return &http.Server{
Handler: handler,
Addr: address,
}
}

storage/model.go


package storage

import "time"

type User struct {
ID int
Username string
CreatedAt time.Time
}

storage/manager.go


package storage

import "context"

type UserManager interface {
// List returns paginated results in given size and starting from the given
// page ("page state"). If the page is empty, pagination start from the
// first page. The second returned argument represents the "next" page which
// helps resuming where the pagination was left off.
List(ctx context.Context, size int, page []byte) ([]*User, []byte, error)
}

cassandra/cassandra.go


package cassandra

import (
"time"

"github.com/gocql/gocql"
)

// The `gocql: no response received from cassandra within timeout period` error
// will be prevented by increasing the default timeout value. e.g. 5 sec
type ConnectionConfig struct {
Hosts []string
Port int
ProtoVersion int
Consistency string
Keyspace string
Timeout time.Duration
}

func NewConnection(config ConnectionConfig) (*gocql.Session, error) {
cluster := gocql.NewCluster(config.Hosts...)

cluster.Port = config.Port
cluster.ProtoVersion = config.ProtoVersion
cluster.Keyspace = config.Keyspace
cluster.Consistency = gocql.ParseConsistency(config.Consistency)
cluster.Timeout = config.Timeout

return cluster.CreateSession()
}

cassandra/user.go


package cassandra

import (
"context"
"time"

"github.com/you/blog/internal/pkg/storage"
"github.com/gocql/gocql"
)

var _ storage.UserManager = User{}

type User struct {
Connection *gocql.Session
Timeout time.Duration
}

func (u User) List(ctx context.Context, size int, page []byte) ([]*storage.User, []byte, error) {
ctx, cancel := context.WithTimeout(ctx, u.Timeout)
defer cancel()

qry := `SELECT id, username, created_at FROM users`

itr := u.Connection.Query(qry).WithContext(ctx).PageSize(size).PageState(page).Iter()
defer itr.Close()

// Set next page state.
page = itr.PageState()

users := make([]*storage.User, 0, itr.NumRows())

scanner := itr.Scanner()
for scanner.Next() {
user := &storage.User{}

if err := scanner.Scan(
&user.ID,
&user.Username,
&user.CreatedAt,
); err != nil {
return nil, nil, err
}

users = append(users, user)
}
if err := scanner.Err(); err != nil {
return nil, nil, err
}

return users, page, nil
}

request.go


package user

import (
"net/url"
"strconv"
"strings"
)

type Request struct {
PageCursor string
PageSize int
}

func (r *Request) bind(u *url.URL) {
r.PageCursor = strings.ReplaceAll(u.Query().Get("page[cursor]"), " ", "")

v, err := strconv.Atoi(u.Query().Get("page[size]"))
switch {
case err != nil, v < 1:
r.PageSize = 10
default:
r.PageSize = v
}
}

response.go


package user

import (
"fmt"
)

type Response struct {
Data interface{} `json:"data"`
Meta struct {
Total int `json:"total,omitempty"`
} `json:"meta"`
Links struct {
Next string `json:"next,omitempty"`
} `json:"links"`
}

func (r *Response) bind(data interface{}, total int, cursor string, size int) {
r.Data = data
r.Meta.Total = total

if cursor != "" {
r.Links.Next += fmt.Sprintf("page[cursor]=%s&", cursor)
}
if size != 0 {
r.Links.Next += fmt.Sprintf("page[size]=%d&", size)
}

if r.Links.Next != "" {
r.Links.Next = "?" + r.Links.Next[:len(r.Links.Next)-1]
}
}

controller.go


Bu dosya çok şey yapıyor bu nedenle onu bölmelisiniz.


package user

import (
"encoding/json"
"log"
"net/http"

"github.com/you/blog/internal/pkg/cryptography"
"github.com/you/blog/internal/pkg/storage"
)

type Controller struct {
UserManager storage.UserManager
Cryptography cryptography.Cryptography
}

// GET http://localhost:8080/api/v1/users
// GET http://localhost:8080/api/v1/users?page[cursor]={encrypted_cursor}&page[size]={number}
func (c Controller) List(w http.ResponseWriter, r *http.Request) {
// Bind client request to the Request object.
var req Request
req.bind(r.URL)

// If the page cursor is not empty decode it.
var page []byte
if req.PageCursor != "" {
var err error
page, err = c.Cryptography.DecryptString(req.PageCursor, nil)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("invalid page cursor"))
return
}
}

// List users and get next page state for pagination.
users, page, err := c.UserManager.List(r.Context(), req.PageSize, page)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}

// If there is a next page to navigate to, generate next page cursor.
var cursor string
if len(page) != 0 {
var err error
cursor, err = c.Cryptography.EncryptAsString(page, nil)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}
}

// Bind server response to the Response object.
var res Response
res.bind(users, len(users), cursor, req.PageSize)

// Prepare response body.
body, err := json.Marshal(res)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}

// Respond to the client.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_, _ = w.Write(body)
}

Örnek


URL değişkenleri olmadan


// REQUEST
// GET http://localhost:8080/api/v1/users

// RESPONSE
{
"data": [
{
"ID": 2,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:11.493Z"
},
...
...
...
{
"ID": 34,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:11.997Z"
}
],
"meta": {
"total": 10
},
"links": {
"next": "?page[cursor]=_49gW9ig9DzDjd_E-AuAIiL2b4rpkLvcFXXpJSzuKgRunG3aV7-1fkjc_iizIfF38mu3Mw==&page[size]=10"
}
}

URL değişkenleriyle


Aşağıda görebileceğiniz gibi, "sonraki" bağlantısını ziyaret ettik ancak "size" parametresini değiştirdik.


// REQUEST
// GET http://localhost:8080/api/v1/users?page[cursor]=_49gW9ig9DzDjd_E-AuAIiL2b4rpkLvcFXXpJSzuKgRunG3aV7-1fkjc_iizIfF38mu3Mw==&page[size]=2

// RESPONSE
{
"data": [
{
"ID": 37,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:12.039Z"
},
{
"ID": 42,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:12.093Z"
}
],
"meta": {
"total": 2
},
"links": {
"next": "?page[cursor]=5sG9Zwze2tKlWdo8QEomA0KdgPLPPpKrh6Ek-vrLV1bcXuC4rOdRDlQo8AA2VXO2EJXIhg==&page[size]=2"
}
}

Geriye gitme hakkında


"Önceki sayfa" özelliğini de uygulamanız gerekiyorsa, birkaç seçeneğiniz var. Bir çeşit depolama katmanından yararlanmak gibi bir seçenekte olsa, bir depolama katmanını kullanmaya gerek duymayan düşüncem şudur.


WHERE yan tümcesinde "Partition Key" verisini ve ORDER BY ifadesini kullanın. Bu, DESC anahtarı ile geriye (önceki sayfa) ve ASC anahtarı ile ileri (sonraki sayfa) gitmenize yardımcı olacaktır. İsteğiniz ve sorgunuz aşağıda gösterildiği gibi değişecektir. Açıktır ki, gereksinimlere bağlı olarak, mevcut şemanızın değiştirilmesi gerekebilir veya talebi karşılamak için bir veya daha fazla şema tanıtmanız gerekebilir.


// Request
?page[cursor]={encrypted_cursor}&page[size]={number}
Forward: &sort=some_field
Backwards: &sort=-some_field

// CQL example
Forward: SELECT id, username, created_at FROM users WHERE username = 'user-1' ORDER BY id ASC LIMIT 10;
Backwards: SELECT id, username, created_at FROM users WHERE username = 'user-1' ORDER BY id DESC LIMIT 10;

Sayfalandırma biraz kafa karıştıcı olabilir. DB'deki X kaydını silerseniz, mevcut sayfadaki yeri "sonraki" sayfadan gelen bir Y kaydı tarafından doldurulur ki, bu da normaldir. Fakat bununla birlikte, bir sonraki sayfaya giderseniz, Y kaydı orada yine görünecektir. Bu, DB'deki bir kaydı silmenin tüm listenin yeniden organize edilmeyeceği anlamına gelir. Aynı şey, DB'ye yeni bir kayıt eklendiğinde de geçerlidir. Aşağıdaki örneklere bakın.


Bunun sayfadaki mevcut liste olduğunu varsayın.


page[cursor]=1 - 13,14,15
page[cursor]=2 - 18,22,28
page[cursor]=3 - 34,38,39

Yukarıdaki 22. kaydı silerseniz, liste aşağıdaki gibi görünecektir. Gördüğünüz gibi 34 iki farklı sayfada!


page[cursor]=1 - 13,14,15
page[cursor]=2 - 18,28,34
page[cursor]=3 - 34,38,39

Yeni bir kayıt (12) eklerseniz, liste aşağıdaki gibi görünecektir. Gördüğünüz gibi, 15 listede yok!


page[cursor]=1 - 12,13,14
page[cursor]=2 - 18,28,34
page[cursor]=3 - 34,38,39