Building a 100% ScyllaDB Shard-Aware Application Using Rust

Discover our innovative project rewarded by ScyllaDB's Technical Achievement Award

This breakthrough design has been awarded

We are proud to have been given ScyllaDB’s Technical Achievement Award for Outstanding Innovative Project involving ScyllaDB by designing a data processing application leveraging on an innovative low-level architectural design. This breakthrough design brings amazing strengths like idempotence, deterministic and distributed data processing along with infinite scalability that benefits our clients. We are really humbled for this recognition that rewards both our teams’ technical expertise and Open-Source community contributions.

It is now in production at Numberly, learn more about it in this article!

 

Certificate of the ScyllaSB Technical Achievement Award with the Numberly logo

Project Context

At Numberly, the Omnichannel Delivery team has the ownership of all the types of messages we support and operate for our clients.

From the well known and established email to the still emerging RCS without forgetting the OTT platforms such as WhatsApp.

The team recently got the chance to build a “platform to rule them all” with the goal of streamlining how all our components send and track messages, whatever their form.

The general logic is as follows: clients or programmatic platforms send messages or batch of messages using REST API gateways which are responsible for validating and rendering the message payload.

Then those gateways will all converge towards a Central Message Routing Platform which will implement full featured scheduling, accounting, tracing and of course routing of the messages using the right platform or operators connectors.

Diagram showing gateways that converge towards a central message routing platform

Looking at the Central Messaging Platform

High constraints

Now putting all your eggs in one basket is always risky right?

Making this kind of move puts a lot of constraints to our platform requirements. It has to be very reliable!

First as being highly available and resilient, because it will become a single point of failure for all our messages.

Second as being able to scale fast to match the growth of one or multiple channels at once as our routing needs change.

 

Strong guarantees

High Availability and scale look easy when compared to our observability and idempotence requirements.

When you imagine all your messages going through a single place, the ability to trace what happened to every single one of them (or a group of them) becomes a real challenge.

Even worse, one of the greatest challenges out there, even more in a distributed system, is the idempotence guarantee that we lacked so far on the other pipelines.

Guaranteeing that a message cannot be sent twice is easier said than done!

Design Thinking & Key Concepts

We split up our objectives into three main concepts that we promised to strictly respect to keep up with the constraints and guarantees of our platform.

 

Reliability

  • Simple: few share-(almost?)-nothing components
  • Low coupling: keep remote dependencies to its minimum
  • Coding language: efficient with explicit patterns and strict paradigms

 

Scale

  • Application layer: easy to deploy & scale with strong resilience
  • Data bus: high-throughput, highly-resilient, horizontally scalable, time and order preserving capabilities message bus
  • Data querying: low-latency, one-or-many query support

 

Idempotence

  • Processing isolation: workload distribution should be deterministic

Architecture considerations

The possible default choice

Considering Numberly’s stack, the first go-to architecture could have been something like this:

Diagram showing the default choice architecture featuring REST API Gateways and the central message routing platform
  • Application layers running on Kubernetes
  • Kafka as a message passing bus from Gateway APIs
  • Kafka used as a log of messages to be processed and sent
  • ScyllaDB as a storage layer to query the state of individual or group of messages
  • Redis as a hot cache for some optimizations
  • Kafka used a messaging bus between our Central Message Routing Platform to individual channel routing agents

On paper it sounds like a solid and proven design right?

 

A not so default choice after all

This apparently simple goto architecture has caveats that breaks too much of the concepts we promised to we stick with!

 

Reliability

  • High Availability with low coupling: we would rely and need to design our reliability upon three different data technologies, each of them could fail for different reasons that our platform logic should handle

 

Scalability

While we are lucky to dispose of a data technology to match each scalability constraint we set, the combination of the three does not match our reliability + idempotence requirements.

Their combination adds too much complexity and point of failures to be efficiently implemented together:

  • Easy to deploy: Kubernetes would do the job alright
  • Data horizontal scaling: while ScyllaDB would scale for sure, Kafka scaling with its partitions logic is to be cautious about while Redis does not scale that well out of the box
  • Data low latency querying: ScyllaDB and Redis are the clear winners here while Kafka is obviously not designed to “query” a piece of data easily
  • Data ordered bus: that’s where Kafka excels and where Redis exposes a queuing capability that will scale hazardously. ScyllaDB on the other hand might be able to act as an ordered bus if we give it some thought?

 

