In this example we are going to create a Rest API where we will return paginated list of users. The important point here is that, the response will contain a key called next which represent the "next page". There won't be a "previous page" option. However, if you wish to go backwards as well, I added an example solution/suggestion at the end of the post.


Pagination requires us to expose Cassandra query's "page state" value. When the client sends a new request this value will be used in the query to fetch the next page. Everytime we run the query, this value will be updated and returned to the client so that they can use it in next request. By the way, encrypted version of this value is being transmitted for security reasons.


Cassandra keyspace


DROP KEYSPACE IF EXISTS blog;

CREATE KEYSPACE IF NOT EXISTS blog
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 1
};

DROP TABLE IF EXISTS blog.users;

CREATE TABLE IF NOT EXISTS blog.users (
id int,
username text,
created_at timestamp,
PRIMARY KEY (username, id)
);

Structure


├── Makefile
├── docker
│   └── docker-compose.yaml
├── internal
│   ├── pkg
│   │   ├── cryptography
│   │   │   └── cryptography.go
│   │   ├── http
│   │   │   ├── router.go
│   │   │   └── server.go
│   │   └── storage
│   │   ├── cassandra
│   │   │   ├── cassandra.go
│   │   │   └── user.go
│   │   ├── manager.go
│   │   └── model.go
│   └── user
│   ├── controller.go
│   ├── request.go
│   └── response.go
└── main.go

Files


Makefile


.PHONY: docker-up
docker-up:
docker-compose -f docker/docker-compose.yaml up

.PHONY: docker-down
docker-down:
docker-compose -f docker/docker-compose.yaml down
docker system prune --volumes --force

.PHONY: up
up:
go run -race main.go

docker-compose.yaml


version: "3.7"

services:

blog-cassandra:
image: "cassandra:3.11.9"
container_name: "blog-cassandra"
ports:
- "9042:9042"
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"

main.go


package main

import (
"log"
"time"

"github.com/you/blog/internal/pkg/cryptography"
"github.com/you/blog/internal/pkg/http"
"github.com/you/blog/internal/pkg/storage/cassandra"
"github.com/you/blog/internal/user"
)

func main() {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// Cassandra connection
con, err := cassandra.NewConnection(cassandra.ConnectionConfig{
Hosts: []string{"127.0.0.1"},
Port: 9042,
ProtoVersion: 4,
Consistency: "Quorum",
Keyspace: "blog",
Timeout: time.Second * 5,
})
if err != nil {
log.Fatalln("CON:", err)
}
defer con.Close()

// User storage manager.
usr := cassandra.User{
Connection: con,
Timeout: time.Second * 5,
}

// Initialise cryptography with the application wide secret key.
// Secret generation and dump: `cry.Secret(cryptography.SecretAES256)`
cry := cryptography.New("b09e58536e4df2a4fc6dd3c9773e4f3d")

ctr := user.Controller{
UserManager: usr,
Cryptography: cry,
}

rtr := http.NewRouter()
rtr.HandleFunc("/api/v1/users", ctr.List)

srv := http.NewServer(rtr, ":8080")
log.Fatal(srv.ListenAndServe())
}

cryptography.go


This is the one we use to handle Cassandra query's "page state" value.


package cryptography

import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"fmt"
"io"
)

type SecretSize int

const (
// AES-128 = 128 bits = 16 bytes (8*2)
SecretAES128 SecretSize = 8
// AES-192 = 192 bits = 24 bytes (12*2)
SecretAES192 SecretSize = 12
// AES-256 = 256 bits = 32 bytes (16*2)
SecretAES256 SecretSize = 16
)

// Cryptography helps encrypting and decrypting private data which is often
// required to be exposed externally.
type Cryptography struct {
secret string
}

// New returns `Cryptography` type. It accepts an application wide `secret`
// which will be used as the "fallback" value if the methods are called with a
// nil `secret` argument.
func New(secret string) Cryptography {
return Cryptography{
secret: secret,
}
}

