Apprendre Rust à la dure, pour une production pipeline Kafka+ScyllaDB

par notre CTO Alexys Jacob

Contexte du projet à Numberly

Vous vous souvenez peut-être que Numberly est un expert du data marketing qui aide les marques à se connecter avec leurs clients en utilisant tous les canaux digitaux disponibles.

En tant qu’entreprise de data, nous opérons sur un grand nombre de données qui évoluent rapidement. Cette approche data orientée événements oriente nos choix technologiques vers des plateformes qui nous permettent de traiter et de réagir aux stimulis en temps aussi proche que possible du temps réel.

À ce titre, nous combinons les superpouvoirs de Kafka et de Scylla grâce à des flux et des applications spécialisés que nous appellerons ici des processeurs de données.

Chacune de ces applications de traitement des données en flux prépare et enrichit les données entrantes en amont afin qu’elles soient utiles aux applications des entreprises, des partenaires ou des clients en aval.

La pertinence d’une décision fondée sur des données est optimale lorsqu’elle est proche du moment où l’événement se produit, ce qui signifie que la disponibilité et la latence sont essentielles pour nous.

La latence et la résilience sont les piliers sur lesquels nous construisons nos plateformes pour rendre notre activité fiable pour nos clients et partenaires.

Ces applications de traitement des données, Kafka et, bien sûr, Scylla ne peuvent pas tomber en panne, sinon nos partenaires et nos clients seront en colère… Et personne ne veut avoir affaire à un client en colère !

L'opportunité (rouillée)

L’industrie de la data et ses écosystèmes sont en constante évolution. L’automne dernier, nous avons dû adapter trois des processeurs de données les plus exigeants écrits en Python.

Ces applications de traitement faisaient le travail depuis plus de cinq ans, elles étaient éprouvées et dignes de confiance.

Mais je suivais la maturation de Rust depuis un certain temps, j’étais curieux et il m’a toujours semblé moins intimidant que le C ou le C++. Alors quand cette opportunité s’est présentée, je suis allé voir mes collègues et je leur ai dit :

“Pourquoi ne pas réécrire ces 3 applications Python dont nous savons qu’elles fonctionnent très bien avec une application Rust dont nous ne connaissons même pas le langage ?”

Je dois admettre que j’ai perdu mon badge de CTO pendant quelques secondes quand j’ai vu leurs visages…

Les promesses de Rust (jamais tentées auparavant)

Rust fait des promesses avec lesquelles certaines personnes semblent d’accord, c’est intriguant !

Il est censé…

– Être sécure

  • Sécurité de la mémoire et des threads
  • Pas de runtime ou de garbage collector

– Facilité de déploiement

  • Les binaires compilés sont autosuffisants

– Ne fait pas de compromis

  • Fortement et statiquement typé
  • L’exhaustivité est obligatoire
  • Syntaxe et primitives de gestion des erreurs intégrées

– S’interface facilement avec Python

  • PyO3 peut être utilisé pour exécuter Rust depuis Python (ou le contraire)

Mais en plus, leur devise marketing parle au marketeur qui sommeille en moi :
“Un langage qui permet à chacun de créer des logiciels fiables et efficaces.”

Ceci étant dit, les lecteurs attentifs remarqueront que je n’ai pas mentionné la vitesse dans les promesses de Rust.
Rust n’est-il pas censé être rapide? Non, ils parlent d’efficacité et ce n’est pas la même chose !

 

Efficient software! = Faster software

Un code efficace ne signifie pas toujours une application plus rapide.

Brett Cannon, core developer de Python, estime que choisir un langage de programmation parce qu’il est plus rapide sur le papier est une forme d’optimisation prématurée.

Je suis d’accord avec lui dans le sens où le mot rapide à des significations différentes selon vos objectifs.

Pour moi, on peut dire que Rust est plus rapide parce qu’il est efficace, ce qui ne couvre pas tous les éléments de la liste ici.

Appliquons-les à mon contexte :

  • Rapide à développer ?
    • Non. Python est beaucoup plus rapide, j’ai fait ça pendant 15 ans.
  • Rapide à maintenir ?
    • Non. Personne chez Numberly ne connaît encore Rust.
  • Rapide à prototyper ?
    • Non. Le code doit être complet pour être compilé et exécuté.
  • Rapide pour traiter les données ?
    • Bien sûr. Mais pour le prouver, il faut d’abord le mesurer !
  • Rapide pour couvrir tous les cas d’échec ?
    • Absolument. Exhaustivité obligatoire + primitives de gestion des erreurs incluses dans le langage