Idempotence

Idempotence as expected becomes a nightmare when you imagine achieving it on such a complex ecosystem mixing many technologies.

  • Deterministic workload distribution: can you achieve it when summing ScyllaDB+Kafka+Redis?..

 

The daring architecture

So we decided to be bold and make a big statement : we’ll only use ONE data technology to hold everything together!

ScyllaDB was the best suited to face the challenge:

  • It’s highly available
  • It scales amazingly
  • It offers ridiculously fast queries for both single and range queries

Which means that it can also be thought of as a distributed cache effectively replacing Redis.

Now replacing kafka as an ordered data bus is not so trivial using ScyllaDB but seems doable…

The biggest question still on our plate was:

  • How can we get a deterministic workload distribution, if possible, for free?

That’s where I got what turned out to be a not so crazy idea after all:

  • What if I used ScyllaDB’s shard-per-core architecture inside my own application?

Let’s take a quick detour and explain ScyllaDB shard-per-core architecture.

ScyllaDB Shard-Per-Core Architecture

ScyllaDB low-level design leverages a shard-per-core architecture to deterministically distribute and process data.

The main idea is that the partition key in your data table design determines not only which node is responsible for a copy of the data but also which CPU CORE gets to handle its I/O processing.

Diagram showing the ScyllaDB cluster, partition key shard calculation, and shards per core

You got it right, ScyllaDB distributes the data in a deterministic fashion down to a single CPU core!

So my naive idea was to distribute our messaging platform processing using the exact same logic of ScyllaDB:

Diagram showing the ScyllaDB cluster, shards per core, and application pods

The expected effect would be to actually align ScyllaDB’s per CPU core processing with our application’s and benefit from all the latency/scaling/reliability that come with it.

The 100% Shard-Aware application

That’s how we effectively created a 100% shard-aware application, and it brings amazing properties on the table!

  • Deterministic workload distribution
  • Super optimized data processing capacity aligned from the application to the storage layer
  • Strong latency and isolation guarantees per application instance (pod)
  • Infinite scale following ScyllaDB’s own ability to grow seamlessly

Building a Shard-Aware Application

Selecting the right programming language

Now that we got our architecture inspiration, it was time to answer the perpetual question “Which language to use?”.

  • We need a modern language that reflects our desire to build a reliable, secure and efficient platform
  • Shard calculation algorithm requires fast hashing capabilities and a great low-level synergy with the ScyllaDB driver

Once we established that, Rust was a no-brainer.

 

The Deterministic Data Ingestion

Incoming messages are handled by a component that we call the ingester.

For each message we receive, after the usual validations, we calculate the shard to which the message belongs as it will be stored in ScyllaDB. For this we use the ScyllaDB Rust driver internal functions (which we contributed).

Diagram showing the first step of data ingestion

More exactly, we compute a partition key that matches the ScyllaDB’s storage replica nodes and CPU core from our message partition key effectively aligning our application’s processing with ScyllaDB’s CPU core.

Once this partition key is calculated to match ScyllaDB’s storage layer, we persist the message with all its data in the message table and at the same time add its metadata to a table named buffer with the calculated partition key.

Diagram showing the second step of data ingestion

The Deterministic Data Processing

Now that the data is stored in ScyllaDB let’s talk about the second component which we call schedulers.

Schedulers will consume the ordered data from the buffer table and effectively proceed with the message routing logic.

Following the shard-to-component architecture, a scheduler will exclusively consume the messages of a specific shard just like a CPU core is assigned to a slice of ScyllaDB data.

Now that the data is stored in ScyllaDB let’s talk about the second component which we call schedulers.

Schedulers will consume the ordered data from the buffer table and effectively proceed with the message routing logic.

Following the shard-to-component architecture, a scheduler will exclusively consume the messages of a specific shard just like a CPU core is assigned to a slice of ScyllaDB data.

Diagram showing the first step of deterministic data processing

A scheduler will fetch a slice of the data that it is responsible for it from the buffer table.

Diagram showing the second step of deterministic data processing