// Secret generates a random bytes key in given size. The key size must be
// either 16, 24 or 32 bytes to select AES-128, AES-192 or AES-256 modes
// respectively. Depending on the requiremenets, this function can be used only
// once to create an application wide secret key then stored as an environment
// variable or repeatedly for each encryption/decryption. You can dump the
// generated secret with with `fmt.Printf("%x", byte)` method.
func (c Cryptography) Secret(size SecretSize) ([]byte, error) {
key := make([]byte, size)

if _, err := rand.Read(key); err != nil {
return nil, err
}

return key, nil
}

// EncryptAsString returns an encrypted string version of the given data using a
// secret key. Decryption requires the same secret key and the `DecryptString`
// method.
func (c Cryptography) EncryptAsString(data, secret []byte) (string, error) {
if secret == nil {
secret = []byte(c.secret)
}

val, _, err := c.encrypt(data, secret, true)
if err != nil {
return "", err
}

return val, nil
}

// EncryptAsByte returns an encrypted byte version of the given data using a
// secret key. Decryption requires the same secret key and the `DecryptByte`
// method.
func (c Cryptography) EncryptAsByte(data, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

_, val, err := c.encrypt(data, secret, false)
if err != nil {
return nil, err
}

return val, nil
}

// encrypt returns an encrypted version of the given data using a secret key.
// Decryption requires the same secret key.
func (c Cryptography) encrypt(data, secret []byte, isString bool) (string, []byte, error) {
block, err := aes.NewCipher(secret)
if err != nil {
return "", nil, fmt.Errorf("new cipher: %w", err)
}

aead, err := cipher.NewGCM(block)
if err != nil {
return "", nil, fmt.Errorf("new gcm: %w", err)
}

nonce := make([]byte, aead.NonceSize())
if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
return "", nil, fmt.Errorf("io read: %w", err)
}

byte := aead.Seal(nonce, nonce, data, nil)

if isString {
return base64.URLEncoding.EncodeToString(byte), nil, nil
}

return "", byte, nil
}

// DecryptString returns an decrypted byte version of the given string data
// using a secret key. It requires the same secret key as the `EncryptAsString`
// method used.
func (c Cryptography) DecryptString(data string, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

byte, err := base64.URLEncoding.DecodeString(data)
if err != nil {
return nil, fmt.Errorf("decode string: %w", err)
}

return c.decrypt(byte, secret)
}

// DecryptByte returns an decrypted byte version of the given byte data using
// a secret key. It requires the same secret key as the `EncryptAsByte` method
// used.
func (c Cryptography) DecryptByte(data, secret []byte) ([]byte, error) {
if secret == nil {
secret = []byte(c.secret)
}

return c.decrypt(data, secret)
}

// decrypt returns a decrypted version of the given data using a secret key. It
// requires the same secret key as the relevant encryption method used.
func (c Cryptography) decrypt(data, secret []byte) ([]byte, error) {
block, err := aes.NewCipher(secret)
if err != nil {
return nil, fmt.Errorf("new cipher: %w", err)
}

aead, err := cipher.NewGCM(block)
if err != nil {
return nil, fmt.Errorf("new gcm: %w", err)
}

size := aead.NonceSize()
if len(data) < size {
return nil, fmt.Errorf("nonce size: invalid length")
}

nonce, text := data[:size], data[size:]

res, err := aead.Open(nil, nonce, text, nil)
if err != nil {
return nil, fmt.Errorf("aead open: %w", err)
}

return res, nil
}

router.go


package http

import (
"net/http"
)

func NewRouter() *http.ServeMux {
return http.NewServeMux()
}

server.go


package http

import (
"net/http"
)

func NewServer(handler http.Handler, address string) *http.Server {
return &http.Server{
Handler: handler,
Addr: address,
}
}

storage/model.go


package storage

import "time"

type User struct {
ID int
Username string
CreatedAt time.Time
}

storage/manager.go


package storage

import "context"