Comme nous pouvons le voir dans mon cas, choisir Rust plutôt que Python signifie que je vais définitivement perdre du temps !

Je n’ai pas choisi Rust pour être “plus rapide”. Notre code Python était suffisamment rapide pour assurer le traitement de leur pipeline.

Alors pourquoi voudrais-je perdre du temps ? La réponse courte est l’innovation !

Les paradigmes d’un code fiable

Maintenant, qu’est-ce que je vais gagner en perdant du temps autre que la douleur d’utiliser des points-virgules et des parenthèses ?

Un logiciel supposé plus fiable grâce à la conception unique et aux paradigmes de Rust.

En d’autres termes, ce qui me ralentit est aussi une occasion de rendre mon logiciel plus robuste !

 

  • Paradigmes de bas niveau (ownership, borrowing, lifetime)
    • Si ça compile, c’est sûr
  • Sécurité de type forte
    • Prévisible, lisible, maintenable
  • Compilation (débogage, version)
    • Le compilateur Rust est très utile par rapport à une exception Python aléatoire.
  • Gestion des dépendances
    • Enfin quelque chose de sain par rapport à la pagaille de Python
  • Correspondance exhaustive des motifs
    • La certitude de ne pas oublier quelque chose
  • Primitives de gestion des erreurs (Résultat)
    • Gérer l’échec nativement dans la syntaxe du langage

“J’ai choisi Rust car il m’a fourni les paradigmes de programmation avec le niveau d’abstraction dont j’avais besoin pour enfin comprendre et mieux expliquer la fiabilité et les performances d’une application.

Apprendre Rust à la dure

Voici un aperçu de tous les aspects et de toutes les technologies auxquels j’ai dû faire face. Je vais détailler les parties les plus intéressantes ci-dessous.

Registre des schémas Confluent Kafka

Nous utilisons Confluent Kafka Community edition avec un Schema Registry pour gérer nos messages encodés en Avro.

Confluent Schema Registry ajoute un octet magique aux messages kafka, ce qui casse la désérialisation du désérialiseur Apache Avro classique.

Heureusement pour moi, Gerard Klij avait fait le gros du travail dans sa crate, ce qui m’a beaucoup aidé avant que je ne découvre des problèmes de performance.

Nous travaillons à l’améliorer et j’espère pouvoir revenir à son projet utile une fois que nous aurons terminé.

En attendant, j’ai décidé d’utiliser l’approche manuelle présentée ici et de décoder moi-même les messages Avro en respectant leur schéma :

/// Deserialize the given kafka raw `message` using the provided
/// Avro `schema` and return a Navigation struct message to be
/// used by the processors.
pub fn get_decoded_message(schema: &Schema, message: &BorrowedMessage) -> Result<Navigation> {
     /*
     DO NOT pass the reader schema as last argument to from_avro_datum
     this implies a resolve() on each value and impacts performances badly
     > Ok(from_avro_datum(&schema, &mut reader, Some(&schema)).unwrap()) // decode + resolve
     */
          let mut reader = Cursor::new(&message.payload().unwrap()[5..]);
          let val = match from_avro_datum(&schema, &mut reader, None) {
               Ok(inner) => inner,
               Err(err) => {
                    return Err(anyhow!(err));
               }
          };
          let navigation: Navigation = match &val {
          Value::Record(_) => from_value::<Navigation>(&val).unwrap(),
               _ => {
                    return Err(anyhow!("could not map avro data to struct"));
               }
          };
          Ok(navigation)
}

Apache Avro Rust était cassé

Puis je me suis heurté au deuxième mur lorsque, même si ma lecture des messages Avro était faite correctement, je ne pouvais pas les désérialiser…

En tant que novice total en Rust, je me suis blâmé pendant des jours avant même d’oser ouvrir le code source d’Apache Avro et d’y jeter un oeil. J’ai fini par découvrir qu’Apache Avro n’était pas complet et ne supportait pas les schémas complexes comme le nôtre !

Je me suis demandé si quelqu’un dans le monde entier utilisait réellement Avro avec Rust en production… d’autant plus que le projet avait été donné à la fondation Apache sans qu’aucun committer ne soit capable de fusionner les PRs à l’époque.