At this point, a scheduler will have the ids of the messages it should process. Then it fetches the message details from the message table.

Diagram showing the first third of deterministic data processing

The scheduler then process and send the message to the right channel it is responsible for.

Diagram showing the fourth step of deterministic data processing

Each component of the platform is responsible for a slice of messages per channel by leveraging on ScyllaDB’s shard-aware algorithm. We obtain a 100% aligned data processing from the application’s perspective down to the database.

Replacing Kafka with ScyllaDB

Replacing Kafka as an ordered data bus is not so trivial using ScyllaDB but was surely doable.

Let’s get a deeper view on how it works from the scheduler component perspective.

We store messages metadata as a time series in the the buffer table, ordered by their ScyllaDB’s time of ingestion (this is an important detail). Each scheduler keeps a timestamp offset of the last message it successfully processed.

This offset is stored in dedicated table. When a scheduler starts it fetches the timestamp offset of the shard of data it is assigned to.

Diagram showing the replacement of Kafta with ScyllaDB

A scheduler is an infinite loop fetching the messages it is assigned to within a certain and configurable time window.

In fact, a scheduler doesn’t fetch data strictly starting from the last timestamp offset but instead from an oldest timestamp.

It does indeed mean that a single message will be fetched multiple times but this is handled by our idempotence business logic and optimized by a memory cache.

Overlapping the previous time range allows us to prevent any possible message miss that could be caused by a potential write latency or subtle time skew between nodes (since we rely on ScyllaDB’s timestamps).

Diagram showing the replacing of Kafta with ScyllaDB

Retrospective

Reaching our goal was not easy, we failed many times but finally made it and proved that our original idea was not only working but also was very convenient to work with while being amazingly efficient.

 

What we learned on the road

The first thing we want to emphasize is that load testing is more than useful.

Quickly enough during the development, we set up load tests, sending dozens of thousands message per second. Our goal was to test our data schema design at scale and idempotence guarantee.

It allowed us to spot multiple issues, sometimes non trivial, like when the execution delay between the statements of our insertion batch was greater than our fetch time-window. Yeah, a nightmare to debug…

By the way, our first workload was a naive insert-and-delete, and load testing made large partitions appear very fast!

Hopefully, we also learned about compaction strategies, and especially the Time-Window Compaction Strategy, which we are using now, and which allowed us to get rid of the large partitions issue.

Message buffering as time-series processing allowed us to avoid large partitions!

 

We contributed to the ScyllaDB Rust driver

To make this project possible, we contributed to the ScyllaDB ecosystem, especially to the Rust driver, with a few issues and pull requests.

For example, we added the code to compute the replica nodes of a primary key, as we needed it to compute the shard of a message:

We hope it will help you if you want to use this cool sharding pattern in your future shard-aware application!

We also discovered some ScyllaDB bugs, so of course we worked with ScyllaDB support to have them fixed (thanks for your reactivity).

 

What we wish we could do

As in all systems, everything is not perfect, and we have some points we wish we could do better.

Obviously, ScyllaDB is not a message queuing platform, and we miss Kafka long-polling. Currently, our architecture does regular fetching of each shard buffer, so that’s a lot of useless bandwidth consumed, but we are working on optimizing this.

Also, we encountered some memory issues, where we did suspect the ScyllaDB Rust driver. We didn’t take so much time to investigate, but it made us dig into the driver code, where we spotted a lot of memory allocations.

As a side project, we started to think about some optimizations; actually, we did more than think, because we wrote a whole prototype of an (almost) allocation-free ScyllaDB Rust driver.

We will maybe make it the subject of a future blog post, with the Rust driver outperforming back the Go driver!

 

Going further with ScyllaDB features

So we bet on ScyllaDB, and that’s a good thing, because it has a lot of other features that we want to benefit from.

For example Change Data Capture: using the CDC Kafka source connector, we could stream our message events to the rest of the infrastructure, without touching our application code. Observability made easy.

We are looking forward to the ScyllaDB path towards strongly consistent tables with Raft as an alternative to LWT.

Currently, we are using LWT in a few places, especially for dynamic shard workload attribution, so we can’t wait to test this feature!

Written by Alexys Jacob, CTO.