type UserManager interface {
// List returns paginated results in given size and starting from the given
// page ("page state"). If the page is empty, pagination start from the
// first page. The second returned argument represents the "next" page which
// helps resuming where the pagination was left off.
List(ctx context.Context, size int, page []byte) ([]*User, []byte, error)
}

cassandra/cassandra.go


package cassandra

import (
"time"

"github.com/gocql/gocql"
)

// The `gocql: no response received from cassandra within timeout period` error
// will be prevented by increasing the default timeout value. e.g. 5 sec
type ConnectionConfig struct {
Hosts []string
Port int
ProtoVersion int
Consistency string
Keyspace string
Timeout time.Duration
}

func NewConnection(config ConnectionConfig) (*gocql.Session, error) {
cluster := gocql.NewCluster(config.Hosts...)

cluster.Port = config.Port
cluster.ProtoVersion = config.ProtoVersion
cluster.Keyspace = config.Keyspace
cluster.Consistency = gocql.ParseConsistency(config.Consistency)
cluster.Timeout = config.Timeout

return cluster.CreateSession()
}

cassandra/user.go


package cassandra

import (
"context"
"time"

"github.com/you/blog/internal/pkg/storage"
"github.com/gocql/gocql"
)

var _ storage.UserManager = User{}

type User struct {
Connection *gocql.Session
Timeout time.Duration
}

func (u User) List(ctx context.Context, size int, page []byte) ([]*storage.User, []byte, error) {
ctx, cancel := context.WithTimeout(ctx, u.Timeout)
defer cancel()

qry := `SELECT id, username, created_at FROM users`

itr := u.Connection.Query(qry).WithContext(ctx).PageSize(size).PageState(page).Iter()
defer itr.Close()

// Set next page state.
page = itr.PageState()

users := make([]*storage.User, 0, itr.NumRows())

scanner := itr.Scanner()
for scanner.Next() {
user := &storage.User{}

if err := scanner.Scan(
&user.ID,
&user.Username,
&user.CreatedAt,
); err != nil {
return nil, nil, err
}

users = append(users, user)
}
if err := scanner.Err(); err != nil {
return nil, nil, err
}

return users, page, nil
}

request.go


package user

import (
"net/url"
"strconv"
"strings"
)

type Request struct {
PageCursor string
PageSize int
}

func (r *Request) bind(u *url.URL) {
r.PageCursor = strings.ReplaceAll(u.Query().Get("page[cursor]"), " ", "")

v, err := strconv.Atoi(u.Query().Get("page[size]"))
switch {
case err != nil, v < 1:
r.PageSize = 10
default:
r.PageSize = v
}
}

response.go


package user

import (
"fmt"
)

type Response struct {
Data interface{} `json:"data"`
Meta struct {
Total int `json:"total,omitempty"`
} `json:"meta"`
Links struct {
Next string `json:"next,omitempty"`
} `json:"links"`
}

func (r *Response) bind(data interface{}, total int, cursor string, size int) {
r.Data = data
r.Meta.Total = total

if cursor != "" {
r.Links.Next += fmt.Sprintf("page[cursor]=%s&", cursor)
}
if size != 0 {
r.Links.Next += fmt.Sprintf("page[size]=%d&", size)
}

if r.Links.Next != "" {
r.Links.Next = "?" + r.Links.Next[:len(r.Links.Next)-1]
}
}

controller.go


This file does a lot but you should split it.


package user

import (
"encoding/json"
"log"
"net/http"

"github.com/you/blog/internal/pkg/cryptography"
"github.com/you/blog/internal/pkg/storage"
)

type Controller struct {
UserManager storage.UserManager
Cryptography cryptography.Cryptography
}

// GET http://localhost:8080/api/v1/users
// GET http://localhost:8080/api/v1/users?page[cursor]={encrypted_cursor}&page[size]={number}
func (c Controller) List(w http.ResponseWriter, r *http.Request) {
// Bind client request to the Request object.
var req Request
req.bind(r.URL)

// If the page cursor is not empty decode it.
var page []byte
if req.PageCursor != "" {
var err error
page, err = c.Cryptography.DecryptString(req.PageCursor, nil)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
_, _ = w.Write([]byte("invalid page cursor"))
return
}
}

