By Jay Clifford / 2022 年 4 月 20 日 / 近所、IoT、開発者
カフカ・サミットが急速に近づいてきたので、手を汚してすべてを理解する時が来たと思いましただいたい。 IoT の推奨事項として、Kafka について聞いたことがありますが、以前は複雑なプロトコル MQTT に組み込まれすぎて、比較することができませんでした。 初心者 (ファンシー・ミー) にとって、各プロトコルは、事実上競合しなければ、非常に同一に見えます。 あるいは、これはケースの工夫であり、実際には多くの条件で互いに補完し合っていることがわかりました. この一連のブログでは、Kafka と MQTT とは何か、そしてそれぞれが IoT 構造に適合する仕組みについて要約したいと思います。 アイデアの間で主張をサポートするために、過去のシナリオを利用することは一見意図的に見えるかもしれないと思いました:
以内 以前のウェブログ で、非常用ガソリン発電機の画面を表示したいシナリオについて説明しました。 InfluxDB Python クライアント ライブラリを使用してシミュレーターを作成し、ジェネレーターのレコードデータを InfluxDB Cloud に送信しました。 このブログでは、特にそのシミュレーターを再利用しますが、クライアント ライブラリを MQTT パブリッシャーと Kafka プロデューサーに置き換えて、それぞれのサポート内のコア メカニクスを指定します
このデモ のコードは、ここ
から取得できるようです。 .
- 基礎を固める カフカって何? Kafka は、トーナメント ストリーミング プラットフォームとして説明されています。 これは、レコードデータの永続性が追加されたパブリッシャー – サブスクライバー構造に準拠しています (より多くの基本事項を指定するには、この
- 過剰なスループット
- )
- 過剰な可用性
- スマートに診断されたサード セレブレーション プラットフォームへのコネクタ
- だから、なぜ私はもうKafka を使用して IoT プラットフォーム全体を合法的に作成できますか? うまくいけば、すべての流行はいくつかの重要な懸念事項に要約されます:
- Kafka は、実際のインフラストラクチャを展開する実際のネットワーク用に構築されています
- Aid-Alive に似た重要な記録データ供給の側面を展開しなくなりましたand Final Will
- 以上のことを述べた上で、長い歴史を持つ Kafka プロデューサーの作成の実装を比較し、それを MQTT パブリッシャーと比較してコンテキストの長さを比較してみましょう。非常用発電機のデモ:
前提条件: このデモの機能については、Mosquitto MQTT Dealer と合流プラットフォーム (カフカ)。 ここでは予備的な作成/セットアップを隠蔽するつもりはありませんが、おそらくこれらの指示を適宜参照するように見えるかもしれません:
- 過剰な可用性
- ホスト: Mosquitto サーバーをホストするプラットフォームのハンドル/IP
- ポート: MQTT プロデューサーが参照するポート。 最も継続的に確立された接続の場合は 1883、TLS は 8883 です。
- Aid Alive Interval: 通信間隔 (秒単位)
self.client.connect(ホスト=self.mqttBroker、ポート=self.port、キープアライブ=MQTT_KEEPALIVE_INTERVAL) カフカ Kafka に到達したとき、バックグラウンド作業がほとんどなくなりました。 2 つのさまざまな Kafka エンティティへの接続を配置する必要がありました:
- ポート: MQTT プロデューサーが参照するポート。 最も継続的に確立された接続の場合は 1883、TLS は 8883 です。
- Kafka クラスター: ペイロードをここに送信する必要があります
- スキーマ レジストリ: レジストリは、カフカ クラスタ。 トピック スキーマの格納と表示を処理します。 さまざまな言葉で言えば、これによりプロデューサーは、Kafka ユーザーが予期するレイアウトでレコードデータを会話することを余儀なくされます。 これについては後で詳しく説明します。
:
'} schema_registry_client=SchemaRegistryClient(schema_registry_conf) - url: スキーマ レジストリのハンドル。 Confluent は、ホスティング用のレジストリの作成を支援します。
- authentication: すべてのリポジトリが大好きで、長年にわたって確立されたセキュリティに対応し、実際のスキーマ設計をサポートします.
- Kafkaクラスター
self.json_serializer=JSONSerializer(self.schema_str , schema_registry_client, engine_to_dict) self.p=SerializingProducer({ ‘bootstrap.servers’: ‘pkc-41wq6.ecu-west-2.aws.confluent.cloud: 9092’, ‘sasl.mechanism’: ‘PLAIN’, ‘security .protocol’: ‘SASL_SSL’, ‘sasl.username’: ‘######’, ‘sasl.password’: ‘######’, ‘error_cb’: error_cb, ‘key.serializer’: StringSerializer(‘utf_8’), ‘value.serializer’: self.json_serializer }) 内訳:
- bootstrap.servers: 簡単に、Kafka クラスターをホストする Confluent Cloud へのハンドル部分。 より具体的には、Kafka ディーラーです。 (さらに、Kafka にはブローカーの表記がありますが、トピックごとに異なります)。 Bootstrap は、クラスター内でそのプレゼンスをグローバルに編成するプロデューサーへの参照です。
-
- sasl.*: 簡単なセキュリティ認証プロトコル。 これらは、Confluent Kafka に接続するための最小要件です。 私たちの全体的な比較にとって趣味ではないので、ここでこれを覆い隠すことはおそらくないように見えるかもしれません.
- error_cb: Kafka のエラー処理を処理します。
- key_serializer: これは、メッセージ キーが内部 Kafka に保存される方法を記述します。 キーは、Kafka がペイロードを処理する方法の非常に主要な部分です。 これについては、次のウェブログの長さのために追加します.
-
- Fee.serializer: 次に、これをクロークできます。簡単に言えば、プロデューサーが送信する記録データの流行を描写する必要があります。 スキーマ レジストリの定義が非常に重要な理由はここにあります
事項とプレゼント MQTT パブリッシャーと Kafka プロデューサーを開始したので、Emergency generator recordsdata を出荷する時が来ました。 これを作成するために、各プロトコルは、提供よりも早くトピックとレコードのデータ準備を必要とします: MQTT
MQTT の世界の内部、トピックは、ペイロード間の論理フィルタリングを確立する UTF-8 文字列です。トピックタイトル ペイロード 温度 36 ガソリン 400 の このシーケンスの割り当て 2 では、MQTT と Kafka のサブジェクトの問題の機能とバリエーションをさらに掘り下げます。材料。 今のところ、1 つのトピックを配置して、すべての緊急発生装置の記録データを送信します (ここではもはや教育的な実践ではありませんが、このミッションの複雑な起源の中で論理的です)。
メッセージ=json.dumps(recordsdata) self.client.put up(topic="emergency_generator "、 メッセージ)
- error_cb: Kafka のエラー処理を処理します。
- Kafkaクラスター
- authentication: すべてのリポジトリが大好きで、長年にわたって確立されたセキュリティに対応し、実際のスキーマ設計をサポートします.
内訳:
- のウェブログ を比較してください)。 ))。 Kafka はさらに、IoT セクターの長さに対していくつかのかなり大きな利点を促進します: