Vous le savez déjà peut-être, mais la base du développement d’applications de Big Data Streaming avec Kafka se déroule en 3 étapes, à savoir,
1 – déclarer le Producer,
2- indiquer le topic de stockage
3- et déclarer le Consumer.
En ce qui concerne le Producer, il existe 2 façon de le déclarer : vous pouvez entièrement le créer par programmation, ou alors vous pouvez exposer une application existante comme Producer. C’est ce dernier cas qui nous intéresse et qui est l’objet de cette chronique.
Le développement d’applications streaming est particulier et dans la majorité des cas, les données [streaming] que vous aurez à traiter existeront déjà dans des systèmes de gestion de base de données de l’entreprise (Oracle, SQL Server, IBM DB2, MySQL, Ms Access, Ms Excel, etc), dans ses ERP métiers (Salesforce, SAP), ou encore dans ses systèmes décisionnels (HDFS, Data warehouse, Teradata, Hana, etc) ou même seront hébergées dans les PaaS de ses fournisseurs Cloud.
Cela signifie que vous aurez rarement à développer un Producer par programmation en partant de rien. Vous allez souvent devoir plutôt définir une source existante comme un Producer Kafka.
Le problème le plus évident que vous rencontrerez pour faire apparaître ces sources de données opérationnelles comme Producer Kafka est que pour la plupart, elles ne sont pas nativement streaming. En d’autres termes, les données qui y sont stockées ne présentent aucune caractéristique streaming et sont stockées comme de simples « faits« . De plus, chaque système opérationnel de base de données possède des caractéristiques techniques et technologiques qui lui sont propres.
Ainsi, pour exposer Oracle comme Producer à Kafka, il faut développer un « Producer Oracle » spécifique, pour Salesforces, il faut développer un « Producer Salesforces« , pour Hadoop, il faut développer un « Producer Hadoop« , etc…
Pour résoudre ces 2 problèmes d’un point de vue conceptuel, la solution revient à faire ceci :
1 – développer un connecteur générique côté Kafka compatible avec le protocole de connexion de données utilisé par les systèmes opérationnels de gestion de données (les SGBDR), notamment le protocole ODBC ou le protocole JDBC, puisque les SGBDR sont quasiment tous compatibles avec l’un de ces 2 protocoles. Ce connecteur, à l’aide de la connexion qu’il établira avec le SGBDR pourra envoyer des appels/requêtes de copie de données vers Kafka, tout comme les applications opérationnelles récupèrent les données de la base de données à travers les requêtes SQL qu’elles envoient au SGBDR.
2 – Mettre le connecteur générique en écoute de la base de données de sorte que Kafka soit informé de tout changement ou modification de données dans la base. Ainsi, toute la base de données est changée de faits en événements (donc en source de données streaming).
C’est exactement cette solution qui a été adoptée par les développeurs de Kafka dès sa version 0.9 avec la sortie de Kafka Connect.
Kafka Connect est une extension de Kafka qui établit une passerelle entre une grande variété de systèmes opérationnels (tels que les SGBDR, les ERP, les data warehouse, les outils de journalisation) et le Log de Kafka afin d’y copier/transférer les données. Cela signifie que vous pouvez utiliser Kafka Connect pour établir une connexion avec Salesforce et l’exposer comme Producer, afin de récupérer chaque nouvelle donnée client qui y arrive et l’enregistrer dans Kafka pour un usage immédiat ou différé.
Aussi, vous pouvez utiliser Kafka Connect pour récupérer les données stockées dans Kafka et les pousser vers une destination (un sink) par exemple le data lake, le data warehouse de l’entreprise, Hadoop, HDFS, ou encore une autre base de données relationnelle. Kafka Connect transforme toute source de données opérationnelles (les faits) en source de données streaming (des événements), ce qui favorise l’usage de Kafka à un grand nombre de scénarios de données. Par exemple :
- l’ingestion en temps réel de tout changement dans une base de données relationnelle
- l’ingestion centralisée des données collectées par des agrégateurs de flux tels que Log4J, SysLog pour la détection d’anomalies
- le reporting en temps réel par l’ingestion des données de systèmes opérationnels vers des Data Hub (Data Lake, Data warehouse)
- ingestion de données pour la construction d’index d’un moteur de recherche de contenu (elasticsearch, Solr, etc).
Nous avons décrit le plein potentiel de Kafka dans l’ingestion des données opérationnelles (non-streaming) dans cet article : Data Warehouse – Data Lake en Big Data.
Vous pouvez aussi consulter le chapitre 4 de notre ouvrage « Big Data Streaming : le traitement streaming & temps réel en Big Data« .
Ainsi, Kafka Connect est une extension centrale que vous devez absolument maîtriser pour élargir les cas d’usage de Kafka dans votre travail quotidien. Nous vous aiderons à le maîtriser dans cette chronique spécialisée.
1 – Architecture de Kafka Connect
Kafka Connect est une extension qui permet de transformer un système de données opérationnel en Producer Kafka. Il fournit une large panoplie de connecteurs qui permettent à Kafka de se connecter à des produits spécifiques et d’y importer ou exporter les données. Rendez vous sur le site suivant pour obtenir la liste complète des connecteurs Kafka disponibles actuellement. Vous verrez que vous y trouverez par exemple :
– Kafka Connect for Salesforce
– Kafka Connect for Oracle
– Kafka Connect for HDFS
– Kafka Connect for ElasticSearch
– etc…
Attention !!! Les connecteurs de Kafka Connect listés dans le lien précédent sont des produits propriétaires de Confluent. Ils seront donc payants pour la plupart. En dehors de Confluent, d’autres éditeurs ont développé eux-mêmes des connecteurs Kafka propres à leurs produits. Par exemple, Oracle Golden Gate for Big Data, SQData, QlikView Attunity Replicate, IBM IIDR, etc. Si vous êtes emmené dans votre travail à utiliser ces produits, alors rapprochez-vous directement de leur documentation pour plus de détails sur leur fonctionnement. Aussi, vous trouverez des développeurs qui ont développé des connecteurs Kafka spécifiques libres d’utilisation. Le code de ces connecteurs est souvent disponible en open source. Par exemple, voici un producer Kafka Twitter sur Git pour la connexion et l’ingestion des données Twitter. Une autre chose, gardez à l’esprit que vous pouvez rédiger vous-même votre producer en suivant les principes et les techniques que nous enseignons dans notre formation sur le développement d’applications de Big Data Streaming.
En fait, Kafka Connect peut être utilisé pour établir une connexion avec tout système qui est compatible avec le protocole JDBC. De plus, il s’utilise comme un service REST. Cela signifie que la création, la configuration et la suppression d’une connexion avec un système opérationnel se fait à l’aide d’une simple API REST. Nous y reviendrons plus bas.
La figure suivante représente l’architecture la plus simpliste de Kafka Connect.
L’architecture de Kafka Connect repose sur 3 grands concepts. En d’autres termes, vous devez apprivoiser uniquement 3 concepts pour comprendre et maîtriser l’utilisation de Kafka Connect : le connecteur (connector), la tâche (task) et le worker.
– le Connecteur (Connector) : c’est l’instance logique d’un job Kafka Connect qui gère la copie/transfert des données d’un système source (source system) vers un système cible (sink system). En fait, il fait référence à une classe appelée Connector
dans laquelle vous spécifiez la source de données copiée, ainsi que différentes configurations. Nous y reviendrons plus bas.
– la tâche (Task) : chaque connecteur instancie une ou plusieurs tâches pour l’exécution de la copie des données. Les tâches sont les instances logiques qui exécutent la copie/transfert de données du système source vers Kafka ou de kafka vers le système cible. Les tâches sont en réalités l’exécution distribuée de la copie des données (du job Kafka Connect – Connector).
– les workers : le connecteur est le job qui copie, les tâches sont les instances logiques de l’exécution de la copie, et les workers sont en fait les processus informatiques (deamon) qui s’exécutent sur le cluster pour copier les données. Les workers ce sont les instances physiques des tâches.
worker = connector + task
Les processus workers vont s’exécuter en distribué de façon coordonnée. Par contre, il faudra un gestionnaire de ressource pour la gestion et le monitoring de leur exécution (par exemple YARN).
La figure suivante récapitule l’architecture interne de Kafka Connect avec l’exemple d’une source de données, Oracle. L’utilisation de Kafka Connect revient à déclarer/supprimer les Connecteurs (indiquer la source de données à copier, les paramètres de connexion à cette source, le nombre de tâches, le topic de destination, …etc) et les configurer via un fichier de configuration (mode standalone), ou via une API REST. Nous y reviendrons plus bas.
2 – Fonctionnement de Kafka Connect
Le fonctionnement de Kafka Connect commence avec le démarrage des processus Kafka Connect sur les machines du cluster Kafka (les workers). Chaque worker tourne sur un noeud différent du cluster.
Chaque instance de worker démarre avec comme configuration minimale : l’adresse d’un agent Kafka, le topic de stockage des données à copier et le Kafka_consumer_group_id. Chaque instance de worker est sans état (stateless) et ne partage aucune information (état) avec les autres instances worker. Par contre, toutes ces instances appartiennent au même Kafka_consumer_group_id et se coordonnent à l’aide d’un topic Kafka.
Chaque worker instancie un Connecteur (source ou cible) avec les configurations de la tâche associée (tâche source ou tâche cible). Comme nous l’avons expliqué précédemment, la tâche est le processus logique de copie des données. Par exemple, une tâche HDFSSinkTask est un thread qui copie les données d’un topic Kafka pour les déposer dans le HDFS.
Chaque worker instancie un pool de tâches sur la base de la configuration fournie par le Connecteur et décide en coordination avec les autres workers sur la base du topic kafka, quelles données vont être traitées par chaque tâche.
Attention !!! la coordination distribuée des workers et des différentes tâches est assurée par Kafka. En cas d’arrêt inopiné d’un worker par exemple, le rééquilibrage automatique de charge est pris en charge par Kafka.
Kafka Connect expose une API REST pour la création, la modification et la suppression des connecteurs. Nous vous montrerons comment l’utiliser plus bas.
Quelque soit le Connecteur utilisé et qu’il soit Source ou Cible, ce qui se passe est qu’à des intervalles réguliers (par exemple toutes les 30 secondes), les données sont copiées du système source par les tâches et écrites dans le système cible. Notez que l’un des avantages avec Kafka Connect est qu’il gère automatiquement les offsets. Il les stocke par défaut dans un topic Kafka. Ainsi, vous n’avez pas à gérer les offsets lorsque vous développez vos connecteurs.
Cela signifie qu’il y’a un commit de l’offset automatiquement après la copie des données et/ou après leur dépôt dans le système cible. Grâce à cette gestion automatique des offset, Kafka Connect sait toujours où il en est dans les copies/transferts de données, et celles-ci sont par conséquent idempotents. En cas d’arrêt d’un worker ou de panne sur le cluster, l’offset est rejoué par Kafka Connect pour récupérer l’état de la copie avant la panne lors de la restauration. Il n’y’a donc jamais une double écriture/copie de la même donnée. Ainsi, les opérations de copie/transfert de données de Kafka Connect sont idempotentes et cela garantie la sémantique exactement-une-fois.
Pour la copie des données des systèmes opérationnels (SGBDR), Kafka est compatible avec le protocole JDBC (Java Data Base Connectivity). Le JDBC, tout comme l’ODBC (Open Data Base Connectivity), est un protocole générique de données qui permet de communiquer avec des SGBDR. Pratiquement tous les SGBDR (Oracle, SQL Server, Access, DB2, ..etc) sont compatibles avec les protocoles JDBC et ODBC. Ainsi, en étant compatible avec le protocole JDBC, Kafka Connect est capable de se connecter à n’importe quel SGBDR et l’exposer soit comme un producer Kafka (copier les tables de la BD et la transférer vers un topic Kafka), soit alors comme un Consumer Kafka (copier les données d’un topic Kafka pour les stocker dans une BD relationnelle).
Le processus est le même que celui décrit plus haut : les données sont chargées à des intervalles périodiques via l’exécution d’une requête SQL. Par défaut, toutes les tables dans la base de données sont copiées, chacune dans un topic distinct. Kafka se met en écoute de la base de données pour y détecter tout changement (par exemple création de nouvelles tables, suppression d’enregistrements, ajout de nouveaux enregistrements dans une table, etc.) et s’adapter automatiquement (réplication des changements dans les topics correspondants). Chaque copie ou transfert des résultats de la requête SQL entraîne automatiquement un commit de l’offset.
Attention !!! Lorsque vous placez Kafka en écoute d’une BD relationnelle, vous devez indiquer le champ ou la colonne qui doit être écouté (ou dont les changements entraîne les changements dans le topic). C’est cette colonne qui servira de référence pour l’envoie de nouvelles données.
3 – Utilisation de Kafka Connect
Comme nous l’avons expliqué précédemment, pour utiliser Kafka Connect, vous devez renseigner 3 fichiers de configuration :
– le fichier de configuration du système source, dans lequel vous indiquerez le nom du connecteur, la classe du connecteur (qui correspond le plus souvent au type de système source), le nombre de tâches à instancier pour le job, le topic dans lequel stocker les données copiées et enfin les paramètres spécifiques au type de système source. Le lien suivant de la communauté open source de Kafka explique plus en détails les paramètres de configuration d’un connecteur.
Exemple de paramétrage d’un connecteur (dans ce cas de figure, le fichier plat est le système source).
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
topic=connect-test
file=test.txt
Attention !!! Pour la configuration de fichier des systèmes sources dont le connecteur a été développé par Confluent, référez-vous directement à la documentation offerte par Confluent. Cela est également valable pour la configuration de fichier des systèmes cibles.
– le fichier de configuration du système cible, qui est quasiment identique à la configuration du fichier du système source, excepté que cette fois-ci, le topic est la source de consommation des données et la cible est le lieu de réception des données.
Si nous reprenons l’exemple de configuration de tout à l’heure, la propriété « file » représente cette fois le nom du fichier cible, c’est-à-dire du fichier dans lequel Kafka Connect écrira les données.
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
– le fichier de configuration du worker, il est question dans ce fichier de fournir les paramètres nécessaires au worker afin qu’il établisse le lien entre le système source et le système cible, et qu’il exécute la lecture des données pour les stocker dans le système cible. Les paramètres à renseigner sont le plus souvent : l’adresse des serveurs kafka, le chemin de stockage de l’offset, le format de conversion des données (json, csv, etc..), la périodicité de rafraîchissement de l’offset, etc… Le lien suivant fournit la liste complète des paramètres possibles :
Voici un exemple de configuration de worker.
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/share/java
Le fichier de configuration du worker se trouve généralement à l’adresse
./kafka-2.3.0-src/config/connect-standalone.properties ou
./kafka-2.3.0-src/config/connect-distributed.properties
Une fois que vous avez paramétrez ces 3 fichiers, vous devez décider du mode d’exécution de Kafka Connect : exécution Standalone ou exécution distribuée ?
Lorsque vous exécutez Kafka Connect en standalone, une seule instance du logiciel est exécutée, et celà en local. Privilégiez ce mode d’exécution uniquement pour les développements et les tests. Lorsque vous allez en production, passez en déploiement distribuée.
Dans notre formation sur le Big Data Streaming, nous fournissons des tutoriels qui vous expliquent comment mettre en pratique Kafka Conect. Cliquez sur l’annonce suivante pour télécharger un extrait de la formation.
4 – Utilisation de l’API REST de Kafka Connect
Kafka Connect a été conçu pour s’exécuter comme un service sur le cluster, par conséquent, à défaut de faire appel au shell pour l’exploiter, vous pouvez utiliser l’API REST qu’il met à disposition. Par défaut, cette API REST est à l’écoute du port 8083, à l’adresse http://localhost:8083.
Voici quelques unes des fonctions que vous pouvez appeler :
- GET /connectors – retourne la liste de tous les connecteurs en cours d’exécution.
- GET /connectors/{name} – retourne les détails d’un connecteur spécifique
- POST /connectors – créé un nouveau connector; dans ce cas de figure, l’objet de la requête doit être un objet JSON dans lequel sont spécifiés les paramètres de configuration du connecteur.
- GET /connectors/{name}/status – retourne le status actuel d’un connecteur, en précisant son état (en cours d’exécution, échec, pause, etc.), le worker auquel il est assigné et l’état de toutes ses tâches.
- DELETE /connectors/{name} – supprime un connecteur, stoppant ainsi toutes ses tâches et détruisant ses configurations.
- GET /connector-plugins – retourne la liste des plugins de connecteurs installés sur le cluster Kafka Connect.
Le lien suivant fournit la liste complète des fonctions REST disponible dans l’API.
Attention !!! Kafka Connect tourne sur son propre cluster, distinct du cluster de Kafka. Aussi, l’API REST n’est utilisable que lors de l’exécution en mode distribuée de Kafka Connect. Elle n’est pas disponible en mode Standalone.
Reprenons l’exemple précédent de la configuration des fichiers pour le fichier plat et faisons la même chose avec l’API REST. Lorsqu’on utilise l’API REST, il n’y’a pas de paramétrage de fichier de configuration. Vous créez directement les connecteurs en envoyant 2 requêtes POST (une pour le système source et l’autre pour le Sink) à l’adresse http://localhost:8083/connectors contenant les objets JSON dans lesquels les paramètres de configuration de chaque connecteur seront au préalable renseignés.
Appelons connect-file-source.json, le fichier json contenant les paramètres du connecteur du système source (en l’occurence le fichier plat).
{
"name": "local-file-source",
"config": {
"connector.class": "FileStreamSource",
"tasks.max": 1,
"file": "test-distributed.txt",
"topic": "connect-distributed"
}
}
Maintenant, on peut exécuter la requête POST :
curl -d @"$VOTRE_CHEMIN_KAFKA/connect-file-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
On fait aussi pareil pour le système cible :
{
"name": "local-file-sink",
"config": {
"connector.class": "FileStreamSink",
"tasks.max": 1,
"file": "test-distributed.sink.txt",
"topics": "connect-distributed"
}
}
curl -d @$VOTRE_CHEMIN_KAFKA/connect-file-sink.json \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors
Voilà ! Notre pipeline est en place en quelques minutes ! Pour plus de détails sur l’utilisation de l’API REST Kafka, nous vous recommandons le lien de la documentation officielle :
Nous sommes arrivés au terme de cette chronique sur Kafka Connect. L’objectif n’a pas été de vous rendre expert dessus, mais de vous donner suffisamment de savoir -faire pour collecter les données des bases opérationnelles en source streaming.
Quelles difficultés rencontrez-vous avec Kafka Connect ? Indiquez-les nous en commentaire.