// List users and get next page state for pagination.
users, page, err := c.UserManager.List(r.Context(), req.PageSize, page)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}

// If there is a next page to navigate to, generate next page cursor.
var cursor string
if len(page) != 0 {
var err error
cursor, err = c.Cryptography.EncryptAsString(page, nil)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}
}

// Bind server response to the Response object.
var res Response
res.bind(users, len(users), cursor, req.PageSize)

// Prepare response body.
body, err := json.Marshal(res)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("internal error"))
return
}

// Respond to the client.
w.Header().Set("Content-Type", "application/json; charset=utf-8")
_, _ = w.Write(body)
}

Example


Without URL arguments


// REQUEST
// GET http://localhost:8080/api/v1/users

// RESPONSE
{
"data": [
{
"ID": 2,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:11.493Z"
},
...
...
...
{
"ID": 34,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:11.997Z"
}
],
"meta": {
"total": 10
},
"links": {
"next": "?page[cursor]=_49gW9ig9DzDjd_E-AuAIiL2b4rpkLvcFXXpJSzuKgRunG3aV7-1fkjc_iizIfF38mu3Mw==&page[size]=10"
}
}

With URL arguments


As you can see below, we visited the "next" link but changed the "size" parameter.


// REQUEST
// GET http://localhost:8080/api/v1/users?page[cursor]=_49gW9ig9DzDjd_E-AuAIiL2b4rpkLvcFXXpJSzuKgRunG3aV7-1fkjc_iizIfF38mu3Mw==&page[size]=2

// RESPONSE
{
"data": [
{
"ID": 37,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:12.039Z"
},
{
"ID": 42,
"Username": "user-3",
"CreatedAt": "2021-02-11T18:47:12.093Z"
}
],
"meta": {
"total": 2
},
"links": {
"next": "?page[cursor]=5sG9Zwze2tKlWdo8QEomA0KdgPLPPpKrh6Ek-vrLV1bcXuC4rOdRDlQo8AA2VXO2EJXIhg==&page[size]=2"
}
}

About going backwards


If you need to implement "previous page" feature as well then you have a few options. Although there are other options such as benefiting from a kind of storage layer, this is what I can think of without relying on a storage layer.


Require "Partition Key" in the WHERE clause and use ORDER BY statement. This will help you to go backwards (previous page) with DESC key and forward (next page) with ASC key. Your request and query would change as shown below. Obviously depending on the requirements, your current schema might need to be altered or you might need to introduce one/many more schemas to satisfy the request.


// Request
?page[cursor]={encrypted_cursor}&page[size]={number}
Forward: &sort=some_field
Backwards: &sort=-some_field

// CQL example
Forward: SELECT id, username, created_at FROM users WHERE username = 'user-1' ORDER BY id ASC LIMIT 10;
Backwards: SELECT id, username, created_at FROM users WHERE username = 'user-1' ORDER BY id DESC LIMIT 10;

Pagination is a bit tricky. If you delete record X in DB, its place in the current page is filled in by an existing record Y coming from the "next" page which is normal. However, if you navigate to the next page, the record Y will still appear there so what this says is that, deleting a record in DB doesn't necessarily reorganise all the records in the list. Same goes with adding a new record to DB. See examples below.


Assume that this is the current list in the page.


page[cursor]=1 - 13,14,15
page[cursor]=2 - 18,22,28
page[cursor]=3 - 34,38,39

If you delete record 22 above, the list will look like below. As you can see, 34 is in two different pages!


page[cursor]=1 - 13,14,15
page[cursor]=2 - 18,28,34
page[cursor]=3 - 34,38,39

If you add a new record 12, the list will look like below. As you can see, 15 is nowhere to be seen!


page[cursor]=1 - 12,13,14
page[cursor]=2 - 18,28,34
page[cursor]=3 - 34,38,39