Me voici donc en train de contribuer à des correctifs pour Apache Avro Rust qui ont finalement été fusionnés trois mois plus tard, en janvier 2022 : AVRO-3232 et AVRO-3240.

Quoi qu’il en soit, un autre fait inattendu que Rust m’a permis de prouver est que la désérialisation d’Avro est plus rapide que la désérialisation de JSON dans notre cas de structures de données riches et complexes. Mon collègue Othmane en était sûr, je pouvais enfin le prouver !

Une fois Avro corrigé, j’ai pu commencer à expérimenter avec l’option  --release et à exécuter mon code optimisé, différence importante et inattendue :

Structures asynchrones pour optimiser la latence

Une fois que j’ai été en mesure de récupérer les messages de Kafka, j’ai commencé à chercher le meilleur modèle pour les traiter.

J’ai trouvé le runtime asynchrone de tokio très intuitif, venant de Python asyncio.

J’ai beaucoup joué avec différents modèles de code pour optimiser et rendre la consommation des messages de Kafka stable et fiable.

Une des découvertes intéressantes a été de ne pas reporter le décodage des messages Avro à un green-thread mais de le faire directement dans la boucle principale. La désérialisation est une opération liée au CPU qui bénéficiera de ne pas être coopérative avec d’autres tâches en green-thread !

De même, autoriser et contrôler votre parallélisme aidera à stabiliser vos opérations liées aux E/S, voyons un exemple réel de cela :

Le fait de reporter le reste de ma logique de traitement qui est liée aux E/S sur les green-threads a permis d’absorber les pics de latences sans affecter ma vitesse de consommation de kafka.

Le tableau de bord Grafana ci-dessus montre que vers 9h00, quelque chose a rendu Scylla plus lent que d’habitude, les latences P95 de scylla select et insert ont augmenté d’un facteur 16.

C’est là que la charge du parallélisme (la petite bosse sur le premier graphique) a également commencé à augmenter, car j’avais plus de green-threads actifs en parallèle traitant les messages.

Mais cela n’a fait qu’affecter la latence de consommation de Kafka d’un facteur 2 à P95, absorbant efficacement les pics de latences dus à cette surcharge éphémère !

C’est l’exemple typique de quelque chose qui était plus difficile à cerner et à démontrer en Python mais qui est devenu clair grâce à Rust.

 

Le driver Scylla Rust

J’ai trouvé le driver Scylla Rust intuitif et bien construit, félicitations à l’équipe qui est également très utile sur leur canal dédié sur le service Scylla Slack, rejoignez-nous !

La nouvelle CachingSession

est très pratique pour mettre en cache vos prepared statements afin que vous n’ayez pas à le faire vous-même comme je l’ai fait au début…

Attention : les prepared statements ne sont PAS paginés par défaut, utilisez les avec execute_iter() !

Je présente un exemple de code d’une fonction de connexion de production à scylla, utilisant SSL, le multi-datacenter et une session avec cache :

/// Connect to Scylla and return a `CachingSession` that will
/// be used by the processors.
pub async fn get_scylla_session(
          scylla_nodes: Vec<&str>,
          scylla_user: &str,
          scylla_password: &str,
          ssl_ca_path: &str,
) -> Result<CachingSession> {
          let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
          let ca_dir = fs::canonicalize(PathBuf::from(ssl_ca_path))?;
          context_builder.set_ca_file(ca_dir.as_path())?;
          context_builder.set_verify(SslVerifyMode::PEER);
          // load balancing policy
          let local_dc = &*DATACENTER;
          let dc_robin = Box::new(DcAwareRoundRobinPolicy::new(local_dc.to_string()));
          let policy = Arc::new(TokenAwarePolicy::new(dc_robin));
          // the size of the prepared statement cache has no impact on the underlying
          // DashMap memory consumption as it is only used by the CachingSession
          // struct logic itself.
          info!(
               "scylla connecting to {:?} as user {} on datacenter {}",
               &scylla_nodes, scylla_user, local_dc
          );
          let caching_session = CachingSession::from(
               SessionBuilder::new()
                    .known_nodes(&scylla_nodes)
                    .connection_timeout(Duration::from_secs(15))
                    .ssl_context(Some(context_builder.build()))
                    .user(scylla_user, scylla_password)
                    .load_balancing(policy)
                    .build()
                    .await?,
               1000,
          );
          Ok(caching_session)
}

Exporter correctement les métriques pour Prometheus

Passons maintenant à Prometheus qui, même s’il arrive tard dans cette présentation, est en fait l’une des premières choses que j’ai mises en place sur mon application.

Pour toutes les expériences que j’ai faites, j’ai mesuré leur impact sur la latence et le débit grâce à Prometheus.

Pour qu’un test soit significatif, ces mesures doivent être effectuées correctement, puis représentées graphiquement correctement.

Voici donc un exemple de la façon dont je mesure la latence d’insertion des requêtes de Scylla.

La première et importante difficulté est de configurer correctement votre bucket d’histogramme avec la finesse du graphique attendu :

pub static ref SCYLLA_INSERT_QUERIES_LATENCY_HIST_SEC: Histogram = register_histogram!(
          "scylla_insert_queries_latency_seconds",
          "Scylla INSERT query latency histogram in seconds",
          vec![0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 5.0, 15.0],
)
.expect("failed to create prometheus metric");

 

Ici, je m’attends à ce que la latence de scylla varie entre 50µs et 15s, ce qui est le délai d’attente maximal du serveur que j’autorise pour les écritures.

Ensuite, je l’utilise comme ceci : je démarre un timer sur l’histogramme et j’enregistre sa durée en cas de succès; je l’abandonne en cas d’échec afin que mes mesures ne soient pas polluées par d’éventuelles erreurs.

let timer = SCYLLA_INSERT_QUERIES_LATENCY_HIST_SEC.start_timer();
match scylla_session
          .execute(scylla_statement, &scylla_parameters)
          .await
{
          Ok(_) => {
               timer.observe_duration();
               Ok(())
          }
          Err(err) => {
               drop(timer);
               PROCESSING_ERRORS_TOTAL
                    .with_label_values(&["scylla_insert"])
                    .inc();
               error!("insert_in_scylla: {:?}", err);
               Err(anyhow!(err))
          }
}

Tableau de bord Grafana

L’un de mes meilleurs investissements en temps dans ce projet a été la création d’un tableau de bord Grafana détaillé et significatif afin que je puisse voir et comparer les résultats de mes expérimentations de l’application Rust.

Assurez-vous de grapher autant de choses que possible !

  • tailles des caches
  • fréquence des requêtes et occurrence de tout, en discernant la différence entre les deux
  • rendez les mesures d’erreurs significatives en utilisant des labels (par type)
  • mémoire des pods kubernetes

Lisez aussi l’excellent article que les gens de Grafana ont écrit sur la façon de visualiser les histogrammes de Prometheus directement dans Grafana, ce n’est pas aussi évident qu’on pourrait le croire !

Ai-je vraiment perdu du temps à cause de Rust ?

La vraie question est : ai-je le sentiment d’avoir perdu du temps ? Réponse courte : non !

La syntaxe était étonnamment simple et intuitive à adopter, même en venant de Python.

Au final, je dois avouer que Rust m’a donné envie de tout tester et analyser à un niveau inférieur et je n’ai absolument pas réussi à résister à la tentation !

J’ai donc passé la majeure partie de mon temps à faire des tests, à établir des graphiques, à analyser et à essayer de trouver une explication décente et perspicace : pour moi, ce n’est certainement pas du temps perdu !

Pour ceux d’entre vous qui ont faim de chiffres, en voici tirés de l’application en production :

  • Débit maximal de lecture Kafka avec traitement ? 200K msg/s sur 20 partitions
  • Latence P50 de la désérialisation Avro ? 75µs
  • Scylla SELECT latence P50 sur des tables de 1.5B+ lignes ? 250µs
  • Scylla INSERT latence P50 sur des tables de 1.5B+ lignes ? 660µs

Conclusion : ça s'est mieux passé que prévu !

  • L’écosystème des crates Rust est mature, similaire au Python Package Index
  • Le driver scylla-rust est stable et efficace !
  • Il m’a fallu un certain temps pour accepter qu’Apache Avro était cassé, pas moi…
  • 3 applications Python totalisant 54 pods remplacées par 1 application Rust totalisant 20 pods
  • C’est le logiciel le plus fiable et le plus efficace que j’ai jamais écrit !

Même s’il s’agissait de ma première application Rust, je me suis senti en confiance pendant le processus de développement, ce qui s’est transformé en confiance dans un logiciel prévisible et résilient.

Après des semaines de production, le nouveau processeur pipeline Rust s’avère être très stable et fiable.

Je peux donc dire que oui, les promesses de Rust sont à la hauteur des attentes !